1 /*
2  * Copyright 2013 MongoDB, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *   http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 
18 #include "mongoc-cursor.h"
19 #include "mongoc-cursor-private.h"
20 #include "mongoc-client-private.h"
21 #include "mongoc-counters-private.h"
22 #include "mongoc-error.h"
23 #include "mongoc-log.h"
24 #include "mongoc-trace-private.h"
25 #include "mongoc-cursor-cursorid-private.h"
26 #include "mongoc-read-concern-private.h"
27 #include "mongoc-util-private.h"
28 #include "mongoc-write-concern-private.h"
29 
30 
31 #undef MONGOC_LOG_DOMAIN
32 #define MONGOC_LOG_DOMAIN "cursor"
33 
34 
35 #define CURSOR_FAILED(cursor_) ((cursor_)->error.domain != 0)
36 
37 static bool
38 _translate_query_opt (const char *query_field,
39                       const char **cmd_field,
40                       int *len);
41 
42 static const bson_t *
43 _mongoc_cursor_op_query (mongoc_cursor_t *cursor,
44                          mongoc_server_stream_t *server_stream);
45 
46 static bool
47 _mongoc_cursor_prepare_find_command (mongoc_cursor_t *cursor,
48                                      bson_t *command,
49                                      mongoc_server_stream_t *server_stream);
50 
51 static const bson_t *
52 _mongoc_cursor_find_command (mongoc_cursor_t *cursor,
53                              mongoc_server_stream_t *server_stream);
54 
55 
56 static bool
_mongoc_cursor_set_opt_int64(mongoc_cursor_t * cursor,const char * option,int64_t value)57 _mongoc_cursor_set_opt_int64 (mongoc_cursor_t *cursor,
58                               const char *option,
59                               int64_t value)
60 {
61    bson_iter_t iter;
62 
63    if (bson_iter_init_find (&iter, &cursor->opts, option)) {
64       if (!BSON_ITER_HOLDS_INT64 (&iter)) {
65          return false;
66       }
67 
68       bson_iter_overwrite_int64 (&iter, value);
69       return true;
70    }
71 
72    return BSON_APPEND_INT64 (&cursor->opts, option, value);
73 }
74 
75 
76 static int64_t
_mongoc_cursor_get_opt_int64(const mongoc_cursor_t * cursor,const char * option,int64_t default_value)77 _mongoc_cursor_get_opt_int64 (const mongoc_cursor_t *cursor,
78                               const char *option,
79                               int64_t default_value)
80 {
81    bson_iter_t iter;
82 
83    if (bson_iter_init_find (&iter, &cursor->opts, option)) {
84       return bson_iter_as_int64 (&iter);
85    }
86 
87    return default_value;
88 }
89 
90 
91 static bool
_mongoc_cursor_set_opt_bool(mongoc_cursor_t * cursor,const char * option,bool value)92 _mongoc_cursor_set_opt_bool (mongoc_cursor_t *cursor,
93                              const char *option,
94                              bool value)
95 {
96    bson_iter_t iter;
97 
98    if (bson_iter_init_find (&iter, &cursor->opts, option)) {
99       if (!BSON_ITER_HOLDS_BOOL (&iter)) {
100          return false;
101       }
102 
103       bson_iter_overwrite_bool (&iter, value);
104       return true;
105    }
106 
107    return BSON_APPEND_BOOL (&cursor->opts, option, value);
108 }
109 
110 
111 bool
_mongoc_cursor_get_opt_bool(const mongoc_cursor_t * cursor,const char * option)112 _mongoc_cursor_get_opt_bool (const mongoc_cursor_t *cursor, const char *option)
113 {
114    bson_iter_t iter;
115 
116    if (bson_iter_init_find (&iter, &cursor->opts, option)) {
117       return bson_iter_as_bool (&iter);
118    }
119 
120    return false;
121 }
122 
123 
124 int32_t
_mongoc_n_return(mongoc_cursor_t * cursor)125 _mongoc_n_return (mongoc_cursor_t *cursor)
126 {
127    int64_t limit;
128    int64_t batch_size;
129    int64_t n_return;
130 
131    if (cursor->is_command) {
132       /* commands always have n_return of 1 */
133       return 1;
134    }
135 
136    limit = mongoc_cursor_get_limit (cursor);
137    batch_size = mongoc_cursor_get_batch_size (cursor);
138 
139    if (limit < 0) {
140       n_return = limit;
141    } else if (limit) {
142       int64_t remaining = limit - cursor->count;
143       BSON_ASSERT (remaining > 0);
144 
145       if (batch_size) {
146          n_return = BSON_MIN (batch_size, remaining);
147       } else {
148          /* batch_size 0 means accept the default */
149          n_return = remaining;
150       }
151    } else {
152       n_return = batch_size;
153    }
154 
155    if (n_return < INT32_MIN) {
156       return INT32_MIN;
157    } else if (n_return > INT32_MAX) {
158       return INT32_MAX;
159    } else {
160       return (int32_t) n_return;
161    }
162 }
163 
164 
165 void
_mongoc_set_cursor_ns(mongoc_cursor_t * cursor,const char * ns,uint32_t nslen)166 _mongoc_set_cursor_ns (mongoc_cursor_t *cursor, const char *ns, uint32_t nslen)
167 {
168    const char *dot;
169 
170    bson_strncpy (cursor->ns, ns, sizeof cursor->ns);
171    cursor->nslen = BSON_MIN (nslen, sizeof cursor->ns);
172    dot = strstr (cursor->ns, ".");
173 
174    if (dot) {
175       cursor->dblen = (uint32_t) (dot - cursor->ns);
176    } else {
177       /* a database name with no collection name */
178       cursor->dblen = cursor->nslen;
179    }
180 }
181 
182 
183 /* return first key beginning with $, or NULL. precondition: bson is valid. */
184 static const char *
_first_dollar_field(const bson_t * bson)185 _first_dollar_field (const bson_t *bson)
186 {
187    bson_iter_t iter;
188    const char *key;
189 
190    BSON_ASSERT (bson_iter_init (&iter, bson));
191    while (bson_iter_next (&iter)) {
192       key = bson_iter_key (&iter);
193 
194       if (key[0] == '$') {
195          return key;
196       }
197    }
198 
199    return NULL;
200 }
201 
202 
203 #define MARK_FAILED(c)          \
204    do {                         \
205       (c)->done = true;         \
206       (c)->end_of_event = true; \
207       (c)->sent = true;         \
208    } while (0)
209 
210 
211 mongoc_cursor_t *
_mongoc_cursor_new_with_opts(mongoc_client_t * client,const char * db_and_collection,bool is_command,const bson_t * filter,const bson_t * opts,const mongoc_read_prefs_t * read_prefs,const mongoc_read_concern_t * read_concern)212 _mongoc_cursor_new_with_opts (mongoc_client_t *client,
213                               const char *db_and_collection,
214                               bool is_command,
215                               const bson_t *filter,
216                               const bson_t *opts,
217                               const mongoc_read_prefs_t *read_prefs,
218                               const mongoc_read_concern_t *read_concern)
219 {
220    mongoc_cursor_t *cursor;
221    mongoc_topology_description_type_t td_type;
222    uint32_t server_id;
223    bson_error_t validate_err;
224    const char *dollar_field;
225 
226    ENTRY;
227 
228    BSON_ASSERT (client);
229 
230    cursor = (mongoc_cursor_t *) bson_malloc0 (sizeof *cursor);
231    cursor->client = client;
232    cursor->is_command = is_command ? 1 : 0;
233 
234    bson_init (&cursor->filter);
235    bson_init (&cursor->opts);
236    bson_init (&cursor->error_doc);
237 
238    if (filter) {
239       if (!bson_validate_with_error (
240              filter, BSON_VALIDATE_EMPTY_KEYS, &validate_err)) {
241          MARK_FAILED (cursor);
242          bson_set_error (&cursor->error,
243                          MONGOC_ERROR_CURSOR,
244                          MONGOC_ERROR_CURSOR_INVALID_CURSOR,
245                          "Invalid filter: %s",
246                          validate_err.message);
247          GOTO (finish);
248       }
249 
250       bson_destroy (&cursor->filter);
251       bson_copy_to (filter, &cursor->filter);
252    }
253 
254    if (opts) {
255       if (!bson_validate_with_error (
256              opts, BSON_VALIDATE_EMPTY_KEYS, &validate_err)) {
257          MARK_FAILED (cursor);
258          bson_set_error (&cursor->error,
259                          MONGOC_ERROR_CURSOR,
260                          MONGOC_ERROR_CURSOR_INVALID_CURSOR,
261                          "Invalid opts: %s",
262                          validate_err.message);
263          GOTO (finish);
264       }
265 
266       dollar_field = _first_dollar_field (opts);
267       if (dollar_field) {
268          MARK_FAILED (cursor);
269          bson_set_error (&cursor->error,
270                          MONGOC_ERROR_CURSOR,
271                          MONGOC_ERROR_CURSOR_INVALID_CURSOR,
272                          "Cannot use $-modifiers in opts: \"%s\"",
273                          dollar_field);
274          GOTO (finish);
275       }
276 
277       bson_copy_to_excluding_noinit (opts, &cursor->opts, "serverId", NULL);
278 
279       /* true if there's a valid serverId or no serverId, false on err */
280       if (!_mongoc_get_server_id_from_opts (opts,
281                                             MONGOC_ERROR_CURSOR,
282                                             MONGOC_ERROR_CURSOR_INVALID_CURSOR,
283                                             &server_id,
284                                             &cursor->error)) {
285          MARK_FAILED (cursor);
286          GOTO (finish);
287       }
288 
289       if (server_id) {
290          mongoc_cursor_set_hint (cursor, server_id);
291       }
292    }
293 
294    cursor->read_prefs = read_prefs
295                            ? mongoc_read_prefs_copy (read_prefs)
296                            : mongoc_read_prefs_new (MONGOC_READ_PRIMARY);
297 
298    cursor->read_concern = read_concern ? mongoc_read_concern_copy (read_concern)
299                                        : mongoc_read_concern_new ();
300 
301    if (db_and_collection) {
302       _mongoc_set_cursor_ns (
303          cursor, db_and_collection, (uint32_t) strlen (db_and_collection));
304    }
305 
306    if (_mongoc_cursor_get_opt_bool (cursor, MONGOC_CURSOR_EXHAUST)) {
307       if (_mongoc_cursor_get_opt_int64 (cursor, MONGOC_CURSOR_LIMIT, 0)) {
308          bson_set_error (&cursor->error,
309                          MONGOC_ERROR_CURSOR,
310                          MONGOC_ERROR_CURSOR_INVALID_CURSOR,
311                          "Cannot specify both 'exhaust' and 'limit'.");
312          MARK_FAILED (cursor);
313          GOTO (finish);
314       }
315 
316       td_type = _mongoc_topology_get_type (client->topology);
317 
318       if (td_type == MONGOC_TOPOLOGY_SHARDED) {
319          bson_set_error (&cursor->error,
320                          MONGOC_ERROR_CURSOR,
321                          MONGOC_ERROR_CURSOR_INVALID_CURSOR,
322                          "Cannot use exhaust cursor with sharded cluster.");
323          MARK_FAILED (cursor);
324          GOTO (finish);
325       }
326    }
327 
328    _mongoc_buffer_init (&cursor->buffer, NULL, 0, NULL, NULL);
329    _mongoc_read_prefs_validate (read_prefs, &cursor->error);
330 
331 finish:
332    mongoc_counter_cursors_active_inc ();
333 
334    RETURN (cursor);
335 }
336 
337 
338 mongoc_cursor_t *
_mongoc_cursor_new(mongoc_client_t * client,const char * db_and_collection,mongoc_query_flags_t qflags,uint32_t skip,int32_t limit,uint32_t batch_size,bool is_command,const bson_t * query,const bson_t * fields,const mongoc_read_prefs_t * read_prefs,const mongoc_read_concern_t * read_concern)339 _mongoc_cursor_new (mongoc_client_t *client,
340                     const char *db_and_collection,
341                     mongoc_query_flags_t qflags,
342                     uint32_t skip,
343                     int32_t limit,
344                     uint32_t batch_size,
345                     bool is_command,
346                     const bson_t *query,
347                     const bson_t *fields,
348                     const mongoc_read_prefs_t *read_prefs,
349                     const mongoc_read_concern_t *read_concern)
350 {
351    bson_t filter;
352    bool has_filter = false;
353    bson_t opts = BSON_INITIALIZER;
354    bool slave_ok = false;
355    const char *key;
356    bson_iter_t iter;
357    const char *opt_key;
358    int len;
359    uint32_t data_len;
360    const uint8_t *data;
361    mongoc_cursor_t *cursor;
362    bson_error_t error = {0};
363 
364    ENTRY;
365 
366    BSON_ASSERT (client);
367 
368    if (query) {
369       if (bson_has_field (query, "$query")) {
370          /* like "{$query: {a: 1}, $orderby: {b: 1}, $otherModifier: true}" */
371          bson_iter_init (&iter, query);
372          while (bson_iter_next (&iter)) {
373             key = bson_iter_key (&iter);
374 
375             if (key[0] != '$') {
376                bson_set_error (&error,
377                                MONGOC_ERROR_CURSOR,
378                                MONGOC_ERROR_CURSOR_INVALID_CURSOR,
379                                "Cannot mix $query with non-dollar field '%s'",
380                                key);
381                GOTO (done);
382             }
383 
384             if (!strcmp (key, "$query")) {
385                /* set "filter" to the incoming document's "$query" */
386                bson_iter_document (&iter, &data_len, &data);
387                bson_init_static (&filter, data, (size_t) data_len);
388                has_filter = true;
389             } else if (_translate_query_opt (key, &opt_key, &len)) {
390                /* "$orderby" becomes "sort", etc., "$unknown" -> "unknown" */
391                bson_append_iter (&opts, opt_key, len, &iter);
392             } else {
393                /* strip leading "$" */
394                bson_append_iter (&opts, key + 1, -1, &iter);
395             }
396          }
397       }
398    }
399 
400    if (!bson_empty0 (fields)) {
401       bson_append_document (
402          &opts, MONGOC_CURSOR_PROJECTION, MONGOC_CURSOR_PROJECTION_LEN, fields);
403    }
404 
405    if (skip) {
406       bson_append_int64 (
407          &opts, MONGOC_CURSOR_SKIP, MONGOC_CURSOR_SKIP_LEN, skip);
408    }
409 
410    if (limit) {
411       bson_append_int64 (
412          &opts, MONGOC_CURSOR_LIMIT, MONGOC_CURSOR_LIMIT_LEN, llabs (limit));
413 
414       if (limit < 0) {
415          bson_append_bool (&opts,
416                            MONGOC_CURSOR_SINGLE_BATCH,
417                            MONGOC_CURSOR_SINGLE_BATCH_LEN,
418                            true);
419       }
420    }
421 
422    if (batch_size) {
423       bson_append_int64 (&opts,
424                          MONGOC_CURSOR_BATCH_SIZE,
425                          MONGOC_CURSOR_BATCH_SIZE_LEN,
426                          batch_size);
427    }
428 
429    if (qflags & MONGOC_QUERY_SLAVE_OK) {
430       slave_ok = true;
431    }
432 
433    if (qflags & MONGOC_QUERY_TAILABLE_CURSOR) {
434       bson_append_bool (
435          &opts, MONGOC_CURSOR_TAILABLE, MONGOC_CURSOR_TAILABLE_LEN, true);
436    }
437 
438    if (qflags & MONGOC_QUERY_OPLOG_REPLAY) {
439       bson_append_bool (&opts,
440                         MONGOC_CURSOR_OPLOG_REPLAY,
441                         MONGOC_CURSOR_OPLOG_REPLAY_LEN,
442                         true);
443    }
444 
445    if (qflags & MONGOC_QUERY_NO_CURSOR_TIMEOUT) {
446       bson_append_bool (&opts,
447                         MONGOC_CURSOR_NO_CURSOR_TIMEOUT,
448                         MONGOC_CURSOR_NO_CURSOR_TIMEOUT_LEN,
449                         true);
450    }
451 
452    if (qflags & MONGOC_QUERY_AWAIT_DATA) {
453       bson_append_bool (
454          &opts, MONGOC_CURSOR_AWAIT_DATA, MONGOC_CURSOR_AWAIT_DATA_LEN, true);
455    }
456 
457    if (qflags & MONGOC_QUERY_EXHAUST) {
458       bson_append_bool (
459          &opts, MONGOC_CURSOR_EXHAUST, MONGOC_CURSOR_EXHAUST_LEN, true);
460    }
461 
462    if (qflags & MONGOC_QUERY_PARTIAL) {
463       bson_append_bool (&opts,
464                         MONGOC_CURSOR_ALLOW_PARTIAL_RESULTS,
465                         MONGOC_CURSOR_ALLOW_PARTIAL_RESULTS_LEN,
466                         true);
467    }
468 
469 done:
470 
471    if (error.domain != 0) {
472       cursor = _mongoc_cursor_new_with_opts (
473          client, db_and_collection, is_command, NULL, NULL, NULL, NULL);
474 
475       MARK_FAILED (cursor);
476       memcpy (&cursor->error, &error, sizeof (bson_error_t));
477    } else {
478       cursor = _mongoc_cursor_new_with_opts (client,
479                                              db_and_collection,
480                                              is_command,
481                                              has_filter ? &filter : query,
482                                              &opts,
483                                              read_prefs,
484                                              read_concern);
485 
486       if (slave_ok) {
487          cursor->slave_ok = true;
488       }
489    }
490 
491    if (has_filter) {
492       bson_destroy (&filter);
493    }
494 
495    bson_destroy (&opts);
496 
497    RETURN (cursor);
498 }
499 
500 
501 void
mongoc_cursor_destroy(mongoc_cursor_t * cursor)502 mongoc_cursor_destroy (mongoc_cursor_t *cursor)
503 {
504    ENTRY;
505 
506    BSON_ASSERT (cursor);
507 
508    if (cursor->iface.destroy) {
509       cursor->iface.destroy (cursor);
510    } else {
511       _mongoc_cursor_destroy (cursor);
512    }
513 
514    EXIT;
515 }
516 
517 void
_mongoc_cursor_destroy(mongoc_cursor_t * cursor)518 _mongoc_cursor_destroy (mongoc_cursor_t *cursor)
519 {
520    char db[MONGOC_NAMESPACE_MAX];
521    ENTRY;
522 
523    BSON_ASSERT (cursor);
524 
525    if (cursor->in_exhaust) {
526       cursor->client->in_exhaust = false;
527       if (!cursor->done) {
528          /* The only way to stop an exhaust cursor is to kill the connection */
529          mongoc_cluster_disconnect_node (&cursor->client->cluster,
530                                          cursor->server_id, false, NULL);
531       }
532    } else if (cursor->rpc.reply.cursor_id) {
533       bson_strncpy (db, cursor->ns, cursor->dblen + 1);
534 
535       _mongoc_client_kill_cursor (cursor->client,
536                                   cursor->server_id,
537                                   cursor->rpc.reply.cursor_id,
538                                   cursor->operation_id,
539                                   db,
540                                   cursor->ns + cursor->dblen + 1);
541    }
542 
543    if (cursor->reader) {
544       bson_reader_destroy (cursor->reader);
545       cursor->reader = NULL;
546    }
547 
548    _mongoc_buffer_destroy (&cursor->buffer);
549    mongoc_read_prefs_destroy (cursor->read_prefs);
550    mongoc_read_concern_destroy (cursor->read_concern);
551    mongoc_write_concern_destroy (cursor->write_concern);
552 
553    bson_destroy (&cursor->filter);
554    bson_destroy (&cursor->opts);
555    bson_destroy (&cursor->error_doc);
556    bson_free (cursor);
557 
558    mongoc_counter_cursors_active_dec ();
559    mongoc_counter_cursors_disposed_inc ();
560 
561    EXIT;
562 }
563 
564 
565 mongoc_server_stream_t *
_mongoc_cursor_fetch_stream(mongoc_cursor_t * cursor)566 _mongoc_cursor_fetch_stream (mongoc_cursor_t *cursor)
567 {
568    mongoc_server_stream_t *server_stream;
569 
570    ENTRY;
571 
572    if (cursor->server_id) {
573       server_stream =
574          mongoc_cluster_stream_for_server (&cursor->client->cluster,
575                                            cursor->server_id,
576                                            true /* reconnect_ok */,
577                                            &cursor->error);
578    } else {
579       server_stream = mongoc_cluster_stream_for_reads (
580          &cursor->client->cluster, cursor->read_prefs, &cursor->error);
581 
582       if (server_stream) {
583          cursor->server_id = server_stream->sd->id;
584       }
585    }
586 
587    RETURN (server_stream);
588 }
589 
590 
591 bool
_use_find_command(const mongoc_cursor_t * cursor,const mongoc_server_stream_t * server_stream)592 _use_find_command (const mongoc_cursor_t *cursor,
593                    const mongoc_server_stream_t *server_stream)
594 {
595    /* Find, getMore And killCursors Commands Spec: "the find command cannot be
596     * used to execute other commands" and "the find command does not support the
597     * exhaust flag."
598     */
599    return server_stream->sd->max_wire_version >= WIRE_VERSION_FIND_CMD &&
600           !cursor->is_command &&
601           !_mongoc_cursor_get_opt_bool (cursor, MONGOC_CURSOR_EXHAUST);
602 }
603 
604 
605 bool
_use_getmore_command(const mongoc_cursor_t * cursor,const mongoc_server_stream_t * server_stream)606 _use_getmore_command (const mongoc_cursor_t *cursor,
607                       const mongoc_server_stream_t *server_stream)
608 {
609    return server_stream->sd->max_wire_version >= WIRE_VERSION_FIND_CMD &&
610           !_mongoc_cursor_get_opt_bool (cursor, MONGOC_CURSOR_EXHAUST);
611 }
612 
613 
614 static const bson_t *
_mongoc_cursor_initial_query(mongoc_cursor_t * cursor)615 _mongoc_cursor_initial_query (mongoc_cursor_t *cursor)
616 {
617    mongoc_server_stream_t *server_stream;
618    const bson_t *b = NULL;
619 
620    ENTRY;
621 
622    BSON_ASSERT (cursor);
623 
624    server_stream = _mongoc_cursor_fetch_stream (cursor);
625 
626    if (!server_stream) {
627       GOTO (done);
628    }
629 
630    if (_use_find_command (cursor, server_stream)) {
631       b = _mongoc_cursor_find_command (cursor, server_stream);
632    } else {
633       /* When the user explicitly provides a readConcern -- but the server
634        * doesn't support readConcern, we must error:
635        * https://github.com/mongodb/specifications/blob/master/source/read-write-concern/read-write-concern.rst#errors-1
636        */
637       if (cursor->read_concern->level != NULL &&
638           server_stream->sd->max_wire_version < WIRE_VERSION_READ_CONCERN) {
639          bson_set_error (&cursor->error,
640                          MONGOC_ERROR_COMMAND,
641                          MONGOC_ERROR_PROTOCOL_BAD_WIRE_VERSION,
642                          "The selected server does not support readConcern");
643       } else {
644          b = _mongoc_cursor_op_query (cursor, server_stream);
645       }
646    }
647 
648 done:
649    /* no-op if server_stream is NULL */
650    mongoc_server_stream_cleanup (server_stream);
651 
652    if (!b) {
653       cursor->done = true;
654    }
655 
656    RETURN (b);
657 }
658 
659 
660 static bool
_mongoc_cursor_monitor_legacy_query(mongoc_cursor_t * cursor,mongoc_server_stream_t * server_stream,const char * cmd_name)661 _mongoc_cursor_monitor_legacy_query (mongoc_cursor_t *cursor,
662                                      mongoc_server_stream_t *server_stream,
663                                      const char *cmd_name)
664 {
665    bson_t doc;
666    mongoc_client_t *client;
667    mongoc_apm_command_started_t event;
668    char db[MONGOC_NAMESPACE_MAX];
669 
670    ENTRY;
671 
672    client = cursor->client;
673    if (!client->apm_callbacks.started) {
674       /* successful */
675       RETURN (true);
676    }
677 
678    bson_init (&doc);
679    bson_strncpy (db, cursor->ns, cursor->dblen + 1);
680 
681    if (!cursor->is_command) {
682       /* simulate a MongoDB 3.2+ "find" command */
683       if (!_mongoc_cursor_prepare_find_command (cursor, &doc, server_stream)) {
684          /* cursor->error is set */
685          bson_destroy (&doc);
686          RETURN (false);
687       }
688    }
689 
690    mongoc_apm_command_started_init (&event,
691                                     cursor->is_command ? &cursor->filter : &doc,
692                                     db,
693                                     cmd_name,
694                                     client->cluster.request_id,
695                                     cursor->operation_id,
696                                     &server_stream->sd->host,
697                                     server_stream->sd->id,
698                                     client->apm_context);
699 
700    client->apm_callbacks.started (&event);
701    mongoc_apm_command_started_cleanup (&event);
702    bson_destroy (&doc);
703 
704    RETURN (true);
705 }
706 
707 
708 /* append array of docs from current cursor batch */
709 static void
_mongoc_cursor_append_docs_array(mongoc_cursor_t * cursor,bson_t * docs)710 _mongoc_cursor_append_docs_array (mongoc_cursor_t *cursor, bson_t *docs)
711 {
712    bool eof = false;
713    char str[16];
714    const char *key;
715    uint32_t i = 0;
716    size_t keylen;
717    const bson_t *doc;
718 
719    while ((doc = bson_reader_read (cursor->reader, &eof))) {
720       keylen = bson_uint32_to_string (i, &key, str, sizeof str);
721       bson_append_document (docs, key, (int) keylen, doc);
722    }
723 
724    bson_reader_reset (cursor->reader);
725 }
726 
727 
728 static void
_mongoc_cursor_monitor_succeeded(mongoc_cursor_t * cursor,int64_t duration,bool first_batch,mongoc_server_stream_t * stream,const char * cmd_name)729 _mongoc_cursor_monitor_succeeded (mongoc_cursor_t *cursor,
730                                   int64_t duration,
731                                   bool first_batch,
732                                   mongoc_server_stream_t *stream,
733                                   const char *cmd_name)
734 {
735    mongoc_apm_command_succeeded_t event;
736    mongoc_client_t *client;
737    bson_t reply;
738    bson_t reply_cursor;
739 
740    ENTRY;
741 
742    client = cursor->client;
743 
744    if (!client->apm_callbacks.succeeded) {
745       EXIT;
746    }
747 
748    if (cursor->is_command) {
749       /* cursor is from mongoc_client_command. we're in mongoc_cursor_next. */
750       if (!_mongoc_rpc_reply_get_first (&cursor->rpc.reply, &reply)) {
751          MONGOC_ERROR ("_mongoc_cursor_monitor_succeeded can't parse reply");
752          EXIT;
753       }
754    } else {
755       bson_t docs_array;
756 
757       /* fake reply to find/getMore command:
758        * {ok: 1, cursor: {id: 17, ns: "...", first/nextBatch: [ ... docs ... ]}}
759        */
760       bson_init (&docs_array);
761       _mongoc_cursor_append_docs_array (cursor, &docs_array);
762 
763       bson_init (&reply);
764       bson_append_int32 (&reply, "ok", 2, 1);
765       bson_append_document_begin (&reply, "cursor", 6, &reply_cursor);
766       bson_append_int64 (&reply_cursor, "id", 2, mongoc_cursor_get_id (cursor));
767       bson_append_utf8 (&reply_cursor, "ns", 2, cursor->ns, cursor->nslen);
768       bson_append_array (&reply_cursor,
769                          first_batch ? "firstBatch" : "nextBatch",
770                          first_batch ? 10 : 9,
771                          &docs_array);
772       bson_append_document_end (&reply, &reply_cursor);
773       bson_destroy (&docs_array);
774    }
775 
776    mongoc_apm_command_succeeded_init (&event,
777                                       duration,
778                                       &reply,
779                                       cmd_name,
780                                       client->cluster.request_id,
781                                       cursor->operation_id,
782                                       &stream->sd->host,
783                                       stream->sd->id,
784                                       client->apm_context);
785 
786    client->apm_callbacks.succeeded (&event);
787 
788    mongoc_apm_command_succeeded_cleanup (&event);
789    bson_destroy (&reply);
790 
791    EXIT;
792 }
793 
794 
795 static void
_mongoc_cursor_monitor_failed(mongoc_cursor_t * cursor,int64_t duration,mongoc_server_stream_t * stream,const char * cmd_name)796 _mongoc_cursor_monitor_failed (mongoc_cursor_t *cursor,
797                                int64_t duration,
798                                mongoc_server_stream_t *stream,
799                                const char *cmd_name)
800 {
801    mongoc_apm_command_failed_t event;
802    mongoc_client_t *client;
803 
804    ENTRY;
805 
806    client = cursor->client;
807 
808    if (!client->apm_callbacks.failed) {
809       EXIT;
810    }
811 
812    mongoc_apm_command_failed_init (&event,
813                                    duration,
814                                    cmd_name,
815                                    &cursor->error,
816                                    client->cluster.request_id,
817                                    cursor->operation_id,
818                                    &stream->sd->host,
819                                    stream->sd->id,
820                                    client->apm_context);
821 
822    client->apm_callbacks.failed (&event);
823 
824    mongoc_apm_command_failed_cleanup (&event);
825 
826    EXIT;
827 }
828 
829 
830 #define OPT_CHECK(_type)                                         \
831    do {                                                          \
832       if (!BSON_ITER_HOLDS_##_type (&iter)) {                    \
833          bson_set_error (&cursor->error,                         \
834                          MONGOC_ERROR_COMMAND,                   \
835                          MONGOC_ERROR_COMMAND_INVALID_ARG,       \
836                          "invalid option %s, should be type %s", \
837                          key,                                    \
838                          #_type);                                \
839          return NULL;                                            \
840       }                                                          \
841    } while (false)
842 
843 
844 #define OPT_CHECK_INT()                                          \
845    do {                                                          \
846       if (!BSON_ITER_HOLDS_INT (&iter)) {                        \
847          bson_set_error (&cursor->error,                         \
848                          MONGOC_ERROR_COMMAND,                   \
849                          MONGOC_ERROR_COMMAND_INVALID_ARG,       \
850                          "invalid option %s, should be integer", \
851                          key);                                   \
852          return NULL;                                            \
853       }                                                          \
854    } while (false)
855 
856 
857 #define OPT_ERR(_msg)                                   \
858    do {                                                 \
859       bson_set_error (&cursor->error,                   \
860                       MONGOC_ERROR_COMMAND,             \
861                       MONGOC_ERROR_COMMAND_INVALID_ARG, \
862                       _msg);                            \
863       return NULL;                                      \
864    } while (false)
865 
866 
867 #define OPT_BSON_ERR(_msg)                                                    \
868    do {                                                                       \
869       bson_set_error (                                                        \
870          &cursor->error, MONGOC_ERROR_BSON, MONGOC_ERROR_BSON_INVALID, _msg); \
871       return NULL;                                                            \
872    } while (false)
873 
874 
875 #define OPT_FLAG(_flag)                \
876    do {                                \
877       OPT_CHECK (BOOL);                \
878       if (bson_iter_as_bool (&iter)) { \
879          *flags |= _flag;              \
880       }                                \
881    } while (false)
882 
883 
884 #define PUSH_DOLLAR_QUERY()                                          \
885    do {                                                              \
886       if (!pushed_dollar_query) {                                    \
887          pushed_dollar_query = true;                                 \
888          bson_append_document (query, "$query", 6, &cursor->filter); \
889       }                                                              \
890    } while (false)
891 
892 
893 #define OPT_SUBDOCUMENT(_opt_name, _legacy_name)                           \
894    do {                                                                    \
895       OPT_CHECK (DOCUMENT);                                                \
896       bson_iter_document (&iter, &len, &data);                             \
897       if (!bson_init_static (&subdocument, data, (size_t) len)) {          \
898          OPT_BSON_ERR ("Invalid '" #_opt_name "' subdocument in 'opts'."); \
899       }                                                                    \
900       BSON_APPEND_DOCUMENT (query, "$" #_legacy_name, &subdocument);       \
901    } while (false)
902 
903 #define ADD_FLAG(_flags, _value)                                   \
904    do {                                                            \
905       if (!BSON_ITER_HOLDS_BOOL (&iter)) {                         \
906          bson_set_error (&cursor->error,                           \
907                          MONGOC_ERROR_COMMAND,                     \
908                          MONGOC_ERROR_COMMAND_INVALID_ARG,         \
909                          "invalid option %s, should be type bool", \
910                          key);                                     \
911          return false;                                             \
912       }                                                            \
913       if (bson_iter_as_bool (&iter)) {                             \
914          *_flags |= _value;                                        \
915       }                                                            \
916    } while (false);
917 
918 static bool
_mongoc_cursor_flags(mongoc_cursor_t * cursor,mongoc_server_stream_t * stream,mongoc_query_flags_t * flags)919 _mongoc_cursor_flags (mongoc_cursor_t *cursor,
920                       mongoc_server_stream_t *stream,
921                       mongoc_query_flags_t *flags /* OUT */)
922 {
923    bson_iter_t iter;
924    const char *key;
925 
926    *flags = MONGOC_QUERY_NONE;
927 
928    if (!bson_iter_init (&iter, &cursor->opts)) {
929       bson_set_error (&cursor->error,
930                       MONGOC_ERROR_BSON,
931                       MONGOC_ERROR_BSON_INVALID,
932                       "Invalid 'opts' parameter.");
933       return false;
934    }
935 
936    while (bson_iter_next (&iter)) {
937       key = bson_iter_key (&iter);
938 
939       if (!strcmp (key, MONGOC_CURSOR_ALLOW_PARTIAL_RESULTS)) {
940          ADD_FLAG (flags, MONGOC_QUERY_PARTIAL);
941       } else if (!strcmp (key, MONGOC_CURSOR_AWAIT_DATA)) {
942          ADD_FLAG (flags, MONGOC_QUERY_AWAIT_DATA);
943       } else if (!strcmp (key, MONGOC_CURSOR_EXHAUST)) {
944          ADD_FLAG (flags, MONGOC_QUERY_EXHAUST);
945       } else if (!strcmp (key, MONGOC_CURSOR_NO_CURSOR_TIMEOUT)) {
946          ADD_FLAG (flags, MONGOC_QUERY_NO_CURSOR_TIMEOUT);
947       } else if (!strcmp (key, MONGOC_CURSOR_OPLOG_REPLAY)) {
948          ADD_FLAG (flags, MONGOC_QUERY_OPLOG_REPLAY);
949       } else if (!strcmp (key, MONGOC_CURSOR_TAILABLE)) {
950          ADD_FLAG (flags, MONGOC_QUERY_TAILABLE_CURSOR);
951       }
952    }
953 
954    if (cursor->slave_ok) {
955       *flags |= MONGOC_QUERY_SLAVE_OK;
956    } else if (cursor->server_id_set &&
957               (stream->topology_type == MONGOC_TOPOLOGY_RS_WITH_PRIMARY ||
958                stream->topology_type == MONGOC_TOPOLOGY_RS_NO_PRIMARY) &&
959               stream->sd->type != MONGOC_SERVER_RS_PRIMARY) {
960       *flags |= MONGOC_QUERY_SLAVE_OK;
961    }
962 
963    return true;
964 }
965 
966 
967 static bson_t *
_mongoc_cursor_parse_opts_for_op_query(mongoc_cursor_t * cursor,mongoc_server_stream_t * stream,bson_t * query,bson_t * fields,mongoc_query_flags_t * flags,int32_t * skip)968 _mongoc_cursor_parse_opts_for_op_query (mongoc_cursor_t *cursor,
969                                         mongoc_server_stream_t *stream,
970                                         bson_t *query /* OUT */,
971                                         bson_t *fields /* OUT */,
972                                         mongoc_query_flags_t *flags /* OUT */,
973                                         int32_t *skip /* OUT */)
974 {
975    bool pushed_dollar_query;
976    bson_iter_t iter;
977    uint32_t len;
978    const uint8_t *data;
979    bson_t subdocument;
980    const char *key;
981    char *dollar_modifier;
982 
983    *flags = MONGOC_QUERY_NONE;
984    *skip = 0;
985 
986    /* assume we'll send filter straight to server, like "{a: 1}". if we find an
987     * opt we must add, like "sort", we push the query like "$query: {a: 1}",
988     * then add a query modifier for the option, in this example "$orderby".
989     */
990    pushed_dollar_query = false;
991 
992    if (!bson_iter_init (&iter, &cursor->opts)) {
993       OPT_BSON_ERR ("Invalid 'opts' parameter.");
994    }
995 
996    while (bson_iter_next (&iter)) {
997       key = bson_iter_key (&iter);
998 
999       /* most common options first */
1000       if (!strcmp (key, MONGOC_CURSOR_PROJECTION)) {
1001          OPT_CHECK (DOCUMENT);
1002          bson_iter_document (&iter, &len, &data);
1003          if (!bson_init_static (&subdocument, data, (size_t) len)) {
1004             OPT_BSON_ERR ("Invalid 'projection' subdocument in 'opts'.");
1005          }
1006 
1007          bson_copy_to (&subdocument, fields);
1008       } else if (!strcmp (key, MONGOC_CURSOR_SORT)) {
1009          PUSH_DOLLAR_QUERY ();
1010          OPT_SUBDOCUMENT (sort, orderby);
1011       } else if (!strcmp (key, MONGOC_CURSOR_SKIP)) {
1012          OPT_CHECK_INT ();
1013          *skip = (int32_t) bson_iter_as_int64 (&iter);
1014       }
1015       /* the rest of the options, alphabetically */
1016       else if (!strcmp (key, MONGOC_CURSOR_ALLOW_PARTIAL_RESULTS)) {
1017          OPT_FLAG (MONGOC_QUERY_PARTIAL);
1018       } else if (!strcmp (key, MONGOC_CURSOR_AWAIT_DATA)) {
1019          OPT_FLAG (MONGOC_QUERY_AWAIT_DATA);
1020       } else if (!strcmp (key, MONGOC_CURSOR_COMMENT)) {
1021          OPT_CHECK (UTF8);
1022          PUSH_DOLLAR_QUERY ();
1023          BSON_APPEND_UTF8 (query, "$comment", bson_iter_utf8 (&iter, NULL));
1024       } else if (!strcmp (key, MONGOC_CURSOR_HINT)) {
1025          if (BSON_ITER_HOLDS_UTF8 (&iter)) {
1026             PUSH_DOLLAR_QUERY ();
1027             BSON_APPEND_UTF8 (query, "$hint", bson_iter_utf8 (&iter, NULL));
1028          } else if (BSON_ITER_HOLDS_DOCUMENT (&iter)) {
1029             PUSH_DOLLAR_QUERY ();
1030             OPT_SUBDOCUMENT (hint, hint);
1031          } else {
1032             OPT_ERR ("Wrong type for 'hint' field in 'opts'.");
1033          }
1034       } else if (!strcmp (key, MONGOC_CURSOR_MAX)) {
1035          PUSH_DOLLAR_QUERY ();
1036          OPT_SUBDOCUMENT (max, max);
1037       } else if (!strcmp (key, MONGOC_CURSOR_MAX_SCAN)) {
1038          OPT_CHECK_INT ();
1039          PUSH_DOLLAR_QUERY ();
1040          BSON_APPEND_INT64 (query, "$maxScan", bson_iter_as_int64 (&iter));
1041       } else if (!strcmp (key, MONGOC_CURSOR_MAX_TIME_MS)) {
1042          OPT_CHECK_INT ();
1043          PUSH_DOLLAR_QUERY ();
1044          BSON_APPEND_INT64 (query, "$maxTimeMS", bson_iter_as_int64 (&iter));
1045       } else if (!strcmp (key, MONGOC_CURSOR_MIN)) {
1046          PUSH_DOLLAR_QUERY ();
1047          OPT_SUBDOCUMENT (min, min);
1048       } else if (!strcmp (key, MONGOC_CURSOR_READ_CONCERN)) {
1049          OPT_ERR ("Set readConcern on client, database, or collection,"
1050                   " not in a query.");
1051       } else if (!strcmp (key, MONGOC_CURSOR_RETURN_KEY)) {
1052          OPT_CHECK (BOOL);
1053          PUSH_DOLLAR_QUERY ();
1054          BSON_APPEND_BOOL (query, "$returnKey", bson_iter_as_bool (&iter));
1055       } else if (!strcmp (key, MONGOC_CURSOR_SHOW_RECORD_ID)) {
1056          OPT_CHECK (BOOL);
1057          PUSH_DOLLAR_QUERY ();
1058          BSON_APPEND_BOOL (query, "$showDiskLoc", bson_iter_as_bool (&iter));
1059       } else if (!strcmp (key, MONGOC_CURSOR_SNAPSHOT)) {
1060          OPT_CHECK (BOOL);
1061          PUSH_DOLLAR_QUERY ();
1062          BSON_APPEND_BOOL (query, "$snapshot", bson_iter_as_bool (&iter));
1063       } else if (!strcmp (key, MONGOC_CURSOR_COLLATION)) {
1064          bson_set_error (&cursor->error,
1065                          MONGOC_ERROR_CURSOR,
1066                          MONGOC_ERROR_PROTOCOL_BAD_WIRE_VERSION,
1067                          "Collation is not supported by this server");
1068          return NULL;
1069       }
1070       /* singleBatch limit and batchSize are handled in _mongoc_n_return,
1071        * exhaust noCursorTimeout oplogReplay tailable in _mongoc_cursor_flags
1072        * maxAwaitTimeMS is handled in _mongoc_cursor_prepare_getmore_command
1073        */
1074       else if (strcmp (key, MONGOC_CURSOR_SINGLE_BATCH) &&
1075                strcmp (key, MONGOC_CURSOR_LIMIT) &&
1076                strcmp (key, MONGOC_CURSOR_BATCH_SIZE) &&
1077                strcmp (key, MONGOC_CURSOR_EXHAUST) &&
1078                strcmp (key, MONGOC_CURSOR_NO_CURSOR_TIMEOUT) &&
1079                strcmp (key, MONGOC_CURSOR_OPLOG_REPLAY) &&
1080                strcmp (key, MONGOC_CURSOR_TAILABLE) &&
1081                strcmp (key, MONGOC_CURSOR_MAX_AWAIT_TIME_MS)) {
1082          /* pass unrecognized options to server, prefixed with $ */
1083          PUSH_DOLLAR_QUERY ();
1084          dollar_modifier = bson_strdup_printf ("$%s", key);
1085          bson_append_iter (query, dollar_modifier, -1, &iter);
1086          bson_free (dollar_modifier);
1087       }
1088    }
1089 
1090    if (!_mongoc_cursor_flags (cursor, stream, flags)) {
1091       /* cursor->error is set */
1092       return NULL;
1093    }
1094 
1095    return pushed_dollar_query ? query : &cursor->filter;
1096 }
1097 
1098 
1099 #undef OPT_CHECK
1100 #undef OPT_ERR
1101 #undef OPT_BSON_ERR
1102 #undef OPT_FLAG
1103 #undef OPT_SUBDOCUMENT
1104 
1105 
1106 static const bson_t *
_mongoc_cursor_op_query(mongoc_cursor_t * cursor,mongoc_server_stream_t * server_stream)1107 _mongoc_cursor_op_query (mongoc_cursor_t *cursor,
1108                          mongoc_server_stream_t *server_stream)
1109 {
1110    int64_t started;
1111    uint32_t request_id;
1112    mongoc_rpc_t rpc;
1113    const char *cmd_name; /* for command monitoring */
1114    const bson_t *query_ptr;
1115    bson_t query = BSON_INITIALIZER;
1116    bson_t fields = BSON_INITIALIZER;
1117    mongoc_query_flags_t flags;
1118    mongoc_apply_read_prefs_result_t result = READ_PREFS_RESULT_INIT;
1119    const bson_t *ret = NULL;
1120    bool succeeded = false;
1121 
1122    ENTRY;
1123 
1124    started = bson_get_monotonic_time ();
1125 
1126    cursor->sent = true;
1127    cursor->operation_id = ++cursor->client->cluster.operation_id;
1128 
1129    request_id = ++cursor->client->cluster.request_id;
1130 
1131    rpc.header.msg_len = 0;
1132    rpc.header.request_id = request_id;
1133    rpc.header.response_to = 0;
1134    rpc.header.opcode = MONGOC_OPCODE_QUERY;
1135    rpc.query.flags = MONGOC_QUERY_NONE;
1136    rpc.query.collection = cursor->ns;
1137    rpc.query.skip = 0;
1138    rpc.query.n_return = 0;
1139    rpc.query.fields = NULL;
1140 
1141    if (cursor->is_command) {
1142       /* "filter" isn't a query, it's like {commandName: ... }*/
1143       cmd_name = _mongoc_get_command_name (&cursor->filter);
1144       BSON_ASSERT (cmd_name);
1145    } else {
1146       cmd_name = "find";
1147    }
1148 
1149    query_ptr = _mongoc_cursor_parse_opts_for_op_query (
1150       cursor, server_stream, &query, &fields, &flags, &rpc.query.skip);
1151 
1152    if (!query_ptr) {
1153       /* invalid opts. cursor->error is set */
1154       GOTO (done);
1155    }
1156 
1157    apply_read_preferences (
1158       cursor->read_prefs, server_stream, query_ptr, flags, &result);
1159 
1160    rpc.query.query = bson_get_data (result.query_with_read_prefs);
1161    rpc.query.flags = result.flags;
1162    rpc.query.n_return = _mongoc_n_return (cursor);
1163    if (!bson_empty (&fields)) {
1164       rpc.query.fields = bson_get_data (&fields);
1165    }
1166 
1167    if (!_mongoc_cursor_monitor_legacy_query (cursor, server_stream, cmd_name)) {
1168       GOTO (done);
1169    }
1170 
1171    if (!mongoc_cluster_sendv_to_server (&cursor->client->cluster,
1172                                         &rpc,
1173                                         server_stream,
1174                                         NULL,
1175                                         &cursor->error)) {
1176       GOTO (done);
1177    }
1178 
1179    _mongoc_buffer_clear (&cursor->buffer, false);
1180 
1181    if (!_mongoc_client_recv (cursor->client,
1182                              &cursor->rpc,
1183                              &cursor->buffer,
1184                              server_stream,
1185                              &cursor->error)) {
1186       GOTO (done);
1187    }
1188 
1189    if (cursor->rpc.header.opcode != MONGOC_OPCODE_REPLY) {
1190       bson_set_error (&cursor->error,
1191                       MONGOC_ERROR_PROTOCOL,
1192                       MONGOC_ERROR_PROTOCOL_INVALID_REPLY,
1193                       "Invalid opcode. Expected %d, got %d.",
1194                       MONGOC_OPCODE_REPLY,
1195                       cursor->rpc.header.opcode);
1196       GOTO (done);
1197    }
1198 
1199    if (cursor->rpc.header.response_to != request_id) {
1200       bson_set_error (&cursor->error,
1201                       MONGOC_ERROR_PROTOCOL,
1202                       MONGOC_ERROR_PROTOCOL_INVALID_REPLY,
1203                       "Invalid response_to for query. Expected %d, got %d.",
1204                       request_id,
1205                       cursor->rpc.header.response_to);
1206       GOTO (done);
1207    }
1208 
1209    if (!_mongoc_rpc_check_ok (&cursor->rpc,
1210                               (bool) cursor->is_command,
1211                               cursor->client->error_api_version,
1212                               &cursor->error,
1213                               &cursor->error_doc)) {
1214       GOTO (done);
1215    }
1216 
1217    if (cursor->reader) {
1218       bson_reader_destroy (cursor->reader);
1219    }
1220 
1221    cursor->reader = bson_reader_new_from_data (
1222       cursor->rpc.reply.documents, (size_t) cursor->rpc.reply.documents_len);
1223 
1224    if (_mongoc_cursor_get_opt_bool (cursor, MONGOC_CURSOR_EXHAUST)) {
1225       cursor->in_exhaust = true;
1226       cursor->client->in_exhaust = true;
1227    }
1228 
1229    _mongoc_cursor_monitor_succeeded (cursor,
1230                                      bson_get_monotonic_time () - started,
1231                                      true, /* first_batch */
1232                                      server_stream,
1233                                      cmd_name);
1234 
1235    cursor->done = false;
1236    cursor->end_of_event = false;
1237    succeeded = true;
1238 
1239    _mongoc_read_from_buffer (cursor, &ret);
1240 
1241 done:
1242    if (!succeeded) {
1243       _mongoc_cursor_monitor_failed (
1244          cursor, bson_get_monotonic_time () - started, server_stream, cmd_name);
1245    }
1246 
1247    apply_read_prefs_result_cleanup (&result);
1248    bson_destroy (&query);
1249    bson_destroy (&fields);
1250 
1251    if (!ret) {
1252       cursor->done = true;
1253    }
1254 
1255    RETURN (ret);
1256 }
1257 
1258 
1259 bool
_mongoc_cursor_run_command(mongoc_cursor_t * cursor,const bson_t * command,bson_t * reply)1260 _mongoc_cursor_run_command (mongoc_cursor_t *cursor,
1261                             const bson_t *command,
1262                             bson_t *reply)
1263 {
1264    mongoc_cluster_t *cluster;
1265    mongoc_server_stream_t *server_stream;
1266    mongoc_cmd_parts_t parts;
1267    char db[MONGOC_NAMESPACE_MAX];
1268    bool ret = false;
1269 
1270    ENTRY;
1271 
1272    cluster = &cursor->client->cluster;
1273    mongoc_cmd_parts_init (&parts, db, MONGOC_QUERY_NONE, command);
1274    parts.read_prefs = cursor->read_prefs;
1275    parts.assembled.operation_id = cursor->operation_id;
1276    server_stream = _mongoc_cursor_fetch_stream (cursor);
1277 
1278    if (!server_stream) {
1279       GOTO (done);
1280    }
1281 
1282    bson_strncpy (db, cursor->ns, cursor->dblen + 1);
1283    parts.assembled.db_name = db;
1284 
1285    if (!_mongoc_cursor_flags (cursor, server_stream, &parts.user_query_flags)) {
1286       GOTO (done);
1287    }
1288 
1289    if (cursor->write_concern &&
1290        !mongoc_write_concern_is_default (cursor->write_concern) &&
1291        server_stream->sd->max_wire_version >= WIRE_VERSION_CMD_WRITE_CONCERN) {
1292       mongoc_write_concern_append (cursor->write_concern, &parts.extra);
1293    }
1294 
1295    ret = mongoc_cluster_run_command_monitored (
1296       cluster, &parts, server_stream, reply, &cursor->error);
1297 
1298    /* Read and Write Concern Spec: "Drivers SHOULD parse server replies for a
1299     * "writeConcernError" field and report the error only in command-specific
1300     * helper methods that take a separate write concern parameter or an options
1301     * parameter that may contain a write concern option.
1302     *
1303     * Only command helpers with names like "_with_write_concern" can create
1304     * cursors with a non-NULL write_concern field.
1305     */
1306    if (ret && cursor->write_concern) {
1307       ret = !_mongoc_parse_wc_err (reply, &cursor->error);
1308    }
1309 
1310 done:
1311    mongoc_server_stream_cleanup (server_stream);
1312    mongoc_cmd_parts_cleanup (&parts);
1313 
1314    return ret;
1315 }
1316 
1317 
1318 static bool
_translate_query_opt(const char * query_field,const char ** cmd_field,int * len)1319 _translate_query_opt (const char *query_field, const char **cmd_field, int *len)
1320 {
1321    if (query_field[0] != '$') {
1322       *cmd_field = query_field;
1323       *len = -1;
1324       return true;
1325    }
1326 
1327    /* strip the leading '$' */
1328    query_field++;
1329 
1330    if (!strcmp (MONGOC_CURSOR_ORDERBY, query_field)) {
1331       *cmd_field = MONGOC_CURSOR_SORT;
1332       *len = MONGOC_CURSOR_SORT_LEN;
1333    } else if (!strcmp (MONGOC_CURSOR_SHOW_DISK_LOC,
1334                        query_field)) { /* <= MongoDb 3.0 */
1335       *cmd_field = MONGOC_CURSOR_SHOW_RECORD_ID;
1336       *len = MONGOC_CURSOR_SHOW_RECORD_ID_LEN;
1337    } else if (!strcmp (MONGOC_CURSOR_HINT, query_field)) {
1338       *cmd_field = MONGOC_CURSOR_HINT;
1339       *len = MONGOC_CURSOR_HINT_LEN;
1340    } else if (!strcmp (MONGOC_CURSOR_COMMENT, query_field)) {
1341       *cmd_field = MONGOC_CURSOR_COMMENT;
1342       *len = MONGOC_CURSOR_COMMENT_LEN;
1343    } else if (!strcmp (MONGOC_CURSOR_MAX_SCAN, query_field)) {
1344       *cmd_field = MONGOC_CURSOR_MAX_SCAN;
1345       *len = MONGOC_CURSOR_MAX_SCAN_LEN;
1346    } else if (!strcmp (MONGOC_CURSOR_MAX_TIME_MS, query_field)) {
1347       *cmd_field = MONGOC_CURSOR_MAX_TIME_MS;
1348       *len = MONGOC_CURSOR_MAX_TIME_MS_LEN;
1349    } else if (!strcmp (MONGOC_CURSOR_MAX, query_field)) {
1350       *cmd_field = MONGOC_CURSOR_MAX;
1351       *len = MONGOC_CURSOR_MAX_LEN;
1352    } else if (!strcmp (MONGOC_CURSOR_MIN, query_field)) {
1353       *cmd_field = MONGOC_CURSOR_MIN;
1354       *len = MONGOC_CURSOR_MIN_LEN;
1355    } else if (!strcmp (MONGOC_CURSOR_RETURN_KEY, query_field)) {
1356       *cmd_field = MONGOC_CURSOR_RETURN_KEY;
1357       *len = MONGOC_CURSOR_RETURN_KEY_LEN;
1358    } else if (!strcmp (MONGOC_CURSOR_SNAPSHOT, query_field)) {
1359       *cmd_field = MONGOC_CURSOR_SNAPSHOT;
1360       *len = MONGOC_CURSOR_SNAPSHOT_LEN;
1361    } else {
1362       /* not a special command field, must be a query operator like $or */
1363       return false;
1364    }
1365 
1366    return true;
1367 }
1368 
1369 
1370 void
_mongoc_cursor_collection(const mongoc_cursor_t * cursor,const char ** collection,int * collection_len)1371 _mongoc_cursor_collection (const mongoc_cursor_t *cursor,
1372                            const char **collection,
1373                            int *collection_len)
1374 {
1375    /* ns is like "db.collection". Collection name is located past the ".". */
1376    *collection = cursor->ns + (cursor->dblen + 1);
1377    /* Collection name's length is ns length, minus length of db name and ".". */
1378    *collection_len = cursor->nslen - cursor->dblen - 1;
1379 
1380    BSON_ASSERT (*collection_len > 0);
1381 }
1382 
1383 
1384 static bool
_mongoc_cursor_prepare_find_command(mongoc_cursor_t * cursor,bson_t * command,mongoc_server_stream_t * server_stream)1385 _mongoc_cursor_prepare_find_command (mongoc_cursor_t *cursor,
1386                                      bson_t *command,
1387                                      mongoc_server_stream_t *server_stream)
1388 {
1389    const char *collection;
1390    int collection_len;
1391    bson_iter_t iter;
1392 
1393    _mongoc_cursor_collection (cursor, &collection, &collection_len);
1394    bson_append_utf8 (command,
1395                      MONGOC_CURSOR_FIND,
1396                      MONGOC_CURSOR_FIND_LEN,
1397                      collection,
1398                      collection_len);
1399    bson_append_document (
1400       command, MONGOC_CURSOR_FILTER, MONGOC_CURSOR_FILTER_LEN, &cursor->filter);
1401    bson_iter_init (&iter, &cursor->opts);
1402 
1403    while (bson_iter_next (&iter)) {
1404       /* don't append "maxAwaitTimeMS" */
1405       if (!strcmp (bson_iter_key (&iter), MONGOC_CURSOR_COLLATION) &&
1406           server_stream->sd->max_wire_version < WIRE_VERSION_COLLATION) {
1407          bson_set_error (&cursor->error,
1408                          MONGOC_ERROR_CURSOR,
1409                          MONGOC_ERROR_PROTOCOL_BAD_WIRE_VERSION,
1410                          "Collation is not supported by this server");
1411          MARK_FAILED (cursor);
1412          return false;
1413       } else if (strcmp (bson_iter_key (&iter),
1414                          MONGOC_CURSOR_MAX_AWAIT_TIME_MS)) {
1415          if (!bson_append_iter (command, bson_iter_key (&iter), -1, &iter)) {
1416             bson_set_error (&cursor->error,
1417                             MONGOC_ERROR_BSON,
1418                             MONGOC_ERROR_BSON_INVALID,
1419                             "Cursor opts too large");
1420             MARK_FAILED (cursor);
1421             return false;
1422          }
1423       }
1424    }
1425 
1426    if (cursor->read_concern->level != NULL) {
1427       const bson_t *read_concern_bson;
1428 
1429       read_concern_bson = _mongoc_read_concern_get_bson (cursor->read_concern);
1430       bson_append_document (command,
1431                             MONGOC_CURSOR_READ_CONCERN,
1432                             MONGOC_CURSOR_READ_CONCERN_LEN,
1433                             read_concern_bson);
1434    }
1435 
1436    return true;
1437 }
1438 
1439 
1440 static const bson_t *
_mongoc_cursor_find_command(mongoc_cursor_t * cursor,mongoc_server_stream_t * server_stream)1441 _mongoc_cursor_find_command (mongoc_cursor_t *cursor,
1442                              mongoc_server_stream_t *server_stream)
1443 {
1444    bson_t command = BSON_INITIALIZER;
1445    const bson_t *bson = NULL;
1446 
1447    ENTRY;
1448 
1449    if (!_mongoc_cursor_prepare_find_command (cursor, &command, server_stream)) {
1450       RETURN (NULL);
1451    }
1452 
1453    _mongoc_cursor_cursorid_init (cursor, &command);
1454    bson_destroy (&command);
1455 
1456    BSON_ASSERT (cursor->iface.next);
1457    _mongoc_cursor_cursorid_next (cursor, &bson);
1458 
1459    RETURN (bson);
1460 }
1461 
1462 
1463 static const bson_t *
_mongoc_cursor_get_more(mongoc_cursor_t * cursor)1464 _mongoc_cursor_get_more (mongoc_cursor_t *cursor)
1465 {
1466    mongoc_server_stream_t *server_stream;
1467    const bson_t *b = NULL;
1468 
1469    ENTRY;
1470 
1471    BSON_ASSERT (cursor);
1472 
1473    server_stream = _mongoc_cursor_fetch_stream (cursor);
1474    if (!server_stream) {
1475       GOTO (failure);
1476    }
1477 
1478    if (!cursor->in_exhaust && !cursor->rpc.reply.cursor_id) {
1479       bson_set_error (&cursor->error,
1480                       MONGOC_ERROR_CURSOR,
1481                       MONGOC_ERROR_CURSOR_INVALID_CURSOR,
1482                       "No valid cursor was provided.");
1483       GOTO (failure);
1484    }
1485 
1486    if (!_mongoc_cursor_op_getmore (cursor, server_stream)) {
1487       GOTO (failure);
1488    }
1489 
1490    mongoc_server_stream_cleanup (server_stream);
1491 
1492    if (cursor->reader) {
1493       _mongoc_read_from_buffer (cursor, &b);
1494    }
1495 
1496    RETURN (b);
1497 
1498 failure:
1499    cursor->done = true;
1500 
1501    mongoc_server_stream_cleanup (server_stream);
1502 
1503    RETURN (NULL);
1504 }
1505 
1506 
1507 static bool
_mongoc_cursor_monitor_legacy_get_more(mongoc_cursor_t * cursor,mongoc_server_stream_t * server_stream)1508 _mongoc_cursor_monitor_legacy_get_more (mongoc_cursor_t *cursor,
1509                                         mongoc_server_stream_t *server_stream)
1510 {
1511    bson_t doc;
1512    char db[MONGOC_NAMESPACE_MAX];
1513    mongoc_client_t *client;
1514    mongoc_apm_command_started_t event;
1515 
1516    ENTRY;
1517 
1518    client = cursor->client;
1519    if (!client->apm_callbacks.started) {
1520       /* successful */
1521       RETURN (true);
1522    }
1523 
1524    bson_init (&doc);
1525    if (!_mongoc_cursor_prepare_getmore_command (cursor, &doc)) {
1526       bson_destroy (&doc);
1527       RETURN (false);
1528    }
1529 
1530    bson_strncpy (db, cursor->ns, cursor->dblen + 1);
1531    mongoc_apm_command_started_init (&event,
1532                                     &doc,
1533                                     db,
1534                                     "getMore",
1535                                     client->cluster.request_id,
1536                                     cursor->operation_id,
1537                                     &server_stream->sd->host,
1538                                     server_stream->sd->id,
1539                                     client->apm_context);
1540 
1541    client->apm_callbacks.started (&event);
1542    mongoc_apm_command_started_cleanup (&event);
1543    bson_destroy (&doc);
1544 
1545    RETURN (true);
1546 }
1547 
1548 
1549 bool
_mongoc_cursor_op_getmore(mongoc_cursor_t * cursor,mongoc_server_stream_t * server_stream)1550 _mongoc_cursor_op_getmore (mongoc_cursor_t *cursor,
1551                            mongoc_server_stream_t *server_stream)
1552 {
1553    int64_t started;
1554    mongoc_rpc_t rpc;
1555    uint32_t request_id;
1556    mongoc_cluster_t *cluster;
1557    mongoc_query_flags_t flags;
1558 
1559    ENTRY;
1560 
1561    started = bson_get_monotonic_time ();
1562    cluster = &cursor->client->cluster;
1563 
1564    if (!_mongoc_cursor_flags (cursor, server_stream, &flags)) {
1565       GOTO (fail);
1566    }
1567 
1568    if (cursor->in_exhaust) {
1569       request_id = (uint32_t) cursor->rpc.header.request_id;
1570    } else {
1571       request_id = ++cluster->request_id;
1572 
1573       rpc.get_more.cursor_id = cursor->rpc.reply.cursor_id;
1574       rpc.header.msg_len = 0;
1575       rpc.header.request_id = request_id;
1576       rpc.header.response_to = 0;
1577       rpc.header.opcode = MONGOC_OPCODE_GET_MORE;
1578       rpc.get_more.zero = 0;
1579       rpc.get_more.collection = cursor->ns;
1580 
1581       if (flags & MONGOC_QUERY_TAILABLE_CURSOR) {
1582          rpc.get_more.n_return = 0;
1583       } else {
1584          rpc.get_more.n_return = _mongoc_n_return (cursor);
1585       }
1586 
1587       if (!_mongoc_cursor_monitor_legacy_get_more (cursor, server_stream)) {
1588          GOTO (fail);
1589       }
1590 
1591       if (!mongoc_cluster_sendv_to_server (
1592              cluster, &rpc, server_stream, NULL, &cursor->error)) {
1593          GOTO (fail);
1594       }
1595    }
1596 
1597    _mongoc_buffer_clear (&cursor->buffer, false);
1598 
1599    if (!_mongoc_client_recv (cursor->client,
1600                              &cursor->rpc,
1601                              &cursor->buffer,
1602                              server_stream,
1603                              &cursor->error)) {
1604       GOTO (fail);
1605    }
1606 
1607    if (cursor->rpc.header.opcode != MONGOC_OPCODE_REPLY) {
1608       bson_set_error (&cursor->error,
1609                       MONGOC_ERROR_PROTOCOL,
1610                       MONGOC_ERROR_PROTOCOL_INVALID_REPLY,
1611                       "Invalid opcode. Expected %d, got %d.",
1612                       MONGOC_OPCODE_REPLY,
1613                       cursor->rpc.header.opcode);
1614       GOTO (fail);
1615    }
1616 
1617    if (cursor->rpc.header.response_to != request_id) {
1618       bson_set_error (&cursor->error,
1619                       MONGOC_ERROR_PROTOCOL,
1620                       MONGOC_ERROR_PROTOCOL_INVALID_REPLY,
1621                       "Invalid response_to for getmore. Expected %d, got %d.",
1622                       request_id,
1623                       cursor->rpc.header.response_to);
1624       GOTO (fail);
1625    }
1626 
1627    if (!_mongoc_rpc_check_ok (&cursor->rpc,
1628                               false /* is_command */,
1629                               cursor->client->error_api_version,
1630                               &cursor->error,
1631                               &cursor->error_doc)) {
1632       GOTO (fail);
1633    }
1634 
1635    if (cursor->reader) {
1636       bson_reader_destroy (cursor->reader);
1637    }
1638 
1639    cursor->reader = bson_reader_new_from_data (
1640       cursor->rpc.reply.documents, (size_t) cursor->rpc.reply.documents_len);
1641 
1642    _mongoc_cursor_monitor_succeeded (cursor,
1643                                      bson_get_monotonic_time () - started,
1644                                      false, /* not first batch */
1645                                      server_stream,
1646                                      "getMore");
1647 
1648    RETURN (true);
1649 
1650 fail:
1651    _mongoc_cursor_monitor_failed (
1652       cursor, bson_get_monotonic_time () - started, server_stream, "getMore");
1653    RETURN (false);
1654 }
1655 
1656 
1657 bool
mongoc_cursor_error(mongoc_cursor_t * cursor,bson_error_t * error)1658 mongoc_cursor_error (mongoc_cursor_t *cursor, bson_error_t *error)
1659 {
1660    ENTRY;
1661 
1662    RETURN (mongoc_cursor_error_document (cursor, error, NULL));
1663 }
1664 
1665 
1666 bool
mongoc_cursor_error_document(mongoc_cursor_t * cursor,bson_error_t * error,const bson_t ** doc)1667 mongoc_cursor_error_document (mongoc_cursor_t *cursor,
1668                               bson_error_t *error,
1669                               const bson_t **doc)
1670 {
1671    bool ret;
1672 
1673    ENTRY;
1674 
1675    BSON_ASSERT (cursor);
1676 
1677    if (cursor->iface.error_document) {
1678       ret = cursor->iface.error_document (cursor, error, doc);
1679    } else {
1680       ret = _mongoc_cursor_error_document (cursor, error, doc);
1681    }
1682 
1683    RETURN (ret);
1684 }
1685 
1686 
1687 bool
_mongoc_cursor_error_document(mongoc_cursor_t * cursor,bson_error_t * error,const bson_t ** doc)1688 _mongoc_cursor_error_document (mongoc_cursor_t *cursor,
1689                                bson_error_t *error,
1690                                const bson_t **doc)
1691 {
1692    ENTRY;
1693 
1694    BSON_ASSERT (cursor);
1695 
1696    if (BSON_UNLIKELY (CURSOR_FAILED (cursor))) {
1697       bson_set_error (error,
1698                       cursor->error.domain,
1699                       cursor->error.code,
1700                       "%s",
1701                       cursor->error.message);
1702 
1703       if (doc) {
1704          *doc = &cursor->error_doc;
1705       }
1706 
1707       RETURN (true);
1708    }
1709 
1710    if (doc) {
1711       *doc = NULL;
1712    }
1713 
1714    RETURN (false);
1715 }
1716 
1717 
1718 bool
mongoc_cursor_next(mongoc_cursor_t * cursor,const bson_t ** bson)1719 mongoc_cursor_next (mongoc_cursor_t *cursor, const bson_t **bson)
1720 {
1721    bool ret;
1722 
1723    ENTRY;
1724 
1725    BSON_ASSERT (cursor);
1726    BSON_ASSERT (bson);
1727 
1728    TRACE ("cursor_id(%" PRId64 ")", cursor->rpc.reply.cursor_id);
1729 
1730    if (bson) {
1731       *bson = NULL;
1732    }
1733 
1734    if (CURSOR_FAILED (cursor)) {
1735       return false;
1736    }
1737 
1738    if (cursor->done) {
1739       bson_set_error (&cursor->error,
1740                       MONGOC_ERROR_CURSOR,
1741                       MONGOC_ERROR_CURSOR_INVALID_CURSOR,
1742                       "Cannot advance a completed or failed cursor.");
1743       return false;
1744    }
1745 
1746    /*
1747     * We cannot proceed if another cursor is receiving results in exhaust mode.
1748     */
1749    if (cursor->client->in_exhaust && !cursor->in_exhaust) {
1750       bson_set_error (&cursor->error,
1751                       MONGOC_ERROR_CLIENT,
1752                       MONGOC_ERROR_CLIENT_IN_EXHAUST,
1753                       "Another cursor derived from this client is in exhaust.");
1754       RETURN (false);
1755    }
1756 
1757    if (cursor->iface.next) {
1758       ret = cursor->iface.next (cursor, bson);
1759    } else {
1760       ret = _mongoc_cursor_next (cursor, bson);
1761    }
1762 
1763    cursor->current = *bson;
1764 
1765    cursor->count++;
1766 
1767    RETURN (ret);
1768 }
1769 
1770 
1771 bool
_mongoc_read_from_buffer(mongoc_cursor_t * cursor,const bson_t ** bson)1772 _mongoc_read_from_buffer (mongoc_cursor_t *cursor, const bson_t **bson)
1773 {
1774    bool eof = false;
1775 
1776    BSON_ASSERT (cursor->reader);
1777 
1778    *bson = bson_reader_read (cursor->reader, &eof);
1779    cursor->end_of_event = eof ? 1 : 0;
1780 
1781    return *bson ? true : false;
1782 }
1783 
1784 
1785 bool
_mongoc_cursor_next(mongoc_cursor_t * cursor,const bson_t ** bson)1786 _mongoc_cursor_next (mongoc_cursor_t *cursor, const bson_t **bson)
1787 {
1788    int64_t limit;
1789    const bson_t *b = NULL;
1790    bool tailable;
1791 
1792    ENTRY;
1793 
1794    BSON_ASSERT (cursor);
1795 
1796    if (bson) {
1797       *bson = NULL;
1798    }
1799 
1800    /*
1801     * If we reached our limit, make sure we mark this as done and do not try to
1802     * make further progress.  We also set end_of_event so that
1803     * mongoc_cursor_more will be false.
1804     */
1805    limit = mongoc_cursor_get_limit (cursor);
1806 
1807    if (limit && cursor->count >= llabs (limit)) {
1808       cursor->done = true;
1809       cursor->end_of_event = true;
1810       RETURN (false);
1811    }
1812 
1813    /*
1814     * Try to read the next document from the reader if it exists, we might
1815     * get NULL back and EOF, in which case we need to submit a getmore.
1816     */
1817    if (cursor->reader) {
1818       _mongoc_read_from_buffer (cursor, &b);
1819       if (b) {
1820          GOTO (complete);
1821       }
1822    }
1823 
1824    /*
1825     * Check to see if we need to send a GET_MORE for more results.
1826     */
1827    if (!cursor->sent) {
1828       b = _mongoc_cursor_initial_query (cursor);
1829    } else if (BSON_UNLIKELY (cursor->end_of_event) &&
1830               cursor->rpc.reply.cursor_id) {
1831       b = _mongoc_cursor_get_more (cursor);
1832    }
1833 
1834 complete:
1835    tailable = _mongoc_cursor_get_opt_bool (cursor, "tailable");
1836    cursor->done = (cursor->end_of_event &&
1837                    ((cursor->in_exhaust && !cursor->rpc.reply.cursor_id) ||
1838                     (!b && !tailable)));
1839 
1840    if (bson) {
1841       *bson = b;
1842    }
1843 
1844    RETURN (!!b);
1845 }
1846 
1847 
1848 bool
mongoc_cursor_more(mongoc_cursor_t * cursor)1849 mongoc_cursor_more (mongoc_cursor_t *cursor)
1850 {
1851    bool ret;
1852 
1853    ENTRY;
1854 
1855    BSON_ASSERT (cursor);
1856 
1857    if (cursor->iface.more) {
1858       ret = cursor->iface.more (cursor);
1859    } else {
1860       ret = _mongoc_cursor_more (cursor);
1861    }
1862 
1863    RETURN (ret);
1864 }
1865 
1866 
1867 bool
_mongoc_cursor_more(mongoc_cursor_t * cursor)1868 _mongoc_cursor_more (mongoc_cursor_t *cursor)
1869 {
1870    BSON_ASSERT (cursor);
1871 
1872    if (CURSOR_FAILED (cursor)) {
1873       return false;
1874    }
1875 
1876    return !(cursor->sent && cursor->done && cursor->end_of_event);
1877 }
1878 
1879 
1880 void
mongoc_cursor_get_host(mongoc_cursor_t * cursor,mongoc_host_list_t * host)1881 mongoc_cursor_get_host (mongoc_cursor_t *cursor, mongoc_host_list_t *host)
1882 {
1883    BSON_ASSERT (cursor);
1884    BSON_ASSERT (host);
1885 
1886    if (cursor->iface.get_host) {
1887       cursor->iface.get_host (cursor, host);
1888    } else {
1889       _mongoc_cursor_get_host (cursor, host);
1890    }
1891 
1892    EXIT;
1893 }
1894 
1895 void
_mongoc_cursor_get_host(mongoc_cursor_t * cursor,mongoc_host_list_t * host)1896 _mongoc_cursor_get_host (mongoc_cursor_t *cursor, mongoc_host_list_t *host)
1897 {
1898    mongoc_server_description_t *description;
1899 
1900    BSON_ASSERT (cursor);
1901    BSON_ASSERT (host);
1902 
1903    memset (host, 0, sizeof *host);
1904 
1905    if (!cursor->server_id) {
1906       MONGOC_WARNING ("%s(): Must send query before fetching peer.", BSON_FUNC);
1907       return;
1908    }
1909 
1910    description = mongoc_topology_server_by_id (
1911       cursor->client->topology, cursor->server_id, &cursor->error);
1912    if (!description) {
1913       return;
1914    }
1915 
1916    *host = description->host;
1917 
1918    mongoc_server_description_destroy (description);
1919 
1920    return;
1921 }
1922 
1923 mongoc_cursor_t *
mongoc_cursor_clone(const mongoc_cursor_t * cursor)1924 mongoc_cursor_clone (const mongoc_cursor_t *cursor)
1925 {
1926    mongoc_cursor_t *ret;
1927 
1928    BSON_ASSERT (cursor);
1929 
1930    if (cursor->iface.clone) {
1931       ret = cursor->iface.clone (cursor);
1932    } else {
1933       ret = _mongoc_cursor_clone (cursor);
1934    }
1935 
1936    RETURN (ret);
1937 }
1938 
1939 
1940 mongoc_cursor_t *
_mongoc_cursor_clone(const mongoc_cursor_t * cursor)1941 _mongoc_cursor_clone (const mongoc_cursor_t *cursor)
1942 {
1943    mongoc_cursor_t *_clone;
1944 
1945    ENTRY;
1946 
1947    BSON_ASSERT (cursor);
1948 
1949    _clone = (mongoc_cursor_t *) bson_malloc0 (sizeof *_clone);
1950 
1951    _clone->client = cursor->client;
1952    _clone->is_command = cursor->is_command;
1953    _clone->nslen = cursor->nslen;
1954    _clone->dblen = cursor->dblen;
1955    _clone->has_fields = cursor->has_fields;
1956 
1957    if (cursor->read_prefs) {
1958       _clone->read_prefs = mongoc_read_prefs_copy (cursor->read_prefs);
1959    }
1960 
1961    if (cursor->read_concern) {
1962       _clone->read_concern = mongoc_read_concern_copy (cursor->read_concern);
1963    }
1964 
1965 
1966    bson_copy_to (&cursor->filter, &_clone->filter);
1967    bson_copy_to (&cursor->opts, &_clone->opts);
1968    bson_copy_to (&cursor->error_doc, &_clone->error_doc);
1969 
1970    bson_strncpy (_clone->ns, cursor->ns, sizeof _clone->ns);
1971 
1972    _mongoc_buffer_init (&_clone->buffer, NULL, 0, NULL, NULL);
1973 
1974    mongoc_counter_cursors_active_inc ();
1975 
1976    RETURN (_clone);
1977 }
1978 
1979 
1980 /*
1981  *--------------------------------------------------------------------------
1982  *
1983  * mongoc_cursor_is_alive --
1984  *
1985  *       Checks to see if a cursor is alive.
1986  *
1987  *       This is primarily useful with tailable cursors.
1988  *
1989  * Returns:
1990  *       true if the cursor is alive.
1991  *
1992  * Side effects:
1993  *       None.
1994  *
1995  *--------------------------------------------------------------------------
1996  */
1997 
1998 bool
mongoc_cursor_is_alive(const mongoc_cursor_t * cursor)1999 mongoc_cursor_is_alive (const mongoc_cursor_t *cursor) /* IN */
2000 {
2001    BSON_ASSERT (cursor);
2002 
2003    return !cursor->done;
2004 }
2005 
2006 
2007 const bson_t *
mongoc_cursor_current(const mongoc_cursor_t * cursor)2008 mongoc_cursor_current (const mongoc_cursor_t *cursor) /* IN */
2009 {
2010    BSON_ASSERT (cursor);
2011 
2012    return cursor->current;
2013 }
2014 
2015 
2016 void
mongoc_cursor_set_batch_size(mongoc_cursor_t * cursor,uint32_t batch_size)2017 mongoc_cursor_set_batch_size (mongoc_cursor_t *cursor, uint32_t batch_size)
2018 {
2019    BSON_ASSERT (cursor);
2020 
2021    _mongoc_cursor_set_opt_int64 (
2022       cursor, MONGOC_CURSOR_BATCH_SIZE, (int64_t) batch_size);
2023 }
2024 
2025 uint32_t
mongoc_cursor_get_batch_size(const mongoc_cursor_t * cursor)2026 mongoc_cursor_get_batch_size (const mongoc_cursor_t *cursor)
2027 {
2028    BSON_ASSERT (cursor);
2029 
2030    return (uint32_t) _mongoc_cursor_get_opt_int64 (
2031       cursor, MONGOC_CURSOR_BATCH_SIZE, 0);
2032 }
2033 
2034 bool
mongoc_cursor_set_limit(mongoc_cursor_t * cursor,int64_t limit)2035 mongoc_cursor_set_limit (mongoc_cursor_t *cursor, int64_t limit)
2036 {
2037    BSON_ASSERT (cursor);
2038 
2039    if (!cursor->sent) {
2040       if (limit < 0) {
2041          return _mongoc_cursor_set_opt_int64 (
2042                    cursor, MONGOC_CURSOR_LIMIT, -limit) &&
2043                 _mongoc_cursor_set_opt_bool (
2044                    cursor, MONGOC_CURSOR_SINGLE_BATCH, true);
2045       } else {
2046          return _mongoc_cursor_set_opt_int64 (
2047             cursor, MONGOC_CURSOR_LIMIT, limit);
2048       }
2049    } else {
2050       return false;
2051    }
2052 }
2053 
2054 int64_t
mongoc_cursor_get_limit(const mongoc_cursor_t * cursor)2055 mongoc_cursor_get_limit (const mongoc_cursor_t *cursor)
2056 {
2057    int64_t limit;
2058    bool single_batch;
2059 
2060    BSON_ASSERT (cursor);
2061 
2062    limit = _mongoc_cursor_get_opt_int64 (cursor, MONGOC_CURSOR_LIMIT, 0);
2063    single_batch =
2064       _mongoc_cursor_get_opt_bool (cursor, MONGOC_CURSOR_SINGLE_BATCH);
2065 
2066    if (limit > 0 && single_batch) {
2067       limit = -limit;
2068    }
2069 
2070    return limit;
2071 }
2072 
2073 bool
mongoc_cursor_set_hint(mongoc_cursor_t * cursor,uint32_t server_id)2074 mongoc_cursor_set_hint (mongoc_cursor_t *cursor, uint32_t server_id)
2075 {
2076    BSON_ASSERT (cursor);
2077 
2078    if (cursor->server_id) {
2079       MONGOC_ERROR ("mongoc_cursor_set_hint: server_id already set");
2080       return false;
2081    }
2082 
2083    if (!server_id) {
2084       MONGOC_ERROR ("mongoc_cursor_set_hint: cannot set server_id to 0");
2085       return false;
2086    }
2087 
2088    cursor->server_id = server_id;
2089    cursor->server_id_set = true;
2090 
2091    return true;
2092 }
2093 
2094 uint32_t
mongoc_cursor_get_hint(const mongoc_cursor_t * cursor)2095 mongoc_cursor_get_hint (const mongoc_cursor_t *cursor)
2096 {
2097    BSON_ASSERT (cursor);
2098 
2099    return cursor->server_id;
2100 }
2101 
2102 int64_t
mongoc_cursor_get_id(const mongoc_cursor_t * cursor)2103 mongoc_cursor_get_id (const mongoc_cursor_t *cursor)
2104 {
2105    BSON_ASSERT (cursor);
2106 
2107    return cursor->rpc.reply.cursor_id;
2108 }
2109 
2110 void
mongoc_cursor_set_max_await_time_ms(mongoc_cursor_t * cursor,uint32_t max_await_time_ms)2111 mongoc_cursor_set_max_await_time_ms (mongoc_cursor_t *cursor,
2112                                      uint32_t max_await_time_ms)
2113 {
2114    BSON_ASSERT (cursor);
2115 
2116    if (!cursor->sent) {
2117       _mongoc_cursor_set_opt_int64 (
2118          cursor, MONGOC_CURSOR_MAX_AWAIT_TIME_MS, (int64_t) max_await_time_ms);
2119    }
2120 }
2121 
2122 uint32_t
mongoc_cursor_get_max_await_time_ms(const mongoc_cursor_t * cursor)2123 mongoc_cursor_get_max_await_time_ms (const mongoc_cursor_t *cursor)
2124 {
2125    bson_iter_t iter;
2126 
2127    BSON_ASSERT (cursor);
2128 
2129    if (bson_iter_init_find (
2130           &iter, &cursor->opts, MONGOC_CURSOR_MAX_AWAIT_TIME_MS)) {
2131       return (uint32_t) bson_iter_as_int64 (&iter);
2132    }
2133 
2134    return 0;
2135 }
2136 
2137 
2138 /*
2139  *--------------------------------------------------------------------------
2140  *
2141  * mongoc_cursor_new_from_command_reply --
2142  *
2143  *       Low-level function to initialize a mongoc_cursor_t from the
2144  *       reply to a command like "aggregate", "find", or "listCollections".
2145  *
2146  *       Useful in drivers that wrap the C driver; in applications, use
2147  *       high-level functions like mongoc_collection_aggregate instead.
2148  *
2149  * Returns:
2150  *       A cursor.
2151  *
2152  * Side effects:
2153  *       On failure, the cursor's error is set: retrieve it with
2154  *       mongoc_cursor_error. On success or failure, "reply" is
2155  *       destroyed.
2156  *
2157  *--------------------------------------------------------------------------
2158  */
2159 
2160 mongoc_cursor_t *
mongoc_cursor_new_from_command_reply(mongoc_client_t * client,bson_t * reply,uint32_t server_id)2161 mongoc_cursor_new_from_command_reply (mongoc_client_t *client,
2162                                       bson_t *reply,
2163                                       uint32_t server_id)
2164 {
2165    mongoc_cursor_t *cursor;
2166    bson_t cmd = BSON_INITIALIZER;
2167 
2168    BSON_ASSERT (client);
2169    BSON_ASSERT (reply);
2170 
2171    cursor = _mongoc_cursor_new_with_opts (
2172       client, NULL, false /* is_command */, NULL, NULL, NULL, NULL);
2173 
2174    _mongoc_cursor_cursorid_init (cursor, &cmd);
2175    _mongoc_cursor_cursorid_init_with_reply (cursor, reply, server_id);
2176    bson_destroy (&cmd);
2177 
2178    return cursor;
2179 }
2180