1 /*
2 * Copyright 2013 MongoDB, Inc.
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17
18 #include "mongoc-cursor.h"
19 #include "mongoc-cursor-private.h"
20 #include "mongoc-cursor-cursorid-private.h"
21 #include "mongoc-log.h"
22 #include "mongoc-trace-private.h"
23 #include "mongoc-error.h"
24 #include "mongoc-util-private.h"
25 #include "mongoc-client-private.h"
26
27
28 #undef MONGOC_LOG_DOMAIN
29 #define MONGOC_LOG_DOMAIN "cursor-cursorid"
30
31
32 static void *
_mongoc_cursor_cursorid_new(void)33 _mongoc_cursor_cursorid_new (void)
34 {
35 mongoc_cursor_cursorid_t *cid;
36
37 ENTRY;
38
39 cid = (mongoc_cursor_cursorid_t *) bson_malloc0 (sizeof *cid);
40 bson_init (&cid->array);
41 cid->in_batch = false;
42 cid->in_reader = false;
43
44 RETURN (cid);
45 }
46
47
48 static void
_mongoc_cursor_cursorid_destroy(mongoc_cursor_t * cursor)49 _mongoc_cursor_cursorid_destroy (mongoc_cursor_t *cursor)
50 {
51 mongoc_cursor_cursorid_t *cid;
52
53 ENTRY;
54
55 cid = (mongoc_cursor_cursorid_t *) cursor->iface_data;
56 BSON_ASSERT (cid);
57
58 bson_destroy (&cid->array);
59 bson_free (cid);
60 _mongoc_cursor_destroy (cursor);
61
62 EXIT;
63 }
64
65
66 /*
67 * Start iterating the reply to an "aggregate", "find", "getMore" etc. command:
68 *
69 * {cursor: {id: 1234, ns: "db.collection", firstBatch: [...]}}
70 */
71 bool
_mongoc_cursor_cursorid_start_batch(mongoc_cursor_t * cursor)72 _mongoc_cursor_cursorid_start_batch (mongoc_cursor_t *cursor)
73 {
74 mongoc_cursor_cursorid_t *cid;
75 bson_iter_t iter;
76 bson_iter_t child;
77 const char *ns;
78 uint32_t nslen;
79
80 cid = (mongoc_cursor_cursorid_t *) cursor->iface_data;
81
82 BSON_ASSERT (cid);
83
84 if (bson_iter_init_find (&iter, &cid->array, "cursor") &&
85 BSON_ITER_HOLDS_DOCUMENT (&iter) && bson_iter_recurse (&iter, &child)) {
86 while (bson_iter_next (&child)) {
87 if (BSON_ITER_IS_KEY (&child, "id")) {
88 cursor->rpc.reply.cursor_id = bson_iter_as_int64 (&child);
89 } else if (BSON_ITER_IS_KEY (&child, "ns")) {
90 ns = bson_iter_utf8 (&child, &nslen);
91 _mongoc_set_cursor_ns (cursor, ns, nslen);
92 } else if (BSON_ITER_IS_KEY (&child, "firstBatch") ||
93 BSON_ITER_IS_KEY (&child, "nextBatch")) {
94 if (BSON_ITER_HOLDS_ARRAY (&child) &&
95 bson_iter_recurse (&child, &cid->batch_iter)) {
96 cid->in_batch = true;
97 }
98 }
99 }
100 }
101
102 return cid->in_batch;
103 }
104
105
106 static bool
_mongoc_cursor_cursorid_refresh_from_command(mongoc_cursor_t * cursor,const bson_t * command)107 _mongoc_cursor_cursorid_refresh_from_command (mongoc_cursor_t *cursor,
108 const bson_t *command)
109 {
110 mongoc_cursor_cursorid_t *cid;
111
112 ENTRY;
113
114 cid = (mongoc_cursor_cursorid_t *) cursor->iface_data;
115 BSON_ASSERT (cid);
116
117 bson_destroy (&cid->array);
118
119 /* server replies to find / aggregate with {cursor: {id: N, firstBatch: []}},
120 * to getMore command with {cursor: {id: N, nextBatch: []}}. */
121 if (_mongoc_cursor_run_command (cursor, command, &cid->array) &&
122 _mongoc_cursor_cursorid_start_batch (cursor)) {
123 RETURN (true);
124 }
125
126 bson_destroy (&cursor->error_doc);
127 bson_copy_to (&cid->array, &cursor->error_doc);
128
129 if (!cursor->error.domain) {
130 bson_set_error (&cursor->error,
131 MONGOC_ERROR_PROTOCOL,
132 MONGOC_ERROR_PROTOCOL_INVALID_REPLY,
133 "Invalid reply to %s command.",
134 _mongoc_get_command_name (command));
135 }
136
137 RETURN (false);
138 }
139
140
141 static void
_mongoc_cursor_cursorid_read_from_batch(mongoc_cursor_t * cursor,const bson_t ** bson)142 _mongoc_cursor_cursorid_read_from_batch (mongoc_cursor_t *cursor,
143 const bson_t **bson)
144 {
145 mongoc_cursor_cursorid_t *cid;
146 const uint8_t *data = NULL;
147 uint32_t data_len = 0;
148
149 ENTRY;
150
151 cid = (mongoc_cursor_cursorid_t *) cursor->iface_data;
152 BSON_ASSERT (cid);
153
154 if (bson_iter_next (&cid->batch_iter) &&
155 BSON_ITER_HOLDS_DOCUMENT (&cid->batch_iter)) {
156 bson_iter_document (&cid->batch_iter, &data_len, &data);
157
158 /* bson_iter_next guarantees valid BSON, so this must succeed */
159 bson_init_static (&cid->current_doc, data, data_len);
160 *bson = &cid->current_doc;
161
162 cursor->end_of_event = false;
163 } else {
164 cursor->end_of_event = true;
165 }
166 }
167
168
169 bool
_mongoc_cursor_cursorid_prime(mongoc_cursor_t * cursor)170 _mongoc_cursor_cursorid_prime (mongoc_cursor_t *cursor)
171 {
172 cursor->sent = true;
173 cursor->operation_id = ++cursor->client->cluster.operation_id;
174 return _mongoc_cursor_cursorid_refresh_from_command (cursor,
175 &cursor->filter);
176 }
177
178
179 bool
_mongoc_cursor_prepare_getmore_command(mongoc_cursor_t * cursor,bson_t * command)180 _mongoc_cursor_prepare_getmore_command (mongoc_cursor_t *cursor,
181 bson_t *command)
182 {
183 const char *collection;
184 int collection_len;
185 int64_t batch_size;
186 bool await_data;
187 int32_t max_await_time_ms;
188
189 ENTRY;
190
191 _mongoc_cursor_collection (cursor, &collection, &collection_len);
192
193 bson_init (command);
194 bson_append_int64 (command, "getMore", 7, mongoc_cursor_get_id (cursor));
195 bson_append_utf8 (command, "collection", 10, collection, collection_len);
196
197 batch_size = mongoc_cursor_get_batch_size (cursor);
198
199 /* See find, getMore, and killCursors Spec for batchSize rules */
200 if (batch_size) {
201 bson_append_int64 (command,
202 MONGOC_CURSOR_BATCH_SIZE,
203 MONGOC_CURSOR_BATCH_SIZE_LEN,
204 abs (_mongoc_n_return (cursor)));
205 }
206
207 /* Find, getMore And killCursors Commands Spec: "In the case of a tailable
208 cursor with awaitData == true the driver MUST provide a Cursor level
209 option named maxAwaitTimeMS (See CRUD specification for details). The
210 maxTimeMS option on the getMore command MUST be set to the value of the
211 option maxAwaitTimeMS. If no maxAwaitTimeMS is specified, the driver
212 SHOULD not set maxTimeMS on the getMore command."
213 */
214 await_data = _mongoc_cursor_get_opt_bool (cursor, MONGOC_CURSOR_TAILABLE) &&
215 _mongoc_cursor_get_opt_bool (cursor, MONGOC_CURSOR_AWAIT_DATA);
216
217
218 if (await_data) {
219 max_await_time_ms =
220 (int32_t) mongoc_cursor_get_max_await_time_ms (cursor);
221 if (max_await_time_ms) {
222 bson_append_int32 (command,
223 MONGOC_CURSOR_MAX_TIME_MS,
224 MONGOC_CURSOR_MAX_TIME_MS_LEN,
225 max_await_time_ms);
226 }
227 }
228
229 RETURN (true);
230 }
231
232
233 static bool
_mongoc_cursor_cursorid_get_more(mongoc_cursor_t * cursor)234 _mongoc_cursor_cursorid_get_more (mongoc_cursor_t *cursor)
235 {
236 mongoc_cursor_cursorid_t *cid;
237 mongoc_server_stream_t *server_stream;
238 bson_t command;
239 bool ret;
240
241 ENTRY;
242
243 cid = (mongoc_cursor_cursorid_t *) cursor->iface_data;
244 BSON_ASSERT (cid);
245
246 server_stream = _mongoc_cursor_fetch_stream (cursor);
247
248 if (!server_stream) {
249 RETURN (false);
250 }
251
252 if (_use_getmore_command (cursor, server_stream)) {
253 if (!_mongoc_cursor_prepare_getmore_command (cursor, &command)) {
254 mongoc_server_stream_cleanup (server_stream);
255 RETURN (false);
256 }
257
258 ret = _mongoc_cursor_cursorid_refresh_from_command (cursor, &command);
259 bson_destroy (&command);
260 } else {
261 ret = _mongoc_cursor_op_getmore (cursor, server_stream);
262 cid->in_reader = ret;
263 }
264
265 mongoc_server_stream_cleanup (server_stream);
266 RETURN (ret);
267 }
268
269
270 bool
_mongoc_cursor_cursorid_next(mongoc_cursor_t * cursor,const bson_t ** bson)271 _mongoc_cursor_cursorid_next (mongoc_cursor_t *cursor, const bson_t **bson)
272 {
273 mongoc_cursor_cursorid_t *cid;
274 bool refreshed = false;
275
276 ENTRY;
277
278 *bson = NULL;
279
280 cid = (mongoc_cursor_cursorid_t *) cursor->iface_data;
281 BSON_ASSERT (cid);
282
283 if (!cursor->sent) {
284 if (!_mongoc_cursor_cursorid_prime (cursor)) {
285 GOTO (done);
286 }
287 }
288
289 again:
290
291 /* Two paths:
292 * - Mongo 3.2+, sent "getMore" cmd, we're reading reply's "nextBatch" array
293 * - Mongo 2.6 to 3, after "aggregate" or similar command we sent OP_GETMORE,
294 * we're reading the raw reply
295 */
296 if (cid->in_batch) {
297 _mongoc_cursor_cursorid_read_from_batch (cursor, bson);
298
299 if (*bson) {
300 GOTO (done);
301 }
302
303 cid->in_batch = false;
304 } else if (cid->in_reader) {
305 _mongoc_read_from_buffer (cursor, bson);
306
307 if (*bson) {
308 GOTO (done);
309 }
310
311 cid->in_reader = false;
312 }
313
314 if (!refreshed && mongoc_cursor_get_id (cursor)) {
315 if (!_mongoc_cursor_cursorid_get_more (cursor)) {
316 GOTO (done);
317 }
318
319 refreshed = true;
320 GOTO (again);
321 }
322
323 done:
324 if (!*bson && mongoc_cursor_get_id (cursor) == 0) {
325 cursor->done = 1;
326 }
327
328 RETURN (*bson != NULL);
329 }
330
331
332 static mongoc_cursor_t *
_mongoc_cursor_cursorid_clone(const mongoc_cursor_t * cursor)333 _mongoc_cursor_cursorid_clone (const mongoc_cursor_t *cursor)
334 {
335 mongoc_cursor_t *clone_;
336
337 ENTRY;
338
339 clone_ = _mongoc_cursor_clone (cursor);
340 _mongoc_cursor_cursorid_init (clone_, &cursor->filter);
341
342 RETURN (clone_);
343 }
344
345
346 static mongoc_cursor_interface_t gMongocCursorCursorid = {
347 _mongoc_cursor_cursorid_clone,
348 _mongoc_cursor_cursorid_destroy,
349 NULL,
350 _mongoc_cursor_cursorid_next,
351 };
352
353
354 void
_mongoc_cursor_cursorid_init(mongoc_cursor_t * cursor,const bson_t * command)355 _mongoc_cursor_cursorid_init (mongoc_cursor_t *cursor, const bson_t *command)
356 {
357 ENTRY;
358
359 bson_destroy (&cursor->filter);
360 bson_copy_to (command, &cursor->filter);
361
362 cursor->iface_data = _mongoc_cursor_cursorid_new ();
363
364 memcpy (&cursor->iface,
365 &gMongocCursorCursorid,
366 sizeof (mongoc_cursor_interface_t));
367
368 EXIT;
369 }
370
371 void
_mongoc_cursor_cursorid_init_with_reply(mongoc_cursor_t * cursor,bson_t * reply,uint32_t server_id)372 _mongoc_cursor_cursorid_init_with_reply (mongoc_cursor_t *cursor,
373 bson_t *reply,
374 uint32_t server_id)
375 {
376 mongoc_cursor_cursorid_t *cid;
377
378 cursor->sent = true;
379 cursor->server_id = server_id;
380
381 cid = (mongoc_cursor_cursorid_t *) cursor->iface_data;
382 BSON_ASSERT (cid);
383
384 bson_destroy (&cid->array);
385 if (!bson_steal (&cid->array, reply)) {
386 bson_steal (&cid->array, bson_copy (reply));
387 }
388
389 if (!_mongoc_cursor_cursorid_start_batch (cursor)) {
390 bson_set_error (&cursor->error,
391 MONGOC_ERROR_CURSOR,
392 MONGOC_ERROR_CURSOR_INVALID_CURSOR,
393 "Couldn't parse cursor document");
394 }
395 }
396