1 /*
2  * Copyright 2014 MongoDB, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *   http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 #include <bson/bson.h>
18 
19 #include "mongoc-client-private.h"
20 #include "mongoc-client-session-private.h"
21 #include "mongoc-client-side-encryption-private.h"
22 #include "mongoc-error.h"
23 #include "mongoc-error-private.h"
24 #include "mongoc-trace-private.h"
25 #include "mongoc-write-command-private.h"
26 #include "mongoc-write-command-legacy-private.h"
27 #include "mongoc-write-concern-private.h"
28 #include "mongoc-util-private.h"
29 #include "mongoc-opts-private.h"
30 
31 
32 /*
33  * TODO:
34  *
35  *    - Remove error parameter to ops, favor result->error.
36  */
37 
38 typedef void (*mongoc_write_op_t) (mongoc_write_command_t *command,
39                                    mongoc_client_t *client,
40                                    mongoc_server_stream_t *server_stream,
41                                    const char *database,
42                                    const char *collection,
43                                    uint32_t offset,
44                                    mongoc_write_result_t *result,
45                                    bson_error_t *error);
46 
47 
48 /* indexed by MONGOC_WRITE_COMMAND_DELETE, INSERT, UPDATE */
49 static const char *gCommandNames[] = {"delete", "insert", "update"};
50 static const char *gCommandFields[] = {"deletes", "documents", "updates"};
51 static const uint32_t gCommandFieldLens[] = {7, 9, 7};
52 
53 static mongoc_write_op_t gLegacyWriteOps[3] = {
54    _mongoc_write_command_delete_legacy,
55    _mongoc_write_command_insert_legacy,
56    _mongoc_write_command_update_legacy};
57 
58 
59 const char *
_mongoc_command_type_to_name(int command_type)60 _mongoc_command_type_to_name (int command_type)
61 {
62    return gCommandNames[command_type];
63 }
64 
65 const char *
_mongoc_command_type_to_field_name(int command_type)66 _mongoc_command_type_to_field_name (int command_type)
67 {
68    return gCommandFields[command_type];
69 }
70 
71 void
_mongoc_write_command_insert_append(mongoc_write_command_t * command,const bson_t * document)72 _mongoc_write_command_insert_append (mongoc_write_command_t *command,
73                                      const bson_t *document)
74 {
75    bson_iter_t iter;
76    bson_oid_t oid;
77    bson_t tmp;
78 
79    ENTRY;
80 
81    BSON_ASSERT (command);
82    BSON_ASSERT (command->type == MONGOC_WRITE_COMMAND_INSERT);
83    BSON_ASSERT (document);
84    BSON_ASSERT (document->len >= 5);
85 
86    /*
87     * If the document does not contain an "_id" field, we need to generate
88     * a new oid for "_id".
89     */
90    if (!bson_iter_init_find (&iter, document, "_id")) {
91       bson_init (&tmp);
92       bson_oid_init (&oid, NULL);
93       BSON_APPEND_OID (&tmp, "_id", &oid);
94       bson_concat (&tmp, document);
95       _mongoc_buffer_append (&command->payload, bson_get_data (&tmp), tmp.len);
96       bson_destroy (&tmp);
97    } else {
98       _mongoc_buffer_append (
99          &command->payload, bson_get_data (document), document->len);
100    }
101 
102    command->n_documents++;
103 
104    EXIT;
105 }
106 
107 void
_mongoc_write_command_update_append(mongoc_write_command_t * command,const bson_t * selector,const bson_t * update,const bson_t * opts)108 _mongoc_write_command_update_append (mongoc_write_command_t *command,
109                                      const bson_t *selector,
110                                      const bson_t *update,
111                                      const bson_t *opts)
112 {
113    bson_t document;
114 
115    ENTRY;
116 
117    BSON_ASSERT (command);
118    BSON_ASSERT (command->type == MONGOC_WRITE_COMMAND_UPDATE);
119    BSON_ASSERT (selector && update);
120 
121    bson_init (&document);
122    BSON_APPEND_DOCUMENT (&document, "q", selector);
123    if (_mongoc_document_is_pipeline (update)) {
124       BSON_APPEND_ARRAY (&document, "u", update);
125    } else {
126       BSON_APPEND_DOCUMENT (&document, "u", update);
127    }
128    if (opts) {
129       bson_concat (&document, opts);
130    }
131 
132    _mongoc_buffer_append (
133       &command->payload, bson_get_data (&document), document.len);
134    command->n_documents++;
135 
136    bson_destroy (&document);
137 
138    EXIT;
139 }
140 
141 void
_mongoc_write_command_delete_append(mongoc_write_command_t * command,const bson_t * selector,const bson_t * opts)142 _mongoc_write_command_delete_append (mongoc_write_command_t *command,
143                                      const bson_t *selector,
144                                      const bson_t *opts)
145 {
146    bson_t document;
147 
148    ENTRY;
149 
150    BSON_ASSERT (command);
151    BSON_ASSERT (command->type == MONGOC_WRITE_COMMAND_DELETE);
152    BSON_ASSERT (selector);
153 
154    BSON_ASSERT (selector->len >= 5);
155 
156    bson_init (&document);
157    BSON_APPEND_DOCUMENT (&document, "q", selector);
158    if (opts) {
159       bson_concat (&document, opts);
160    }
161 
162    _mongoc_buffer_append (
163       &command->payload, bson_get_data (&document), document.len);
164    command->n_documents++;
165 
166    bson_destroy (&document);
167 
168    EXIT;
169 }
170 
171 void
_mongoc_write_command_init_bulk(mongoc_write_command_t * command,int type,mongoc_bulk_write_flags_t flags,int64_t operation_id,const bson_t * opts)172 _mongoc_write_command_init_bulk (mongoc_write_command_t *command,
173                                  int type,
174                                  mongoc_bulk_write_flags_t flags,
175                                  int64_t operation_id,
176                                  const bson_t *opts)
177 {
178    ENTRY;
179 
180    BSON_ASSERT (command);
181 
182    command->type = type;
183    command->flags = flags;
184    command->operation_id = operation_id;
185    if (!bson_empty0 (opts)) {
186       bson_copy_to (opts, &command->cmd_opts);
187    } else {
188       bson_init (&command->cmd_opts);
189    }
190 
191    _mongoc_buffer_init (&command->payload, NULL, 0, NULL, NULL);
192    command->n_documents = 0;
193 
194    EXIT;
195 }
196 
197 
198 void
_mongoc_write_command_init_insert(mongoc_write_command_t * command,const bson_t * document,const bson_t * cmd_opts,mongoc_bulk_write_flags_t flags,int64_t operation_id)199 _mongoc_write_command_init_insert (mongoc_write_command_t *command, /* IN */
200                                    const bson_t *document,          /* IN */
201                                    const bson_t *cmd_opts,          /* IN */
202                                    mongoc_bulk_write_flags_t flags, /* IN */
203                                    int64_t operation_id)            /* IN */
204 {
205    ENTRY;
206 
207    BSON_ASSERT (command);
208 
209    _mongoc_write_command_init_bulk (
210       command, MONGOC_WRITE_COMMAND_INSERT, flags, operation_id, cmd_opts);
211 
212    /* must handle NULL document from mongoc_collection_insert_bulk */
213    if (document) {
214       _mongoc_write_command_insert_append (command, document);
215    }
216 
217    EXIT;
218 }
219 
220 
221 void
_mongoc_write_command_init_insert_idl(mongoc_write_command_t * command,const bson_t * document,const bson_t * cmd_opts,int64_t operation_id)222 _mongoc_write_command_init_insert_idl (mongoc_write_command_t *command,
223                                        const bson_t *document,
224                                        const bson_t *cmd_opts,
225                                        int64_t operation_id)
226 {
227    mongoc_bulk_write_flags_t flags = MONGOC_BULK_WRITE_FLAGS_INIT;
228 
229    ENTRY;
230 
231    BSON_ASSERT (command);
232 
233    _mongoc_write_command_init_bulk (
234       command, MONGOC_WRITE_COMMAND_INSERT, flags, operation_id, cmd_opts);
235 
236    /* must handle NULL document from mongoc_collection_insert_bulk */
237    if (document) {
238       _mongoc_write_command_insert_append (command, document);
239    }
240 
241    EXIT;
242 }
243 
244 
245 void
_mongoc_write_command_init_delete(mongoc_write_command_t * command,const bson_t * selector,const bson_t * cmd_opts,const bson_t * opts,mongoc_bulk_write_flags_t flags,int64_t operation_id)246 _mongoc_write_command_init_delete (mongoc_write_command_t *command, /* IN */
247                                    const bson_t *selector,          /* IN */
248                                    const bson_t *cmd_opts,          /* IN */
249                                    const bson_t *opts,              /* IN */
250                                    mongoc_bulk_write_flags_t flags, /* IN */
251                                    int64_t operation_id)            /* IN */
252 {
253    ENTRY;
254 
255    BSON_ASSERT (command);
256    BSON_ASSERT (selector);
257 
258    _mongoc_write_command_init_bulk (
259       command, MONGOC_WRITE_COMMAND_DELETE, flags, operation_id, cmd_opts);
260    _mongoc_write_command_delete_append (command, selector, opts);
261 
262    EXIT;
263 }
264 
265 
266 void
_mongoc_write_command_init_delete_idl(mongoc_write_command_t * command,const bson_t * selector,const bson_t * cmd_opts,const bson_t * opts,int64_t operation_id)267 _mongoc_write_command_init_delete_idl (mongoc_write_command_t *command,
268                                        const bson_t *selector,
269                                        const bson_t *cmd_opts,
270                                        const bson_t *opts,
271                                        int64_t operation_id)
272 {
273    mongoc_bulk_write_flags_t flags = MONGOC_BULK_WRITE_FLAGS_INIT;
274 
275    ENTRY;
276 
277    BSON_ASSERT (command);
278    BSON_ASSERT (selector);
279 
280    _mongoc_write_command_init_bulk (
281       command, MONGOC_WRITE_COMMAND_DELETE, flags, operation_id, cmd_opts);
282 
283    _mongoc_write_command_delete_append (command, selector, opts);
284 
285    EXIT;
286 }
287 
288 
289 void
_mongoc_write_command_init_update(mongoc_write_command_t * command,const bson_t * selector,const bson_t * update,const bson_t * opts,mongoc_bulk_write_flags_t flags,int64_t operation_id)290 _mongoc_write_command_init_update (mongoc_write_command_t *command, /* IN */
291                                    const bson_t *selector,          /* IN */
292                                    const bson_t *update,            /* IN */
293                                    const bson_t *opts,              /* IN */
294                                    mongoc_bulk_write_flags_t flags, /* IN */
295                                    int64_t operation_id)            /* IN */
296 {
297    ENTRY;
298 
299    BSON_ASSERT (command);
300    BSON_ASSERT (selector);
301    BSON_ASSERT (update);
302 
303    _mongoc_write_command_init_bulk (
304       command, MONGOC_WRITE_COMMAND_UPDATE, flags, operation_id, NULL);
305    _mongoc_write_command_update_append (command, selector, update, opts);
306 
307    EXIT;
308 }
309 
310 
311 void
_mongoc_write_command_init_update_idl(mongoc_write_command_t * command,const bson_t * selector,const bson_t * update,const bson_t * opts,int64_t operation_id)312 _mongoc_write_command_init_update_idl (mongoc_write_command_t *command,
313                                        const bson_t *selector,
314                                        const bson_t *update,
315                                        const bson_t *opts,
316                                        int64_t operation_id)
317 {
318    mongoc_bulk_write_flags_t flags = MONGOC_BULK_WRITE_FLAGS_INIT;
319 
320    ENTRY;
321 
322    BSON_ASSERT (command);
323 
324    _mongoc_write_command_init_bulk (
325       command, MONGOC_WRITE_COMMAND_UPDATE, flags, operation_id, NULL);
326    _mongoc_write_command_update_append (command, selector, update, opts);
327 
328    EXIT;
329 }
330 
331 
332 /* takes initialized bson_t *doc and begins formatting a write command */
333 void
_mongoc_write_command_init(bson_t * doc,mongoc_write_command_t * command,const char * collection)334 _mongoc_write_command_init (bson_t *doc,
335                             mongoc_write_command_t *command,
336                             const char *collection)
337 {
338    ENTRY;
339 
340    if (!command->n_documents) {
341       EXIT;
342    }
343 
344    BSON_APPEND_UTF8 (doc, gCommandNames[command->type], collection);
345    BSON_APPEND_BOOL (doc, "ordered", command->flags.ordered);
346 
347    if (command->flags.bypass_document_validation) {
348       BSON_APPEND_BOOL (doc,
349                         "bypassDocumentValidation",
350                         command->flags.bypass_document_validation);
351    }
352 
353    EXIT;
354 }
355 
356 
357 /*
358  *-------------------------------------------------------------------------
359  *
360  * _mongoc_write_command_too_large_error --
361  *
362  *       Fill a bson_error_t and optional bson_t with error info after
363  *       receiving a document for bulk insert, update, or remove that is
364  *       larger than max_bson_size.
365  *
366  *       "err_doc" should be NULL or an empty initialized bson_t.
367  *
368  * Returns:
369  *       None.
370  *
371  * Side effects:
372  *       "error" and optionally "err_doc" are filled out.
373  *
374  *-------------------------------------------------------------------------
375  */
376 
377 void
_mongoc_write_command_too_large_error(bson_error_t * error,int32_t idx,int32_t len,int32_t max_bson_size)378 _mongoc_write_command_too_large_error (bson_error_t *error,
379                                        int32_t idx,
380                                        int32_t len,
381                                        int32_t max_bson_size)
382 {
383    bson_set_error (error,
384                    MONGOC_ERROR_BSON,
385                    MONGOC_ERROR_BSON_INVALID,
386                    "Document %u is too large for the cluster. "
387                    "Document is %u bytes, max is %d.",
388                    idx,
389                    len,
390                    max_bson_size);
391 }
392 
393 
394 void
_empty_error(mongoc_write_command_t * command,bson_error_t * error)395 _empty_error (mongoc_write_command_t *command, bson_error_t *error)
396 {
397    static const uint32_t codes[] = {MONGOC_ERROR_COLLECTION_DELETE_FAILED,
398                                     MONGOC_ERROR_COLLECTION_INSERT_FAILED,
399                                     MONGOC_ERROR_COLLECTION_UPDATE_FAILED};
400 
401    bson_set_error (error,
402                    MONGOC_ERROR_COLLECTION,
403                    codes[command->type],
404                    "Cannot do an empty %s",
405                    gCommandNames[command->type]);
406 }
407 
408 
409 bool
_mongoc_write_command_will_overflow(uint32_t len_so_far,uint32_t document_len,uint32_t n_documents_written,int32_t max_bson_size,int32_t max_write_batch_size)410 _mongoc_write_command_will_overflow (uint32_t len_so_far,
411                                      uint32_t document_len,
412                                      uint32_t n_documents_written,
413                                      int32_t max_bson_size,
414                                      int32_t max_write_batch_size)
415 {
416    /* max BSON object size + 16k bytes.
417     * server guarantees there is enough room: SERVER-10643
418     */
419    int32_t max_cmd_size = max_bson_size + BSON_OBJECT_ALLOWANCE;
420 
421    BSON_ASSERT (max_bson_size);
422 
423    if (len_so_far + document_len > max_cmd_size) {
424       return true;
425    } else if (max_write_batch_size > 0 &&
426               n_documents_written >= max_write_batch_size) {
427       return true;
428    }
429 
430    return false;
431 }
432 
433 
434 static void
_mongoc_write_opmsg(mongoc_write_command_t * command,mongoc_client_t * client,mongoc_server_stream_t * server_stream,const char * database,const char * collection,const mongoc_write_concern_t * write_concern,uint32_t index_offset,mongoc_client_session_t * cs,mongoc_write_result_t * result,bson_error_t * error)435 _mongoc_write_opmsg (mongoc_write_command_t *command,
436                      mongoc_client_t *client,
437                      mongoc_server_stream_t *server_stream,
438                      const char *database,
439                      const char *collection,
440                      const mongoc_write_concern_t *write_concern,
441                      uint32_t index_offset,
442                      mongoc_client_session_t *cs,
443                      mongoc_write_result_t *result,
444                      bson_error_t *error)
445 {
446    mongoc_cmd_parts_t parts;
447    bson_iter_t iter;
448    bson_t cmd;
449    bson_t reply;
450    bool ret = false;
451    int32_t max_msg_size;
452    int32_t max_bson_obj_size;
453    int32_t max_document_count;
454    uint32_t header;
455    uint32_t payload_batch_size = 0;
456    uint32_t payload_total_offset = 0;
457    bool ship_it = false;
458    int document_count = 0;
459    int32_t len;
460    mongoc_server_stream_t *retry_server_stream = NULL;
461 
462    ENTRY;
463 
464    BSON_ASSERT (command);
465    BSON_ASSERT (client);
466    BSON_ASSERT (database);
467    BSON_ASSERT (server_stream);
468    BSON_ASSERT (collection);
469 
470    max_bson_obj_size = mongoc_server_stream_max_bson_obj_size (server_stream);
471    max_msg_size = mongoc_server_stream_max_msg_size (server_stream);
472    if (_mongoc_cse_is_enabled (client)) {
473       max_msg_size = MONGOC_REDUCED_MAX_MSG_SIZE_FOR_FLE;
474    }
475    max_document_count =
476       mongoc_server_stream_max_write_batch_size (server_stream);
477 
478    bson_init (&cmd);
479    _mongoc_write_command_init (&cmd, command, collection);
480    mongoc_cmd_parts_init (&parts, client, database, MONGOC_QUERY_NONE, &cmd);
481    parts.assembled.operation_id = command->operation_id;
482    parts.is_write_command = true;
483    if (!mongoc_cmd_parts_set_write_concern (
484           &parts, write_concern, server_stream->sd->max_wire_version, error)) {
485       bson_destroy (&cmd);
486       mongoc_cmd_parts_cleanup (&parts);
487       EXIT;
488    }
489 
490    if (parts.assembled.is_acknowledged) {
491       mongoc_cmd_parts_set_session (&parts, cs);
492    }
493 
494    /* Write commands that include multi-document operations are not retryable.
495     * Set this explicitly so that mongoc_cmd_parts_assemble does not need to
496     * inspect the command body later. */
497    parts.allow_txn_number =
498       (command->flags.has_multi_write || !parts.assembled.is_acknowledged)
499          ? MONGOC_CMD_PARTS_ALLOW_TXN_NUMBER_NO
500          : MONGOC_CMD_PARTS_ALLOW_TXN_NUMBER_YES;
501 
502    BSON_ASSERT (bson_iter_init (&iter, &command->cmd_opts));
503    if (!mongoc_cmd_parts_append_opts (
504           &parts, &iter, server_stream->sd->max_wire_version, error)) {
505       bson_destroy (&cmd);
506       mongoc_cmd_parts_cleanup (&parts);
507       EXIT;
508    }
509 
510    if (!mongoc_cmd_parts_assemble (&parts, server_stream, error)) {
511       bson_destroy (&cmd);
512       mongoc_cmd_parts_cleanup (&parts);
513       EXIT;
514    }
515 
516    /*
517     * OP_MSG header == 16 byte
518     * + 4 bytes flagBits
519     * + 1 byte payload type = 1
520     * + 1 byte payload type = 2
521     * + 4 byte size of payload
522     * == 26 bytes opcode overhead
523     * + X Full command document {insert: "test", writeConcern: {...}}
524     * + Y command identifier ("documents", "deletes", "updates") ( + \0)
525     */
526 
527    header =
528       26 + parts.assembled.command->len + gCommandFieldLens[command->type] + 1;
529 
530    do {
531       memcpy (&len,
532               command->payload.data + payload_batch_size + payload_total_offset,
533               4);
534       len = BSON_UINT32_FROM_LE (len);
535 
536       if (len > max_bson_obj_size + BSON_OBJECT_ALLOWANCE) {
537          /* Quit if the document is too large */
538          _mongoc_write_command_too_large_error (
539             error, index_offset, len, max_bson_obj_size);
540          result->failed = true;
541          break;
542 
543       } else if ((payload_batch_size + header) + len <= max_msg_size ||
544                  document_count == 0) {
545          /* The current batch is still under max batch size in bytes */
546          payload_batch_size += len;
547 
548          /* If this document filled the maximum document count */
549          if (++document_count == max_document_count) {
550             ship_it = true;
551             /* If this document is the last document we have */
552          } else if (payload_batch_size + payload_total_offset ==
553                     command->payload.len) {
554             ship_it = true;
555          } else {
556             ship_it = false;
557          }
558       } else {
559          ship_it = true;
560       }
561 
562       if (ship_it) {
563          bool is_retryable = parts.is_retryable_write;
564          mongoc_write_err_type_t error_type;
565 
566          /* Seek past the document offset we have already sent */
567          parts.assembled.payload = command->payload.data + payload_total_offset;
568          /* Only send the documents up to this size */
569          parts.assembled.payload_size = payload_batch_size;
570          parts.assembled.payload_identifier = gCommandFields[command->type];
571 
572          /* increment the transaction number for the first attempt of each
573           * retryable write command */
574          if (is_retryable) {
575             bson_iter_t txn_number_iter;
576             BSON_ASSERT (bson_iter_init_find (
577                &txn_number_iter, parts.assembled.command, "txnNumber"));
578             bson_iter_overwrite_int64 (
579                &txn_number_iter,
580                ++parts.assembled.session->server_session->txn_number);
581          }
582       retry:
583          ret = mongoc_cluster_run_command_monitored (
584             &client->cluster, &parts.assembled, &reply, error);
585 
586          if (parts.is_retryable_write) {
587             _mongoc_write_error_handle_labels (
588                ret, error, &reply, server_stream->sd->max_wire_version);
589          }
590 
591          /* Add this batch size so we skip these documents next time */
592          payload_total_offset += payload_batch_size;
593          payload_batch_size = 0;
594 
595          /* If a retryable error is encountered and the write is retryable,
596           * select a new writable stream and retry. If server selection fails or
597           * the selected server does not support retryable writes, fall through
598           * and allow the original error to be reported. */
599          error_type = _mongoc_write_error_get_type (&reply);
600          if (is_retryable) {
601             _mongoc_write_error_update_if_unsupported_storage_engine (
602                ret, error, &reply);
603          }
604          if (is_retryable && error_type == MONGOC_WRITE_ERR_RETRY) {
605             bson_error_t ignored_error;
606 
607             /* each write command may be retried at most once */
608             is_retryable = false;
609 
610             if (retry_server_stream) {
611                mongoc_server_stream_cleanup (retry_server_stream);
612             }
613 
614             retry_server_stream = mongoc_cluster_stream_for_writes (
615                &client->cluster, cs, NULL, &ignored_error);
616 
617             if (retry_server_stream &&
618                 retry_server_stream->sd->max_wire_version >=
619                    WIRE_VERSION_RETRY_WRITES) {
620                parts.assembled.server_stream = retry_server_stream;
621                bson_destroy (&reply);
622                GOTO (retry);
623             }
624          }
625 
626          if (!ret) {
627             result->failed = true;
628             /* Stop for ordered bulk writes or when the server stream has been
629              * properly invalidated (e.g., due to a network error). */
630             if (command->flags.ordered || !mongoc_cluster_stream_valid (
631                                              &client->cluster, server_stream)) {
632                result->must_stop = true;
633             }
634          }
635 
636          /* Result merge needs to know the absolute index for a document
637           * so it can rewrite the error message which contains the relative
638           * document index per batch
639           */
640          _mongoc_write_result_merge (result, command, &reply, index_offset);
641          index_offset += document_count;
642          document_count = 0;
643          bson_destroy (&reply);
644       }
645       /* While we have more documents to write */
646    } while (payload_total_offset < command->payload.len && !result->must_stop);
647 
648    bson_destroy (&cmd);
649    mongoc_cmd_parts_cleanup (&parts);
650 
651    if (retry_server_stream) {
652       if (ret) {
653          /* if a retry succeeded, report that in the result so bulk write can
654           * use the newly selected server. */
655          result->retry_server_id =
656             mongoc_server_description_id (retry_server_stream->sd);
657       }
658       mongoc_server_stream_cleanup (retry_server_stream);
659    }
660 
661    if (ret) {
662       /* if a retry succeeded, clear the initial error */
663       memset (&result->error, 0, sizeof (bson_error_t));
664    }
665 
666    EXIT;
667 }
668 
669 
670 void
_append_array_from_command(mongoc_write_command_t * command,bson_t * bson)671 _append_array_from_command (mongoc_write_command_t *command, bson_t *bson)
672 {
673    bson_t ar;
674    bson_reader_t *reader;
675    char str[16];
676    uint32_t i = 0;
677    const char *key;
678    bool eof;
679    const bson_t *current;
680 
681 
682    reader =
683       bson_reader_new_from_data (command->payload.data, command->payload.len);
684 
685    bson_append_array_begin (bson,
686                             gCommandFields[command->type],
687                             gCommandFieldLens[command->type],
688                             &ar);
689 
690    while ((current = bson_reader_read (reader, &eof))) {
691       bson_uint32_to_string (i, &key, str, sizeof str);
692       BSON_APPEND_DOCUMENT (&ar, key, current);
693       i++;
694    }
695 
696    bson_append_array_end (bson, &ar);
697    bson_reader_destroy (reader);
698 }
699 
700 /* Assemble the base @cmd with all of the command options.
701  * @parts is always initialized, even on error.
702  * This is called twice in _mongoc_write_opquery.
703  * Once with no payload documents, to determine the total size. And once with
704  * payload documents, to send the final command. */
705 static bool
_assemble_cmd(bson_t * cmd,mongoc_write_command_t * command,mongoc_client_t * client,mongoc_server_stream_t * server_stream,const char * database,const mongoc_write_concern_t * write_concern,mongoc_cmd_parts_t * parts,bson_error_t * error)706 _assemble_cmd (bson_t *cmd,
707                mongoc_write_command_t *command,
708                mongoc_client_t *client,
709                mongoc_server_stream_t *server_stream,
710                const char *database,
711                const mongoc_write_concern_t *write_concern,
712                mongoc_cmd_parts_t *parts,
713                bson_error_t *error)
714 {
715    bool ret;
716    bson_iter_t iter;
717 
718    mongoc_cmd_parts_init (parts, client, database, MONGOC_QUERY_NONE, cmd);
719    parts->is_write_command = true;
720    parts->assembled.operation_id = command->operation_id;
721 
722    ret = mongoc_cmd_parts_set_write_concern (
723       parts, write_concern, server_stream->sd->max_wire_version, error);
724    if (ret) {
725       BSON_ASSERT (bson_iter_init (&iter, &command->cmd_opts));
726       ret = mongoc_cmd_parts_append_opts (
727          parts, &iter, server_stream->sd->max_wire_version, error);
728    }
729    if (ret) {
730       ret = mongoc_cmd_parts_assemble (parts, server_stream, error);
731    }
732    return ret;
733 }
734 
735 static void
_mongoc_write_opquery(mongoc_write_command_t * command,mongoc_client_t * client,mongoc_server_stream_t * server_stream,const char * database,const char * collection,const mongoc_write_concern_t * write_concern,uint32_t offset,mongoc_write_result_t * result,bson_error_t * error)736 _mongoc_write_opquery (mongoc_write_command_t *command,
737                        mongoc_client_t *client,
738                        mongoc_server_stream_t *server_stream,
739                        const char *database,
740                        const char *collection,
741                        const mongoc_write_concern_t *write_concern,
742                        uint32_t offset,
743                        mongoc_write_result_t *result,
744                        bson_error_t *error)
745 {
746    mongoc_cmd_parts_t parts;
747    const char *key;
748    uint32_t len = 0;
749    bson_t ar;
750    bson_t cmd;
751    char str[16];
752    bool has_more;
753    bool ret = false;
754    uint32_t i;
755    int32_t max_bson_obj_size;
756    int32_t max_write_batch_size;
757    uint32_t overhead;
758    uint32_t key_len;
759    int data_offset = 0;
760    bson_reader_t *reader;
761    const bson_t *bson;
762    bool eof;
763 
764    ENTRY;
765 
766    BSON_ASSERT (command);
767    BSON_ASSERT (client);
768    BSON_ASSERT (database);
769    BSON_ASSERT (server_stream);
770    BSON_ASSERT (collection);
771 
772    bson_init (&cmd);
773    max_bson_obj_size = mongoc_server_stream_max_bson_obj_size (server_stream);
774    max_write_batch_size =
775       mongoc_server_stream_max_write_batch_size (server_stream);
776 
777 again:
778    has_more = false;
779    i = 0;
780 
781    _mongoc_write_command_init (&cmd, command, collection);
782    /* If any part of assembling failed, return with failure. */
783    if (!_assemble_cmd (&cmd,
784                        command,
785                        client,
786                        server_stream,
787                        database,
788                        write_concern,
789                        &parts,
790                        error)) {
791       result->failed = true;
792       bson_destroy (&cmd);
793       mongoc_cmd_parts_cleanup (&parts);
794       EXIT;
795    }
796 
797    /* Use the assembled command to compute the overhead, since it may be a new
798     * BSON document with options applied. If no options were applied, then
799     * parts.assembled.command points to cmd. The constant 2 is due to 1 byte to
800     * specify array type and 1 byte for field name's null terminator. */
801    overhead =
802       parts.assembled.command->len + 2 + gCommandFieldLens[command->type];
803    /* Toss out the assembled command, we'll assemble again after adding all of
804     * the payload documents. */
805    mongoc_cmd_parts_cleanup (&parts);
806 
807    reader = bson_reader_new_from_data (command->payload.data + data_offset,
808                                        command->payload.len - data_offset);
809 
810    bson_append_array_begin (&cmd,
811                             gCommandFields[command->type],
812                             gCommandFieldLens[command->type],
813                             &ar);
814 
815    while ((bson = bson_reader_read (reader, &eof))) {
816       key_len = (uint32_t) bson_uint32_to_string (i, &key, str, sizeof str);
817       len = bson->len;
818       /* 1 byte to specify document type, 1 byte for key's null terminator */
819       if (_mongoc_write_command_will_overflow (overhead,
820                                                key_len + len + 2 + ar.len,
821                                                i,
822                                                max_bson_obj_size,
823                                                max_write_batch_size)) {
824          has_more = true;
825          break;
826       }
827       BSON_APPEND_DOCUMENT (&ar, key, bson);
828       data_offset += len;
829       i++;
830    }
831 
832    bson_append_array_end (&cmd, &ar);
833 
834    if (!i) {
835       _mongoc_write_command_too_large_error (error, i, len, max_bson_obj_size);
836       result->failed = true;
837       result->must_stop = true;
838       ret = false;
839       if (bson) {
840          data_offset += len;
841       }
842    } else {
843       bson_t reply;
844 
845       ret = _assemble_cmd (&cmd,
846                            command,
847                            client,
848                            server_stream,
849                            database,
850                            write_concern,
851                            &parts,
852                            error);
853       if (ret) {
854          ret = mongoc_cluster_run_command_monitored (
855             &client->cluster, &parts.assembled, &reply, error);
856       } else {
857          bson_init (&reply);
858       }
859 
860       if (!ret) {
861          result->failed = true;
862          if (bson_empty (&reply) ||
863              !mongoc_cluster_stream_valid (&client->cluster, server_stream)) {
864             /* assembling failed, or a network error running the command */
865             result->must_stop = true;
866          }
867       }
868 
869       _mongoc_write_result_merge (result, command, &reply, offset);
870       offset += i;
871       bson_destroy (&reply);
872       mongoc_cmd_parts_cleanup (&parts);
873    }
874    bson_reader_destroy (reader);
875 
876    if (has_more && (ret || !command->flags.ordered) && !result->must_stop) {
877       bson_reinit (&cmd);
878       GOTO (again);
879    }
880 
881    bson_destroy (&cmd);
882    EXIT;
883 }
884 
885 
886 void
_mongoc_write_command_execute(mongoc_write_command_t * command,mongoc_client_t * client,mongoc_server_stream_t * server_stream,const char * database,const char * collection,const mongoc_write_concern_t * write_concern,uint32_t offset,mongoc_client_session_t * cs,mongoc_write_result_t * result)887 _mongoc_write_command_execute (
888    mongoc_write_command_t *command,             /* IN */
889    mongoc_client_t *client,                     /* IN */
890    mongoc_server_stream_t *server_stream,       /* IN */
891    const char *database,                        /* IN */
892    const char *collection,                      /* IN */
893    const mongoc_write_concern_t *write_concern, /* IN */
894    uint32_t offset,                             /* IN */
895    mongoc_client_session_t *cs,                 /* IN */
896    mongoc_write_result_t *result)               /* OUT */
897 {
898    mongoc_crud_opts_t crud = {0};
899 
900    ENTRY;
901 
902    BSON_ASSERT (command);
903    BSON_ASSERT (client);
904    BSON_ASSERT (server_stream);
905    BSON_ASSERT (database);
906    BSON_ASSERT (collection);
907    BSON_ASSERT (result);
908 
909    if (!write_concern) {
910       write_concern = client->write_concern;
911    }
912 
913    if (!mongoc_write_concern_is_valid (write_concern)) {
914       bson_set_error (&result->error,
915                       MONGOC_ERROR_COMMAND,
916                       MONGOC_ERROR_COMMAND_INVALID_ARG,
917                       "The write concern is invalid.");
918       result->failed = true;
919       EXIT;
920    }
921 
922    crud.client_session = cs;
923    crud.writeConcern = (mongoc_write_concern_t *) write_concern;
924 
925    _mongoc_write_command_execute_idl (command,
926                                       client,
927                                       server_stream,
928                                       database,
929                                       collection,
930                                       offset,
931                                       &crud,
932                                       result);
933    EXIT;
934 }
935 
936 void
_mongoc_write_command_execute_idl(mongoc_write_command_t * command,mongoc_client_t * client,mongoc_server_stream_t * server_stream,const char * database,const char * collection,uint32_t offset,const mongoc_crud_opts_t * crud,mongoc_write_result_t * result)937 _mongoc_write_command_execute_idl (mongoc_write_command_t *command,
938                                    mongoc_client_t *client,
939                                    mongoc_server_stream_t *server_stream,
940                                    const char *database,
941                                    const char *collection,
942                                    uint32_t offset,
943                                    const mongoc_crud_opts_t *crud,
944                                    mongoc_write_result_t *result)
945 {
946    ENTRY;
947 
948    BSON_ASSERT (command);
949    BSON_ASSERT (client);
950    BSON_ASSERT (server_stream);
951    BSON_ASSERT (database);
952    BSON_ASSERT (collection);
953    BSON_ASSERT (result);
954 
955    if (command->flags.has_collation) {
956       if (!mongoc_write_concern_is_acknowledged (crud->writeConcern)) {
957          result->failed = true;
958          bson_set_error (&result->error,
959                          MONGOC_ERROR_COMMAND,
960                          MONGOC_ERROR_COMMAND_INVALID_ARG,
961                          "Cannot set collation for unacknowledged writes");
962          EXIT;
963       }
964 
965       if (server_stream->sd->max_wire_version < WIRE_VERSION_COLLATION) {
966          bson_set_error (&result->error,
967                          MONGOC_ERROR_COMMAND,
968                          MONGOC_ERROR_PROTOCOL_BAD_WIRE_VERSION,
969                          "The selected server does not support collation");
970          result->failed = true;
971          EXIT;
972       }
973    }
974 
975    if (command->flags.has_array_filters) {
976       if (!mongoc_write_concern_is_acknowledged (crud->writeConcern)) {
977          result->failed = true;
978          bson_set_error (&result->error,
979                          MONGOC_ERROR_COMMAND,
980                          MONGOC_ERROR_COMMAND_INVALID_ARG,
981                          "Cannot use array filters with unacknowledged writes");
982          EXIT;
983       }
984 
985       if (server_stream->sd->max_wire_version < WIRE_VERSION_ARRAY_FILTERS) {
986          bson_set_error (&result->error,
987                          MONGOC_ERROR_COMMAND,
988                          MONGOC_ERROR_PROTOCOL_BAD_WIRE_VERSION,
989                          "The selected server does not support array filters");
990          result->failed = true;
991          EXIT;
992       }
993    }
994 
995    if (command->flags.has_update_hint) {
996       if (server_stream->sd->max_wire_version <
997              WIRE_VERSION_HINT_SERVER_SIDE_ERROR ||
998           (server_stream->sd->max_wire_version < WIRE_VERSION_UPDATE_HINT &&
999            !mongoc_write_concern_is_acknowledged (crud->writeConcern))) {
1000          bson_set_error (
1001             &result->error,
1002             MONGOC_ERROR_COMMAND,
1003             MONGOC_ERROR_PROTOCOL_BAD_WIRE_VERSION,
1004             "The selected server does not support hint for update");
1005          result->failed = true;
1006          EXIT;
1007       }
1008    }
1009 
1010    if (command->flags.has_delete_hint) {
1011       if (server_stream->sd->max_wire_version <
1012              WIRE_VERSION_HINT_SERVER_SIDE_ERROR ||
1013           (server_stream->sd->max_wire_version < WIRE_VERSION_DELETE_HINT &&
1014            !mongoc_write_concern_is_acknowledged (crud->writeConcern))) {
1015          bson_set_error (
1016             &result->error,
1017             MONGOC_ERROR_COMMAND,
1018             MONGOC_ERROR_COMMAND_INVALID_ARG,
1019             "The selected server does not support hint for delete");
1020          result->failed = true;
1021          EXIT;
1022       }
1023    }
1024 
1025    if (command->flags.bypass_document_validation) {
1026       if (!mongoc_write_concern_is_acknowledged (crud->writeConcern)) {
1027          result->failed = true;
1028          bson_set_error (
1029             &result->error,
1030             MONGOC_ERROR_COMMAND,
1031             MONGOC_ERROR_COMMAND_INVALID_ARG,
1032             "Cannot set bypassDocumentValidation for unacknowledged writes");
1033          EXIT;
1034       }
1035    }
1036 
1037    if (crud->client_session &&
1038        !mongoc_write_concern_is_acknowledged (crud->writeConcern)) {
1039       result->failed = true;
1040       bson_set_error (&result->error,
1041                       MONGOC_ERROR_COMMAND,
1042                       MONGOC_ERROR_COMMAND_INVALID_ARG,
1043                       "Cannot use client session with unacknowledged writes");
1044       EXIT;
1045    }
1046 
1047    if (command->payload.len == 0) {
1048       _empty_error (command, &result->error);
1049       EXIT;
1050    }
1051 
1052    if (server_stream->sd->max_wire_version >= WIRE_VERSION_OP_MSG) {
1053       _mongoc_write_opmsg (command,
1054                            client,
1055                            server_stream,
1056                            database,
1057                            collection,
1058                            crud->writeConcern,
1059                            offset,
1060                            crud->client_session,
1061                            result,
1062                            &result->error);
1063    } else {
1064       if (mongoc_write_concern_is_acknowledged (crud->writeConcern)) {
1065          _mongoc_write_opquery (command,
1066                                 client,
1067                                 server_stream,
1068                                 database,
1069                                 collection,
1070                                 crud->writeConcern,
1071                                 offset,
1072                                 result,
1073                                 &result->error);
1074       } else {
1075          gLegacyWriteOps[command->type](command,
1076                                         client,
1077                                         server_stream,
1078                                         database,
1079                                         collection,
1080                                         offset,
1081                                         result,
1082                                         &result->error);
1083       }
1084    }
1085 
1086    EXIT;
1087 }
1088 
1089 
1090 void
_mongoc_write_command_destroy(mongoc_write_command_t * command)1091 _mongoc_write_command_destroy (mongoc_write_command_t *command)
1092 {
1093    ENTRY;
1094 
1095    if (command) {
1096       bson_destroy (&command->cmd_opts);
1097       _mongoc_buffer_destroy (&command->payload);
1098    }
1099 
1100    EXIT;
1101 }
1102 
1103 
1104 void
_mongoc_write_result_init(mongoc_write_result_t * result)1105 _mongoc_write_result_init (mongoc_write_result_t *result) /* IN */
1106 {
1107    ENTRY;
1108 
1109    BSON_ASSERT (result);
1110 
1111    memset (result, 0, sizeof *result);
1112 
1113    bson_init (&result->upserted);
1114    bson_init (&result->writeConcernErrors);
1115    bson_init (&result->writeErrors);
1116    bson_init (&result->errorLabels);
1117 
1118    EXIT;
1119 }
1120 
1121 
1122 void
_mongoc_write_result_destroy(mongoc_write_result_t * result)1123 _mongoc_write_result_destroy (mongoc_write_result_t *result)
1124 {
1125    ENTRY;
1126 
1127    BSON_ASSERT (result);
1128 
1129    bson_destroy (&result->upserted);
1130    bson_destroy (&result->writeConcernErrors);
1131    bson_destroy (&result->writeErrors);
1132    bson_destroy (&result->errorLabels);
1133 
1134    EXIT;
1135 }
1136 
1137 
1138 void
_mongoc_write_result_append_upsert(mongoc_write_result_t * result,int32_t idx,const bson_value_t * value)1139 _mongoc_write_result_append_upsert (mongoc_write_result_t *result,
1140                                     int32_t idx,
1141                                     const bson_value_t *value)
1142 {
1143    bson_t child;
1144    const char *keyptr = NULL;
1145    char key[12];
1146    int len;
1147 
1148    BSON_ASSERT (result);
1149    BSON_ASSERT (value);
1150 
1151    len = (int) bson_uint32_to_string (
1152       result->upsert_append_count, &keyptr, key, sizeof key);
1153 
1154    bson_append_document_begin (&result->upserted, keyptr, len, &child);
1155    BSON_APPEND_INT32 (&child, "index", idx);
1156    BSON_APPEND_VALUE (&child, "_id", value);
1157    bson_append_document_end (&result->upserted, &child);
1158 
1159    result->upsert_append_count++;
1160 }
1161 
1162 
1163 int32_t
_mongoc_write_result_merge_arrays(uint32_t offset,mongoc_write_result_t * result,bson_t * dest,bson_iter_t * iter)1164 _mongoc_write_result_merge_arrays (uint32_t offset,
1165                                    mongoc_write_result_t *result, /* IN */
1166                                    bson_t *dest,                  /* IN */
1167                                    bson_iter_t *iter)             /* IN */
1168 {
1169    const bson_value_t *value;
1170    bson_iter_t ar;
1171    bson_iter_t citer;
1172    int32_t idx;
1173    int32_t count = 0;
1174    int32_t aridx;
1175    bson_t child;
1176    const char *keyptr = NULL;
1177    char key[12];
1178    int len;
1179 
1180    ENTRY;
1181 
1182    BSON_ASSERT (result);
1183    BSON_ASSERT (dest);
1184    BSON_ASSERT (iter);
1185    BSON_ASSERT (BSON_ITER_HOLDS_ARRAY (iter));
1186 
1187    aridx = bson_count_keys (dest);
1188 
1189    if (bson_iter_recurse (iter, &ar)) {
1190       while (bson_iter_next (&ar)) {
1191          if (BSON_ITER_HOLDS_DOCUMENT (&ar) &&
1192              bson_iter_recurse (&ar, &citer)) {
1193             len =
1194                (int) bson_uint32_to_string (aridx++, &keyptr, key, sizeof key);
1195             bson_append_document_begin (dest, keyptr, len, &child);
1196             while (bson_iter_next (&citer)) {
1197                if (BSON_ITER_IS_KEY (&citer, "index")) {
1198                   idx = bson_iter_int32 (&citer) + offset;
1199                   BSON_APPEND_INT32 (&child, "index", idx);
1200                } else {
1201                   value = bson_iter_value (&citer);
1202                   BSON_APPEND_VALUE (&child, bson_iter_key (&citer), value);
1203                }
1204             }
1205             bson_append_document_end (dest, &child);
1206             count++;
1207          }
1208       }
1209    }
1210 
1211    RETURN (count);
1212 }
1213 
1214 
1215 void
_mongoc_write_result_merge(mongoc_write_result_t * result,mongoc_write_command_t * command,const bson_t * reply,uint32_t offset)1216 _mongoc_write_result_merge (mongoc_write_result_t *result,   /* IN */
1217                             mongoc_write_command_t *command, /* IN */
1218                             const bson_t *reply,             /* IN */
1219                             uint32_t offset)
1220 {
1221    int32_t server_index = 0;
1222    const bson_value_t *value;
1223    bson_iter_t iter;
1224    bson_iter_t citer;
1225    bson_iter_t ar;
1226    int32_t n_upserted = 0;
1227    int32_t affected = 0;
1228 
1229    ENTRY;
1230 
1231    BSON_ASSERT (result);
1232    BSON_ASSERT (reply);
1233 
1234    if (bson_iter_init_find (&iter, reply, "n") &&
1235        BSON_ITER_HOLDS_INT32 (&iter)) {
1236       affected = bson_iter_int32 (&iter);
1237    }
1238 
1239    if (bson_iter_init_find (&iter, reply, "writeErrors") &&
1240        BSON_ITER_HOLDS_ARRAY (&iter) && bson_iter_recurse (&iter, &citer) &&
1241        bson_iter_next (&citer)) {
1242       result->failed = true;
1243    }
1244 
1245    switch (command->type) {
1246    case MONGOC_WRITE_COMMAND_INSERT:
1247       result->nInserted += affected;
1248       break;
1249    case MONGOC_WRITE_COMMAND_DELETE:
1250       result->nRemoved += affected;
1251       break;
1252    case MONGOC_WRITE_COMMAND_UPDATE:
1253 
1254       /* server returns each upserted _id with its index into this batch
1255        * look for "upserted": [{"index": 4, "_id": ObjectId()}, ...] */
1256       if (bson_iter_init_find (&iter, reply, "upserted")) {
1257          if (BSON_ITER_HOLDS_ARRAY (&iter) &&
1258              (bson_iter_recurse (&iter, &ar))) {
1259             while (bson_iter_next (&ar)) {
1260                if (BSON_ITER_HOLDS_DOCUMENT (&ar) &&
1261                    bson_iter_recurse (&ar, &citer) &&
1262                    bson_iter_find (&citer, "index") &&
1263                    BSON_ITER_HOLDS_INT32 (&citer)) {
1264                   server_index = bson_iter_int32 (&citer);
1265 
1266                   if (bson_iter_recurse (&ar, &citer) &&
1267                       bson_iter_find (&citer, "_id")) {
1268                      value = bson_iter_value (&citer);
1269                      _mongoc_write_result_append_upsert (
1270                         result, offset + server_index, value);
1271                      n_upserted++;
1272                   }
1273                }
1274             }
1275          }
1276          result->nUpserted += n_upserted;
1277          /*
1278           * XXX: The following addition to nMatched needs some checking.
1279           *      I'm highly skeptical of it.
1280           */
1281          result->nMatched += BSON_MAX (0, (affected - n_upserted));
1282       } else {
1283          result->nMatched += affected;
1284       }
1285       if (bson_iter_init_find (&iter, reply, "nModified") &&
1286           BSON_ITER_HOLDS_INT32 (&iter)) {
1287          result->nModified += bson_iter_int32 (&iter);
1288       }
1289       break;
1290    default:
1291       BSON_ASSERT (false);
1292       break;
1293    }
1294 
1295    if (bson_iter_init_find (&iter, reply, "writeErrors") &&
1296        BSON_ITER_HOLDS_ARRAY (&iter)) {
1297       _mongoc_write_result_merge_arrays (
1298          offset, result, &result->writeErrors, &iter);
1299    }
1300 
1301    if (bson_iter_init_find (&iter, reply, "writeConcernError") &&
1302        BSON_ITER_HOLDS_DOCUMENT (&iter)) {
1303       uint32_t len;
1304       const uint8_t *data;
1305       bson_t write_concern_error;
1306       char str[16];
1307       const char *key;
1308 
1309       /* writeConcernError is a subdocument in the server response
1310        * append it to the result->writeConcernErrors array */
1311       bson_iter_document (&iter, &len, &data);
1312       BSON_ASSERT (bson_init_static (&write_concern_error, data, len));
1313 
1314       bson_uint32_to_string (
1315          result->n_writeConcernErrors, &key, str, sizeof str);
1316 
1317       if (!bson_append_document (
1318              &result->writeConcernErrors, key, -1, &write_concern_error)) {
1319          MONGOC_ERROR ("Error adding \"%s\" to writeConcernErrors.\n", key);
1320       }
1321 
1322       result->n_writeConcernErrors++;
1323    }
1324 
1325    /* inefficient if there are ever large numbers: for each label in each err,
1326     * we linear-search result->errorLabels to see if it's included yet */
1327    _mongoc_bson_array_copy_labels_to (reply, &result->errorLabels);
1328 
1329    EXIT;
1330 }
1331 
1332 
1333 /*
1334  * If error is not set, set code from first document in array like
1335  * [{"code": 64, "errmsg": "duplicate"}, ...]. Format the error message
1336  * from all errors in array.
1337  */
1338 static void
_set_error_from_response(bson_t * bson_array,mongoc_error_domain_t domain,const char * error_type,bson_error_t * error)1339 _set_error_from_response (bson_t *bson_array,
1340                           mongoc_error_domain_t domain,
1341                           const char *error_type,
1342                           bson_error_t *error /* OUT */)
1343 {
1344    bson_iter_t array_iter;
1345    bson_iter_t doc_iter;
1346    bson_string_t *compound_err;
1347    const char *errmsg = NULL;
1348    int32_t code = 0;
1349    uint32_t n_keys, i;
1350 
1351    compound_err = bson_string_new (NULL);
1352    n_keys = bson_count_keys (bson_array);
1353    if (n_keys > 1) {
1354       bson_string_append_printf (
1355          compound_err, "Multiple %s errors: ", error_type);
1356    }
1357 
1358    if (!bson_empty0 (bson_array) && bson_iter_init (&array_iter, bson_array)) {
1359       /* get first code and all error messages */
1360       i = 0;
1361 
1362       while (bson_iter_next (&array_iter)) {
1363          if (BSON_ITER_HOLDS_DOCUMENT (&array_iter) &&
1364              bson_iter_recurse (&array_iter, &doc_iter)) {
1365             /* parse doc, which is like {"code": 64, "errmsg": "duplicate"} */
1366             while (bson_iter_next (&doc_iter)) {
1367                /* use the first error code we find */
1368                if (BSON_ITER_IS_KEY (&doc_iter, "code") && code == 0) {
1369                   code = (uint32_t) bson_iter_as_int64 (&doc_iter);
1370                } else if (BSON_ITER_IS_KEY (&doc_iter, "errmsg")) {
1371                   errmsg = bson_iter_utf8 (&doc_iter, NULL);
1372 
1373                   /* build message like 'Multiple write errors: "foo", "bar"' */
1374                   if (n_keys > 1) {
1375                      bson_string_append_printf (compound_err, "\"%s\"", errmsg);
1376                      if (i < n_keys - 1) {
1377                         bson_string_append (compound_err, ", ");
1378                      }
1379                   } else {
1380                      /* single error message */
1381                      bson_string_append (compound_err, errmsg);
1382                   }
1383                }
1384             }
1385 
1386             i++;
1387          }
1388       }
1389 
1390       if (code && compound_err->len) {
1391          bson_set_error (
1392             error, domain, (uint32_t) code, "%s", compound_err->str);
1393       }
1394    }
1395 
1396    bson_string_free (compound_err, true);
1397 }
1398 
1399 
1400 /* complete a write result, including only certain fields */
1401 bool
_mongoc_write_result_complete(mongoc_write_result_t * result,int32_t error_api_version,const mongoc_write_concern_t * wc,mongoc_error_domain_t err_domain_override,bson_t * bson,bson_error_t * error,...)1402 _mongoc_write_result_complete (
1403    mongoc_write_result_t *result,             /* IN */
1404    int32_t error_api_version,                 /* IN */
1405    const mongoc_write_concern_t *wc,          /* IN */
1406    mongoc_error_domain_t err_domain_override, /* IN */
1407    bson_t *bson,                              /* OUT */
1408    bson_error_t *error,                       /* OUT */
1409    ...)
1410 {
1411    mongoc_error_domain_t domain;
1412    va_list args;
1413    const char *field;
1414    int n_args;
1415    bson_iter_t iter;
1416    bson_iter_t child;
1417 
1418    ENTRY;
1419 
1420    BSON_ASSERT (result);
1421 
1422    if (error_api_version >= MONGOC_ERROR_API_VERSION_2) {
1423       domain = MONGOC_ERROR_SERVER;
1424    } else if (err_domain_override) {
1425       domain = err_domain_override;
1426    } else if (result->error.domain) {
1427       domain = (mongoc_error_domain_t) result->error.domain;
1428    } else {
1429       domain = MONGOC_ERROR_COLLECTION;
1430    }
1431 
1432    /* produce either old fields like nModified from the deprecated Bulk API Spec
1433     * or new fields like modifiedCount from the CRUD Spec, which we partly obey
1434     */
1435    if (bson && mongoc_write_concern_is_acknowledged (wc)) {
1436       n_args = 0;
1437       va_start (args, error);
1438       while ((field = va_arg (args, const char *))) {
1439          n_args++;
1440 
1441          if (!strcmp (field, "nInserted")) {
1442             BSON_APPEND_INT32 (bson, field, result->nInserted);
1443          } else if (!strcmp (field, "insertedCount")) {
1444             BSON_APPEND_INT32 (bson, field, result->nInserted);
1445          } else if (!strcmp (field, "nMatched")) {
1446             BSON_APPEND_INT32 (bson, field, result->nMatched);
1447          } else if (!strcmp (field, "matchedCount")) {
1448             BSON_APPEND_INT32 (bson, field, result->nMatched);
1449          } else if (!strcmp (field, "nModified")) {
1450             BSON_APPEND_INT32 (bson, field, result->nModified);
1451          } else if (!strcmp (field, "modifiedCount")) {
1452             BSON_APPEND_INT32 (bson, field, result->nModified);
1453          } else if (!strcmp (field, "nRemoved")) {
1454             BSON_APPEND_INT32 (bson, field, result->nRemoved);
1455          } else if (!strcmp (field, "deletedCount")) {
1456             BSON_APPEND_INT32 (bson, field, result->nRemoved);
1457          } else if (!strcmp (field, "nUpserted")) {
1458             BSON_APPEND_INT32 (bson, field, result->nUpserted);
1459          } else if (!strcmp (field, "upsertedCount")) {
1460             BSON_APPEND_INT32 (bson, field, result->nUpserted);
1461          } else if (!strcmp (field, "upserted") &&
1462                     !bson_empty0 (&result->upserted)) {
1463             BSON_APPEND_ARRAY (bson, field, &result->upserted);
1464          } else if (!strcmp (field, "upsertedId") &&
1465                     !bson_empty0 (&result->upserted) &&
1466                     bson_iter_init_find (&iter, &result->upserted, "0") &&
1467                     bson_iter_recurse (&iter, &child) &&
1468                     bson_iter_find (&child, "_id")) {
1469             /* "upsertedId", singular, for update_one() */
1470             BSON_APPEND_VALUE (bson, "upsertedId", bson_iter_value (&child));
1471          }
1472       }
1473 
1474       va_end (args);
1475 
1476       /* default: a standard result includes all Bulk API fields */
1477       if (!n_args) {
1478          BSON_APPEND_INT32 (bson, "nInserted", result->nInserted);
1479          BSON_APPEND_INT32 (bson, "nMatched", result->nMatched);
1480          BSON_APPEND_INT32 (bson, "nModified", result->nModified);
1481          BSON_APPEND_INT32 (bson, "nRemoved", result->nRemoved);
1482          BSON_APPEND_INT32 (bson, "nUpserted", result->nUpserted);
1483          if (!bson_empty0 (&result->upserted)) {
1484             BSON_APPEND_ARRAY (bson, "upserted", &result->upserted);
1485          }
1486       }
1487 
1488       /* always append errors if there are any */
1489       if (!n_args || !bson_empty (&result->writeErrors)) {
1490          BSON_APPEND_ARRAY (bson, "writeErrors", &result->writeErrors);
1491       }
1492 
1493       if (result->n_writeConcernErrors) {
1494          BSON_APPEND_ARRAY (
1495             bson, "writeConcernErrors", &result->writeConcernErrors);
1496       }
1497    }
1498 
1499    /* set bson_error_t from first write error or write concern error */
1500    _set_error_from_response (
1501       &result->writeErrors, domain, "write", &result->error);
1502 
1503    if (!result->error.code) {
1504       _set_error_from_response (&result->writeConcernErrors,
1505                                 MONGOC_ERROR_WRITE_CONCERN,
1506                                 "write concern",
1507                                 &result->error);
1508    }
1509 
1510    if (bson && !bson_empty (&result->errorLabels)) {
1511       BSON_APPEND_ARRAY (bson, "errorLabels", &result->errorLabels);
1512    }
1513 
1514    if (error) {
1515       memcpy (error, &result->error, sizeof *error);
1516    }
1517 
1518    RETURN (!result->failed && result->error.code == 0);
1519 }
1520 
1521 
1522 /*--------------------------------------------------------------------------
1523  *
1524  * _mongoc_write_error_get_type --
1525  *
1526  *       Checks if the error or reply from a write command is considered
1527  *       retryable according to the retryable writes spec. Checks both
1528  *       for a client error (a network exception) and a server error in
1529  *       the reply. @cmd_ret and @cmd_err come from the result of a
1530  *       write_command function. This function should be called after
1531  *       error labels are appended in _mongoc_write_error_handle_labels,
1532  *       which should be called after mongoc_cluster_run_command_monitored.
1533  *
1534  *
1535  * Return:
1536  *       A mongoc_write_error_type_t indicating the type of error (if any).
1537  *
1538  *--------------------------------------------------------------------------
1539  */
1540 mongoc_write_err_type_t
_mongoc_write_error_get_type(bson_t * reply)1541 _mongoc_write_error_get_type (bson_t *reply)
1542 {
1543    bson_error_t error;
1544 
1545    if (mongoc_error_has_label (reply, RETRYABLE_WRITE_ERROR)) {
1546       return MONGOC_WRITE_ERR_RETRY;
1547    }
1548 
1549    /* check for a server error. */
1550    if (_mongoc_cmd_check_ok_no_wce (
1551           reply, MONGOC_ERROR_API_VERSION_2, &error)) {
1552       return MONGOC_WRITE_ERR_NONE;
1553    }
1554 
1555    switch (error.code) {
1556    case 64: /* WriteConcernFailed */
1557       return MONGOC_WRITE_ERR_WRITE_CONCERN;
1558    default:
1559       return MONGOC_WRITE_ERR_OTHER;
1560    }
1561 }
1562 
1563 /* Returns true and modifies reply and cmd_err. */
1564 bool
_mongoc_write_error_update_if_unsupported_storage_engine(bool cmd_ret,bson_error_t * cmd_err,bson_t * reply)1565 _mongoc_write_error_update_if_unsupported_storage_engine (bool cmd_ret,
1566                                                           bson_error_t *cmd_err,
1567                                                           bson_t *reply)
1568 {
1569    bson_error_t server_error;
1570 
1571    if (cmd_ret) {
1572       return false;
1573    }
1574 
1575    if (_mongoc_cmd_check_ok_no_wce (
1576           reply, MONGOC_ERROR_API_VERSION_2, &server_error)) {
1577       return false;
1578    }
1579 
1580    if (server_error.code == 20 &&
1581        strstr (server_error.message, "Transaction numbers") ==
1582           server_error.message) {
1583       const char *replacement = "This MongoDB deployment does not support "
1584                                 "retryable writes. Please add "
1585                                 "retryWrites=false to your connection string.";
1586 
1587       strcpy (cmd_err->message, replacement);
1588 
1589       if (reply) {
1590          bson_t *new_reply = bson_new ();
1591          bson_copy_to_excluding_noinit (reply, new_reply, "errmsg", NULL);
1592          BSON_APPEND_UTF8 (new_reply, "errmsg", replacement);
1593          bson_destroy (reply);
1594          bson_steal (reply, new_reply);
1595       }
1596       return true;
1597    }
1598    return false;
1599 }
1600