1 /*
2  * Copyright 2013 MongoDB, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *   http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 
18 #include "mongoc-cursor.h"
19 #include "mongoc-cursor-private.h"
20 #include "mongoc-cursor-cursorid-private.h"
21 #include "mongoc-log.h"
22 #include "mongoc-trace-private.h"
23 #include "mongoc-error.h"
24 #include "mongoc-util-private.h"
25 #include "mongoc-client-private.h"
26 
27 
28 #undef MONGOC_LOG_DOMAIN
29 #define MONGOC_LOG_DOMAIN "cursor-cursorid"
30 
31 
32 static void *
_mongoc_cursor_cursorid_new(void)33 _mongoc_cursor_cursorid_new (void)
34 {
35    mongoc_cursor_cursorid_t *cid;
36 
37    ENTRY;
38 
39    cid = (mongoc_cursor_cursorid_t *) bson_malloc0 (sizeof *cid);
40    bson_init (&cid->array);
41    cid->in_batch = false;
42    cid->in_reader = false;
43 
44    RETURN (cid);
45 }
46 
47 
48 static void
_mongoc_cursor_cursorid_destroy(mongoc_cursor_t * cursor)49 _mongoc_cursor_cursorid_destroy (mongoc_cursor_t *cursor)
50 {
51    mongoc_cursor_cursorid_t *cid;
52 
53    ENTRY;
54 
55    cid = (mongoc_cursor_cursorid_t *) cursor->iface_data;
56    BSON_ASSERT (cid);
57 
58    bson_destroy (&cid->array);
59    bson_free (cid);
60    _mongoc_cursor_destroy (cursor);
61 
62    EXIT;
63 }
64 
65 
66 /*
67  * Start iterating the reply to an "aggregate", "find", "getMore" etc. command:
68  *
69  *    {cursor: {id: 1234, ns: "db.collection", firstBatch: [...]}}
70  */
71 bool
_mongoc_cursor_cursorid_start_batch(mongoc_cursor_t * cursor)72 _mongoc_cursor_cursorid_start_batch (mongoc_cursor_t *cursor)
73 {
74    mongoc_cursor_cursorid_t *cid;
75    bson_iter_t iter;
76    bson_iter_t child;
77    const char *ns;
78    uint32_t nslen;
79 
80    cid = (mongoc_cursor_cursorid_t *) cursor->iface_data;
81 
82    BSON_ASSERT (cid);
83 
84    if (bson_iter_init_find (&iter, &cid->array, "cursor") &&
85        BSON_ITER_HOLDS_DOCUMENT (&iter) && bson_iter_recurse (&iter, &child)) {
86       while (bson_iter_next (&child)) {
87          if (BSON_ITER_IS_KEY (&child, "id")) {
88             cursor->rpc.reply.cursor_id = bson_iter_as_int64 (&child);
89          } else if (BSON_ITER_IS_KEY (&child, "ns")) {
90             ns = bson_iter_utf8 (&child, &nslen);
91             _mongoc_set_cursor_ns (cursor, ns, nslen);
92          } else if (BSON_ITER_IS_KEY (&child, "firstBatch") ||
93                     BSON_ITER_IS_KEY (&child, "nextBatch")) {
94             if (BSON_ITER_HOLDS_ARRAY (&child) &&
95                 bson_iter_recurse (&child, &cid->batch_iter)) {
96                cid->in_batch = true;
97             }
98          }
99       }
100    }
101 
102    return cid->in_batch;
103 }
104 
105 
106 static bool
_mongoc_cursor_cursorid_refresh_from_command(mongoc_cursor_t * cursor,const bson_t * command)107 _mongoc_cursor_cursorid_refresh_from_command (mongoc_cursor_t *cursor,
108                                               const bson_t *command)
109 {
110    mongoc_cursor_cursorid_t *cid;
111 
112    ENTRY;
113 
114    cid = (mongoc_cursor_cursorid_t *) cursor->iface_data;
115    BSON_ASSERT (cid);
116 
117    bson_destroy (&cid->array);
118 
119    /* server replies to find / aggregate with {cursor: {id: N, firstBatch: []}},
120     * to getMore command with {cursor: {id: N, nextBatch: []}}. */
121    if (_mongoc_cursor_run_command (cursor, command, &cid->array) &&
122        _mongoc_cursor_cursorid_start_batch (cursor)) {
123       RETURN (true);
124    }
125 
126    bson_destroy (&cursor->error_doc);
127    bson_copy_to (&cid->array, &cursor->error_doc);
128 
129    if (!cursor->error.domain) {
130       bson_set_error (&cursor->error,
131                       MONGOC_ERROR_PROTOCOL,
132                       MONGOC_ERROR_PROTOCOL_INVALID_REPLY,
133                       "Invalid reply to %s command.",
134                       _mongoc_get_command_name (command));
135    }
136 
137    RETURN (false);
138 }
139 
140 
141 static void
_mongoc_cursor_cursorid_read_from_batch(mongoc_cursor_t * cursor,const bson_t ** bson)142 _mongoc_cursor_cursorid_read_from_batch (mongoc_cursor_t *cursor,
143                                          const bson_t **bson)
144 {
145    mongoc_cursor_cursorid_t *cid;
146    const uint8_t *data = NULL;
147    uint32_t data_len = 0;
148 
149    ENTRY;
150 
151    cid = (mongoc_cursor_cursorid_t *) cursor->iface_data;
152    BSON_ASSERT (cid);
153 
154    if (bson_iter_next (&cid->batch_iter) &&
155        BSON_ITER_HOLDS_DOCUMENT (&cid->batch_iter)) {
156       bson_iter_document (&cid->batch_iter, &data_len, &data);
157 
158       /* bson_iter_next guarantees valid BSON, so this must succeed */
159       bson_init_static (&cid->current_doc, data, data_len);
160       *bson = &cid->current_doc;
161 
162       cursor->end_of_event = false;
163    } else {
164       cursor->end_of_event = true;
165    }
166 }
167 
168 
169 bool
_mongoc_cursor_cursorid_prime(mongoc_cursor_t * cursor)170 _mongoc_cursor_cursorid_prime (mongoc_cursor_t *cursor)
171 {
172    cursor->sent = true;
173    cursor->operation_id = ++cursor->client->cluster.operation_id;
174    return _mongoc_cursor_cursorid_refresh_from_command (cursor,
175                                                         &cursor->filter);
176 }
177 
178 
179 bool
_mongoc_cursor_prepare_getmore_command(mongoc_cursor_t * cursor,bson_t * command)180 _mongoc_cursor_prepare_getmore_command (mongoc_cursor_t *cursor,
181                                         bson_t *command)
182 {
183    const char *collection;
184    int collection_len;
185    int64_t batch_size;
186    bool await_data;
187    int32_t max_await_time_ms;
188 
189    ENTRY;
190 
191    _mongoc_cursor_collection (cursor, &collection, &collection_len);
192 
193    bson_init (command);
194    bson_append_int64 (command, "getMore", 7, mongoc_cursor_get_id (cursor));
195    bson_append_utf8 (command, "collection", 10, collection, collection_len);
196 
197    batch_size = mongoc_cursor_get_batch_size (cursor);
198 
199    /* See find, getMore, and killCursors Spec for batchSize rules */
200    if (batch_size) {
201       bson_append_int64 (command,
202                          MONGOC_CURSOR_BATCH_SIZE,
203                          MONGOC_CURSOR_BATCH_SIZE_LEN,
204                          abs (_mongoc_n_return (cursor)));
205    }
206 
207    /* Find, getMore And killCursors Commands Spec: "In the case of a tailable
208       cursor with awaitData == true the driver MUST provide a Cursor level
209       option named maxAwaitTimeMS (See CRUD specification for details). The
210       maxTimeMS option on the getMore command MUST be set to the value of the
211       option maxAwaitTimeMS. If no maxAwaitTimeMS is specified, the driver
212       SHOULD not set maxTimeMS on the getMore command."
213     */
214    await_data = _mongoc_cursor_get_opt_bool (cursor, MONGOC_CURSOR_TAILABLE) &&
215                 _mongoc_cursor_get_opt_bool (cursor, MONGOC_CURSOR_AWAIT_DATA);
216 
217 
218    if (await_data) {
219       max_await_time_ms =
220          (int32_t) mongoc_cursor_get_max_await_time_ms (cursor);
221       if (max_await_time_ms) {
222          bson_append_int32 (command,
223                             MONGOC_CURSOR_MAX_TIME_MS,
224                             MONGOC_CURSOR_MAX_TIME_MS_LEN,
225                             max_await_time_ms);
226       }
227    }
228 
229    RETURN (true);
230 }
231 
232 
233 static bool
_mongoc_cursor_cursorid_get_more(mongoc_cursor_t * cursor)234 _mongoc_cursor_cursorid_get_more (mongoc_cursor_t *cursor)
235 {
236    mongoc_cursor_cursorid_t *cid;
237    mongoc_server_stream_t *server_stream;
238    bson_t command;
239    bool ret;
240 
241    ENTRY;
242 
243    cid = (mongoc_cursor_cursorid_t *) cursor->iface_data;
244    BSON_ASSERT (cid);
245 
246    server_stream = _mongoc_cursor_fetch_stream (cursor);
247 
248    if (!server_stream) {
249       RETURN (false);
250    }
251 
252    if (_use_getmore_command (cursor, server_stream)) {
253       if (!_mongoc_cursor_prepare_getmore_command (cursor, &command)) {
254          mongoc_server_stream_cleanup (server_stream);
255          RETURN (false);
256       }
257 
258       ret = _mongoc_cursor_cursorid_refresh_from_command (cursor, &command);
259       bson_destroy (&command);
260    } else {
261       ret = _mongoc_cursor_op_getmore (cursor, server_stream);
262       cid->in_reader = ret;
263    }
264 
265    mongoc_server_stream_cleanup (server_stream);
266    RETURN (ret);
267 }
268 
269 
270 bool
_mongoc_cursor_cursorid_next(mongoc_cursor_t * cursor,const bson_t ** bson)271 _mongoc_cursor_cursorid_next (mongoc_cursor_t *cursor, const bson_t **bson)
272 {
273    mongoc_cursor_cursorid_t *cid;
274    bool refreshed = false;
275 
276    ENTRY;
277 
278    *bson = NULL;
279 
280    cid = (mongoc_cursor_cursorid_t *) cursor->iface_data;
281    BSON_ASSERT (cid);
282 
283    if (!cursor->sent) {
284       if (!_mongoc_cursor_cursorid_prime (cursor)) {
285          GOTO (done);
286       }
287    }
288 
289 again:
290 
291    /* Two paths:
292     * - Mongo 3.2+, sent "getMore" cmd, we're reading reply's "nextBatch" array
293     * - Mongo 2.6 to 3, after "aggregate" or similar command we sent OP_GETMORE,
294     *   we're reading the raw reply
295     */
296    if (cid->in_batch) {
297       _mongoc_cursor_cursorid_read_from_batch (cursor, bson);
298 
299       if (*bson) {
300          GOTO (done);
301       }
302 
303       cid->in_batch = false;
304    } else if (cid->in_reader) {
305       _mongoc_read_from_buffer (cursor, bson);
306 
307       if (*bson) {
308          GOTO (done);
309       }
310 
311       cid->in_reader = false;
312    }
313 
314    if (!refreshed && mongoc_cursor_get_id (cursor)) {
315       if (!_mongoc_cursor_cursorid_get_more (cursor)) {
316          GOTO (done);
317       }
318 
319       refreshed = true;
320       GOTO (again);
321    }
322 
323 done:
324    if (!*bson && mongoc_cursor_get_id (cursor) == 0) {
325       cursor->done = 1;
326    }
327 
328    RETURN (*bson != NULL);
329 }
330 
331 
332 static mongoc_cursor_t *
_mongoc_cursor_cursorid_clone(const mongoc_cursor_t * cursor)333 _mongoc_cursor_cursorid_clone (const mongoc_cursor_t *cursor)
334 {
335    mongoc_cursor_t *clone_;
336 
337    ENTRY;
338 
339    clone_ = _mongoc_cursor_clone (cursor);
340    _mongoc_cursor_cursorid_init (clone_, &cursor->filter);
341 
342    RETURN (clone_);
343 }
344 
345 
346 static mongoc_cursor_interface_t gMongocCursorCursorid = {
347    _mongoc_cursor_cursorid_clone,
348    _mongoc_cursor_cursorid_destroy,
349    NULL,
350    _mongoc_cursor_cursorid_next,
351 };
352 
353 
354 void
_mongoc_cursor_cursorid_init(mongoc_cursor_t * cursor,const bson_t * command)355 _mongoc_cursor_cursorid_init (mongoc_cursor_t *cursor, const bson_t *command)
356 {
357    ENTRY;
358 
359    bson_destroy (&cursor->filter);
360    bson_copy_to (command, &cursor->filter);
361 
362    cursor->iface_data = _mongoc_cursor_cursorid_new ();
363 
364    memcpy (&cursor->iface,
365            &gMongocCursorCursorid,
366            sizeof (mongoc_cursor_interface_t));
367 
368    EXIT;
369 }
370 
371 void
_mongoc_cursor_cursorid_init_with_reply(mongoc_cursor_t * cursor,bson_t * reply,uint32_t server_id)372 _mongoc_cursor_cursorid_init_with_reply (mongoc_cursor_t *cursor,
373                                          bson_t *reply,
374                                          uint32_t server_id)
375 {
376    mongoc_cursor_cursorid_t *cid;
377 
378    cursor->sent = true;
379    cursor->server_id = server_id;
380 
381    cid = (mongoc_cursor_cursorid_t *) cursor->iface_data;
382    BSON_ASSERT (cid);
383 
384    bson_destroy (&cid->array);
385    if (!bson_steal (&cid->array, reply)) {
386       bson_steal (&cid->array, bson_copy (reply));
387    }
388 
389    if (!_mongoc_cursor_cursorid_start_batch (cursor)) {
390       bson_set_error (&cursor->error,
391                       MONGOC_ERROR_CURSOR,
392                       MONGOC_ERROR_CURSOR_INVALID_CURSOR,
393                       "Couldn't parse cursor document");
394    }
395 }
396