1 /*
2 * Copyright 2014 MongoDB, Inc.
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17 #include <bson/bson.h>
18
19 #include "mongoc-client-private.h"
20 #include "mongoc-client-session-private.h"
21 #include "mongoc-client-side-encryption-private.h"
22 #include "mongoc-error.h"
23 #include "mongoc-error-private.h"
24 #include "mongoc-trace-private.h"
25 #include "mongoc-write-command-private.h"
26 #include "mongoc-write-command-legacy-private.h"
27 #include "mongoc-write-concern-private.h"
28 #include "mongoc-util-private.h"
29 #include "mongoc-opts-private.h"
30
31
32 /*
33 * TODO:
34 *
35 * - Remove error parameter to ops, favor result->error.
36 */
37
38 typedef void (*mongoc_write_op_t) (mongoc_write_command_t *command,
39 mongoc_client_t *client,
40 mongoc_server_stream_t *server_stream,
41 const char *database,
42 const char *collection,
43 uint32_t offset,
44 mongoc_write_result_t *result,
45 bson_error_t *error);
46
47
48 /* indexed by MONGOC_WRITE_COMMAND_DELETE, INSERT, UPDATE */
49 static const char *gCommandNames[] = {"delete", "insert", "update"};
50 static const char *gCommandFields[] = {"deletes", "documents", "updates"};
51 static const uint32_t gCommandFieldLens[] = {7, 9, 7};
52
53 static mongoc_write_op_t gLegacyWriteOps[3] = {
54 _mongoc_write_command_delete_legacy,
55 _mongoc_write_command_insert_legacy,
56 _mongoc_write_command_update_legacy};
57
58
59 const char *
_mongoc_command_type_to_name(int command_type)60 _mongoc_command_type_to_name (int command_type)
61 {
62 return gCommandNames[command_type];
63 }
64
65 const char *
_mongoc_command_type_to_field_name(int command_type)66 _mongoc_command_type_to_field_name (int command_type)
67 {
68 return gCommandFields[command_type];
69 }
70
71 void
_mongoc_write_command_insert_append(mongoc_write_command_t * command,const bson_t * document)72 _mongoc_write_command_insert_append (mongoc_write_command_t *command,
73 const bson_t *document)
74 {
75 bson_iter_t iter;
76 bson_oid_t oid;
77 bson_t tmp;
78
79 ENTRY;
80
81 BSON_ASSERT (command);
82 BSON_ASSERT (command->type == MONGOC_WRITE_COMMAND_INSERT);
83 BSON_ASSERT (document);
84 BSON_ASSERT (document->len >= 5);
85
86 /*
87 * If the document does not contain an "_id" field, we need to generate
88 * a new oid for "_id".
89 */
90 if (!bson_iter_init_find (&iter, document, "_id")) {
91 bson_init (&tmp);
92 bson_oid_init (&oid, NULL);
93 BSON_APPEND_OID (&tmp, "_id", &oid);
94 bson_concat (&tmp, document);
95 _mongoc_buffer_append (&command->payload, bson_get_data (&tmp), tmp.len);
96 bson_destroy (&tmp);
97 } else {
98 _mongoc_buffer_append (
99 &command->payload, bson_get_data (document), document->len);
100 }
101
102 command->n_documents++;
103
104 EXIT;
105 }
106
107 void
_mongoc_write_command_update_append(mongoc_write_command_t * command,const bson_t * selector,const bson_t * update,const bson_t * opts)108 _mongoc_write_command_update_append (mongoc_write_command_t *command,
109 const bson_t *selector,
110 const bson_t *update,
111 const bson_t *opts)
112 {
113 bson_t document;
114
115 ENTRY;
116
117 BSON_ASSERT (command);
118 BSON_ASSERT (command->type == MONGOC_WRITE_COMMAND_UPDATE);
119 BSON_ASSERT (selector && update);
120
121 bson_init (&document);
122 BSON_APPEND_DOCUMENT (&document, "q", selector);
123 if (_mongoc_document_is_pipeline (update)) {
124 BSON_APPEND_ARRAY (&document, "u", update);
125 } else {
126 BSON_APPEND_DOCUMENT (&document, "u", update);
127 }
128 if (opts) {
129 bson_concat (&document, opts);
130 }
131
132 _mongoc_buffer_append (
133 &command->payload, bson_get_data (&document), document.len);
134 command->n_documents++;
135
136 bson_destroy (&document);
137
138 EXIT;
139 }
140
141 void
_mongoc_write_command_delete_append(mongoc_write_command_t * command,const bson_t * selector,const bson_t * opts)142 _mongoc_write_command_delete_append (mongoc_write_command_t *command,
143 const bson_t *selector,
144 const bson_t *opts)
145 {
146 bson_t document;
147
148 ENTRY;
149
150 BSON_ASSERT (command);
151 BSON_ASSERT (command->type == MONGOC_WRITE_COMMAND_DELETE);
152 BSON_ASSERT (selector);
153
154 BSON_ASSERT (selector->len >= 5);
155
156 bson_init (&document);
157 BSON_APPEND_DOCUMENT (&document, "q", selector);
158 if (opts) {
159 bson_concat (&document, opts);
160 }
161
162 _mongoc_buffer_append (
163 &command->payload, bson_get_data (&document), document.len);
164 command->n_documents++;
165
166 bson_destroy (&document);
167
168 EXIT;
169 }
170
171 void
_mongoc_write_command_init_bulk(mongoc_write_command_t * command,int type,mongoc_bulk_write_flags_t flags,int64_t operation_id,const bson_t * opts)172 _mongoc_write_command_init_bulk (mongoc_write_command_t *command,
173 int type,
174 mongoc_bulk_write_flags_t flags,
175 int64_t operation_id,
176 const bson_t *opts)
177 {
178 ENTRY;
179
180 BSON_ASSERT (command);
181
182 command->type = type;
183 command->flags = flags;
184 command->operation_id = operation_id;
185 if (!bson_empty0 (opts)) {
186 bson_copy_to (opts, &command->cmd_opts);
187 } else {
188 bson_init (&command->cmd_opts);
189 }
190
191 _mongoc_buffer_init (&command->payload, NULL, 0, NULL, NULL);
192 command->n_documents = 0;
193
194 EXIT;
195 }
196
197
198 void
_mongoc_write_command_init_insert(mongoc_write_command_t * command,const bson_t * document,const bson_t * cmd_opts,mongoc_bulk_write_flags_t flags,int64_t operation_id)199 _mongoc_write_command_init_insert (mongoc_write_command_t *command, /* IN */
200 const bson_t *document, /* IN */
201 const bson_t *cmd_opts, /* IN */
202 mongoc_bulk_write_flags_t flags, /* IN */
203 int64_t operation_id) /* IN */
204 {
205 ENTRY;
206
207 BSON_ASSERT (command);
208
209 _mongoc_write_command_init_bulk (
210 command, MONGOC_WRITE_COMMAND_INSERT, flags, operation_id, cmd_opts);
211
212 /* must handle NULL document from mongoc_collection_insert_bulk */
213 if (document) {
214 _mongoc_write_command_insert_append (command, document);
215 }
216
217 EXIT;
218 }
219
220
221 void
_mongoc_write_command_init_insert_idl(mongoc_write_command_t * command,const bson_t * document,const bson_t * cmd_opts,int64_t operation_id)222 _mongoc_write_command_init_insert_idl (mongoc_write_command_t *command,
223 const bson_t *document,
224 const bson_t *cmd_opts,
225 int64_t operation_id)
226 {
227 mongoc_bulk_write_flags_t flags = MONGOC_BULK_WRITE_FLAGS_INIT;
228
229 ENTRY;
230
231 BSON_ASSERT (command);
232
233 _mongoc_write_command_init_bulk (
234 command, MONGOC_WRITE_COMMAND_INSERT, flags, operation_id, cmd_opts);
235
236 /* must handle NULL document from mongoc_collection_insert_bulk */
237 if (document) {
238 _mongoc_write_command_insert_append (command, document);
239 }
240
241 EXIT;
242 }
243
244
245 void
_mongoc_write_command_init_delete(mongoc_write_command_t * command,const bson_t * selector,const bson_t * cmd_opts,const bson_t * opts,mongoc_bulk_write_flags_t flags,int64_t operation_id)246 _mongoc_write_command_init_delete (mongoc_write_command_t *command, /* IN */
247 const bson_t *selector, /* IN */
248 const bson_t *cmd_opts, /* IN */
249 const bson_t *opts, /* IN */
250 mongoc_bulk_write_flags_t flags, /* IN */
251 int64_t operation_id) /* IN */
252 {
253 ENTRY;
254
255 BSON_ASSERT (command);
256 BSON_ASSERT (selector);
257
258 _mongoc_write_command_init_bulk (
259 command, MONGOC_WRITE_COMMAND_DELETE, flags, operation_id, cmd_opts);
260 _mongoc_write_command_delete_append (command, selector, opts);
261
262 EXIT;
263 }
264
265
266 void
_mongoc_write_command_init_delete_idl(mongoc_write_command_t * command,const bson_t * selector,const bson_t * cmd_opts,const bson_t * opts,int64_t operation_id)267 _mongoc_write_command_init_delete_idl (mongoc_write_command_t *command,
268 const bson_t *selector,
269 const bson_t *cmd_opts,
270 const bson_t *opts,
271 int64_t operation_id)
272 {
273 mongoc_bulk_write_flags_t flags = MONGOC_BULK_WRITE_FLAGS_INIT;
274
275 ENTRY;
276
277 BSON_ASSERT (command);
278 BSON_ASSERT (selector);
279
280 _mongoc_write_command_init_bulk (
281 command, MONGOC_WRITE_COMMAND_DELETE, flags, operation_id, cmd_opts);
282
283 _mongoc_write_command_delete_append (command, selector, opts);
284
285 EXIT;
286 }
287
288
289 void
_mongoc_write_command_init_update(mongoc_write_command_t * command,const bson_t * selector,const bson_t * update,const bson_t * opts,mongoc_bulk_write_flags_t flags,int64_t operation_id)290 _mongoc_write_command_init_update (mongoc_write_command_t *command, /* IN */
291 const bson_t *selector, /* IN */
292 const bson_t *update, /* IN */
293 const bson_t *opts, /* IN */
294 mongoc_bulk_write_flags_t flags, /* IN */
295 int64_t operation_id) /* IN */
296 {
297 ENTRY;
298
299 BSON_ASSERT (command);
300 BSON_ASSERT (selector);
301 BSON_ASSERT (update);
302
303 _mongoc_write_command_init_bulk (
304 command, MONGOC_WRITE_COMMAND_UPDATE, flags, operation_id, NULL);
305 _mongoc_write_command_update_append (command, selector, update, opts);
306
307 EXIT;
308 }
309
310
311 void
_mongoc_write_command_init_update_idl(mongoc_write_command_t * command,const bson_t * selector,const bson_t * update,const bson_t * opts,int64_t operation_id)312 _mongoc_write_command_init_update_idl (mongoc_write_command_t *command,
313 const bson_t *selector,
314 const bson_t *update,
315 const bson_t *opts,
316 int64_t operation_id)
317 {
318 mongoc_bulk_write_flags_t flags = MONGOC_BULK_WRITE_FLAGS_INIT;
319
320 ENTRY;
321
322 BSON_ASSERT (command);
323
324 _mongoc_write_command_init_bulk (
325 command, MONGOC_WRITE_COMMAND_UPDATE, flags, operation_id, NULL);
326 _mongoc_write_command_update_append (command, selector, update, opts);
327
328 EXIT;
329 }
330
331
332 /* takes initialized bson_t *doc and begins formatting a write command */
333 void
_mongoc_write_command_init(bson_t * doc,mongoc_write_command_t * command,const char * collection)334 _mongoc_write_command_init (bson_t *doc,
335 mongoc_write_command_t *command,
336 const char *collection)
337 {
338 ENTRY;
339
340 if (!command->n_documents) {
341 EXIT;
342 }
343
344 BSON_APPEND_UTF8 (doc, gCommandNames[command->type], collection);
345 BSON_APPEND_BOOL (doc, "ordered", command->flags.ordered);
346
347 if (command->flags.bypass_document_validation) {
348 BSON_APPEND_BOOL (doc,
349 "bypassDocumentValidation",
350 command->flags.bypass_document_validation);
351 }
352
353 EXIT;
354 }
355
356
357 /*
358 *-------------------------------------------------------------------------
359 *
360 * _mongoc_write_command_too_large_error --
361 *
362 * Fill a bson_error_t and optional bson_t with error info after
363 * receiving a document for bulk insert, update, or remove that is
364 * larger than max_bson_size.
365 *
366 * "err_doc" should be NULL or an empty initialized bson_t.
367 *
368 * Returns:
369 * None.
370 *
371 * Side effects:
372 * "error" and optionally "err_doc" are filled out.
373 *
374 *-------------------------------------------------------------------------
375 */
376
377 void
_mongoc_write_command_too_large_error(bson_error_t * error,int32_t idx,int32_t len,int32_t max_bson_size)378 _mongoc_write_command_too_large_error (bson_error_t *error,
379 int32_t idx,
380 int32_t len,
381 int32_t max_bson_size)
382 {
383 bson_set_error (error,
384 MONGOC_ERROR_BSON,
385 MONGOC_ERROR_BSON_INVALID,
386 "Document %u is too large for the cluster. "
387 "Document is %u bytes, max is %d.",
388 idx,
389 len,
390 max_bson_size);
391 }
392
393
394 void
_empty_error(mongoc_write_command_t * command,bson_error_t * error)395 _empty_error (mongoc_write_command_t *command, bson_error_t *error)
396 {
397 static const uint32_t codes[] = {MONGOC_ERROR_COLLECTION_DELETE_FAILED,
398 MONGOC_ERROR_COLLECTION_INSERT_FAILED,
399 MONGOC_ERROR_COLLECTION_UPDATE_FAILED};
400
401 bson_set_error (error,
402 MONGOC_ERROR_COLLECTION,
403 codes[command->type],
404 "Cannot do an empty %s",
405 gCommandNames[command->type]);
406 }
407
408
409 bool
_mongoc_write_command_will_overflow(uint32_t len_so_far,uint32_t document_len,uint32_t n_documents_written,int32_t max_bson_size,int32_t max_write_batch_size)410 _mongoc_write_command_will_overflow (uint32_t len_so_far,
411 uint32_t document_len,
412 uint32_t n_documents_written,
413 int32_t max_bson_size,
414 int32_t max_write_batch_size)
415 {
416 /* max BSON object size + 16k bytes.
417 * server guarantees there is enough room: SERVER-10643
418 */
419 int32_t max_cmd_size = max_bson_size + BSON_OBJECT_ALLOWANCE;
420
421 BSON_ASSERT (max_bson_size);
422
423 if (len_so_far + document_len > max_cmd_size) {
424 return true;
425 } else if (max_write_batch_size > 0 &&
426 n_documents_written >= max_write_batch_size) {
427 return true;
428 }
429
430 return false;
431 }
432
433
434 static void
_mongoc_write_opmsg(mongoc_write_command_t * command,mongoc_client_t * client,mongoc_server_stream_t * server_stream,const char * database,const char * collection,const mongoc_write_concern_t * write_concern,uint32_t index_offset,mongoc_client_session_t * cs,mongoc_write_result_t * result,bson_error_t * error)435 _mongoc_write_opmsg (mongoc_write_command_t *command,
436 mongoc_client_t *client,
437 mongoc_server_stream_t *server_stream,
438 const char *database,
439 const char *collection,
440 const mongoc_write_concern_t *write_concern,
441 uint32_t index_offset,
442 mongoc_client_session_t *cs,
443 mongoc_write_result_t *result,
444 bson_error_t *error)
445 {
446 mongoc_cmd_parts_t parts;
447 bson_iter_t iter;
448 bson_t cmd;
449 bson_t reply;
450 bool ret = false;
451 int32_t max_msg_size;
452 int32_t max_bson_obj_size;
453 int32_t max_document_count;
454 uint32_t header;
455 uint32_t payload_batch_size = 0;
456 uint32_t payload_total_offset = 0;
457 bool ship_it = false;
458 int document_count = 0;
459 int32_t len;
460 mongoc_server_stream_t *retry_server_stream = NULL;
461
462 ENTRY;
463
464 BSON_ASSERT (command);
465 BSON_ASSERT (client);
466 BSON_ASSERT (database);
467 BSON_ASSERT (server_stream);
468 BSON_ASSERT (collection);
469
470 max_bson_obj_size = mongoc_server_stream_max_bson_obj_size (server_stream);
471 max_msg_size = mongoc_server_stream_max_msg_size (server_stream);
472 if (_mongoc_cse_is_enabled (client)) {
473 max_msg_size = MONGOC_REDUCED_MAX_MSG_SIZE_FOR_FLE;
474 }
475 max_document_count =
476 mongoc_server_stream_max_write_batch_size (server_stream);
477
478 bson_init (&cmd);
479 _mongoc_write_command_init (&cmd, command, collection);
480 mongoc_cmd_parts_init (&parts, client, database, MONGOC_QUERY_NONE, &cmd);
481 parts.assembled.operation_id = command->operation_id;
482 parts.is_write_command = true;
483 if (!mongoc_cmd_parts_set_write_concern (
484 &parts, write_concern, server_stream->sd->max_wire_version, error)) {
485 bson_destroy (&cmd);
486 mongoc_cmd_parts_cleanup (&parts);
487 EXIT;
488 }
489
490 if (parts.assembled.is_acknowledged) {
491 mongoc_cmd_parts_set_session (&parts, cs);
492 }
493
494 /* Write commands that include multi-document operations are not retryable.
495 * Set this explicitly so that mongoc_cmd_parts_assemble does not need to
496 * inspect the command body later. */
497 parts.allow_txn_number =
498 (command->flags.has_multi_write || !parts.assembled.is_acknowledged)
499 ? MONGOC_CMD_PARTS_ALLOW_TXN_NUMBER_NO
500 : MONGOC_CMD_PARTS_ALLOW_TXN_NUMBER_YES;
501
502 BSON_ASSERT (bson_iter_init (&iter, &command->cmd_opts));
503 if (!mongoc_cmd_parts_append_opts (
504 &parts, &iter, server_stream->sd->max_wire_version, error)) {
505 bson_destroy (&cmd);
506 mongoc_cmd_parts_cleanup (&parts);
507 EXIT;
508 }
509
510 if (!mongoc_cmd_parts_assemble (&parts, server_stream, error)) {
511 bson_destroy (&cmd);
512 mongoc_cmd_parts_cleanup (&parts);
513 EXIT;
514 }
515
516 /*
517 * OP_MSG header == 16 byte
518 * + 4 bytes flagBits
519 * + 1 byte payload type = 1
520 * + 1 byte payload type = 2
521 * + 4 byte size of payload
522 * == 26 bytes opcode overhead
523 * + X Full command document {insert: "test", writeConcern: {...}}
524 * + Y command identifier ("documents", "deletes", "updates") ( + \0)
525 */
526
527 header =
528 26 + parts.assembled.command->len + gCommandFieldLens[command->type] + 1;
529
530 do {
531 memcpy (&len,
532 command->payload.data + payload_batch_size + payload_total_offset,
533 4);
534 len = BSON_UINT32_FROM_LE (len);
535
536 if (len > max_bson_obj_size + BSON_OBJECT_ALLOWANCE) {
537 /* Quit if the document is too large */
538 _mongoc_write_command_too_large_error (
539 error, index_offset, len, max_bson_obj_size);
540 result->failed = true;
541 break;
542
543 } else if ((payload_batch_size + header) + len <= max_msg_size ||
544 document_count == 0) {
545 /* The current batch is still under max batch size in bytes */
546 payload_batch_size += len;
547
548 /* If this document filled the maximum document count */
549 if (++document_count == max_document_count) {
550 ship_it = true;
551 /* If this document is the last document we have */
552 } else if (payload_batch_size + payload_total_offset ==
553 command->payload.len) {
554 ship_it = true;
555 } else {
556 ship_it = false;
557 }
558 } else {
559 ship_it = true;
560 }
561
562 if (ship_it) {
563 bool is_retryable = parts.is_retryable_write;
564 mongoc_write_err_type_t error_type;
565
566 /* Seek past the document offset we have already sent */
567 parts.assembled.payload = command->payload.data + payload_total_offset;
568 /* Only send the documents up to this size */
569 parts.assembled.payload_size = payload_batch_size;
570 parts.assembled.payload_identifier = gCommandFields[command->type];
571
572 /* increment the transaction number for the first attempt of each
573 * retryable write command */
574 if (is_retryable) {
575 bson_iter_t txn_number_iter;
576 BSON_ASSERT (bson_iter_init_find (
577 &txn_number_iter, parts.assembled.command, "txnNumber"));
578 bson_iter_overwrite_int64 (
579 &txn_number_iter,
580 ++parts.assembled.session->server_session->txn_number);
581 }
582 retry:
583 ret = mongoc_cluster_run_command_monitored (
584 &client->cluster, &parts.assembled, &reply, error);
585
586 if (parts.is_retryable_write) {
587 _mongoc_write_error_handle_labels (
588 ret, error, &reply, server_stream->sd->max_wire_version);
589 }
590
591 /* Add this batch size so we skip these documents next time */
592 payload_total_offset += payload_batch_size;
593 payload_batch_size = 0;
594
595 /* If a retryable error is encountered and the write is retryable,
596 * select a new writable stream and retry. If server selection fails or
597 * the selected server does not support retryable writes, fall through
598 * and allow the original error to be reported. */
599 error_type = _mongoc_write_error_get_type (&reply);
600 if (is_retryable) {
601 _mongoc_write_error_update_if_unsupported_storage_engine (
602 ret, error, &reply);
603 }
604 if (is_retryable && error_type == MONGOC_WRITE_ERR_RETRY) {
605 bson_error_t ignored_error;
606
607 /* each write command may be retried at most once */
608 is_retryable = false;
609
610 if (retry_server_stream) {
611 mongoc_server_stream_cleanup (retry_server_stream);
612 }
613
614 retry_server_stream = mongoc_cluster_stream_for_writes (
615 &client->cluster, cs, NULL, &ignored_error);
616
617 if (retry_server_stream &&
618 retry_server_stream->sd->max_wire_version >=
619 WIRE_VERSION_RETRY_WRITES) {
620 parts.assembled.server_stream = retry_server_stream;
621 bson_destroy (&reply);
622 GOTO (retry);
623 }
624 }
625
626 if (!ret) {
627 result->failed = true;
628 /* Stop for ordered bulk writes or when the server stream has been
629 * properly invalidated (e.g., due to a network error). */
630 if (command->flags.ordered || !mongoc_cluster_stream_valid (
631 &client->cluster, server_stream)) {
632 result->must_stop = true;
633 }
634 }
635
636 /* Result merge needs to know the absolute index for a document
637 * so it can rewrite the error message which contains the relative
638 * document index per batch
639 */
640 _mongoc_write_result_merge (result, command, &reply, index_offset);
641 index_offset += document_count;
642 document_count = 0;
643 bson_destroy (&reply);
644 }
645 /* While we have more documents to write */
646 } while (payload_total_offset < command->payload.len && !result->must_stop);
647
648 bson_destroy (&cmd);
649 mongoc_cmd_parts_cleanup (&parts);
650
651 if (retry_server_stream) {
652 if (ret) {
653 /* if a retry succeeded, report that in the result so bulk write can
654 * use the newly selected server. */
655 result->retry_server_id =
656 mongoc_server_description_id (retry_server_stream->sd);
657 }
658 mongoc_server_stream_cleanup (retry_server_stream);
659 }
660
661 if (ret) {
662 /* if a retry succeeded, clear the initial error */
663 memset (&result->error, 0, sizeof (bson_error_t));
664 }
665
666 EXIT;
667 }
668
669
670 void
_append_array_from_command(mongoc_write_command_t * command,bson_t * bson)671 _append_array_from_command (mongoc_write_command_t *command, bson_t *bson)
672 {
673 bson_t ar;
674 bson_reader_t *reader;
675 char str[16];
676 uint32_t i = 0;
677 const char *key;
678 bool eof;
679 const bson_t *current;
680
681
682 reader =
683 bson_reader_new_from_data (command->payload.data, command->payload.len);
684
685 bson_append_array_begin (bson,
686 gCommandFields[command->type],
687 gCommandFieldLens[command->type],
688 &ar);
689
690 while ((current = bson_reader_read (reader, &eof))) {
691 bson_uint32_to_string (i, &key, str, sizeof str);
692 BSON_APPEND_DOCUMENT (&ar, key, current);
693 i++;
694 }
695
696 bson_append_array_end (bson, &ar);
697 bson_reader_destroy (reader);
698 }
699
700 /* Assemble the base @cmd with all of the command options.
701 * @parts is always initialized, even on error.
702 * This is called twice in _mongoc_write_opquery.
703 * Once with no payload documents, to determine the total size. And once with
704 * payload documents, to send the final command. */
705 static bool
_assemble_cmd(bson_t * cmd,mongoc_write_command_t * command,mongoc_client_t * client,mongoc_server_stream_t * server_stream,const char * database,const mongoc_write_concern_t * write_concern,mongoc_cmd_parts_t * parts,bson_error_t * error)706 _assemble_cmd (bson_t *cmd,
707 mongoc_write_command_t *command,
708 mongoc_client_t *client,
709 mongoc_server_stream_t *server_stream,
710 const char *database,
711 const mongoc_write_concern_t *write_concern,
712 mongoc_cmd_parts_t *parts,
713 bson_error_t *error)
714 {
715 bool ret;
716 bson_iter_t iter;
717
718 mongoc_cmd_parts_init (parts, client, database, MONGOC_QUERY_NONE, cmd);
719 parts->is_write_command = true;
720 parts->assembled.operation_id = command->operation_id;
721
722 ret = mongoc_cmd_parts_set_write_concern (
723 parts, write_concern, server_stream->sd->max_wire_version, error);
724 if (ret) {
725 BSON_ASSERT (bson_iter_init (&iter, &command->cmd_opts));
726 ret = mongoc_cmd_parts_append_opts (
727 parts, &iter, server_stream->sd->max_wire_version, error);
728 }
729 if (ret) {
730 ret = mongoc_cmd_parts_assemble (parts, server_stream, error);
731 }
732 return ret;
733 }
734
735 static void
_mongoc_write_opquery(mongoc_write_command_t * command,mongoc_client_t * client,mongoc_server_stream_t * server_stream,const char * database,const char * collection,const mongoc_write_concern_t * write_concern,uint32_t offset,mongoc_write_result_t * result,bson_error_t * error)736 _mongoc_write_opquery (mongoc_write_command_t *command,
737 mongoc_client_t *client,
738 mongoc_server_stream_t *server_stream,
739 const char *database,
740 const char *collection,
741 const mongoc_write_concern_t *write_concern,
742 uint32_t offset,
743 mongoc_write_result_t *result,
744 bson_error_t *error)
745 {
746 mongoc_cmd_parts_t parts;
747 const char *key;
748 uint32_t len = 0;
749 bson_t ar;
750 bson_t cmd;
751 char str[16];
752 bool has_more;
753 bool ret = false;
754 uint32_t i;
755 int32_t max_bson_obj_size;
756 int32_t max_write_batch_size;
757 uint32_t overhead;
758 uint32_t key_len;
759 int data_offset = 0;
760 bson_reader_t *reader;
761 const bson_t *bson;
762 bool eof;
763
764 ENTRY;
765
766 BSON_ASSERT (command);
767 BSON_ASSERT (client);
768 BSON_ASSERT (database);
769 BSON_ASSERT (server_stream);
770 BSON_ASSERT (collection);
771
772 bson_init (&cmd);
773 max_bson_obj_size = mongoc_server_stream_max_bson_obj_size (server_stream);
774 max_write_batch_size =
775 mongoc_server_stream_max_write_batch_size (server_stream);
776
777 again:
778 has_more = false;
779 i = 0;
780
781 _mongoc_write_command_init (&cmd, command, collection);
782 /* If any part of assembling failed, return with failure. */
783 if (!_assemble_cmd (&cmd,
784 command,
785 client,
786 server_stream,
787 database,
788 write_concern,
789 &parts,
790 error)) {
791 result->failed = true;
792 bson_destroy (&cmd);
793 mongoc_cmd_parts_cleanup (&parts);
794 EXIT;
795 }
796
797 /* Use the assembled command to compute the overhead, since it may be a new
798 * BSON document with options applied. If no options were applied, then
799 * parts.assembled.command points to cmd. The constant 2 is due to 1 byte to
800 * specify array type and 1 byte for field name's null terminator. */
801 overhead =
802 parts.assembled.command->len + 2 + gCommandFieldLens[command->type];
803 /* Toss out the assembled command, we'll assemble again after adding all of
804 * the payload documents. */
805 mongoc_cmd_parts_cleanup (&parts);
806
807 reader = bson_reader_new_from_data (command->payload.data + data_offset,
808 command->payload.len - data_offset);
809
810 bson_append_array_begin (&cmd,
811 gCommandFields[command->type],
812 gCommandFieldLens[command->type],
813 &ar);
814
815 while ((bson = bson_reader_read (reader, &eof))) {
816 key_len = (uint32_t) bson_uint32_to_string (i, &key, str, sizeof str);
817 len = bson->len;
818 /* 1 byte to specify document type, 1 byte for key's null terminator */
819 if (_mongoc_write_command_will_overflow (overhead,
820 key_len + len + 2 + ar.len,
821 i,
822 max_bson_obj_size,
823 max_write_batch_size)) {
824 has_more = true;
825 break;
826 }
827 BSON_APPEND_DOCUMENT (&ar, key, bson);
828 data_offset += len;
829 i++;
830 }
831
832 bson_append_array_end (&cmd, &ar);
833
834 if (!i) {
835 _mongoc_write_command_too_large_error (error, i, len, max_bson_obj_size);
836 result->failed = true;
837 result->must_stop = true;
838 ret = false;
839 if (bson) {
840 data_offset += len;
841 }
842 } else {
843 bson_t reply;
844
845 ret = _assemble_cmd (&cmd,
846 command,
847 client,
848 server_stream,
849 database,
850 write_concern,
851 &parts,
852 error);
853 if (ret) {
854 ret = mongoc_cluster_run_command_monitored (
855 &client->cluster, &parts.assembled, &reply, error);
856 } else {
857 bson_init (&reply);
858 }
859
860 if (!ret) {
861 result->failed = true;
862 if (bson_empty (&reply) ||
863 !mongoc_cluster_stream_valid (&client->cluster, server_stream)) {
864 /* assembling failed, or a network error running the command */
865 result->must_stop = true;
866 }
867 }
868
869 _mongoc_write_result_merge (result, command, &reply, offset);
870 offset += i;
871 bson_destroy (&reply);
872 mongoc_cmd_parts_cleanup (&parts);
873 }
874 bson_reader_destroy (reader);
875
876 if (has_more && (ret || !command->flags.ordered) && !result->must_stop) {
877 bson_reinit (&cmd);
878 GOTO (again);
879 }
880
881 bson_destroy (&cmd);
882 EXIT;
883 }
884
885
886 void
_mongoc_write_command_execute(mongoc_write_command_t * command,mongoc_client_t * client,mongoc_server_stream_t * server_stream,const char * database,const char * collection,const mongoc_write_concern_t * write_concern,uint32_t offset,mongoc_client_session_t * cs,mongoc_write_result_t * result)887 _mongoc_write_command_execute (
888 mongoc_write_command_t *command, /* IN */
889 mongoc_client_t *client, /* IN */
890 mongoc_server_stream_t *server_stream, /* IN */
891 const char *database, /* IN */
892 const char *collection, /* IN */
893 const mongoc_write_concern_t *write_concern, /* IN */
894 uint32_t offset, /* IN */
895 mongoc_client_session_t *cs, /* IN */
896 mongoc_write_result_t *result) /* OUT */
897 {
898 mongoc_crud_opts_t crud = {0};
899
900 ENTRY;
901
902 BSON_ASSERT (command);
903 BSON_ASSERT (client);
904 BSON_ASSERT (server_stream);
905 BSON_ASSERT (database);
906 BSON_ASSERT (collection);
907 BSON_ASSERT (result);
908
909 if (!write_concern) {
910 write_concern = client->write_concern;
911 }
912
913 if (!mongoc_write_concern_is_valid (write_concern)) {
914 bson_set_error (&result->error,
915 MONGOC_ERROR_COMMAND,
916 MONGOC_ERROR_COMMAND_INVALID_ARG,
917 "The write concern is invalid.");
918 result->failed = true;
919 EXIT;
920 }
921
922 crud.client_session = cs;
923 crud.writeConcern = (mongoc_write_concern_t *) write_concern;
924
925 _mongoc_write_command_execute_idl (command,
926 client,
927 server_stream,
928 database,
929 collection,
930 offset,
931 &crud,
932 result);
933 EXIT;
934 }
935
936 void
_mongoc_write_command_execute_idl(mongoc_write_command_t * command,mongoc_client_t * client,mongoc_server_stream_t * server_stream,const char * database,const char * collection,uint32_t offset,const mongoc_crud_opts_t * crud,mongoc_write_result_t * result)937 _mongoc_write_command_execute_idl (mongoc_write_command_t *command,
938 mongoc_client_t *client,
939 mongoc_server_stream_t *server_stream,
940 const char *database,
941 const char *collection,
942 uint32_t offset,
943 const mongoc_crud_opts_t *crud,
944 mongoc_write_result_t *result)
945 {
946 ENTRY;
947
948 BSON_ASSERT (command);
949 BSON_ASSERT (client);
950 BSON_ASSERT (server_stream);
951 BSON_ASSERT (database);
952 BSON_ASSERT (collection);
953 BSON_ASSERT (result);
954
955 if (command->flags.has_collation) {
956 if (!mongoc_write_concern_is_acknowledged (crud->writeConcern)) {
957 result->failed = true;
958 bson_set_error (&result->error,
959 MONGOC_ERROR_COMMAND,
960 MONGOC_ERROR_COMMAND_INVALID_ARG,
961 "Cannot set collation for unacknowledged writes");
962 EXIT;
963 }
964
965 if (server_stream->sd->max_wire_version < WIRE_VERSION_COLLATION) {
966 bson_set_error (&result->error,
967 MONGOC_ERROR_COMMAND,
968 MONGOC_ERROR_PROTOCOL_BAD_WIRE_VERSION,
969 "The selected server does not support collation");
970 result->failed = true;
971 EXIT;
972 }
973 }
974
975 if (command->flags.has_array_filters) {
976 if (!mongoc_write_concern_is_acknowledged (crud->writeConcern)) {
977 result->failed = true;
978 bson_set_error (&result->error,
979 MONGOC_ERROR_COMMAND,
980 MONGOC_ERROR_COMMAND_INVALID_ARG,
981 "Cannot use array filters with unacknowledged writes");
982 EXIT;
983 }
984
985 if (server_stream->sd->max_wire_version < WIRE_VERSION_ARRAY_FILTERS) {
986 bson_set_error (&result->error,
987 MONGOC_ERROR_COMMAND,
988 MONGOC_ERROR_PROTOCOL_BAD_WIRE_VERSION,
989 "The selected server does not support array filters");
990 result->failed = true;
991 EXIT;
992 }
993 }
994
995 if (command->flags.has_update_hint) {
996 if (server_stream->sd->max_wire_version <
997 WIRE_VERSION_HINT_SERVER_SIDE_ERROR ||
998 (server_stream->sd->max_wire_version < WIRE_VERSION_UPDATE_HINT &&
999 !mongoc_write_concern_is_acknowledged (crud->writeConcern))) {
1000 bson_set_error (
1001 &result->error,
1002 MONGOC_ERROR_COMMAND,
1003 MONGOC_ERROR_PROTOCOL_BAD_WIRE_VERSION,
1004 "The selected server does not support hint for update");
1005 result->failed = true;
1006 EXIT;
1007 }
1008 }
1009
1010 if (command->flags.has_delete_hint) {
1011 if (server_stream->sd->max_wire_version <
1012 WIRE_VERSION_HINT_SERVER_SIDE_ERROR ||
1013 (server_stream->sd->max_wire_version < WIRE_VERSION_DELETE_HINT &&
1014 !mongoc_write_concern_is_acknowledged (crud->writeConcern))) {
1015 bson_set_error (
1016 &result->error,
1017 MONGOC_ERROR_COMMAND,
1018 MONGOC_ERROR_COMMAND_INVALID_ARG,
1019 "The selected server does not support hint for delete");
1020 result->failed = true;
1021 EXIT;
1022 }
1023 }
1024
1025 if (command->flags.bypass_document_validation) {
1026 if (!mongoc_write_concern_is_acknowledged (crud->writeConcern)) {
1027 result->failed = true;
1028 bson_set_error (
1029 &result->error,
1030 MONGOC_ERROR_COMMAND,
1031 MONGOC_ERROR_COMMAND_INVALID_ARG,
1032 "Cannot set bypassDocumentValidation for unacknowledged writes");
1033 EXIT;
1034 }
1035 }
1036
1037 if (crud->client_session &&
1038 !mongoc_write_concern_is_acknowledged (crud->writeConcern)) {
1039 result->failed = true;
1040 bson_set_error (&result->error,
1041 MONGOC_ERROR_COMMAND,
1042 MONGOC_ERROR_COMMAND_INVALID_ARG,
1043 "Cannot use client session with unacknowledged writes");
1044 EXIT;
1045 }
1046
1047 if (command->payload.len == 0) {
1048 _empty_error (command, &result->error);
1049 EXIT;
1050 }
1051
1052 if (server_stream->sd->max_wire_version >= WIRE_VERSION_OP_MSG) {
1053 _mongoc_write_opmsg (command,
1054 client,
1055 server_stream,
1056 database,
1057 collection,
1058 crud->writeConcern,
1059 offset,
1060 crud->client_session,
1061 result,
1062 &result->error);
1063 } else {
1064 if (mongoc_write_concern_is_acknowledged (crud->writeConcern)) {
1065 _mongoc_write_opquery (command,
1066 client,
1067 server_stream,
1068 database,
1069 collection,
1070 crud->writeConcern,
1071 offset,
1072 result,
1073 &result->error);
1074 } else {
1075 gLegacyWriteOps[command->type](command,
1076 client,
1077 server_stream,
1078 database,
1079 collection,
1080 offset,
1081 result,
1082 &result->error);
1083 }
1084 }
1085
1086 EXIT;
1087 }
1088
1089
1090 void
_mongoc_write_command_destroy(mongoc_write_command_t * command)1091 _mongoc_write_command_destroy (mongoc_write_command_t *command)
1092 {
1093 ENTRY;
1094
1095 if (command) {
1096 bson_destroy (&command->cmd_opts);
1097 _mongoc_buffer_destroy (&command->payload);
1098 }
1099
1100 EXIT;
1101 }
1102
1103
1104 void
_mongoc_write_result_init(mongoc_write_result_t * result)1105 _mongoc_write_result_init (mongoc_write_result_t *result) /* IN */
1106 {
1107 ENTRY;
1108
1109 BSON_ASSERT (result);
1110
1111 memset (result, 0, sizeof *result);
1112
1113 bson_init (&result->upserted);
1114 bson_init (&result->writeConcernErrors);
1115 bson_init (&result->writeErrors);
1116 bson_init (&result->errorLabels);
1117
1118 EXIT;
1119 }
1120
1121
1122 void
_mongoc_write_result_destroy(mongoc_write_result_t * result)1123 _mongoc_write_result_destroy (mongoc_write_result_t *result)
1124 {
1125 ENTRY;
1126
1127 BSON_ASSERT (result);
1128
1129 bson_destroy (&result->upserted);
1130 bson_destroy (&result->writeConcernErrors);
1131 bson_destroy (&result->writeErrors);
1132 bson_destroy (&result->errorLabels);
1133
1134 EXIT;
1135 }
1136
1137
1138 void
_mongoc_write_result_append_upsert(mongoc_write_result_t * result,int32_t idx,const bson_value_t * value)1139 _mongoc_write_result_append_upsert (mongoc_write_result_t *result,
1140 int32_t idx,
1141 const bson_value_t *value)
1142 {
1143 bson_t child;
1144 const char *keyptr = NULL;
1145 char key[12];
1146 int len;
1147
1148 BSON_ASSERT (result);
1149 BSON_ASSERT (value);
1150
1151 len = (int) bson_uint32_to_string (
1152 result->upsert_append_count, &keyptr, key, sizeof key);
1153
1154 bson_append_document_begin (&result->upserted, keyptr, len, &child);
1155 BSON_APPEND_INT32 (&child, "index", idx);
1156 BSON_APPEND_VALUE (&child, "_id", value);
1157 bson_append_document_end (&result->upserted, &child);
1158
1159 result->upsert_append_count++;
1160 }
1161
1162
1163 int32_t
_mongoc_write_result_merge_arrays(uint32_t offset,mongoc_write_result_t * result,bson_t * dest,bson_iter_t * iter)1164 _mongoc_write_result_merge_arrays (uint32_t offset,
1165 mongoc_write_result_t *result, /* IN */
1166 bson_t *dest, /* IN */
1167 bson_iter_t *iter) /* IN */
1168 {
1169 const bson_value_t *value;
1170 bson_iter_t ar;
1171 bson_iter_t citer;
1172 int32_t idx;
1173 int32_t count = 0;
1174 int32_t aridx;
1175 bson_t child;
1176 const char *keyptr = NULL;
1177 char key[12];
1178 int len;
1179
1180 ENTRY;
1181
1182 BSON_ASSERT (result);
1183 BSON_ASSERT (dest);
1184 BSON_ASSERT (iter);
1185 BSON_ASSERT (BSON_ITER_HOLDS_ARRAY (iter));
1186
1187 aridx = bson_count_keys (dest);
1188
1189 if (bson_iter_recurse (iter, &ar)) {
1190 while (bson_iter_next (&ar)) {
1191 if (BSON_ITER_HOLDS_DOCUMENT (&ar) &&
1192 bson_iter_recurse (&ar, &citer)) {
1193 len =
1194 (int) bson_uint32_to_string (aridx++, &keyptr, key, sizeof key);
1195 bson_append_document_begin (dest, keyptr, len, &child);
1196 while (bson_iter_next (&citer)) {
1197 if (BSON_ITER_IS_KEY (&citer, "index")) {
1198 idx = bson_iter_int32 (&citer) + offset;
1199 BSON_APPEND_INT32 (&child, "index", idx);
1200 } else {
1201 value = bson_iter_value (&citer);
1202 BSON_APPEND_VALUE (&child, bson_iter_key (&citer), value);
1203 }
1204 }
1205 bson_append_document_end (dest, &child);
1206 count++;
1207 }
1208 }
1209 }
1210
1211 RETURN (count);
1212 }
1213
1214
1215 void
_mongoc_write_result_merge(mongoc_write_result_t * result,mongoc_write_command_t * command,const bson_t * reply,uint32_t offset)1216 _mongoc_write_result_merge (mongoc_write_result_t *result, /* IN */
1217 mongoc_write_command_t *command, /* IN */
1218 const bson_t *reply, /* IN */
1219 uint32_t offset)
1220 {
1221 int32_t server_index = 0;
1222 const bson_value_t *value;
1223 bson_iter_t iter;
1224 bson_iter_t citer;
1225 bson_iter_t ar;
1226 int32_t n_upserted = 0;
1227 int32_t affected = 0;
1228
1229 ENTRY;
1230
1231 BSON_ASSERT (result);
1232 BSON_ASSERT (reply);
1233
1234 if (bson_iter_init_find (&iter, reply, "n") &&
1235 BSON_ITER_HOLDS_INT32 (&iter)) {
1236 affected = bson_iter_int32 (&iter);
1237 }
1238
1239 if (bson_iter_init_find (&iter, reply, "writeErrors") &&
1240 BSON_ITER_HOLDS_ARRAY (&iter) && bson_iter_recurse (&iter, &citer) &&
1241 bson_iter_next (&citer)) {
1242 result->failed = true;
1243 }
1244
1245 switch (command->type) {
1246 case MONGOC_WRITE_COMMAND_INSERT:
1247 result->nInserted += affected;
1248 break;
1249 case MONGOC_WRITE_COMMAND_DELETE:
1250 result->nRemoved += affected;
1251 break;
1252 case MONGOC_WRITE_COMMAND_UPDATE:
1253
1254 /* server returns each upserted _id with its index into this batch
1255 * look for "upserted": [{"index": 4, "_id": ObjectId()}, ...] */
1256 if (bson_iter_init_find (&iter, reply, "upserted")) {
1257 if (BSON_ITER_HOLDS_ARRAY (&iter) &&
1258 (bson_iter_recurse (&iter, &ar))) {
1259 while (bson_iter_next (&ar)) {
1260 if (BSON_ITER_HOLDS_DOCUMENT (&ar) &&
1261 bson_iter_recurse (&ar, &citer) &&
1262 bson_iter_find (&citer, "index") &&
1263 BSON_ITER_HOLDS_INT32 (&citer)) {
1264 server_index = bson_iter_int32 (&citer);
1265
1266 if (bson_iter_recurse (&ar, &citer) &&
1267 bson_iter_find (&citer, "_id")) {
1268 value = bson_iter_value (&citer);
1269 _mongoc_write_result_append_upsert (
1270 result, offset + server_index, value);
1271 n_upserted++;
1272 }
1273 }
1274 }
1275 }
1276 result->nUpserted += n_upserted;
1277 /*
1278 * XXX: The following addition to nMatched needs some checking.
1279 * I'm highly skeptical of it.
1280 */
1281 result->nMatched += BSON_MAX (0, (affected - n_upserted));
1282 } else {
1283 result->nMatched += affected;
1284 }
1285 if (bson_iter_init_find (&iter, reply, "nModified") &&
1286 BSON_ITER_HOLDS_INT32 (&iter)) {
1287 result->nModified += bson_iter_int32 (&iter);
1288 }
1289 break;
1290 default:
1291 BSON_ASSERT (false);
1292 break;
1293 }
1294
1295 if (bson_iter_init_find (&iter, reply, "writeErrors") &&
1296 BSON_ITER_HOLDS_ARRAY (&iter)) {
1297 _mongoc_write_result_merge_arrays (
1298 offset, result, &result->writeErrors, &iter);
1299 }
1300
1301 if (bson_iter_init_find (&iter, reply, "writeConcernError") &&
1302 BSON_ITER_HOLDS_DOCUMENT (&iter)) {
1303 uint32_t len;
1304 const uint8_t *data;
1305 bson_t write_concern_error;
1306 char str[16];
1307 const char *key;
1308
1309 /* writeConcernError is a subdocument in the server response
1310 * append it to the result->writeConcernErrors array */
1311 bson_iter_document (&iter, &len, &data);
1312 BSON_ASSERT (bson_init_static (&write_concern_error, data, len));
1313
1314 bson_uint32_to_string (
1315 result->n_writeConcernErrors, &key, str, sizeof str);
1316
1317 if (!bson_append_document (
1318 &result->writeConcernErrors, key, -1, &write_concern_error)) {
1319 MONGOC_ERROR ("Error adding \"%s\" to writeConcernErrors.\n", key);
1320 }
1321
1322 result->n_writeConcernErrors++;
1323 }
1324
1325 /* inefficient if there are ever large numbers: for each label in each err,
1326 * we linear-search result->errorLabels to see if it's included yet */
1327 _mongoc_bson_array_copy_labels_to (reply, &result->errorLabels);
1328
1329 EXIT;
1330 }
1331
1332
1333 /*
1334 * If error is not set, set code from first document in array like
1335 * [{"code": 64, "errmsg": "duplicate"}, ...]. Format the error message
1336 * from all errors in array.
1337 */
1338 static void
_set_error_from_response(bson_t * bson_array,mongoc_error_domain_t domain,const char * error_type,bson_error_t * error)1339 _set_error_from_response (bson_t *bson_array,
1340 mongoc_error_domain_t domain,
1341 const char *error_type,
1342 bson_error_t *error /* OUT */)
1343 {
1344 bson_iter_t array_iter;
1345 bson_iter_t doc_iter;
1346 bson_string_t *compound_err;
1347 const char *errmsg = NULL;
1348 int32_t code = 0;
1349 uint32_t n_keys, i;
1350
1351 compound_err = bson_string_new (NULL);
1352 n_keys = bson_count_keys (bson_array);
1353 if (n_keys > 1) {
1354 bson_string_append_printf (
1355 compound_err, "Multiple %s errors: ", error_type);
1356 }
1357
1358 if (!bson_empty0 (bson_array) && bson_iter_init (&array_iter, bson_array)) {
1359 /* get first code and all error messages */
1360 i = 0;
1361
1362 while (bson_iter_next (&array_iter)) {
1363 if (BSON_ITER_HOLDS_DOCUMENT (&array_iter) &&
1364 bson_iter_recurse (&array_iter, &doc_iter)) {
1365 /* parse doc, which is like {"code": 64, "errmsg": "duplicate"} */
1366 while (bson_iter_next (&doc_iter)) {
1367 /* use the first error code we find */
1368 if (BSON_ITER_IS_KEY (&doc_iter, "code") && code == 0) {
1369 code = (uint32_t) bson_iter_as_int64 (&doc_iter);
1370 } else if (BSON_ITER_IS_KEY (&doc_iter, "errmsg")) {
1371 errmsg = bson_iter_utf8 (&doc_iter, NULL);
1372
1373 /* build message like 'Multiple write errors: "foo", "bar"' */
1374 if (n_keys > 1) {
1375 bson_string_append_printf (compound_err, "\"%s\"", errmsg);
1376 if (i < n_keys - 1) {
1377 bson_string_append (compound_err, ", ");
1378 }
1379 } else {
1380 /* single error message */
1381 bson_string_append (compound_err, errmsg);
1382 }
1383 }
1384 }
1385
1386 i++;
1387 }
1388 }
1389
1390 if (code && compound_err->len) {
1391 bson_set_error (
1392 error, domain, (uint32_t) code, "%s", compound_err->str);
1393 }
1394 }
1395
1396 bson_string_free (compound_err, true);
1397 }
1398
1399
1400 /* complete a write result, including only certain fields */
1401 bool
_mongoc_write_result_complete(mongoc_write_result_t * result,int32_t error_api_version,const mongoc_write_concern_t * wc,mongoc_error_domain_t err_domain_override,bson_t * bson,bson_error_t * error,...)1402 _mongoc_write_result_complete (
1403 mongoc_write_result_t *result, /* IN */
1404 int32_t error_api_version, /* IN */
1405 const mongoc_write_concern_t *wc, /* IN */
1406 mongoc_error_domain_t err_domain_override, /* IN */
1407 bson_t *bson, /* OUT */
1408 bson_error_t *error, /* OUT */
1409 ...)
1410 {
1411 mongoc_error_domain_t domain;
1412 va_list args;
1413 const char *field;
1414 int n_args;
1415 bson_iter_t iter;
1416 bson_iter_t child;
1417
1418 ENTRY;
1419
1420 BSON_ASSERT (result);
1421
1422 if (error_api_version >= MONGOC_ERROR_API_VERSION_2) {
1423 domain = MONGOC_ERROR_SERVER;
1424 } else if (err_domain_override) {
1425 domain = err_domain_override;
1426 } else if (result->error.domain) {
1427 domain = (mongoc_error_domain_t) result->error.domain;
1428 } else {
1429 domain = MONGOC_ERROR_COLLECTION;
1430 }
1431
1432 /* produce either old fields like nModified from the deprecated Bulk API Spec
1433 * or new fields like modifiedCount from the CRUD Spec, which we partly obey
1434 */
1435 if (bson && mongoc_write_concern_is_acknowledged (wc)) {
1436 n_args = 0;
1437 va_start (args, error);
1438 while ((field = va_arg (args, const char *))) {
1439 n_args++;
1440
1441 if (!strcmp (field, "nInserted")) {
1442 BSON_APPEND_INT32 (bson, field, result->nInserted);
1443 } else if (!strcmp (field, "insertedCount")) {
1444 BSON_APPEND_INT32 (bson, field, result->nInserted);
1445 } else if (!strcmp (field, "nMatched")) {
1446 BSON_APPEND_INT32 (bson, field, result->nMatched);
1447 } else if (!strcmp (field, "matchedCount")) {
1448 BSON_APPEND_INT32 (bson, field, result->nMatched);
1449 } else if (!strcmp (field, "nModified")) {
1450 BSON_APPEND_INT32 (bson, field, result->nModified);
1451 } else if (!strcmp (field, "modifiedCount")) {
1452 BSON_APPEND_INT32 (bson, field, result->nModified);
1453 } else if (!strcmp (field, "nRemoved")) {
1454 BSON_APPEND_INT32 (bson, field, result->nRemoved);
1455 } else if (!strcmp (field, "deletedCount")) {
1456 BSON_APPEND_INT32 (bson, field, result->nRemoved);
1457 } else if (!strcmp (field, "nUpserted")) {
1458 BSON_APPEND_INT32 (bson, field, result->nUpserted);
1459 } else if (!strcmp (field, "upsertedCount")) {
1460 BSON_APPEND_INT32 (bson, field, result->nUpserted);
1461 } else if (!strcmp (field, "upserted") &&
1462 !bson_empty0 (&result->upserted)) {
1463 BSON_APPEND_ARRAY (bson, field, &result->upserted);
1464 } else if (!strcmp (field, "upsertedId") &&
1465 !bson_empty0 (&result->upserted) &&
1466 bson_iter_init_find (&iter, &result->upserted, "0") &&
1467 bson_iter_recurse (&iter, &child) &&
1468 bson_iter_find (&child, "_id")) {
1469 /* "upsertedId", singular, for update_one() */
1470 BSON_APPEND_VALUE (bson, "upsertedId", bson_iter_value (&child));
1471 }
1472 }
1473
1474 va_end (args);
1475
1476 /* default: a standard result includes all Bulk API fields */
1477 if (!n_args) {
1478 BSON_APPEND_INT32 (bson, "nInserted", result->nInserted);
1479 BSON_APPEND_INT32 (bson, "nMatched", result->nMatched);
1480 BSON_APPEND_INT32 (bson, "nModified", result->nModified);
1481 BSON_APPEND_INT32 (bson, "nRemoved", result->nRemoved);
1482 BSON_APPEND_INT32 (bson, "nUpserted", result->nUpserted);
1483 if (!bson_empty0 (&result->upserted)) {
1484 BSON_APPEND_ARRAY (bson, "upserted", &result->upserted);
1485 }
1486 }
1487
1488 /* always append errors if there are any */
1489 if (!n_args || !bson_empty (&result->writeErrors)) {
1490 BSON_APPEND_ARRAY (bson, "writeErrors", &result->writeErrors);
1491 }
1492
1493 if (result->n_writeConcernErrors) {
1494 BSON_APPEND_ARRAY (
1495 bson, "writeConcernErrors", &result->writeConcernErrors);
1496 }
1497 }
1498
1499 /* set bson_error_t from first write error or write concern error */
1500 _set_error_from_response (
1501 &result->writeErrors, domain, "write", &result->error);
1502
1503 if (!result->error.code) {
1504 _set_error_from_response (&result->writeConcernErrors,
1505 MONGOC_ERROR_WRITE_CONCERN,
1506 "write concern",
1507 &result->error);
1508 }
1509
1510 if (bson && !bson_empty (&result->errorLabels)) {
1511 BSON_APPEND_ARRAY (bson, "errorLabels", &result->errorLabels);
1512 }
1513
1514 if (error) {
1515 memcpy (error, &result->error, sizeof *error);
1516 }
1517
1518 RETURN (!result->failed && result->error.code == 0);
1519 }
1520
1521
1522 /*--------------------------------------------------------------------------
1523 *
1524 * _mongoc_write_error_get_type --
1525 *
1526 * Checks if the error or reply from a write command is considered
1527 * retryable according to the retryable writes spec. Checks both
1528 * for a client error (a network exception) and a server error in
1529 * the reply. @cmd_ret and @cmd_err come from the result of a
1530 * write_command function. This function should be called after
1531 * error labels are appended in _mongoc_write_error_handle_labels,
1532 * which should be called after mongoc_cluster_run_command_monitored.
1533 *
1534 *
1535 * Return:
1536 * A mongoc_write_error_type_t indicating the type of error (if any).
1537 *
1538 *--------------------------------------------------------------------------
1539 */
1540 mongoc_write_err_type_t
_mongoc_write_error_get_type(bson_t * reply)1541 _mongoc_write_error_get_type (bson_t *reply)
1542 {
1543 bson_error_t error;
1544
1545 if (mongoc_error_has_label (reply, RETRYABLE_WRITE_ERROR)) {
1546 return MONGOC_WRITE_ERR_RETRY;
1547 }
1548
1549 /* check for a server error. */
1550 if (_mongoc_cmd_check_ok_no_wce (
1551 reply, MONGOC_ERROR_API_VERSION_2, &error)) {
1552 return MONGOC_WRITE_ERR_NONE;
1553 }
1554
1555 switch (error.code) {
1556 case 64: /* WriteConcernFailed */
1557 return MONGOC_WRITE_ERR_WRITE_CONCERN;
1558 default:
1559 return MONGOC_WRITE_ERR_OTHER;
1560 }
1561 }
1562
1563 /* Returns true and modifies reply and cmd_err. */
1564 bool
_mongoc_write_error_update_if_unsupported_storage_engine(bool cmd_ret,bson_error_t * cmd_err,bson_t * reply)1565 _mongoc_write_error_update_if_unsupported_storage_engine (bool cmd_ret,
1566 bson_error_t *cmd_err,
1567 bson_t *reply)
1568 {
1569 bson_error_t server_error;
1570
1571 if (cmd_ret) {
1572 return false;
1573 }
1574
1575 if (_mongoc_cmd_check_ok_no_wce (
1576 reply, MONGOC_ERROR_API_VERSION_2, &server_error)) {
1577 return false;
1578 }
1579
1580 if (server_error.code == 20 &&
1581 strstr (server_error.message, "Transaction numbers") ==
1582 server_error.message) {
1583 const char *replacement = "This MongoDB deployment does not support "
1584 "retryable writes. Please add "
1585 "retryWrites=false to your connection string.";
1586
1587 strcpy (cmd_err->message, replacement);
1588
1589 if (reply) {
1590 bson_t *new_reply = bson_new ();
1591 bson_copy_to_excluding_noinit (reply, new_reply, "errmsg", NULL);
1592 BSON_APPEND_UTF8 (new_reply, "errmsg", replacement);
1593 bson_destroy (reply);
1594 bson_steal (reply, new_reply);
1595 }
1596 return true;
1597 }
1598 return false;
1599 }
1600