1 /*
2 * Copyright 2015 MongoDB, Inc.
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17
18 #include <mongoc-rpc-private.h>
19 #include "mongoc.h"
20
21 #include "mock-server.h"
22 #include "../test-conveniences.h"
23 #include "../TestSuite.h"
24
25
26 static bool
27 is_command_ns (const char *ns);
28
29 static void
30 request_from_query (request_t *request, const mongoc_rpc_t *rpc);
31
32 static void
33 request_from_insert (request_t *request, const mongoc_rpc_t *rpc);
34
35 static void
36 request_from_update (request_t *request, const mongoc_rpc_t *rpc);
37
38 static void
39 request_from_delete (request_t *request, const mongoc_rpc_t *rpc);
40
41 static void
42 request_from_killcursors (request_t *request, const mongoc_rpc_t *rpc);
43
44 static void
45 request_from_getmore (request_t *request, const mongoc_rpc_t *rpc);
46
47 static char *
48 query_flags_str (uint32_t flags);
49 static char *
50 insert_flags_str (uint32_t flags);
51 static char *
52 update_flags_str (uint32_t flags);
53 static char *
54 delete_flags_str (uint32_t flags);
55
56 request_t *
request_new(const mongoc_buffer_t * buffer,int32_t msg_len,mock_server_t * server,mongoc_stream_t * client,uint16_t client_port,sync_queue_t * replies)57 request_new (const mongoc_buffer_t *buffer,
58 int32_t msg_len,
59 mock_server_t *server,
60 mongoc_stream_t *client,
61 uint16_t client_port,
62 sync_queue_t *replies)
63 {
64 request_t *request = (request_t *) bson_malloc0 (sizeof *request);
65 uint8_t *data;
66
67 data = (uint8_t *) bson_malloc ((size_t) msg_len);
68 memcpy (data, buffer->data + buffer->off, (size_t) msg_len);
69 request->data = data;
70 request->data_len = (size_t) msg_len;
71 request->replies = replies;
72
73 if (!_mongoc_rpc_scatter (&request->request_rpc, data, (size_t) msg_len)) {
74 MONGOC_WARNING ("%s():%d: %s", BSON_FUNC, __LINE__, "Failed to scatter");
75 bson_free (data);
76 bson_free (request);
77 return NULL;
78 }
79
80 _mongoc_rpc_swab_from_le (&request->request_rpc);
81
82 request->opcode = (mongoc_opcode_t) request->request_rpc.header.opcode;
83 request->server = server;
84 request->client = client;
85 request->client_port = client_port;
86 _mongoc_array_init (&request->docs, sizeof (bson_t *));
87
88 switch (request->opcode) {
89 case MONGOC_OPCODE_COMPRESSED:
90 break;
91 case MONGOC_OPCODE_QUERY:
92 request_from_query (request, &request->request_rpc);
93 break;
94
95 case MONGOC_OPCODE_INSERT:
96 request_from_insert (request, &request->request_rpc);
97 break;
98
99 case MONGOC_OPCODE_UPDATE:
100 request_from_update (request, &request->request_rpc);
101 break;
102
103 case MONGOC_OPCODE_KILL_CURSORS:
104 request_from_killcursors (request, &request->request_rpc);
105 break;
106
107 case MONGOC_OPCODE_GET_MORE:
108 request_from_getmore (request, &request->request_rpc);
109 break;
110
111 case MONGOC_OPCODE_DELETE:
112 request_from_delete (request, &request->request_rpc);
113 break;
114
115 case MONGOC_OPCODE_REPLY:
116 case MONGOC_OPCODE_MSG:
117 default:
118 fprintf (stderr, "Unimplemented opcode %d\n", request->opcode);
119 abort ();
120 }
121
122 return request;
123 }
124
125 const bson_t *
request_get_doc(const request_t * request,int n)126 request_get_doc (const request_t *request, int n)
127 {
128 return _mongoc_array_index (&request->docs, const bson_t *, n);
129 }
130
131 bool
request_matches_flags(const request_t * request,mongoc_query_flags_t flags)132 request_matches_flags (const request_t *request, mongoc_query_flags_t flags)
133 {
134 const mongoc_rpc_t *rpc;
135
136 BSON_ASSERT (request);
137 rpc = &request->request_rpc;
138
139 if (rpc->query.flags != flags) {
140 test_error ("request's query flags are %s, expected %s",
141 query_flags_str (rpc->query.flags),
142 query_flags_str (flags));
143 return false;
144 }
145
146 return true;
147 }
148
149 /* TODO: take file, line, function params from caller, wrap in macro */
150 bool
request_matches_query(const request_t * request,const char * ns,mongoc_query_flags_t flags,uint32_t skip,int32_t n_return,const char * query_json,const char * fields_json,bool is_command)151 request_matches_query (const request_t *request,
152 const char *ns,
153 mongoc_query_flags_t flags,
154 uint32_t skip,
155 int32_t n_return,
156 const char *query_json,
157 const char *fields_json,
158 bool is_command)
159 {
160 const mongoc_rpc_t *rpc;
161 const bson_t *doc;
162 bool n_return_equal;
163
164 BSON_ASSERT (request);
165 rpc = &request->request_rpc;
166
167 BSON_ASSERT (request->docs.len <= 2);
168
169 if (request->is_command && !is_command) {
170 test_error ("expected query, got command");
171 return false;
172 }
173
174 if (!request->is_command && is_command) {
175 test_error ("expected command, got query");
176 return false;
177 }
178
179 if (request->opcode != MONGOC_OPCODE_QUERY) {
180 test_error ("request's opcode does not match QUERY");
181 return false;
182 }
183
184 if (strcmp (rpc->query.collection, ns)) {
185 test_error ("request's namespace is '%s', expected '%s'",
186 request->request_rpc.query.collection,
187 ns);
188 return false;
189 }
190
191 if (!request_matches_flags (request, flags)) {
192 return false;
193 }
194
195 if (rpc->query.skip != skip) {
196 test_error ("requests's skip = %d, expected %d", rpc->query.skip, skip);
197 return false;
198 }
199
200 n_return_equal = (rpc->query.n_return == n_return);
201
202 if (!n_return_equal && abs (rpc->query.n_return) == 1) {
203 /* quirk: commands from mongoc_client_command_simple have n_return 1,
204 * from mongoc_topology_scanner_t have n_return -1
205 */
206 n_return_equal = abs (rpc->query.n_return) == n_return;
207 }
208
209 if (!n_return_equal) {
210 test_error ("requests's n_return = %d, expected %d",
211 rpc->query.n_return,
212 n_return);
213 return false;
214 }
215
216 if (request->docs.len) {
217 doc = request_get_doc (request, 0);
218 } else {
219 doc = NULL;
220 }
221
222 if (!match_json (
223 doc, is_command, __FILE__, __LINE__, BSON_FUNC, query_json)) {
224 /* match_json has logged the err */
225 return false;
226 }
227
228 if (request->docs.len > 1) {
229 doc = request_get_doc (request, 1);
230 } else {
231 doc = NULL;
232 }
233
234 if (!match_json (doc, false, __FILE__, __LINE__, BSON_FUNC, fields_json)) {
235 /* match_json has logged the err */
236 return false;
237 }
238
239 return true;
240 }
241
242
243 /* TODO: take file, line, function params from caller, wrap in macro */
244 bool
request_matches_insert(const request_t * request,const char * ns,mongoc_insert_flags_t flags,const char * doc_json)245 request_matches_insert (const request_t *request,
246 const char *ns,
247 mongoc_insert_flags_t flags,
248 const char *doc_json)
249 {
250 const mongoc_rpc_t *rpc;
251 const bson_t *doc;
252
253 BSON_ASSERT (request);
254 rpc = &request->request_rpc;
255
256 if (request->opcode != MONGOC_OPCODE_INSERT) {
257 test_error ("request's opcode does not match INSERT");
258 return false;
259 }
260
261 if (strcmp (rpc->insert.collection, ns)) {
262 test_error ("insert's namespace is '%s', expected '%s'",
263 request->request_rpc.get_more.collection,
264 ns);
265 return false;
266 }
267
268 if (rpc->insert.flags != flags) {
269 test_error ("request's insert flags are %s, expected %s",
270 insert_flags_str (rpc->insert.flags),
271 insert_flags_str (flags));
272 return false;
273 }
274
275 ASSERT_CMPINT ((int) request->docs.len, ==, 1);
276 doc = request_get_doc (request, 0);
277 if (!match_json (doc, false, __FILE__, __LINE__, BSON_FUNC, doc_json)) {
278 return false;
279 }
280
281 return true;
282 }
283
284
285 /* TODO: take file, line, function params from caller, wrap in macro */
286 bool
request_matches_bulk_insert(const request_t * request,const char * ns,mongoc_insert_flags_t flags,int n)287 request_matches_bulk_insert (const request_t *request,
288 const char *ns,
289 mongoc_insert_flags_t flags,
290 int n)
291 {
292 const mongoc_rpc_t *rpc;
293
294 BSON_ASSERT (request);
295 rpc = &request->request_rpc;
296
297 if (request->opcode != MONGOC_OPCODE_INSERT) {
298 test_error ("request's opcode does not match INSERT");
299 return false;
300 }
301
302 if (strcmp (rpc->insert.collection, ns)) {
303 test_error ("insert's namespace is '%s', expected '%s'",
304 request->request_rpc.get_more.collection,
305 ns);
306 return false;
307 }
308
309 if (rpc->insert.flags != flags) {
310 test_error ("request's insert flags are %s, expected %s",
311 insert_flags_str (rpc->insert.flags),
312 insert_flags_str (flags));
313 return false;
314 }
315
316 if ((int) request->docs.len != n) {
317 test_error (
318 "expected %d docs inserted, got %d", n, (int) request->docs.len);
319 return false;
320 }
321
322 return true;
323 }
324
325
326 /* TODO: take file, line, function params from caller, wrap in macro */
327 bool
request_matches_update(const request_t * request,const char * ns,mongoc_update_flags_t flags,const char * selector_json,const char * update_json)328 request_matches_update (const request_t *request,
329 const char *ns,
330 mongoc_update_flags_t flags,
331 const char *selector_json,
332 const char *update_json)
333 {
334 const mongoc_rpc_t *rpc;
335 const bson_t *doc;
336
337 BSON_ASSERT (request);
338 rpc = &request->request_rpc;
339
340 if (request->opcode != MONGOC_OPCODE_UPDATE) {
341 test_error ("request's opcode does not match UPDATE");
342 return false;
343 }
344
345 if (strcmp (rpc->update.collection, ns)) {
346 test_error ("update's namespace is '%s', expected '%s'",
347 request->request_rpc.update.collection,
348 ns);
349 return false;
350 }
351
352 if (rpc->update.flags != flags) {
353 test_error ("request's update flags are %s, expected %s",
354 update_flags_str (rpc->update.flags),
355 update_flags_str (flags));
356 return false;
357 }
358
359 ASSERT_CMPINT ((int) request->docs.len, ==, 2);
360 doc = request_get_doc (request, 0);
361 if (!match_json (doc, false, __FILE__, __LINE__, BSON_FUNC, selector_json)) {
362 return false;
363 }
364
365 doc = request_get_doc (request, 1);
366 if (!match_json (doc, false, __FILE__, __LINE__, BSON_FUNC, update_json)) {
367 return false;
368 }
369
370 return true;
371 }
372
373
374 /* TODO: take file, line, function params from caller, wrap in macro */
375 bool
request_matches_delete(const request_t * request,const char * ns,mongoc_remove_flags_t flags,const char * selector_json)376 request_matches_delete (const request_t *request,
377 const char *ns,
378 mongoc_remove_flags_t flags,
379 const char *selector_json)
380 {
381 const mongoc_rpc_t *rpc;
382 const bson_t *doc;
383
384 BSON_ASSERT (request);
385 rpc = &request->request_rpc;
386
387 if (request->opcode != MONGOC_OPCODE_DELETE) {
388 test_error ("request's opcode does not match DELETE");
389 return false;
390 }
391
392 if (strcmp (rpc->delete_.collection, ns)) {
393 test_error ("delete's namespace is '%s', expected '%s'",
394 request->request_rpc.delete_.collection,
395 ns);
396 return false;
397 }
398
399 if (rpc->delete_.flags != flags) {
400 test_error ("request's delete flags are %s, expected %s",
401 delete_flags_str (rpc->delete_.flags),
402 delete_flags_str (flags));
403 return false;
404 }
405
406 ASSERT_CMPINT ((int) request->docs.len, ==, 1);
407 doc = request_get_doc (request, 0);
408 if (!match_json (doc, false, __FILE__, __LINE__, BSON_FUNC, selector_json)) {
409 return false;
410 }
411
412 return true;
413 }
414
415
416 /* TODO: take file, line, function params from caller, wrap in macro */
417 bool
request_matches_getmore(const request_t * request,const char * ns,int32_t n_return,int64_t cursor_id)418 request_matches_getmore (const request_t *request,
419 const char *ns,
420 int32_t n_return,
421 int64_t cursor_id)
422 {
423 const mongoc_rpc_t *rpc;
424
425 BSON_ASSERT (request);
426 rpc = &request->request_rpc;
427
428 if (request->opcode != MONGOC_OPCODE_GET_MORE) {
429 test_error ("request's opcode does not match GET_MORE");
430 return false;
431 }
432
433 if (strcmp (rpc->get_more.collection, ns)) {
434 test_error ("request's namespace is '%s', expected '%s'",
435 request->request_rpc.get_more.collection,
436 ns);
437 return false;
438 }
439
440 if (rpc->get_more.n_return != n_return) {
441 test_error ("requests's n_return = %d, expected %d",
442 rpc->get_more.n_return,
443 n_return);
444 return false;
445 }
446
447 if (rpc->get_more.cursor_id != cursor_id) {
448 test_error ("requests's cursor_id = %" PRId64 ", expected %" PRId64,
449 rpc->get_more.cursor_id,
450 cursor_id);
451 return false;
452 }
453
454 return true;
455 }
456
457
458 /* TODO: take file, line, function params from caller, wrap in macro */
459 bool
request_matches_kill_cursors(const request_t * request,int64_t cursor_id)460 request_matches_kill_cursors (const request_t *request, int64_t cursor_id)
461 {
462 const mongoc_rpc_t *rpc;
463
464 BSON_ASSERT (request);
465 rpc = &request->request_rpc;
466
467 if (request->opcode != MONGOC_OPCODE_KILL_CURSORS) {
468 test_error ("request's opcode does not match KILL_CURSORS");
469 return false;
470 }
471
472 if (rpc->kill_cursors.n_cursors != 1) {
473 test_error ("request's n_cursors is %d, expected 1",
474 rpc->kill_cursors.n_cursors);
475 return false;
476 }
477
478 if (rpc->kill_cursors.cursors[0] != cursor_id) {
479 test_error ("request's cursor_id %" PRId64 ", expected %" PRId64,
480 rpc->kill_cursors.cursors[0],
481 cursor_id);
482 return false;
483 }
484
485 return true;
486 }
487
488
489 /*--------------------------------------------------------------------------
490 *
491 * request_get_server_port --
492 *
493 * Get the port of the server this request was sent to.
494 *
495 * Returns:
496 * A port number.
497 *
498 * Side effects:
499 * None.
500 *
501 *--------------------------------------------------------------------------
502 */
503
504 uint16_t
request_get_server_port(request_t * request)505 request_get_server_port (request_t *request)
506 {
507 return mock_server_get_port (request->server);
508 }
509
510
511 /*--------------------------------------------------------------------------
512 *
513 * request_get_client_port --
514 *
515 * Get the client port this request was sent from.
516 *
517 * Returns:
518 * A port number.
519 *
520 * Side effects:
521 * None.
522 *
523 *--------------------------------------------------------------------------
524 */
525
526 uint16_t
request_get_client_port(request_t * request)527 request_get_client_port (request_t *request)
528 {
529 return request->client_port;
530 }
531
532
533 /*--------------------------------------------------------------------------
534 *
535 * request_destroy --
536 *
537 * Free a request_t.
538 *
539 * Returns:
540 * None.
541 *
542 * Side effects:
543 * None.
544 *
545 *--------------------------------------------------------------------------
546 */
547
548 void
request_destroy(request_t * request)549 request_destroy (request_t *request)
550 {
551 size_t i;
552 bson_t *doc;
553
554 for (i = 0; i < request->docs.len; i++) {
555 doc = _mongoc_array_index (&request->docs, bson_t *, i);
556 bson_destroy (doc);
557 }
558
559 _mongoc_array_destroy (&request->docs);
560 bson_free (request->command_name);
561 bson_free (request->as_str);
562 bson_free (request->data);
563 bson_free (request);
564 }
565
566
567 static bool
is_command_ns(const char * ns)568 is_command_ns (const char *ns)
569 {
570 size_t len = strlen (ns);
571 const char *cmd = ".$cmd";
572 size_t cmd_len = strlen (cmd);
573
574 return len > cmd_len && !strncmp (ns + len - cmd_len, cmd, cmd_len);
575 }
576
577
578 static char *
query_flags_str(uint32_t flags)579 query_flags_str (uint32_t flags)
580 {
581 int flag = 1;
582 bson_string_t *str = bson_string_new ("");
583 bool begun = false;
584
585 if (flags == MONGOC_QUERY_NONE) {
586 bson_string_append (str, "0");
587 } else {
588 while (flag <= MONGOC_QUERY_PARTIAL) {
589 flag <<= 1;
590
591 if (flags & flag) {
592 if (begun) {
593 bson_string_append (str, "|");
594 }
595
596 begun = true;
597
598 switch (flag) {
599 case MONGOC_QUERY_TAILABLE_CURSOR:
600 bson_string_append (str, "TAILABLE");
601 break;
602 case MONGOC_QUERY_SLAVE_OK:
603 bson_string_append (str, "SLAVE_OK");
604 break;
605 case MONGOC_QUERY_OPLOG_REPLAY:
606 bson_string_append (str, "OPLOG_REPLAY");
607 break;
608 case MONGOC_QUERY_NO_CURSOR_TIMEOUT:
609 bson_string_append (str, "NO_TIMEOUT");
610 break;
611 case MONGOC_QUERY_AWAIT_DATA:
612 bson_string_append (str, "AWAIT_DATA");
613 break;
614 case MONGOC_QUERY_EXHAUST:
615 bson_string_append (str, "EXHAUST");
616 break;
617 case MONGOC_QUERY_PARTIAL:
618 bson_string_append (str, "PARTIAL");
619 break;
620 case MONGOC_QUERY_NONE:
621 default:
622 BSON_ASSERT (false);
623 }
624 }
625 }
626 }
627
628 return bson_string_free (str, false); /* detach buffer */
629 }
630
631
632 static void
request_from_query(request_t * request,const mongoc_rpc_t * rpc)633 request_from_query (request_t *request, const mongoc_rpc_t *rpc)
634 {
635 int32_t len;
636 bson_t *query;
637 bson_t *fields;
638 bson_iter_t iter;
639 bson_string_t *query_as_str = bson_string_new ("OP_QUERY ");
640 char *str;
641
642 memcpy (&len, rpc->query.query, 4);
643 len = BSON_UINT32_FROM_LE (len);
644 query = bson_new_from_data (rpc->query.query, (size_t) len);
645 BSON_ASSERT (query);
646 _mongoc_array_append_val (&request->docs, query);
647
648 bson_string_append_printf (query_as_str, "%s ", rpc->query.collection);
649
650 if (is_command_ns (request->request_rpc.query.collection)) {
651 request->is_command = true;
652
653 if (bson_iter_init (&iter, query) && bson_iter_next (&iter)) {
654 request->command_name = bson_strdup (bson_iter_key (&iter));
655 } else {
656 fprintf (stderr,
657 "WARNING: no command name for %s\n",
658 request->request_rpc.query.collection);
659 }
660 }
661
662 str = bson_as_json (query, NULL);
663 bson_string_append (query_as_str, str);
664 bson_free (str);
665
666 if (rpc->query.fields) {
667 memcpy (&len, rpc->query.fields, 4);
668 len = BSON_UINT32_FROM_LE (len);
669 fields = bson_new_from_data (rpc->query.fields, (size_t) len);
670 BSON_ASSERT (fields);
671 _mongoc_array_append_val (&request->docs, fields);
672
673 str = bson_as_json (fields, NULL);
674 bson_string_append (query_as_str, " fields=");
675 bson_string_append (query_as_str, str);
676 bson_free (str);
677 }
678
679 bson_string_append (query_as_str, " flags=");
680
681 str = query_flags_str (rpc->query.flags);
682 bson_string_append (query_as_str, str);
683 bson_free (str);
684
685 if (rpc->query.skip) {
686 bson_string_append_printf (
687 query_as_str, " skip=%d", (int) rpc->query.skip);
688 }
689
690 if (rpc->query.n_return) {
691 bson_string_append_printf (
692 query_as_str, " n_return=%d", (int) rpc->query.n_return);
693 }
694
695 request->as_str = bson_string_free (query_as_str, false);
696 }
697
698
699 static char *
insert_flags_str(uint32_t flags)700 insert_flags_str (uint32_t flags)
701 {
702 if (flags == MONGOC_INSERT_NONE) {
703 return bson_strdup ("0");
704 } else {
705 return bson_strdup ("CONTINUE_ON_ERROR");
706 }
707 }
708
709
710 static uint32_t
length_prefix(void * data)711 length_prefix (void *data)
712 {
713 uint32_t len_le;
714
715 memcpy (&len_le, data, sizeof (len_le));
716
717 return BSON_UINT32_FROM_LE (len_le);
718 }
719
720
721 static void
request_from_insert(request_t * request,const mongoc_rpc_t * rpc)722 request_from_insert (request_t *request, const mongoc_rpc_t *rpc)
723 {
724 uint8_t *pos = (uint8_t *) request->request_rpc.insert.documents->iov_base;
725 uint8_t *end = request->data + request->data_len;
726 bson_string_t *insert_as_str = bson_string_new ("OP_INSERT");
727 bson_t *doc;
728 size_t n_documents;
729 size_t i;
730 char *str;
731
732 while (pos < end) {
733 uint32_t len = length_prefix (pos);
734 doc = bson_new_from_data (pos, len);
735 BSON_ASSERT (doc);
736 _mongoc_array_append_val (&request->docs, doc);
737 pos += len;
738 }
739
740 n_documents = request->docs.len;
741
742 bson_string_append_printf (insert_as_str, " %d ", (int) n_documents);
743
744 for (i = 0; i < n_documents; i++) {
745 str = bson_as_json (request_get_doc (request, (int) i), NULL);
746 BSON_ASSERT (str);
747 bson_string_append (insert_as_str, str);
748 bson_free (str);
749
750 if (i < n_documents - 1) {
751 bson_string_append (insert_as_str, ", ");
752 }
753 }
754
755 bson_string_append (insert_as_str, " flags=");
756
757 str = insert_flags_str (rpc->insert.flags);
758 bson_string_append (insert_as_str, str);
759 bson_free (str);
760
761 request->as_str = bson_string_free (insert_as_str, false);
762 }
763
764
765 static char *
update_flags_str(uint32_t flags)766 update_flags_str (uint32_t flags)
767 {
768 int flag = 1;
769 bson_string_t *str = bson_string_new ("");
770 bool begun = false;
771
772 if (flags == MONGOC_UPDATE_NONE) {
773 bson_string_append (str, "0");
774 } else {
775 while (flag <= MONGOC_UPDATE_MULTI_UPDATE) {
776 flag <<= 1;
777
778 if (flags & flag) {
779 if (begun) {
780 bson_string_append (str, "|");
781 }
782
783 begun = true;
784
785 switch (flag) {
786 case MONGOC_UPDATE_UPSERT:
787 bson_string_append (str, "UPSERT");
788 break;
789 case MONGOC_UPDATE_MULTI_UPDATE:
790 bson_string_append (str, "MULTI");
791 break;
792 case MONGOC_UPDATE_NONE:
793 default:
794 BSON_ASSERT (false);
795 }
796 }
797 }
798 }
799
800 return bson_string_free (str, false); /* detach buffer */
801 }
802
803
804 static void
request_from_update(request_t * request,const mongoc_rpc_t * rpc)805 request_from_update (request_t *request, const mongoc_rpc_t *rpc)
806 {
807 int32_t len;
808 bson_t *doc;
809 bson_string_t *update_as_str = bson_string_new ("OP_UPDATE ");
810 char *str;
811
812 memcpy (&len, rpc->update.selector, 4);
813 len = BSON_UINT32_FROM_LE (len);
814 doc = bson_new_from_data (rpc->update.selector, (size_t) len);
815 BSON_ASSERT (doc);
816 _mongoc_array_append_val (&request->docs, doc);
817
818 str = bson_as_json (doc, NULL);
819 bson_string_append (update_as_str, str);
820 bson_free (str);
821
822 bson_string_append (update_as_str, ", ");
823
824 memcpy (&len, rpc->update.update, 4);
825 len = BSON_UINT32_FROM_LE (len);
826 doc = bson_new_from_data (rpc->update.update, (size_t) len);
827 BSON_ASSERT (doc);
828 _mongoc_array_append_val (&request->docs, doc);
829
830 str = bson_as_json (doc, NULL);
831 bson_string_append (update_as_str, str);
832 bson_free (str);
833
834 bson_string_append (update_as_str, " flags=");
835
836 str = update_flags_str (rpc->update.flags);
837 bson_string_append (update_as_str, str);
838 bson_free (str);
839
840 request->as_str = bson_string_free (update_as_str, false);
841 }
842
843
844 static char *
delete_flags_str(uint32_t flags)845 delete_flags_str (uint32_t flags)
846 {
847 if (flags == MONGOC_DELETE_NONE) {
848 return bson_strdup ("0");
849 } else {
850 return bson_strdup ("SINGLE_REMOVE");
851 }
852 }
853
854
855 static void
request_from_delete(request_t * request,const mongoc_rpc_t * rpc)856 request_from_delete (request_t *request, const mongoc_rpc_t *rpc)
857 {
858 int32_t len;
859 bson_t *doc;
860 bson_string_t *delete_as_str = bson_string_new ("OP_DELETE ");
861 char *str;
862
863 memcpy (&len, rpc->delete_.selector, 4);
864 len = BSON_UINT32_FROM_LE (len);
865 doc = bson_new_from_data (rpc->delete_.selector, (size_t) len);
866 BSON_ASSERT (doc);
867 _mongoc_array_append_val (&request->docs, doc);
868
869 str = bson_as_json (doc, NULL);
870 bson_string_append (delete_as_str, str);
871 bson_free (str);
872
873 bson_string_append (delete_as_str, " flags=");
874
875 str = delete_flags_str (rpc->delete_.flags);
876 bson_string_append (delete_as_str, str);
877 bson_free (str);
878
879 request->as_str = bson_string_free (delete_as_str, false);
880 }
881
882
883 static void
request_from_killcursors(request_t * request,const mongoc_rpc_t * rpc)884 request_from_killcursors (request_t *request, const mongoc_rpc_t *rpc)
885 {
886 /* protocol allows multiple cursor ids but we only implement one */
887 BSON_ASSERT (rpc->kill_cursors.n_cursors == 1);
888 request->as_str = bson_strdup_printf ("OP_KILLCURSORS %" PRId64,
889 rpc->kill_cursors.cursors[0]);
890 }
891
892
893 static void
request_from_getmore(request_t * request,const mongoc_rpc_t * rpc)894 request_from_getmore (request_t *request, const mongoc_rpc_t *rpc)
895 {
896 request->as_str =
897 bson_strdup_printf ("OP_GETMORE %s %" PRId64 " n_return=%d",
898 rpc->get_more.collection,
899 rpc->get_more.cursor_id,
900 rpc->get_more.n_return);
901 }
902