1 /*
2 * Copyright 2013 MongoDB, Inc.
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17
18 #include "mongoc-cursor.h"
19 #include "mongoc-cursor-private.h"
20 #include "mongoc-client-private.h"
21 #include "mongoc-counters-private.h"
22 #include "mongoc-error.h"
23 #include "mongoc-log.h"
24 #include "mongoc-trace-private.h"
25 #include "mongoc-cursor-cursorid-private.h"
26 #include "mongoc-read-concern-private.h"
27 #include "mongoc-util-private.h"
28 #include "mongoc-write-concern-private.h"
29
30
31 #undef MONGOC_LOG_DOMAIN
32 #define MONGOC_LOG_DOMAIN "cursor"
33
34
35 #define CURSOR_FAILED(cursor_) ((cursor_)->error.domain != 0)
36
37 static bool
38 _translate_query_opt (const char *query_field,
39 const char **cmd_field,
40 int *len);
41
42 static const bson_t *
43 _mongoc_cursor_op_query (mongoc_cursor_t *cursor,
44 mongoc_server_stream_t *server_stream);
45
46 static bool
47 _mongoc_cursor_prepare_find_command (mongoc_cursor_t *cursor,
48 bson_t *command,
49 mongoc_server_stream_t *server_stream);
50
51 static const bson_t *
52 _mongoc_cursor_find_command (mongoc_cursor_t *cursor,
53 mongoc_server_stream_t *server_stream);
54
55
56 static bool
_mongoc_cursor_set_opt_int64(mongoc_cursor_t * cursor,const char * option,int64_t value)57 _mongoc_cursor_set_opt_int64 (mongoc_cursor_t *cursor,
58 const char *option,
59 int64_t value)
60 {
61 bson_iter_t iter;
62
63 if (bson_iter_init_find (&iter, &cursor->opts, option)) {
64 if (!BSON_ITER_HOLDS_INT64 (&iter)) {
65 return false;
66 }
67
68 bson_iter_overwrite_int64 (&iter, value);
69 return true;
70 }
71
72 return BSON_APPEND_INT64 (&cursor->opts, option, value);
73 }
74
75
76 static int64_t
_mongoc_cursor_get_opt_int64(const mongoc_cursor_t * cursor,const char * option,int64_t default_value)77 _mongoc_cursor_get_opt_int64 (const mongoc_cursor_t *cursor,
78 const char *option,
79 int64_t default_value)
80 {
81 bson_iter_t iter;
82
83 if (bson_iter_init_find (&iter, &cursor->opts, option)) {
84 return bson_iter_as_int64 (&iter);
85 }
86
87 return default_value;
88 }
89
90
91 static bool
_mongoc_cursor_set_opt_bool(mongoc_cursor_t * cursor,const char * option,bool value)92 _mongoc_cursor_set_opt_bool (mongoc_cursor_t *cursor,
93 const char *option,
94 bool value)
95 {
96 bson_iter_t iter;
97
98 if (bson_iter_init_find (&iter, &cursor->opts, option)) {
99 if (!BSON_ITER_HOLDS_BOOL (&iter)) {
100 return false;
101 }
102
103 bson_iter_overwrite_bool (&iter, value);
104 return true;
105 }
106
107 return BSON_APPEND_BOOL (&cursor->opts, option, value);
108 }
109
110
111 bool
_mongoc_cursor_get_opt_bool(const mongoc_cursor_t * cursor,const char * option)112 _mongoc_cursor_get_opt_bool (const mongoc_cursor_t *cursor, const char *option)
113 {
114 bson_iter_t iter;
115
116 if (bson_iter_init_find (&iter, &cursor->opts, option)) {
117 return bson_iter_as_bool (&iter);
118 }
119
120 return false;
121 }
122
123
124 int32_t
_mongoc_n_return(mongoc_cursor_t * cursor)125 _mongoc_n_return (mongoc_cursor_t *cursor)
126 {
127 int64_t limit;
128 int64_t batch_size;
129 int64_t n_return;
130
131 if (cursor->is_command) {
132 /* commands always have n_return of 1 */
133 return 1;
134 }
135
136 limit = mongoc_cursor_get_limit (cursor);
137 batch_size = mongoc_cursor_get_batch_size (cursor);
138
139 if (limit < 0) {
140 n_return = limit;
141 } else if (limit) {
142 int64_t remaining = limit - cursor->count;
143 BSON_ASSERT (remaining > 0);
144
145 if (batch_size) {
146 n_return = BSON_MIN (batch_size, remaining);
147 } else {
148 /* batch_size 0 means accept the default */
149 n_return = remaining;
150 }
151 } else {
152 n_return = batch_size;
153 }
154
155 if (n_return < INT32_MIN) {
156 return INT32_MIN;
157 } else if (n_return > INT32_MAX) {
158 return INT32_MAX;
159 } else {
160 return (int32_t) n_return;
161 }
162 }
163
164
165 void
_mongoc_set_cursor_ns(mongoc_cursor_t * cursor,const char * ns,uint32_t nslen)166 _mongoc_set_cursor_ns (mongoc_cursor_t *cursor, const char *ns, uint32_t nslen)
167 {
168 const char *dot;
169
170 bson_strncpy (cursor->ns, ns, sizeof cursor->ns);
171 cursor->nslen = BSON_MIN (nslen, sizeof cursor->ns);
172 dot = strstr (cursor->ns, ".");
173
174 if (dot) {
175 cursor->dblen = (uint32_t) (dot - cursor->ns);
176 } else {
177 /* a database name with no collection name */
178 cursor->dblen = cursor->nslen;
179 }
180 }
181
182
183 /* return first key beginning with $, or NULL. precondition: bson is valid. */
184 static const char *
_first_dollar_field(const bson_t * bson)185 _first_dollar_field (const bson_t *bson)
186 {
187 bson_iter_t iter;
188 const char *key;
189
190 BSON_ASSERT (bson_iter_init (&iter, bson));
191 while (bson_iter_next (&iter)) {
192 key = bson_iter_key (&iter);
193
194 if (key[0] == '$') {
195 return key;
196 }
197 }
198
199 return NULL;
200 }
201
202
203 #define MARK_FAILED(c) \
204 do { \
205 (c)->done = true; \
206 (c)->end_of_event = true; \
207 (c)->sent = true; \
208 } while (0)
209
210
211 mongoc_cursor_t *
_mongoc_cursor_new_with_opts(mongoc_client_t * client,const char * db_and_collection,bool is_command,const bson_t * filter,const bson_t * opts,const mongoc_read_prefs_t * read_prefs,const mongoc_read_concern_t * read_concern)212 _mongoc_cursor_new_with_opts (mongoc_client_t *client,
213 const char *db_and_collection,
214 bool is_command,
215 const bson_t *filter,
216 const bson_t *opts,
217 const mongoc_read_prefs_t *read_prefs,
218 const mongoc_read_concern_t *read_concern)
219 {
220 mongoc_cursor_t *cursor;
221 mongoc_topology_description_type_t td_type;
222 uint32_t server_id;
223 bson_error_t validate_err;
224 const char *dollar_field;
225
226 ENTRY;
227
228 BSON_ASSERT (client);
229
230 cursor = (mongoc_cursor_t *) bson_malloc0 (sizeof *cursor);
231 cursor->client = client;
232 cursor->is_command = is_command ? 1 : 0;
233
234 bson_init (&cursor->filter);
235 bson_init (&cursor->opts);
236 bson_init (&cursor->error_doc);
237
238 if (filter) {
239 if (!bson_validate_with_error (
240 filter, BSON_VALIDATE_EMPTY_KEYS, &validate_err)) {
241 MARK_FAILED (cursor);
242 bson_set_error (&cursor->error,
243 MONGOC_ERROR_CURSOR,
244 MONGOC_ERROR_CURSOR_INVALID_CURSOR,
245 "Invalid filter: %s",
246 validate_err.message);
247 GOTO (finish);
248 }
249
250 bson_destroy (&cursor->filter);
251 bson_copy_to (filter, &cursor->filter);
252 }
253
254 if (opts) {
255 if (!bson_validate_with_error (
256 opts, BSON_VALIDATE_EMPTY_KEYS, &validate_err)) {
257 MARK_FAILED (cursor);
258 bson_set_error (&cursor->error,
259 MONGOC_ERROR_CURSOR,
260 MONGOC_ERROR_CURSOR_INVALID_CURSOR,
261 "Invalid opts: %s",
262 validate_err.message);
263 GOTO (finish);
264 }
265
266 dollar_field = _first_dollar_field (opts);
267 if (dollar_field) {
268 MARK_FAILED (cursor);
269 bson_set_error (&cursor->error,
270 MONGOC_ERROR_CURSOR,
271 MONGOC_ERROR_CURSOR_INVALID_CURSOR,
272 "Cannot use $-modifiers in opts: \"%s\"",
273 dollar_field);
274 GOTO (finish);
275 }
276
277 bson_copy_to_excluding_noinit (opts, &cursor->opts, "serverId", NULL);
278
279 /* true if there's a valid serverId or no serverId, false on err */
280 if (!_mongoc_get_server_id_from_opts (opts,
281 MONGOC_ERROR_CURSOR,
282 MONGOC_ERROR_CURSOR_INVALID_CURSOR,
283 &server_id,
284 &cursor->error)) {
285 MARK_FAILED (cursor);
286 GOTO (finish);
287 }
288
289 if (server_id) {
290 mongoc_cursor_set_hint (cursor, server_id);
291 }
292 }
293
294 cursor->read_prefs = read_prefs
295 ? mongoc_read_prefs_copy (read_prefs)
296 : mongoc_read_prefs_new (MONGOC_READ_PRIMARY);
297
298 cursor->read_concern = read_concern ? mongoc_read_concern_copy (read_concern)
299 : mongoc_read_concern_new ();
300
301 if (db_and_collection) {
302 _mongoc_set_cursor_ns (
303 cursor, db_and_collection, (uint32_t) strlen (db_and_collection));
304 }
305
306 if (_mongoc_cursor_get_opt_bool (cursor, MONGOC_CURSOR_EXHAUST)) {
307 if (_mongoc_cursor_get_opt_int64 (cursor, MONGOC_CURSOR_LIMIT, 0)) {
308 bson_set_error (&cursor->error,
309 MONGOC_ERROR_CURSOR,
310 MONGOC_ERROR_CURSOR_INVALID_CURSOR,
311 "Cannot specify both 'exhaust' and 'limit'.");
312 MARK_FAILED (cursor);
313 GOTO (finish);
314 }
315
316 td_type = _mongoc_topology_get_type (client->topology);
317
318 if (td_type == MONGOC_TOPOLOGY_SHARDED) {
319 bson_set_error (&cursor->error,
320 MONGOC_ERROR_CURSOR,
321 MONGOC_ERROR_CURSOR_INVALID_CURSOR,
322 "Cannot use exhaust cursor with sharded cluster.");
323 MARK_FAILED (cursor);
324 GOTO (finish);
325 }
326 }
327
328 _mongoc_buffer_init (&cursor->buffer, NULL, 0, NULL, NULL);
329 _mongoc_read_prefs_validate (read_prefs, &cursor->error);
330
331 finish:
332 mongoc_counter_cursors_active_inc ();
333
334 RETURN (cursor);
335 }
336
337
338 mongoc_cursor_t *
_mongoc_cursor_new(mongoc_client_t * client,const char * db_and_collection,mongoc_query_flags_t qflags,uint32_t skip,int32_t limit,uint32_t batch_size,bool is_command,const bson_t * query,const bson_t * fields,const mongoc_read_prefs_t * read_prefs,const mongoc_read_concern_t * read_concern)339 _mongoc_cursor_new (mongoc_client_t *client,
340 const char *db_and_collection,
341 mongoc_query_flags_t qflags,
342 uint32_t skip,
343 int32_t limit,
344 uint32_t batch_size,
345 bool is_command,
346 const bson_t *query,
347 const bson_t *fields,
348 const mongoc_read_prefs_t *read_prefs,
349 const mongoc_read_concern_t *read_concern)
350 {
351 bson_t filter;
352 bool has_filter = false;
353 bson_t opts = BSON_INITIALIZER;
354 bool slave_ok = false;
355 const char *key;
356 bson_iter_t iter;
357 const char *opt_key;
358 int len;
359 uint32_t data_len;
360 const uint8_t *data;
361 mongoc_cursor_t *cursor;
362 bson_error_t error = {0};
363
364 ENTRY;
365
366 BSON_ASSERT (client);
367
368 if (query) {
369 if (bson_has_field (query, "$query")) {
370 /* like "{$query: {a: 1}, $orderby: {b: 1}, $otherModifier: true}" */
371 bson_iter_init (&iter, query);
372 while (bson_iter_next (&iter)) {
373 key = bson_iter_key (&iter);
374
375 if (key[0] != '$') {
376 bson_set_error (&error,
377 MONGOC_ERROR_CURSOR,
378 MONGOC_ERROR_CURSOR_INVALID_CURSOR,
379 "Cannot mix $query with non-dollar field '%s'",
380 key);
381 GOTO (done);
382 }
383
384 if (!strcmp (key, "$query")) {
385 /* set "filter" to the incoming document's "$query" */
386 bson_iter_document (&iter, &data_len, &data);
387 bson_init_static (&filter, data, (size_t) data_len);
388 has_filter = true;
389 } else if (_translate_query_opt (key, &opt_key, &len)) {
390 /* "$orderby" becomes "sort", etc., "$unknown" -> "unknown" */
391 bson_append_iter (&opts, opt_key, len, &iter);
392 } else {
393 /* strip leading "$" */
394 bson_append_iter (&opts, key + 1, -1, &iter);
395 }
396 }
397 }
398 }
399
400 if (!bson_empty0 (fields)) {
401 bson_append_document (
402 &opts, MONGOC_CURSOR_PROJECTION, MONGOC_CURSOR_PROJECTION_LEN, fields);
403 }
404
405 if (skip) {
406 bson_append_int64 (
407 &opts, MONGOC_CURSOR_SKIP, MONGOC_CURSOR_SKIP_LEN, skip);
408 }
409
410 if (limit) {
411 bson_append_int64 (
412 &opts, MONGOC_CURSOR_LIMIT, MONGOC_CURSOR_LIMIT_LEN, llabs (limit));
413
414 if (limit < 0) {
415 bson_append_bool (&opts,
416 MONGOC_CURSOR_SINGLE_BATCH,
417 MONGOC_CURSOR_SINGLE_BATCH_LEN,
418 true);
419 }
420 }
421
422 if (batch_size) {
423 bson_append_int64 (&opts,
424 MONGOC_CURSOR_BATCH_SIZE,
425 MONGOC_CURSOR_BATCH_SIZE_LEN,
426 batch_size);
427 }
428
429 if (qflags & MONGOC_QUERY_SLAVE_OK) {
430 slave_ok = true;
431 }
432
433 if (qflags & MONGOC_QUERY_TAILABLE_CURSOR) {
434 bson_append_bool (
435 &opts, MONGOC_CURSOR_TAILABLE, MONGOC_CURSOR_TAILABLE_LEN, true);
436 }
437
438 if (qflags & MONGOC_QUERY_OPLOG_REPLAY) {
439 bson_append_bool (&opts,
440 MONGOC_CURSOR_OPLOG_REPLAY,
441 MONGOC_CURSOR_OPLOG_REPLAY_LEN,
442 true);
443 }
444
445 if (qflags & MONGOC_QUERY_NO_CURSOR_TIMEOUT) {
446 bson_append_bool (&opts,
447 MONGOC_CURSOR_NO_CURSOR_TIMEOUT,
448 MONGOC_CURSOR_NO_CURSOR_TIMEOUT_LEN,
449 true);
450 }
451
452 if (qflags & MONGOC_QUERY_AWAIT_DATA) {
453 bson_append_bool (
454 &opts, MONGOC_CURSOR_AWAIT_DATA, MONGOC_CURSOR_AWAIT_DATA_LEN, true);
455 }
456
457 if (qflags & MONGOC_QUERY_EXHAUST) {
458 bson_append_bool (
459 &opts, MONGOC_CURSOR_EXHAUST, MONGOC_CURSOR_EXHAUST_LEN, true);
460 }
461
462 if (qflags & MONGOC_QUERY_PARTIAL) {
463 bson_append_bool (&opts,
464 MONGOC_CURSOR_ALLOW_PARTIAL_RESULTS,
465 MONGOC_CURSOR_ALLOW_PARTIAL_RESULTS_LEN,
466 true);
467 }
468
469 done:
470
471 if (error.domain != 0) {
472 cursor = _mongoc_cursor_new_with_opts (
473 client, db_and_collection, is_command, NULL, NULL, NULL, NULL);
474
475 MARK_FAILED (cursor);
476 memcpy (&cursor->error, &error, sizeof (bson_error_t));
477 } else {
478 cursor = _mongoc_cursor_new_with_opts (client,
479 db_and_collection,
480 is_command,
481 has_filter ? &filter : query,
482 &opts,
483 read_prefs,
484 read_concern);
485
486 if (slave_ok) {
487 cursor->slave_ok = true;
488 }
489 }
490
491 if (has_filter) {
492 bson_destroy (&filter);
493 }
494
495 bson_destroy (&opts);
496
497 RETURN (cursor);
498 }
499
500
501 void
mongoc_cursor_destroy(mongoc_cursor_t * cursor)502 mongoc_cursor_destroy (mongoc_cursor_t *cursor)
503 {
504 ENTRY;
505
506 BSON_ASSERT (cursor);
507
508 if (cursor->iface.destroy) {
509 cursor->iface.destroy (cursor);
510 } else {
511 _mongoc_cursor_destroy (cursor);
512 }
513
514 EXIT;
515 }
516
517 void
_mongoc_cursor_destroy(mongoc_cursor_t * cursor)518 _mongoc_cursor_destroy (mongoc_cursor_t *cursor)
519 {
520 char db[MONGOC_NAMESPACE_MAX];
521 ENTRY;
522
523 BSON_ASSERT (cursor);
524
525 if (cursor->in_exhaust) {
526 cursor->client->in_exhaust = false;
527 if (!cursor->done) {
528 /* The only way to stop an exhaust cursor is to kill the connection */
529 mongoc_cluster_disconnect_node (&cursor->client->cluster,
530 cursor->server_id, false, NULL);
531 }
532 } else if (cursor->rpc.reply.cursor_id) {
533 bson_strncpy (db, cursor->ns, cursor->dblen + 1);
534
535 _mongoc_client_kill_cursor (cursor->client,
536 cursor->server_id,
537 cursor->rpc.reply.cursor_id,
538 cursor->operation_id,
539 db,
540 cursor->ns + cursor->dblen + 1);
541 }
542
543 if (cursor->reader) {
544 bson_reader_destroy (cursor->reader);
545 cursor->reader = NULL;
546 }
547
548 _mongoc_buffer_destroy (&cursor->buffer);
549 mongoc_read_prefs_destroy (cursor->read_prefs);
550 mongoc_read_concern_destroy (cursor->read_concern);
551 mongoc_write_concern_destroy (cursor->write_concern);
552
553 bson_destroy (&cursor->filter);
554 bson_destroy (&cursor->opts);
555 bson_destroy (&cursor->error_doc);
556 bson_free (cursor);
557
558 mongoc_counter_cursors_active_dec ();
559 mongoc_counter_cursors_disposed_inc ();
560
561 EXIT;
562 }
563
564
565 mongoc_server_stream_t *
_mongoc_cursor_fetch_stream(mongoc_cursor_t * cursor)566 _mongoc_cursor_fetch_stream (mongoc_cursor_t *cursor)
567 {
568 mongoc_server_stream_t *server_stream;
569
570 ENTRY;
571
572 if (cursor->server_id) {
573 server_stream =
574 mongoc_cluster_stream_for_server (&cursor->client->cluster,
575 cursor->server_id,
576 true /* reconnect_ok */,
577 &cursor->error);
578 } else {
579 server_stream = mongoc_cluster_stream_for_reads (
580 &cursor->client->cluster, cursor->read_prefs, &cursor->error);
581
582 if (server_stream) {
583 cursor->server_id = server_stream->sd->id;
584 }
585 }
586
587 RETURN (server_stream);
588 }
589
590
591 bool
_use_find_command(const mongoc_cursor_t * cursor,const mongoc_server_stream_t * server_stream)592 _use_find_command (const mongoc_cursor_t *cursor,
593 const mongoc_server_stream_t *server_stream)
594 {
595 /* Find, getMore And killCursors Commands Spec: "the find command cannot be
596 * used to execute other commands" and "the find command does not support the
597 * exhaust flag."
598 */
599 return server_stream->sd->max_wire_version >= WIRE_VERSION_FIND_CMD &&
600 !cursor->is_command &&
601 !_mongoc_cursor_get_opt_bool (cursor, MONGOC_CURSOR_EXHAUST);
602 }
603
604
605 bool
_use_getmore_command(const mongoc_cursor_t * cursor,const mongoc_server_stream_t * server_stream)606 _use_getmore_command (const mongoc_cursor_t *cursor,
607 const mongoc_server_stream_t *server_stream)
608 {
609 return server_stream->sd->max_wire_version >= WIRE_VERSION_FIND_CMD &&
610 !_mongoc_cursor_get_opt_bool (cursor, MONGOC_CURSOR_EXHAUST);
611 }
612
613
614 static const bson_t *
_mongoc_cursor_initial_query(mongoc_cursor_t * cursor)615 _mongoc_cursor_initial_query (mongoc_cursor_t *cursor)
616 {
617 mongoc_server_stream_t *server_stream;
618 const bson_t *b = NULL;
619
620 ENTRY;
621
622 BSON_ASSERT (cursor);
623
624 server_stream = _mongoc_cursor_fetch_stream (cursor);
625
626 if (!server_stream) {
627 GOTO (done);
628 }
629
630 if (_use_find_command (cursor, server_stream)) {
631 b = _mongoc_cursor_find_command (cursor, server_stream);
632 } else {
633 /* When the user explicitly provides a readConcern -- but the server
634 * doesn't support readConcern, we must error:
635 * https://github.com/mongodb/specifications/blob/master/source/read-write-concern/read-write-concern.rst#errors-1
636 */
637 if (cursor->read_concern->level != NULL &&
638 server_stream->sd->max_wire_version < WIRE_VERSION_READ_CONCERN) {
639 bson_set_error (&cursor->error,
640 MONGOC_ERROR_COMMAND,
641 MONGOC_ERROR_PROTOCOL_BAD_WIRE_VERSION,
642 "The selected server does not support readConcern");
643 } else {
644 b = _mongoc_cursor_op_query (cursor, server_stream);
645 }
646 }
647
648 done:
649 /* no-op if server_stream is NULL */
650 mongoc_server_stream_cleanup (server_stream);
651
652 if (!b) {
653 cursor->done = true;
654 }
655
656 RETURN (b);
657 }
658
659
660 static bool
_mongoc_cursor_monitor_legacy_query(mongoc_cursor_t * cursor,mongoc_server_stream_t * server_stream,const char * cmd_name)661 _mongoc_cursor_monitor_legacy_query (mongoc_cursor_t *cursor,
662 mongoc_server_stream_t *server_stream,
663 const char *cmd_name)
664 {
665 bson_t doc;
666 mongoc_client_t *client;
667 mongoc_apm_command_started_t event;
668 char db[MONGOC_NAMESPACE_MAX];
669
670 ENTRY;
671
672 client = cursor->client;
673 if (!client->apm_callbacks.started) {
674 /* successful */
675 RETURN (true);
676 }
677
678 bson_init (&doc);
679 bson_strncpy (db, cursor->ns, cursor->dblen + 1);
680
681 if (!cursor->is_command) {
682 /* simulate a MongoDB 3.2+ "find" command */
683 if (!_mongoc_cursor_prepare_find_command (cursor, &doc, server_stream)) {
684 /* cursor->error is set */
685 bson_destroy (&doc);
686 RETURN (false);
687 }
688 }
689
690 mongoc_apm_command_started_init (&event,
691 cursor->is_command ? &cursor->filter : &doc,
692 db,
693 cmd_name,
694 client->cluster.request_id,
695 cursor->operation_id,
696 &server_stream->sd->host,
697 server_stream->sd->id,
698 client->apm_context);
699
700 client->apm_callbacks.started (&event);
701 mongoc_apm_command_started_cleanup (&event);
702 bson_destroy (&doc);
703
704 RETURN (true);
705 }
706
707
708 /* append array of docs from current cursor batch */
709 static void
_mongoc_cursor_append_docs_array(mongoc_cursor_t * cursor,bson_t * docs)710 _mongoc_cursor_append_docs_array (mongoc_cursor_t *cursor, bson_t *docs)
711 {
712 bool eof = false;
713 char str[16];
714 const char *key;
715 uint32_t i = 0;
716 size_t keylen;
717 const bson_t *doc;
718
719 while ((doc = bson_reader_read (cursor->reader, &eof))) {
720 keylen = bson_uint32_to_string (i, &key, str, sizeof str);
721 bson_append_document (docs, key, (int) keylen, doc);
722 }
723
724 bson_reader_reset (cursor->reader);
725 }
726
727
728 static void
_mongoc_cursor_monitor_succeeded(mongoc_cursor_t * cursor,int64_t duration,bool first_batch,mongoc_server_stream_t * stream,const char * cmd_name)729 _mongoc_cursor_monitor_succeeded (mongoc_cursor_t *cursor,
730 int64_t duration,
731 bool first_batch,
732 mongoc_server_stream_t *stream,
733 const char *cmd_name)
734 {
735 mongoc_apm_command_succeeded_t event;
736 mongoc_client_t *client;
737 bson_t reply;
738 bson_t reply_cursor;
739
740 ENTRY;
741
742 client = cursor->client;
743
744 if (!client->apm_callbacks.succeeded) {
745 EXIT;
746 }
747
748 if (cursor->is_command) {
749 /* cursor is from mongoc_client_command. we're in mongoc_cursor_next. */
750 if (!_mongoc_rpc_reply_get_first (&cursor->rpc.reply, &reply)) {
751 MONGOC_ERROR ("_mongoc_cursor_monitor_succeeded can't parse reply");
752 EXIT;
753 }
754 } else {
755 bson_t docs_array;
756
757 /* fake reply to find/getMore command:
758 * {ok: 1, cursor: {id: 17, ns: "...", first/nextBatch: [ ... docs ... ]}}
759 */
760 bson_init (&docs_array);
761 _mongoc_cursor_append_docs_array (cursor, &docs_array);
762
763 bson_init (&reply);
764 bson_append_int32 (&reply, "ok", 2, 1);
765 bson_append_document_begin (&reply, "cursor", 6, &reply_cursor);
766 bson_append_int64 (&reply_cursor, "id", 2, mongoc_cursor_get_id (cursor));
767 bson_append_utf8 (&reply_cursor, "ns", 2, cursor->ns, cursor->nslen);
768 bson_append_array (&reply_cursor,
769 first_batch ? "firstBatch" : "nextBatch",
770 first_batch ? 10 : 9,
771 &docs_array);
772 bson_append_document_end (&reply, &reply_cursor);
773 bson_destroy (&docs_array);
774 }
775
776 mongoc_apm_command_succeeded_init (&event,
777 duration,
778 &reply,
779 cmd_name,
780 client->cluster.request_id,
781 cursor->operation_id,
782 &stream->sd->host,
783 stream->sd->id,
784 client->apm_context);
785
786 client->apm_callbacks.succeeded (&event);
787
788 mongoc_apm_command_succeeded_cleanup (&event);
789 bson_destroy (&reply);
790
791 EXIT;
792 }
793
794
795 static void
_mongoc_cursor_monitor_failed(mongoc_cursor_t * cursor,int64_t duration,mongoc_server_stream_t * stream,const char * cmd_name)796 _mongoc_cursor_monitor_failed (mongoc_cursor_t *cursor,
797 int64_t duration,
798 mongoc_server_stream_t *stream,
799 const char *cmd_name)
800 {
801 mongoc_apm_command_failed_t event;
802 mongoc_client_t *client;
803
804 ENTRY;
805
806 client = cursor->client;
807
808 if (!client->apm_callbacks.failed) {
809 EXIT;
810 }
811
812 mongoc_apm_command_failed_init (&event,
813 duration,
814 cmd_name,
815 &cursor->error,
816 client->cluster.request_id,
817 cursor->operation_id,
818 &stream->sd->host,
819 stream->sd->id,
820 client->apm_context);
821
822 client->apm_callbacks.failed (&event);
823
824 mongoc_apm_command_failed_cleanup (&event);
825
826 EXIT;
827 }
828
829
830 #define OPT_CHECK(_type) \
831 do { \
832 if (!BSON_ITER_HOLDS_##_type (&iter)) { \
833 bson_set_error (&cursor->error, \
834 MONGOC_ERROR_COMMAND, \
835 MONGOC_ERROR_COMMAND_INVALID_ARG, \
836 "invalid option %s, should be type %s", \
837 key, \
838 #_type); \
839 return NULL; \
840 } \
841 } while (false)
842
843
844 #define OPT_CHECK_INT() \
845 do { \
846 if (!BSON_ITER_HOLDS_INT (&iter)) { \
847 bson_set_error (&cursor->error, \
848 MONGOC_ERROR_COMMAND, \
849 MONGOC_ERROR_COMMAND_INVALID_ARG, \
850 "invalid option %s, should be integer", \
851 key); \
852 return NULL; \
853 } \
854 } while (false)
855
856
857 #define OPT_ERR(_msg) \
858 do { \
859 bson_set_error (&cursor->error, \
860 MONGOC_ERROR_COMMAND, \
861 MONGOC_ERROR_COMMAND_INVALID_ARG, \
862 _msg); \
863 return NULL; \
864 } while (false)
865
866
867 #define OPT_BSON_ERR(_msg) \
868 do { \
869 bson_set_error ( \
870 &cursor->error, MONGOC_ERROR_BSON, MONGOC_ERROR_BSON_INVALID, _msg); \
871 return NULL; \
872 } while (false)
873
874
875 #define OPT_FLAG(_flag) \
876 do { \
877 OPT_CHECK (BOOL); \
878 if (bson_iter_as_bool (&iter)) { \
879 *flags |= _flag; \
880 } \
881 } while (false)
882
883
884 #define PUSH_DOLLAR_QUERY() \
885 do { \
886 if (!pushed_dollar_query) { \
887 pushed_dollar_query = true; \
888 bson_append_document (query, "$query", 6, &cursor->filter); \
889 } \
890 } while (false)
891
892
893 #define OPT_SUBDOCUMENT(_opt_name, _legacy_name) \
894 do { \
895 OPT_CHECK (DOCUMENT); \
896 bson_iter_document (&iter, &len, &data); \
897 if (!bson_init_static (&subdocument, data, (size_t) len)) { \
898 OPT_BSON_ERR ("Invalid '" #_opt_name "' subdocument in 'opts'."); \
899 } \
900 BSON_APPEND_DOCUMENT (query, "$" #_legacy_name, &subdocument); \
901 } while (false)
902
903 #define ADD_FLAG(_flags, _value) \
904 do { \
905 if (!BSON_ITER_HOLDS_BOOL (&iter)) { \
906 bson_set_error (&cursor->error, \
907 MONGOC_ERROR_COMMAND, \
908 MONGOC_ERROR_COMMAND_INVALID_ARG, \
909 "invalid option %s, should be type bool", \
910 key); \
911 return false; \
912 } \
913 if (bson_iter_as_bool (&iter)) { \
914 *_flags |= _value; \
915 } \
916 } while (false);
917
918 static bool
_mongoc_cursor_flags(mongoc_cursor_t * cursor,mongoc_server_stream_t * stream,mongoc_query_flags_t * flags)919 _mongoc_cursor_flags (mongoc_cursor_t *cursor,
920 mongoc_server_stream_t *stream,
921 mongoc_query_flags_t *flags /* OUT */)
922 {
923 bson_iter_t iter;
924 const char *key;
925
926 *flags = MONGOC_QUERY_NONE;
927
928 if (!bson_iter_init (&iter, &cursor->opts)) {
929 bson_set_error (&cursor->error,
930 MONGOC_ERROR_BSON,
931 MONGOC_ERROR_BSON_INVALID,
932 "Invalid 'opts' parameter.");
933 return false;
934 }
935
936 while (bson_iter_next (&iter)) {
937 key = bson_iter_key (&iter);
938
939 if (!strcmp (key, MONGOC_CURSOR_ALLOW_PARTIAL_RESULTS)) {
940 ADD_FLAG (flags, MONGOC_QUERY_PARTIAL);
941 } else if (!strcmp (key, MONGOC_CURSOR_AWAIT_DATA)) {
942 ADD_FLAG (flags, MONGOC_QUERY_AWAIT_DATA);
943 } else if (!strcmp (key, MONGOC_CURSOR_EXHAUST)) {
944 ADD_FLAG (flags, MONGOC_QUERY_EXHAUST);
945 } else if (!strcmp (key, MONGOC_CURSOR_NO_CURSOR_TIMEOUT)) {
946 ADD_FLAG (flags, MONGOC_QUERY_NO_CURSOR_TIMEOUT);
947 } else if (!strcmp (key, MONGOC_CURSOR_OPLOG_REPLAY)) {
948 ADD_FLAG (flags, MONGOC_QUERY_OPLOG_REPLAY);
949 } else if (!strcmp (key, MONGOC_CURSOR_TAILABLE)) {
950 ADD_FLAG (flags, MONGOC_QUERY_TAILABLE_CURSOR);
951 }
952 }
953
954 if (cursor->slave_ok) {
955 *flags |= MONGOC_QUERY_SLAVE_OK;
956 } else if (cursor->server_id_set &&
957 (stream->topology_type == MONGOC_TOPOLOGY_RS_WITH_PRIMARY ||
958 stream->topology_type == MONGOC_TOPOLOGY_RS_NO_PRIMARY) &&
959 stream->sd->type != MONGOC_SERVER_RS_PRIMARY) {
960 *flags |= MONGOC_QUERY_SLAVE_OK;
961 }
962
963 return true;
964 }
965
966
967 static bson_t *
_mongoc_cursor_parse_opts_for_op_query(mongoc_cursor_t * cursor,mongoc_server_stream_t * stream,bson_t * query,bson_t * fields,mongoc_query_flags_t * flags,int32_t * skip)968 _mongoc_cursor_parse_opts_for_op_query (mongoc_cursor_t *cursor,
969 mongoc_server_stream_t *stream,
970 bson_t *query /* OUT */,
971 bson_t *fields /* OUT */,
972 mongoc_query_flags_t *flags /* OUT */,
973 int32_t *skip /* OUT */)
974 {
975 bool pushed_dollar_query;
976 bson_iter_t iter;
977 uint32_t len;
978 const uint8_t *data;
979 bson_t subdocument;
980 const char *key;
981 char *dollar_modifier;
982
983 *flags = MONGOC_QUERY_NONE;
984 *skip = 0;
985
986 /* assume we'll send filter straight to server, like "{a: 1}". if we find an
987 * opt we must add, like "sort", we push the query like "$query: {a: 1}",
988 * then add a query modifier for the option, in this example "$orderby".
989 */
990 pushed_dollar_query = false;
991
992 if (!bson_iter_init (&iter, &cursor->opts)) {
993 OPT_BSON_ERR ("Invalid 'opts' parameter.");
994 }
995
996 while (bson_iter_next (&iter)) {
997 key = bson_iter_key (&iter);
998
999 /* most common options first */
1000 if (!strcmp (key, MONGOC_CURSOR_PROJECTION)) {
1001 OPT_CHECK (DOCUMENT);
1002 bson_iter_document (&iter, &len, &data);
1003 if (!bson_init_static (&subdocument, data, (size_t) len)) {
1004 OPT_BSON_ERR ("Invalid 'projection' subdocument in 'opts'.");
1005 }
1006
1007 bson_copy_to (&subdocument, fields);
1008 } else if (!strcmp (key, MONGOC_CURSOR_SORT)) {
1009 PUSH_DOLLAR_QUERY ();
1010 OPT_SUBDOCUMENT (sort, orderby);
1011 } else if (!strcmp (key, MONGOC_CURSOR_SKIP)) {
1012 OPT_CHECK_INT ();
1013 *skip = (int32_t) bson_iter_as_int64 (&iter);
1014 }
1015 /* the rest of the options, alphabetically */
1016 else if (!strcmp (key, MONGOC_CURSOR_ALLOW_PARTIAL_RESULTS)) {
1017 OPT_FLAG (MONGOC_QUERY_PARTIAL);
1018 } else if (!strcmp (key, MONGOC_CURSOR_AWAIT_DATA)) {
1019 OPT_FLAG (MONGOC_QUERY_AWAIT_DATA);
1020 } else if (!strcmp (key, MONGOC_CURSOR_COMMENT)) {
1021 OPT_CHECK (UTF8);
1022 PUSH_DOLLAR_QUERY ();
1023 BSON_APPEND_UTF8 (query, "$comment", bson_iter_utf8 (&iter, NULL));
1024 } else if (!strcmp (key, MONGOC_CURSOR_HINT)) {
1025 if (BSON_ITER_HOLDS_UTF8 (&iter)) {
1026 PUSH_DOLLAR_QUERY ();
1027 BSON_APPEND_UTF8 (query, "$hint", bson_iter_utf8 (&iter, NULL));
1028 } else if (BSON_ITER_HOLDS_DOCUMENT (&iter)) {
1029 PUSH_DOLLAR_QUERY ();
1030 OPT_SUBDOCUMENT (hint, hint);
1031 } else {
1032 OPT_ERR ("Wrong type for 'hint' field in 'opts'.");
1033 }
1034 } else if (!strcmp (key, MONGOC_CURSOR_MAX)) {
1035 PUSH_DOLLAR_QUERY ();
1036 OPT_SUBDOCUMENT (max, max);
1037 } else if (!strcmp (key, MONGOC_CURSOR_MAX_SCAN)) {
1038 OPT_CHECK_INT ();
1039 PUSH_DOLLAR_QUERY ();
1040 BSON_APPEND_INT64 (query, "$maxScan", bson_iter_as_int64 (&iter));
1041 } else if (!strcmp (key, MONGOC_CURSOR_MAX_TIME_MS)) {
1042 OPT_CHECK_INT ();
1043 PUSH_DOLLAR_QUERY ();
1044 BSON_APPEND_INT64 (query, "$maxTimeMS", bson_iter_as_int64 (&iter));
1045 } else if (!strcmp (key, MONGOC_CURSOR_MIN)) {
1046 PUSH_DOLLAR_QUERY ();
1047 OPT_SUBDOCUMENT (min, min);
1048 } else if (!strcmp (key, MONGOC_CURSOR_READ_CONCERN)) {
1049 OPT_ERR ("Set readConcern on client, database, or collection,"
1050 " not in a query.");
1051 } else if (!strcmp (key, MONGOC_CURSOR_RETURN_KEY)) {
1052 OPT_CHECK (BOOL);
1053 PUSH_DOLLAR_QUERY ();
1054 BSON_APPEND_BOOL (query, "$returnKey", bson_iter_as_bool (&iter));
1055 } else if (!strcmp (key, MONGOC_CURSOR_SHOW_RECORD_ID)) {
1056 OPT_CHECK (BOOL);
1057 PUSH_DOLLAR_QUERY ();
1058 BSON_APPEND_BOOL (query, "$showDiskLoc", bson_iter_as_bool (&iter));
1059 } else if (!strcmp (key, MONGOC_CURSOR_SNAPSHOT)) {
1060 OPT_CHECK (BOOL);
1061 PUSH_DOLLAR_QUERY ();
1062 BSON_APPEND_BOOL (query, "$snapshot", bson_iter_as_bool (&iter));
1063 } else if (!strcmp (key, MONGOC_CURSOR_COLLATION)) {
1064 bson_set_error (&cursor->error,
1065 MONGOC_ERROR_CURSOR,
1066 MONGOC_ERROR_PROTOCOL_BAD_WIRE_VERSION,
1067 "Collation is not supported by this server");
1068 return NULL;
1069 }
1070 /* singleBatch limit and batchSize are handled in _mongoc_n_return,
1071 * exhaust noCursorTimeout oplogReplay tailable in _mongoc_cursor_flags
1072 * maxAwaitTimeMS is handled in _mongoc_cursor_prepare_getmore_command
1073 */
1074 else if (strcmp (key, MONGOC_CURSOR_SINGLE_BATCH) &&
1075 strcmp (key, MONGOC_CURSOR_LIMIT) &&
1076 strcmp (key, MONGOC_CURSOR_BATCH_SIZE) &&
1077 strcmp (key, MONGOC_CURSOR_EXHAUST) &&
1078 strcmp (key, MONGOC_CURSOR_NO_CURSOR_TIMEOUT) &&
1079 strcmp (key, MONGOC_CURSOR_OPLOG_REPLAY) &&
1080 strcmp (key, MONGOC_CURSOR_TAILABLE) &&
1081 strcmp (key, MONGOC_CURSOR_MAX_AWAIT_TIME_MS)) {
1082 /* pass unrecognized options to server, prefixed with $ */
1083 PUSH_DOLLAR_QUERY ();
1084 dollar_modifier = bson_strdup_printf ("$%s", key);
1085 bson_append_iter (query, dollar_modifier, -1, &iter);
1086 bson_free (dollar_modifier);
1087 }
1088 }
1089
1090 if (!_mongoc_cursor_flags (cursor, stream, flags)) {
1091 /* cursor->error is set */
1092 return NULL;
1093 }
1094
1095 return pushed_dollar_query ? query : &cursor->filter;
1096 }
1097
1098
1099 #undef OPT_CHECK
1100 #undef OPT_ERR
1101 #undef OPT_BSON_ERR
1102 #undef OPT_FLAG
1103 #undef OPT_SUBDOCUMENT
1104
1105
1106 static const bson_t *
_mongoc_cursor_op_query(mongoc_cursor_t * cursor,mongoc_server_stream_t * server_stream)1107 _mongoc_cursor_op_query (mongoc_cursor_t *cursor,
1108 mongoc_server_stream_t *server_stream)
1109 {
1110 int64_t started;
1111 uint32_t request_id;
1112 mongoc_rpc_t rpc;
1113 const char *cmd_name; /* for command monitoring */
1114 const bson_t *query_ptr;
1115 bson_t query = BSON_INITIALIZER;
1116 bson_t fields = BSON_INITIALIZER;
1117 mongoc_query_flags_t flags;
1118 mongoc_apply_read_prefs_result_t result = READ_PREFS_RESULT_INIT;
1119 const bson_t *ret = NULL;
1120 bool succeeded = false;
1121
1122 ENTRY;
1123
1124 started = bson_get_monotonic_time ();
1125
1126 cursor->sent = true;
1127 cursor->operation_id = ++cursor->client->cluster.operation_id;
1128
1129 request_id = ++cursor->client->cluster.request_id;
1130
1131 rpc.header.msg_len = 0;
1132 rpc.header.request_id = request_id;
1133 rpc.header.response_to = 0;
1134 rpc.header.opcode = MONGOC_OPCODE_QUERY;
1135 rpc.query.flags = MONGOC_QUERY_NONE;
1136 rpc.query.collection = cursor->ns;
1137 rpc.query.skip = 0;
1138 rpc.query.n_return = 0;
1139 rpc.query.fields = NULL;
1140
1141 if (cursor->is_command) {
1142 /* "filter" isn't a query, it's like {commandName: ... }*/
1143 cmd_name = _mongoc_get_command_name (&cursor->filter);
1144 BSON_ASSERT (cmd_name);
1145 } else {
1146 cmd_name = "find";
1147 }
1148
1149 query_ptr = _mongoc_cursor_parse_opts_for_op_query (
1150 cursor, server_stream, &query, &fields, &flags, &rpc.query.skip);
1151
1152 if (!query_ptr) {
1153 /* invalid opts. cursor->error is set */
1154 GOTO (done);
1155 }
1156
1157 apply_read_preferences (
1158 cursor->read_prefs, server_stream, query_ptr, flags, &result);
1159
1160 rpc.query.query = bson_get_data (result.query_with_read_prefs);
1161 rpc.query.flags = result.flags;
1162 rpc.query.n_return = _mongoc_n_return (cursor);
1163 if (!bson_empty (&fields)) {
1164 rpc.query.fields = bson_get_data (&fields);
1165 }
1166
1167 if (!_mongoc_cursor_monitor_legacy_query (cursor, server_stream, cmd_name)) {
1168 GOTO (done);
1169 }
1170
1171 if (!mongoc_cluster_sendv_to_server (&cursor->client->cluster,
1172 &rpc,
1173 server_stream,
1174 NULL,
1175 &cursor->error)) {
1176 GOTO (done);
1177 }
1178
1179 _mongoc_buffer_clear (&cursor->buffer, false);
1180
1181 if (!_mongoc_client_recv (cursor->client,
1182 &cursor->rpc,
1183 &cursor->buffer,
1184 server_stream,
1185 &cursor->error)) {
1186 GOTO (done);
1187 }
1188
1189 if (cursor->rpc.header.opcode != MONGOC_OPCODE_REPLY) {
1190 bson_set_error (&cursor->error,
1191 MONGOC_ERROR_PROTOCOL,
1192 MONGOC_ERROR_PROTOCOL_INVALID_REPLY,
1193 "Invalid opcode. Expected %d, got %d.",
1194 MONGOC_OPCODE_REPLY,
1195 cursor->rpc.header.opcode);
1196 GOTO (done);
1197 }
1198
1199 if (cursor->rpc.header.response_to != request_id) {
1200 bson_set_error (&cursor->error,
1201 MONGOC_ERROR_PROTOCOL,
1202 MONGOC_ERROR_PROTOCOL_INVALID_REPLY,
1203 "Invalid response_to for query. Expected %d, got %d.",
1204 request_id,
1205 cursor->rpc.header.response_to);
1206 GOTO (done);
1207 }
1208
1209 if (!_mongoc_rpc_check_ok (&cursor->rpc,
1210 (bool) cursor->is_command,
1211 cursor->client->error_api_version,
1212 &cursor->error,
1213 &cursor->error_doc)) {
1214 GOTO (done);
1215 }
1216
1217 if (cursor->reader) {
1218 bson_reader_destroy (cursor->reader);
1219 }
1220
1221 cursor->reader = bson_reader_new_from_data (
1222 cursor->rpc.reply.documents, (size_t) cursor->rpc.reply.documents_len);
1223
1224 if (_mongoc_cursor_get_opt_bool (cursor, MONGOC_CURSOR_EXHAUST)) {
1225 cursor->in_exhaust = true;
1226 cursor->client->in_exhaust = true;
1227 }
1228
1229 _mongoc_cursor_monitor_succeeded (cursor,
1230 bson_get_monotonic_time () - started,
1231 true, /* first_batch */
1232 server_stream,
1233 cmd_name);
1234
1235 cursor->done = false;
1236 cursor->end_of_event = false;
1237 succeeded = true;
1238
1239 _mongoc_read_from_buffer (cursor, &ret);
1240
1241 done:
1242 if (!succeeded) {
1243 _mongoc_cursor_monitor_failed (
1244 cursor, bson_get_monotonic_time () - started, server_stream, cmd_name);
1245 }
1246
1247 apply_read_prefs_result_cleanup (&result);
1248 bson_destroy (&query);
1249 bson_destroy (&fields);
1250
1251 if (!ret) {
1252 cursor->done = true;
1253 }
1254
1255 RETURN (ret);
1256 }
1257
1258
1259 bool
_mongoc_cursor_run_command(mongoc_cursor_t * cursor,const bson_t * command,bson_t * reply)1260 _mongoc_cursor_run_command (mongoc_cursor_t *cursor,
1261 const bson_t *command,
1262 bson_t *reply)
1263 {
1264 mongoc_cluster_t *cluster;
1265 mongoc_server_stream_t *server_stream;
1266 mongoc_cmd_parts_t parts;
1267 char db[MONGOC_NAMESPACE_MAX];
1268 bool ret = false;
1269
1270 ENTRY;
1271
1272 cluster = &cursor->client->cluster;
1273 mongoc_cmd_parts_init (&parts, db, MONGOC_QUERY_NONE, command);
1274 parts.read_prefs = cursor->read_prefs;
1275 parts.assembled.operation_id = cursor->operation_id;
1276 server_stream = _mongoc_cursor_fetch_stream (cursor);
1277
1278 if (!server_stream) {
1279 GOTO (done);
1280 }
1281
1282 bson_strncpy (db, cursor->ns, cursor->dblen + 1);
1283 parts.assembled.db_name = db;
1284
1285 if (!_mongoc_cursor_flags (cursor, server_stream, &parts.user_query_flags)) {
1286 GOTO (done);
1287 }
1288
1289 if (cursor->write_concern &&
1290 !mongoc_write_concern_is_default (cursor->write_concern) &&
1291 server_stream->sd->max_wire_version >= WIRE_VERSION_CMD_WRITE_CONCERN) {
1292 mongoc_write_concern_append (cursor->write_concern, &parts.extra);
1293 }
1294
1295 ret = mongoc_cluster_run_command_monitored (
1296 cluster, &parts, server_stream, reply, &cursor->error);
1297
1298 /* Read and Write Concern Spec: "Drivers SHOULD parse server replies for a
1299 * "writeConcernError" field and report the error only in command-specific
1300 * helper methods that take a separate write concern parameter or an options
1301 * parameter that may contain a write concern option.
1302 *
1303 * Only command helpers with names like "_with_write_concern" can create
1304 * cursors with a non-NULL write_concern field.
1305 */
1306 if (ret && cursor->write_concern) {
1307 ret = !_mongoc_parse_wc_err (reply, &cursor->error);
1308 }
1309
1310 done:
1311 mongoc_server_stream_cleanup (server_stream);
1312 mongoc_cmd_parts_cleanup (&parts);
1313
1314 return ret;
1315 }
1316
1317
1318 static bool
_translate_query_opt(const char * query_field,const char ** cmd_field,int * len)1319 _translate_query_opt (const char *query_field, const char **cmd_field, int *len)
1320 {
1321 if (query_field[0] != '$') {
1322 *cmd_field = query_field;
1323 *len = -1;
1324 return true;
1325 }
1326
1327 /* strip the leading '$' */
1328 query_field++;
1329
1330 if (!strcmp (MONGOC_CURSOR_ORDERBY, query_field)) {
1331 *cmd_field = MONGOC_CURSOR_SORT;
1332 *len = MONGOC_CURSOR_SORT_LEN;
1333 } else if (!strcmp (MONGOC_CURSOR_SHOW_DISK_LOC,
1334 query_field)) { /* <= MongoDb 3.0 */
1335 *cmd_field = MONGOC_CURSOR_SHOW_RECORD_ID;
1336 *len = MONGOC_CURSOR_SHOW_RECORD_ID_LEN;
1337 } else if (!strcmp (MONGOC_CURSOR_HINT, query_field)) {
1338 *cmd_field = MONGOC_CURSOR_HINT;
1339 *len = MONGOC_CURSOR_HINT_LEN;
1340 } else if (!strcmp (MONGOC_CURSOR_COMMENT, query_field)) {
1341 *cmd_field = MONGOC_CURSOR_COMMENT;
1342 *len = MONGOC_CURSOR_COMMENT_LEN;
1343 } else if (!strcmp (MONGOC_CURSOR_MAX_SCAN, query_field)) {
1344 *cmd_field = MONGOC_CURSOR_MAX_SCAN;
1345 *len = MONGOC_CURSOR_MAX_SCAN_LEN;
1346 } else if (!strcmp (MONGOC_CURSOR_MAX_TIME_MS, query_field)) {
1347 *cmd_field = MONGOC_CURSOR_MAX_TIME_MS;
1348 *len = MONGOC_CURSOR_MAX_TIME_MS_LEN;
1349 } else if (!strcmp (MONGOC_CURSOR_MAX, query_field)) {
1350 *cmd_field = MONGOC_CURSOR_MAX;
1351 *len = MONGOC_CURSOR_MAX_LEN;
1352 } else if (!strcmp (MONGOC_CURSOR_MIN, query_field)) {
1353 *cmd_field = MONGOC_CURSOR_MIN;
1354 *len = MONGOC_CURSOR_MIN_LEN;
1355 } else if (!strcmp (MONGOC_CURSOR_RETURN_KEY, query_field)) {
1356 *cmd_field = MONGOC_CURSOR_RETURN_KEY;
1357 *len = MONGOC_CURSOR_RETURN_KEY_LEN;
1358 } else if (!strcmp (MONGOC_CURSOR_SNAPSHOT, query_field)) {
1359 *cmd_field = MONGOC_CURSOR_SNAPSHOT;
1360 *len = MONGOC_CURSOR_SNAPSHOT_LEN;
1361 } else {
1362 /* not a special command field, must be a query operator like $or */
1363 return false;
1364 }
1365
1366 return true;
1367 }
1368
1369
1370 void
_mongoc_cursor_collection(const mongoc_cursor_t * cursor,const char ** collection,int * collection_len)1371 _mongoc_cursor_collection (const mongoc_cursor_t *cursor,
1372 const char **collection,
1373 int *collection_len)
1374 {
1375 /* ns is like "db.collection". Collection name is located past the ".". */
1376 *collection = cursor->ns + (cursor->dblen + 1);
1377 /* Collection name's length is ns length, minus length of db name and ".". */
1378 *collection_len = cursor->nslen - cursor->dblen - 1;
1379
1380 BSON_ASSERT (*collection_len > 0);
1381 }
1382
1383
1384 static bool
_mongoc_cursor_prepare_find_command(mongoc_cursor_t * cursor,bson_t * command,mongoc_server_stream_t * server_stream)1385 _mongoc_cursor_prepare_find_command (mongoc_cursor_t *cursor,
1386 bson_t *command,
1387 mongoc_server_stream_t *server_stream)
1388 {
1389 const char *collection;
1390 int collection_len;
1391 bson_iter_t iter;
1392
1393 _mongoc_cursor_collection (cursor, &collection, &collection_len);
1394 bson_append_utf8 (command,
1395 MONGOC_CURSOR_FIND,
1396 MONGOC_CURSOR_FIND_LEN,
1397 collection,
1398 collection_len);
1399 bson_append_document (
1400 command, MONGOC_CURSOR_FILTER, MONGOC_CURSOR_FILTER_LEN, &cursor->filter);
1401 bson_iter_init (&iter, &cursor->opts);
1402
1403 while (bson_iter_next (&iter)) {
1404 /* don't append "maxAwaitTimeMS" */
1405 if (!strcmp (bson_iter_key (&iter), MONGOC_CURSOR_COLLATION) &&
1406 server_stream->sd->max_wire_version < WIRE_VERSION_COLLATION) {
1407 bson_set_error (&cursor->error,
1408 MONGOC_ERROR_CURSOR,
1409 MONGOC_ERROR_PROTOCOL_BAD_WIRE_VERSION,
1410 "Collation is not supported by this server");
1411 MARK_FAILED (cursor);
1412 return false;
1413 } else if (strcmp (bson_iter_key (&iter),
1414 MONGOC_CURSOR_MAX_AWAIT_TIME_MS)) {
1415 if (!bson_append_iter (command, bson_iter_key (&iter), -1, &iter)) {
1416 bson_set_error (&cursor->error,
1417 MONGOC_ERROR_BSON,
1418 MONGOC_ERROR_BSON_INVALID,
1419 "Cursor opts too large");
1420 MARK_FAILED (cursor);
1421 return false;
1422 }
1423 }
1424 }
1425
1426 if (cursor->read_concern->level != NULL) {
1427 const bson_t *read_concern_bson;
1428
1429 read_concern_bson = _mongoc_read_concern_get_bson (cursor->read_concern);
1430 bson_append_document (command,
1431 MONGOC_CURSOR_READ_CONCERN,
1432 MONGOC_CURSOR_READ_CONCERN_LEN,
1433 read_concern_bson);
1434 }
1435
1436 return true;
1437 }
1438
1439
1440 static const bson_t *
_mongoc_cursor_find_command(mongoc_cursor_t * cursor,mongoc_server_stream_t * server_stream)1441 _mongoc_cursor_find_command (mongoc_cursor_t *cursor,
1442 mongoc_server_stream_t *server_stream)
1443 {
1444 bson_t command = BSON_INITIALIZER;
1445 const bson_t *bson = NULL;
1446
1447 ENTRY;
1448
1449 if (!_mongoc_cursor_prepare_find_command (cursor, &command, server_stream)) {
1450 RETURN (NULL);
1451 }
1452
1453 _mongoc_cursor_cursorid_init (cursor, &command);
1454 bson_destroy (&command);
1455
1456 BSON_ASSERT (cursor->iface.next);
1457 _mongoc_cursor_cursorid_next (cursor, &bson);
1458
1459 RETURN (bson);
1460 }
1461
1462
1463 static const bson_t *
_mongoc_cursor_get_more(mongoc_cursor_t * cursor)1464 _mongoc_cursor_get_more (mongoc_cursor_t *cursor)
1465 {
1466 mongoc_server_stream_t *server_stream;
1467 const bson_t *b = NULL;
1468
1469 ENTRY;
1470
1471 BSON_ASSERT (cursor);
1472
1473 server_stream = _mongoc_cursor_fetch_stream (cursor);
1474 if (!server_stream) {
1475 GOTO (failure);
1476 }
1477
1478 if (!cursor->in_exhaust && !cursor->rpc.reply.cursor_id) {
1479 bson_set_error (&cursor->error,
1480 MONGOC_ERROR_CURSOR,
1481 MONGOC_ERROR_CURSOR_INVALID_CURSOR,
1482 "No valid cursor was provided.");
1483 GOTO (failure);
1484 }
1485
1486 if (!_mongoc_cursor_op_getmore (cursor, server_stream)) {
1487 GOTO (failure);
1488 }
1489
1490 mongoc_server_stream_cleanup (server_stream);
1491
1492 if (cursor->reader) {
1493 _mongoc_read_from_buffer (cursor, &b);
1494 }
1495
1496 RETURN (b);
1497
1498 failure:
1499 cursor->done = true;
1500
1501 mongoc_server_stream_cleanup (server_stream);
1502
1503 RETURN (NULL);
1504 }
1505
1506
1507 static bool
_mongoc_cursor_monitor_legacy_get_more(mongoc_cursor_t * cursor,mongoc_server_stream_t * server_stream)1508 _mongoc_cursor_monitor_legacy_get_more (mongoc_cursor_t *cursor,
1509 mongoc_server_stream_t *server_stream)
1510 {
1511 bson_t doc;
1512 char db[MONGOC_NAMESPACE_MAX];
1513 mongoc_client_t *client;
1514 mongoc_apm_command_started_t event;
1515
1516 ENTRY;
1517
1518 client = cursor->client;
1519 if (!client->apm_callbacks.started) {
1520 /* successful */
1521 RETURN (true);
1522 }
1523
1524 bson_init (&doc);
1525 if (!_mongoc_cursor_prepare_getmore_command (cursor, &doc)) {
1526 bson_destroy (&doc);
1527 RETURN (false);
1528 }
1529
1530 bson_strncpy (db, cursor->ns, cursor->dblen + 1);
1531 mongoc_apm_command_started_init (&event,
1532 &doc,
1533 db,
1534 "getMore",
1535 client->cluster.request_id,
1536 cursor->operation_id,
1537 &server_stream->sd->host,
1538 server_stream->sd->id,
1539 client->apm_context);
1540
1541 client->apm_callbacks.started (&event);
1542 mongoc_apm_command_started_cleanup (&event);
1543 bson_destroy (&doc);
1544
1545 RETURN (true);
1546 }
1547
1548
1549 bool
_mongoc_cursor_op_getmore(mongoc_cursor_t * cursor,mongoc_server_stream_t * server_stream)1550 _mongoc_cursor_op_getmore (mongoc_cursor_t *cursor,
1551 mongoc_server_stream_t *server_stream)
1552 {
1553 int64_t started;
1554 mongoc_rpc_t rpc;
1555 uint32_t request_id;
1556 mongoc_cluster_t *cluster;
1557 mongoc_query_flags_t flags;
1558
1559 ENTRY;
1560
1561 started = bson_get_monotonic_time ();
1562 cluster = &cursor->client->cluster;
1563
1564 if (!_mongoc_cursor_flags (cursor, server_stream, &flags)) {
1565 GOTO (fail);
1566 }
1567
1568 if (cursor->in_exhaust) {
1569 request_id = (uint32_t) cursor->rpc.header.request_id;
1570 } else {
1571 request_id = ++cluster->request_id;
1572
1573 rpc.get_more.cursor_id = cursor->rpc.reply.cursor_id;
1574 rpc.header.msg_len = 0;
1575 rpc.header.request_id = request_id;
1576 rpc.header.response_to = 0;
1577 rpc.header.opcode = MONGOC_OPCODE_GET_MORE;
1578 rpc.get_more.zero = 0;
1579 rpc.get_more.collection = cursor->ns;
1580
1581 if (flags & MONGOC_QUERY_TAILABLE_CURSOR) {
1582 rpc.get_more.n_return = 0;
1583 } else {
1584 rpc.get_more.n_return = _mongoc_n_return (cursor);
1585 }
1586
1587 if (!_mongoc_cursor_monitor_legacy_get_more (cursor, server_stream)) {
1588 GOTO (fail);
1589 }
1590
1591 if (!mongoc_cluster_sendv_to_server (
1592 cluster, &rpc, server_stream, NULL, &cursor->error)) {
1593 GOTO (fail);
1594 }
1595 }
1596
1597 _mongoc_buffer_clear (&cursor->buffer, false);
1598
1599 if (!_mongoc_client_recv (cursor->client,
1600 &cursor->rpc,
1601 &cursor->buffer,
1602 server_stream,
1603 &cursor->error)) {
1604 GOTO (fail);
1605 }
1606
1607 if (cursor->rpc.header.opcode != MONGOC_OPCODE_REPLY) {
1608 bson_set_error (&cursor->error,
1609 MONGOC_ERROR_PROTOCOL,
1610 MONGOC_ERROR_PROTOCOL_INVALID_REPLY,
1611 "Invalid opcode. Expected %d, got %d.",
1612 MONGOC_OPCODE_REPLY,
1613 cursor->rpc.header.opcode);
1614 GOTO (fail);
1615 }
1616
1617 if (cursor->rpc.header.response_to != request_id) {
1618 bson_set_error (&cursor->error,
1619 MONGOC_ERROR_PROTOCOL,
1620 MONGOC_ERROR_PROTOCOL_INVALID_REPLY,
1621 "Invalid response_to for getmore. Expected %d, got %d.",
1622 request_id,
1623 cursor->rpc.header.response_to);
1624 GOTO (fail);
1625 }
1626
1627 if (!_mongoc_rpc_check_ok (&cursor->rpc,
1628 false /* is_command */,
1629 cursor->client->error_api_version,
1630 &cursor->error,
1631 &cursor->error_doc)) {
1632 GOTO (fail);
1633 }
1634
1635 if (cursor->reader) {
1636 bson_reader_destroy (cursor->reader);
1637 }
1638
1639 cursor->reader = bson_reader_new_from_data (
1640 cursor->rpc.reply.documents, (size_t) cursor->rpc.reply.documents_len);
1641
1642 _mongoc_cursor_monitor_succeeded (cursor,
1643 bson_get_monotonic_time () - started,
1644 false, /* not first batch */
1645 server_stream,
1646 "getMore");
1647
1648 RETURN (true);
1649
1650 fail:
1651 _mongoc_cursor_monitor_failed (
1652 cursor, bson_get_monotonic_time () - started, server_stream, "getMore");
1653 RETURN (false);
1654 }
1655
1656
1657 bool
mongoc_cursor_error(mongoc_cursor_t * cursor,bson_error_t * error)1658 mongoc_cursor_error (mongoc_cursor_t *cursor, bson_error_t *error)
1659 {
1660 ENTRY;
1661
1662 RETURN (mongoc_cursor_error_document (cursor, error, NULL));
1663 }
1664
1665
1666 bool
mongoc_cursor_error_document(mongoc_cursor_t * cursor,bson_error_t * error,const bson_t ** doc)1667 mongoc_cursor_error_document (mongoc_cursor_t *cursor,
1668 bson_error_t *error,
1669 const bson_t **doc)
1670 {
1671 bool ret;
1672
1673 ENTRY;
1674
1675 BSON_ASSERT (cursor);
1676
1677 if (cursor->iface.error_document) {
1678 ret = cursor->iface.error_document (cursor, error, doc);
1679 } else {
1680 ret = _mongoc_cursor_error_document (cursor, error, doc);
1681 }
1682
1683 RETURN (ret);
1684 }
1685
1686
1687 bool
_mongoc_cursor_error_document(mongoc_cursor_t * cursor,bson_error_t * error,const bson_t ** doc)1688 _mongoc_cursor_error_document (mongoc_cursor_t *cursor,
1689 bson_error_t *error,
1690 const bson_t **doc)
1691 {
1692 ENTRY;
1693
1694 BSON_ASSERT (cursor);
1695
1696 if (BSON_UNLIKELY (CURSOR_FAILED (cursor))) {
1697 bson_set_error (error,
1698 cursor->error.domain,
1699 cursor->error.code,
1700 "%s",
1701 cursor->error.message);
1702
1703 if (doc) {
1704 *doc = &cursor->error_doc;
1705 }
1706
1707 RETURN (true);
1708 }
1709
1710 if (doc) {
1711 *doc = NULL;
1712 }
1713
1714 RETURN (false);
1715 }
1716
1717
1718 bool
mongoc_cursor_next(mongoc_cursor_t * cursor,const bson_t ** bson)1719 mongoc_cursor_next (mongoc_cursor_t *cursor, const bson_t **bson)
1720 {
1721 bool ret;
1722
1723 ENTRY;
1724
1725 BSON_ASSERT (cursor);
1726 BSON_ASSERT (bson);
1727
1728 TRACE ("cursor_id(%" PRId64 ")", cursor->rpc.reply.cursor_id);
1729
1730 if (bson) {
1731 *bson = NULL;
1732 }
1733
1734 if (CURSOR_FAILED (cursor)) {
1735 return false;
1736 }
1737
1738 if (cursor->done) {
1739 bson_set_error (&cursor->error,
1740 MONGOC_ERROR_CURSOR,
1741 MONGOC_ERROR_CURSOR_INVALID_CURSOR,
1742 "Cannot advance a completed or failed cursor.");
1743 return false;
1744 }
1745
1746 /*
1747 * We cannot proceed if another cursor is receiving results in exhaust mode.
1748 */
1749 if (cursor->client->in_exhaust && !cursor->in_exhaust) {
1750 bson_set_error (&cursor->error,
1751 MONGOC_ERROR_CLIENT,
1752 MONGOC_ERROR_CLIENT_IN_EXHAUST,
1753 "Another cursor derived from this client is in exhaust.");
1754 RETURN (false);
1755 }
1756
1757 if (cursor->iface.next) {
1758 ret = cursor->iface.next (cursor, bson);
1759 } else {
1760 ret = _mongoc_cursor_next (cursor, bson);
1761 }
1762
1763 cursor->current = *bson;
1764
1765 cursor->count++;
1766
1767 RETURN (ret);
1768 }
1769
1770
1771 bool
_mongoc_read_from_buffer(mongoc_cursor_t * cursor,const bson_t ** bson)1772 _mongoc_read_from_buffer (mongoc_cursor_t *cursor, const bson_t **bson)
1773 {
1774 bool eof = false;
1775
1776 BSON_ASSERT (cursor->reader);
1777
1778 *bson = bson_reader_read (cursor->reader, &eof);
1779 cursor->end_of_event = eof ? 1 : 0;
1780
1781 return *bson ? true : false;
1782 }
1783
1784
1785 bool
_mongoc_cursor_next(mongoc_cursor_t * cursor,const bson_t ** bson)1786 _mongoc_cursor_next (mongoc_cursor_t *cursor, const bson_t **bson)
1787 {
1788 int64_t limit;
1789 const bson_t *b = NULL;
1790 bool tailable;
1791
1792 ENTRY;
1793
1794 BSON_ASSERT (cursor);
1795
1796 if (bson) {
1797 *bson = NULL;
1798 }
1799
1800 /*
1801 * If we reached our limit, make sure we mark this as done and do not try to
1802 * make further progress. We also set end_of_event so that
1803 * mongoc_cursor_more will be false.
1804 */
1805 limit = mongoc_cursor_get_limit (cursor);
1806
1807 if (limit && cursor->count >= llabs (limit)) {
1808 cursor->done = true;
1809 cursor->end_of_event = true;
1810 RETURN (false);
1811 }
1812
1813 /*
1814 * Try to read the next document from the reader if it exists, we might
1815 * get NULL back and EOF, in which case we need to submit a getmore.
1816 */
1817 if (cursor->reader) {
1818 _mongoc_read_from_buffer (cursor, &b);
1819 if (b) {
1820 GOTO (complete);
1821 }
1822 }
1823
1824 /*
1825 * Check to see if we need to send a GET_MORE for more results.
1826 */
1827 if (!cursor->sent) {
1828 b = _mongoc_cursor_initial_query (cursor);
1829 } else if (BSON_UNLIKELY (cursor->end_of_event) &&
1830 cursor->rpc.reply.cursor_id) {
1831 b = _mongoc_cursor_get_more (cursor);
1832 }
1833
1834 complete:
1835 tailable = _mongoc_cursor_get_opt_bool (cursor, "tailable");
1836 cursor->done = (cursor->end_of_event &&
1837 ((cursor->in_exhaust && !cursor->rpc.reply.cursor_id) ||
1838 (!b && !tailable)));
1839
1840 if (bson) {
1841 *bson = b;
1842 }
1843
1844 RETURN (!!b);
1845 }
1846
1847
1848 bool
mongoc_cursor_more(mongoc_cursor_t * cursor)1849 mongoc_cursor_more (mongoc_cursor_t *cursor)
1850 {
1851 bool ret;
1852
1853 ENTRY;
1854
1855 BSON_ASSERT (cursor);
1856
1857 if (cursor->iface.more) {
1858 ret = cursor->iface.more (cursor);
1859 } else {
1860 ret = _mongoc_cursor_more (cursor);
1861 }
1862
1863 RETURN (ret);
1864 }
1865
1866
1867 bool
_mongoc_cursor_more(mongoc_cursor_t * cursor)1868 _mongoc_cursor_more (mongoc_cursor_t *cursor)
1869 {
1870 BSON_ASSERT (cursor);
1871
1872 if (CURSOR_FAILED (cursor)) {
1873 return false;
1874 }
1875
1876 return !(cursor->sent && cursor->done && cursor->end_of_event);
1877 }
1878
1879
1880 void
mongoc_cursor_get_host(mongoc_cursor_t * cursor,mongoc_host_list_t * host)1881 mongoc_cursor_get_host (mongoc_cursor_t *cursor, mongoc_host_list_t *host)
1882 {
1883 BSON_ASSERT (cursor);
1884 BSON_ASSERT (host);
1885
1886 if (cursor->iface.get_host) {
1887 cursor->iface.get_host (cursor, host);
1888 } else {
1889 _mongoc_cursor_get_host (cursor, host);
1890 }
1891
1892 EXIT;
1893 }
1894
1895 void
_mongoc_cursor_get_host(mongoc_cursor_t * cursor,mongoc_host_list_t * host)1896 _mongoc_cursor_get_host (mongoc_cursor_t *cursor, mongoc_host_list_t *host)
1897 {
1898 mongoc_server_description_t *description;
1899
1900 BSON_ASSERT (cursor);
1901 BSON_ASSERT (host);
1902
1903 memset (host, 0, sizeof *host);
1904
1905 if (!cursor->server_id) {
1906 MONGOC_WARNING ("%s(): Must send query before fetching peer.", BSON_FUNC);
1907 return;
1908 }
1909
1910 description = mongoc_topology_server_by_id (
1911 cursor->client->topology, cursor->server_id, &cursor->error);
1912 if (!description) {
1913 return;
1914 }
1915
1916 *host = description->host;
1917
1918 mongoc_server_description_destroy (description);
1919
1920 return;
1921 }
1922
1923 mongoc_cursor_t *
mongoc_cursor_clone(const mongoc_cursor_t * cursor)1924 mongoc_cursor_clone (const mongoc_cursor_t *cursor)
1925 {
1926 mongoc_cursor_t *ret;
1927
1928 BSON_ASSERT (cursor);
1929
1930 if (cursor->iface.clone) {
1931 ret = cursor->iface.clone (cursor);
1932 } else {
1933 ret = _mongoc_cursor_clone (cursor);
1934 }
1935
1936 RETURN (ret);
1937 }
1938
1939
1940 mongoc_cursor_t *
_mongoc_cursor_clone(const mongoc_cursor_t * cursor)1941 _mongoc_cursor_clone (const mongoc_cursor_t *cursor)
1942 {
1943 mongoc_cursor_t *_clone;
1944
1945 ENTRY;
1946
1947 BSON_ASSERT (cursor);
1948
1949 _clone = (mongoc_cursor_t *) bson_malloc0 (sizeof *_clone);
1950
1951 _clone->client = cursor->client;
1952 _clone->is_command = cursor->is_command;
1953 _clone->nslen = cursor->nslen;
1954 _clone->dblen = cursor->dblen;
1955 _clone->has_fields = cursor->has_fields;
1956
1957 if (cursor->read_prefs) {
1958 _clone->read_prefs = mongoc_read_prefs_copy (cursor->read_prefs);
1959 }
1960
1961 if (cursor->read_concern) {
1962 _clone->read_concern = mongoc_read_concern_copy (cursor->read_concern);
1963 }
1964
1965
1966 bson_copy_to (&cursor->filter, &_clone->filter);
1967 bson_copy_to (&cursor->opts, &_clone->opts);
1968 bson_copy_to (&cursor->error_doc, &_clone->error_doc);
1969
1970 bson_strncpy (_clone->ns, cursor->ns, sizeof _clone->ns);
1971
1972 _mongoc_buffer_init (&_clone->buffer, NULL, 0, NULL, NULL);
1973
1974 mongoc_counter_cursors_active_inc ();
1975
1976 RETURN (_clone);
1977 }
1978
1979
1980 /*
1981 *--------------------------------------------------------------------------
1982 *
1983 * mongoc_cursor_is_alive --
1984 *
1985 * Checks to see if a cursor is alive.
1986 *
1987 * This is primarily useful with tailable cursors.
1988 *
1989 * Returns:
1990 * true if the cursor is alive.
1991 *
1992 * Side effects:
1993 * None.
1994 *
1995 *--------------------------------------------------------------------------
1996 */
1997
1998 bool
mongoc_cursor_is_alive(const mongoc_cursor_t * cursor)1999 mongoc_cursor_is_alive (const mongoc_cursor_t *cursor) /* IN */
2000 {
2001 BSON_ASSERT (cursor);
2002
2003 return !cursor->done;
2004 }
2005
2006
2007 const bson_t *
mongoc_cursor_current(const mongoc_cursor_t * cursor)2008 mongoc_cursor_current (const mongoc_cursor_t *cursor) /* IN */
2009 {
2010 BSON_ASSERT (cursor);
2011
2012 return cursor->current;
2013 }
2014
2015
2016 void
mongoc_cursor_set_batch_size(mongoc_cursor_t * cursor,uint32_t batch_size)2017 mongoc_cursor_set_batch_size (mongoc_cursor_t *cursor, uint32_t batch_size)
2018 {
2019 BSON_ASSERT (cursor);
2020
2021 _mongoc_cursor_set_opt_int64 (
2022 cursor, MONGOC_CURSOR_BATCH_SIZE, (int64_t) batch_size);
2023 }
2024
2025 uint32_t
mongoc_cursor_get_batch_size(const mongoc_cursor_t * cursor)2026 mongoc_cursor_get_batch_size (const mongoc_cursor_t *cursor)
2027 {
2028 BSON_ASSERT (cursor);
2029
2030 return (uint32_t) _mongoc_cursor_get_opt_int64 (
2031 cursor, MONGOC_CURSOR_BATCH_SIZE, 0);
2032 }
2033
2034 bool
mongoc_cursor_set_limit(mongoc_cursor_t * cursor,int64_t limit)2035 mongoc_cursor_set_limit (mongoc_cursor_t *cursor, int64_t limit)
2036 {
2037 BSON_ASSERT (cursor);
2038
2039 if (!cursor->sent) {
2040 if (limit < 0) {
2041 return _mongoc_cursor_set_opt_int64 (
2042 cursor, MONGOC_CURSOR_LIMIT, -limit) &&
2043 _mongoc_cursor_set_opt_bool (
2044 cursor, MONGOC_CURSOR_SINGLE_BATCH, true);
2045 } else {
2046 return _mongoc_cursor_set_opt_int64 (
2047 cursor, MONGOC_CURSOR_LIMIT, limit);
2048 }
2049 } else {
2050 return false;
2051 }
2052 }
2053
2054 int64_t
mongoc_cursor_get_limit(const mongoc_cursor_t * cursor)2055 mongoc_cursor_get_limit (const mongoc_cursor_t *cursor)
2056 {
2057 int64_t limit;
2058 bool single_batch;
2059
2060 BSON_ASSERT (cursor);
2061
2062 limit = _mongoc_cursor_get_opt_int64 (cursor, MONGOC_CURSOR_LIMIT, 0);
2063 single_batch =
2064 _mongoc_cursor_get_opt_bool (cursor, MONGOC_CURSOR_SINGLE_BATCH);
2065
2066 if (limit > 0 && single_batch) {
2067 limit = -limit;
2068 }
2069
2070 return limit;
2071 }
2072
2073 bool
mongoc_cursor_set_hint(mongoc_cursor_t * cursor,uint32_t server_id)2074 mongoc_cursor_set_hint (mongoc_cursor_t *cursor, uint32_t server_id)
2075 {
2076 BSON_ASSERT (cursor);
2077
2078 if (cursor->server_id) {
2079 MONGOC_ERROR ("mongoc_cursor_set_hint: server_id already set");
2080 return false;
2081 }
2082
2083 if (!server_id) {
2084 MONGOC_ERROR ("mongoc_cursor_set_hint: cannot set server_id to 0");
2085 return false;
2086 }
2087
2088 cursor->server_id = server_id;
2089 cursor->server_id_set = true;
2090
2091 return true;
2092 }
2093
2094 uint32_t
mongoc_cursor_get_hint(const mongoc_cursor_t * cursor)2095 mongoc_cursor_get_hint (const mongoc_cursor_t *cursor)
2096 {
2097 BSON_ASSERT (cursor);
2098
2099 return cursor->server_id;
2100 }
2101
2102 int64_t
mongoc_cursor_get_id(const mongoc_cursor_t * cursor)2103 mongoc_cursor_get_id (const mongoc_cursor_t *cursor)
2104 {
2105 BSON_ASSERT (cursor);
2106
2107 return cursor->rpc.reply.cursor_id;
2108 }
2109
2110 void
mongoc_cursor_set_max_await_time_ms(mongoc_cursor_t * cursor,uint32_t max_await_time_ms)2111 mongoc_cursor_set_max_await_time_ms (mongoc_cursor_t *cursor,
2112 uint32_t max_await_time_ms)
2113 {
2114 BSON_ASSERT (cursor);
2115
2116 if (!cursor->sent) {
2117 _mongoc_cursor_set_opt_int64 (
2118 cursor, MONGOC_CURSOR_MAX_AWAIT_TIME_MS, (int64_t) max_await_time_ms);
2119 }
2120 }
2121
2122 uint32_t
mongoc_cursor_get_max_await_time_ms(const mongoc_cursor_t * cursor)2123 mongoc_cursor_get_max_await_time_ms (const mongoc_cursor_t *cursor)
2124 {
2125 bson_iter_t iter;
2126
2127 BSON_ASSERT (cursor);
2128
2129 if (bson_iter_init_find (
2130 &iter, &cursor->opts, MONGOC_CURSOR_MAX_AWAIT_TIME_MS)) {
2131 return (uint32_t) bson_iter_as_int64 (&iter);
2132 }
2133
2134 return 0;
2135 }
2136
2137
2138 /*
2139 *--------------------------------------------------------------------------
2140 *
2141 * mongoc_cursor_new_from_command_reply --
2142 *
2143 * Low-level function to initialize a mongoc_cursor_t from the
2144 * reply to a command like "aggregate", "find", or "listCollections".
2145 *
2146 * Useful in drivers that wrap the C driver; in applications, use
2147 * high-level functions like mongoc_collection_aggregate instead.
2148 *
2149 * Returns:
2150 * A cursor.
2151 *
2152 * Side effects:
2153 * On failure, the cursor's error is set: retrieve it with
2154 * mongoc_cursor_error. On success or failure, "reply" is
2155 * destroyed.
2156 *
2157 *--------------------------------------------------------------------------
2158 */
2159
2160 mongoc_cursor_t *
mongoc_cursor_new_from_command_reply(mongoc_client_t * client,bson_t * reply,uint32_t server_id)2161 mongoc_cursor_new_from_command_reply (mongoc_client_t *client,
2162 bson_t *reply,
2163 uint32_t server_id)
2164 {
2165 mongoc_cursor_t *cursor;
2166 bson_t cmd = BSON_INITIALIZER;
2167
2168 BSON_ASSERT (client);
2169 BSON_ASSERT (reply);
2170
2171 cursor = _mongoc_cursor_new_with_opts (
2172 client, NULL, false /* is_command */, NULL, NULL, NULL, NULL);
2173
2174 _mongoc_cursor_cursorid_init (cursor, &cmd);
2175 _mongoc_cursor_cursorid_init_with_reply (cursor, reply, server_id);
2176 bson_destroy (&cmd);
2177
2178 return cursor;
2179 }
2180