1 /*
2 * Copyright 2015 MongoDB, Inc.
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17
18 #include "mongoc.h"
19
20 #include "mongoc-buffer-private.h"
21 #include "mongoc-socket-private.h"
22 #include "mongoc-thread-private.h"
23 #include "mongoc-util-private.h"
24 #include "mongoc-trace-private.h"
25 #include "sync-queue.h"
26 #include "mock-server.h"
27 #include "../test-conveniences.h"
28 #include "../test-libmongoc.h"
29 #include "../TestSuite.h"
30
31 #ifdef HAVE_STRINGS_H
32 #include <strings.h>
33 #endif
34
35 /* /Async/ismaster_ssl and /TOPOLOGY/scanner_ssl need a reasonable timeout */
36 #define TIMEOUT 5000
37
38
39 struct _mock_server_t {
40 bool running;
41 bool stopped;
42 bool rand_delay;
43 int64_t request_timeout_msec;
44 uint16_t port;
45 mongoc_socket_t *sock;
46 char *uri_str;
47 mongoc_uri_t *uri;
48 mongoc_thread_t main_thread;
49 mongoc_cond_t cond;
50 mongoc_mutex_t mutex;
51 int32_t last_response_id;
52 mongoc_array_t worker_threads;
53 sync_queue_t *q;
54 mongoc_array_t autoresponders;
55 int last_autoresponder_id;
56 int64_t start_time;
57
58 #ifdef MONGOC_ENABLE_SSL
59 bool ssl;
60 mongoc_ssl_opt_t ssl_opts;
61 #endif
62 };
63
64
65 struct _autoresponder_handle_t {
66 autoresponder_t responder;
67 void *data;
68 destructor_t destructor;
69 int id;
70 };
71
72
73 typedef struct {
74 mongoc_reply_flags_t flags;
75 bson_t *docs;
76 int n_docs;
77 int64_t cursor_id;
78 uint16_t client_port;
79 mongoc_opcode_t request_opcode;
80 mongoc_query_flags_t query_flags;
81 int32_t response_to;
82 } reply_t;
83
84
85 static void *
86 main_thread (void *data);
87
88 static void *
89 worker_thread (void *data);
90
91 static void
92 _mock_server_reply_with_stream (mock_server_t *server,
93 reply_t *reply,
94 mongoc_stream_t *client);
95
96 void
97 autoresponder_handle_destroy (autoresponder_handle_t *handle);
98
99 static uint16_t
100 get_port (mongoc_socket_t *sock);
101
102 /*--------------------------------------------------------------------------
103 *
104 * mock_server_new --
105 *
106 * Get a new mock_server_t. Call mock_server_run to start it,
107 * then mock_server_get_uri to connect.
108 *
109 * This server does not autorespond to "ismaster".
110 *
111 * Returns:
112 * A server you must mock_server_destroy.
113 *
114 * Side effects:
115 * None.
116 *
117 *--------------------------------------------------------------------------
118 */
119
120 mock_server_t *
mock_server_new()121 mock_server_new ()
122 {
123 mock_server_t *server =
124 (mock_server_t *) bson_malloc0 (sizeof (mock_server_t));
125
126 server->request_timeout_msec = get_future_timeout_ms ();
127 _mongoc_array_init (&server->autoresponders,
128 sizeof (autoresponder_handle_t));
129 _mongoc_array_init (&server->worker_threads, sizeof (mongoc_thread_t));
130 mongoc_cond_init (&server->cond);
131 mongoc_mutex_init (&server->mutex);
132 server->q = q_new ();
133 server->start_time = bson_get_monotonic_time ();
134
135 return server;
136 }
137
138
139 /*--------------------------------------------------------------------------
140 *
141 * mock_server_with_autoismaster --
142 *
143 * A new mock_server_t that autoresponds to ismaster. Call
144 * mock_server_run to start it, then mock_server_get_uri to
145 * connect.
146 *
147 * Returns:
148 * A server you must mock_server_destroy.
149 *
150 * Side effects:
151 * None.
152 *
153 *--------------------------------------------------------------------------
154 */
155
156 mock_server_t *
mock_server_with_autoismaster(int32_t max_wire_version)157 mock_server_with_autoismaster (int32_t max_wire_version)
158 {
159 mock_server_t *server = mock_server_new ();
160
161 char *ismaster = bson_strdup_printf ("{'ok': 1.0,"
162 " 'ismaster': true,"
163 " 'minWireVersion': 0,"
164 " 'maxWireVersion': %d}",
165 max_wire_version);
166
167 mock_server_auto_ismaster (server, ismaster);
168
169 bson_free (ismaster);
170
171 return server;
172 }
173
174
175 /*--------------------------------------------------------------------------
176 *
177 * mock_mongos_new --
178 *
179 * A new mock_server_t that autoresponds to ismaster as if it were a
180 * mongos. Call mock_server_run to start it, then mock_server_get_uri
181 * to connect.
182 *
183 * Returns:
184 * A server you must mock_server_destroy.
185 *
186 * Side effects:
187 * None.
188 *
189 *--------------------------------------------------------------------------
190 */
191
192 mock_server_t *
mock_mongos_new(int32_t max_wire_version)193 mock_mongos_new (int32_t max_wire_version)
194 {
195 mock_server_t *server = mock_server_new ();
196
197 char *ismaster = bson_strdup_printf ("{'ok': 1.0,"
198 " 'ismaster': true,"
199 " 'msg': 'isdbgrid',"
200 " 'minWireVersion': 0,"
201 " 'maxWireVersion': %d}",
202 max_wire_version);
203
204 mock_server_auto_ismaster (server, ismaster);
205
206 bson_free (ismaster);
207
208 return server;
209 }
210
211
212 static bool
hangup(request_t * request,void * ctx)213 hangup (request_t *request, void *ctx)
214 {
215 mock_server_hangs_up (request);
216 request_destroy (request);
217 return true;
218 }
219
220
221 /*--------------------------------------------------------------------------
222 *
223 * mock_server_down --
224 *
225 * A new mock_server_t hangs up. Call mock_server_run to start it,
226 * then mock_server_get_uri to connect.
227 *
228 * Returns:
229 * A server you must mock_server_destroy.
230 *
231 * Side effects:
232 * None.
233 *
234 *--------------------------------------------------------------------------
235 */
236
237 mock_server_t *
mock_server_down(void)238 mock_server_down (void)
239 {
240 mock_server_t *server = mock_server_new ();
241
242 mock_server_autoresponds (server, hangup, NULL, NULL);
243
244 return server;
245 }
246
247
248 #ifdef MONGOC_ENABLE_SSL
249
250 /*--------------------------------------------------------------------------
251 *
252 * mock_server_set_ssl_opts --
253 *
254 * Set server-side SSL options before calling mock_server_run.
255 *
256 * Returns:
257 * None.
258 *
259 * Side effects:
260 * None.
261 *
262 *--------------------------------------------------------------------------
263 */
264 void
mock_server_set_ssl_opts(mock_server_t * server,mongoc_ssl_opt_t * opts)265 mock_server_set_ssl_opts (mock_server_t *server, mongoc_ssl_opt_t *opts)
266 {
267 mongoc_mutex_lock (&server->mutex);
268 server->ssl = true;
269 memcpy (&server->ssl_opts, opts, sizeof *opts);
270 mongoc_mutex_unlock (&server->mutex);
271 }
272
273 #endif
274
275 /*--------------------------------------------------------------------------
276 *
277 * mock_server_run --
278 *
279 * Start listening on an unused port. After this, call
280 * mock_server_get_uri to connect.
281 *
282 * Returns:
283 * The bound port.
284 *
285 * Side effects:
286 * The server's port and URI are set.
287 *
288 *--------------------------------------------------------------------------
289 */
290 uint16_t
mock_server_run(mock_server_t * server)291 mock_server_run (mock_server_t *server)
292 {
293 mongoc_socket_t *ssock;
294 struct sockaddr_in bind_addr;
295 int optval;
296 uint16_t bound_port;
297
298 /* CDRIVER-2115: don't run mock server tests on 32-bit */
299 BSON_ASSERT (sizeof (void *) * 8 >= 64);
300
301 MONGOC_INFO ("Starting mock server on port %d.", server->port);
302
303 ssock = mongoc_socket_new (AF_INET, SOCK_STREAM, 0);
304 if (!ssock) {
305 perror ("Failed to create socket.");
306 return 0;
307 }
308
309 optval = 1;
310 mongoc_socket_setsockopt (
311 ssock, SOL_SOCKET, SO_REUSEADDR, &optval, sizeof optval);
312
313 memset (&bind_addr, 0, sizeof bind_addr);
314
315 bind_addr.sin_family = AF_INET;
316 bind_addr.sin_addr.s_addr = htonl (INADDR_ANY);
317
318 /* bind to unused port */
319 bind_addr.sin_port = htons (0);
320
321 if (-1 == mongoc_socket_bind (
322 ssock, (struct sockaddr *) &bind_addr, sizeof bind_addr)) {
323 perror ("Failed to bind socket");
324 return 0;
325 }
326
327 if (-1 == mongoc_socket_listen (ssock, 10)) {
328 perror ("Failed to put socket into listen mode");
329 return 0;
330 }
331
332 bound_port = get_port (ssock);
333 if (!bound_port) {
334 perror ("Failed to get bound port number");
335 return 0;
336 }
337
338 mongoc_mutex_lock (&server->mutex);
339
340 server->sock = ssock;
341 server->port = bound_port;
342 /* TODO: configurable timeouts, perhaps from env */
343 server->uri_str = bson_strdup_printf (
344 "mongodb://127.0.0.1:%hu/?serverselectiontimeoutms=10000&"
345 "sockettimeoutms=10000",
346 bound_port);
347 server->uri = mongoc_uri_new (server->uri_str);
348
349 mongoc_thread_create (&server->main_thread, main_thread, (void *) server);
350 while (!server->running) {
351 mongoc_cond_wait (&server->cond, &server->mutex);
352 }
353
354 mongoc_mutex_unlock (&server->mutex);
355
356 test_suite_mock_server_log ("listening on port %hu", bound_port);
357
358 return (uint16_t) bound_port;
359 }
360
361
362 /*--------------------------------------------------------------------------
363 *
364 * mock_server_autoresponds --
365 *
366 * Respond to matching requests. "data" is passed to the responder
367 * callback, and passed to "destructor" when the autoresponder is
368 * destroyed.
369 *
370 * Responders are run most-recently-added-first until one returns
371 * true to indicate it has handled the request. If none handles it,
372 * the request is enqueued until a call to mock_server_receives_*.
373 *
374 * Autoresponders must call request_destroy after handling a
375 * request.
376 *
377 * Returns:
378 * An id for mock_server_remove_autoresponder.
379 *
380 * Side effects:
381 * If a matching request is enqueued, pop it and respond.
382 *
383 *--------------------------------------------------------------------------
384 */
385
386 int
mock_server_autoresponds(mock_server_t * server,autoresponder_t responder,void * data,destructor_t destructor)387 mock_server_autoresponds (mock_server_t *server,
388 autoresponder_t responder,
389 void *data,
390 destructor_t destructor)
391 {
392 autoresponder_handle_t handle = {responder, data, destructor};
393 int id;
394
395 mongoc_mutex_lock (&server->mutex);
396 id = handle.id = server->last_autoresponder_id++;
397 /* TODO: peek and see if a matching request is enqueued */
398 _mongoc_array_append_val (&server->autoresponders, handle);
399 mongoc_mutex_unlock (&server->mutex);
400
401 return id;
402 }
403
404
405 /*--------------------------------------------------------------------------
406 *
407 * mock_server_remove_autoresponder --
408 *
409 * Remove a responder callback. Pass in the id returned by
410 * mock_server_autoresponds.
411 *
412 * Returns:
413 * None.
414 *
415 * Side effects:
416 * The responder's destructor is called on its "data" pointer.
417 *
418 *--------------------------------------------------------------------------
419 */
420
421 void
mock_server_remove_autoresponder(mock_server_t * server,int id)422 mock_server_remove_autoresponder (mock_server_t *server, int id)
423 {
424 size_t i;
425 autoresponder_handle_t *handles;
426
427 mongoc_mutex_lock (&server->mutex);
428 handles = (autoresponder_handle_t *) server->autoresponders.data;
429 for (i = 0; i < server->autoresponders.len; i++) {
430 if (handles[i].id == id) {
431 /* left-shift everyone after */
432 server->autoresponders.len--;
433 for (; i < server->autoresponders.len; i++) {
434 handles[i] = handles[i + 1];
435 }
436
437 autoresponder_handle_destroy (handles);
438
439 break;
440 }
441 }
442
443 mongoc_mutex_unlock (&server->mutex);
444 }
445
446
447 static bool
auto_ismaster(request_t * request,void * data)448 auto_ismaster (request_t *request, void *data)
449 {
450 const char *response_json = (const char *) data;
451 char *quotes_replaced;
452 bson_t response;
453 bson_error_t error;
454
455 if (!request->is_command || strcasecmp (request->command_name, "ismaster")) {
456 return false;
457 }
458
459 quotes_replaced = single_quotes_to_double (response_json);
460
461 if (!bson_init_from_json (&response, quotes_replaced, -1, &error)) {
462 fprintf (stderr, "%s\n", error.message);
463 fflush (stderr);
464 abort ();
465 }
466
467 if (mock_server_get_rand_delay (request->server)) {
468 _mongoc_usleep ((int64_t) (rand () % 10) * 1000);
469 }
470
471 mock_server_replies (request, MONGOC_REPLY_NONE, 0, 0, 1, response_json);
472
473 bson_destroy (&response);
474 bson_free (quotes_replaced);
475 request_destroy (request);
476 return true;
477 }
478
479
480 /*--------------------------------------------------------------------------
481 *
482 * mock_server_auto_ismaster --
483 *
484 * Autorespond to "ismaster" with the provided document.
485 *
486 * Returns:
487 * An id for mock_server_remove_autoresponder.
488 *
489 * Side effects:
490 * If a matching request is enqueued, pop it and respond.
491 *
492 *--------------------------------------------------------------------------
493 */
494
495 int
mock_server_auto_ismaster(mock_server_t * server,const char * response_json,...)496 mock_server_auto_ismaster (mock_server_t *server,
497 const char *response_json,
498 ...)
499 {
500 char *formatted_response_json;
501 va_list args;
502
503 va_start (args, response_json);
504 formatted_response_json = bson_strdupv_printf (response_json, args);
505 va_end (args);
506
507 return mock_server_autoresponds (
508 server, auto_ismaster, (void *) formatted_response_json, bson_free);
509 }
510
511
512 /*--------------------------------------------------------------------------
513 *
514 * mock_server_get_uri --
515 *
516 * Call after mock_server_run to get the connection string.
517 *
518 * Returns:
519 * A const URI.
520 *
521 * Side effects:
522 * None.
523 *
524 *--------------------------------------------------------------------------
525 */
526
527 const mongoc_uri_t *
mock_server_get_uri(mock_server_t * server)528 mock_server_get_uri (mock_server_t *server)
529 {
530 mongoc_uri_t *uri;
531
532 mongoc_mutex_lock (&server->mutex);
533 uri = server->uri;
534 mongoc_mutex_unlock (&server->mutex);
535
536 return uri;
537 }
538
539
540 /*--------------------------------------------------------------------------
541 *
542 * mock_server_get_host_and_port --
543 *
544 * Call after mock_server_run to get the server's "host:port".
545 *
546 * Returns:
547 * A const string.
548 *
549 * Side effects:
550 * None.
551 *
552 *--------------------------------------------------------------------------
553 */
554
555 const char *
mock_server_get_host_and_port(mock_server_t * server)556 mock_server_get_host_and_port (mock_server_t *server)
557 {
558 const mongoc_uri_t *uri;
559
560 uri = mock_server_get_uri (server);
561 BSON_ASSERT (uri); /* must call after mock_server_run */
562 return (mongoc_uri_get_hosts (uri))->host_and_port;
563 }
564
565
566 /*--------------------------------------------------------------------------
567 *
568 * mock_server_get_port --
569 *
570 * Call after mock_server_run to get the server's listening port.
571 *
572 * Returns:
573 * A port number.
574 *
575 * Side effects:
576 * None.
577 *
578 *--------------------------------------------------------------------------
579 */
580
581 uint16_t
mock_server_get_port(mock_server_t * server)582 mock_server_get_port (mock_server_t *server)
583 {
584 return server->port;
585 }
586
587
588 /*--------------------------------------------------------------------------
589 *
590 * mock_server_get_request_timeout_msec --
591 *
592 * How long mock_server_receives_* functions wait for a client
593 * request before giving up and returning NULL.
594 *
595 *--------------------------------------------------------------------------
596 */
597
598 int64_t
mock_server_get_request_timeout_msec(mock_server_t * server)599 mock_server_get_request_timeout_msec (mock_server_t *server)
600 {
601 int64_t request_timeout_msec;
602
603 mongoc_mutex_lock (&server->mutex);
604 request_timeout_msec = server->request_timeout_msec;
605 mongoc_mutex_unlock (&server->mutex);
606
607 return request_timeout_msec;
608 }
609
610 /*--------------------------------------------------------------------------
611 *
612 * mock_server_set_request_timeout_msec --
613 *
614 * How long mock_server_receives_* functions wait for a client
615 * request before giving up and returning NULL.
616 *
617 *--------------------------------------------------------------------------
618 */
619
620 void
mock_server_set_request_timeout_msec(mock_server_t * server,int64_t request_timeout_msec)621 mock_server_set_request_timeout_msec (mock_server_t *server,
622 int64_t request_timeout_msec)
623 {
624 mongoc_mutex_lock (&server->mutex);
625 server->request_timeout_msec = request_timeout_msec;
626 mongoc_mutex_unlock (&server->mutex);
627 }
628
629
630 /*--------------------------------------------------------------------------
631 *
632 * mock_server_get_rand_delay --
633 *
634 * Does the server delay a random duration before responding?
635 *
636 *--------------------------------------------------------------------------
637 */
638
639 bool
mock_server_get_rand_delay(mock_server_t * server)640 mock_server_get_rand_delay (mock_server_t *server)
641 {
642 bool rand_delay;
643
644 mongoc_mutex_lock (&server->mutex);
645 rand_delay = server->rand_delay;
646 mongoc_mutex_unlock (&server->mutex);
647
648 return rand_delay;
649 }
650
651 /*--------------------------------------------------------------------------
652 *
653 * mock_server_set_rand_delay --
654 *
655 * Whether to delay a random duration before responding.
656 *
657 *--------------------------------------------------------------------------
658 */
659
660 void
mock_server_set_rand_delay(mock_server_t * server,bool rand_delay)661 mock_server_set_rand_delay (mock_server_t *server, bool rand_delay)
662 {
663 mongoc_mutex_lock (&server->mutex);
664 server->rand_delay = rand_delay;
665 mongoc_mutex_unlock (&server->mutex);
666 }
667
668
669 /*--------------------------------------------------------------------------
670 *
671 * mock_server_get_uptime_sec --
672 *
673 * How long since mock_server_run() was called.
674 *
675 *--------------------------------------------------------------------------
676 */
677
678 double
mock_server_get_uptime_sec(mock_server_t * server)679 mock_server_get_uptime_sec (mock_server_t *server)
680 {
681 double uptime;
682
683 mongoc_mutex_lock (&server->mutex);
684 uptime = (bson_get_monotonic_time () - server->start_time) / 1e6;
685 mongoc_mutex_unlock (&server->mutex);
686
687 return uptime;
688 }
689
690
691 sync_queue_t *
mock_server_get_queue(mock_server_t * server)692 mock_server_get_queue (mock_server_t *server)
693 {
694 sync_queue_t *q;
695
696 mongoc_mutex_lock (&server->mutex);
697 q = server->q;
698 mongoc_mutex_unlock (&server->mutex);
699
700 return q;
701 }
702
703
704 request_t *
mock_server_receives_request(mock_server_t * server)705 mock_server_receives_request (mock_server_t *server)
706 {
707 sync_queue_t *q;
708 int64_t request_timeout_msec;
709
710 q = mock_server_get_queue (server);
711 request_timeout_msec = mock_server_get_request_timeout_msec (server);
712 return (request_t *) q_get (q, request_timeout_msec);
713 }
714
715
716 /*--------------------------------------------------------------------------
717 *
718 * mock_server_receives_command --
719 *
720 * Pop a client request if one is enqueued, or wait up to
721 * request_timeout_ms for the client to send a request.
722 *
723 * Returns:
724 * A request you must request_destroy, or NULL if the request does
725 * not match.
726 *
727 * Side effects:
728 * Logs if the current request is not a command matching
729 * database_name, command_name, and command_json.
730 *
731 *--------------------------------------------------------------------------
732 */
733
734 request_t *
mock_server_receives_command(mock_server_t * server,const char * database_name,mongoc_query_flags_t flags,const char * command_json,...)735 mock_server_receives_command (mock_server_t *server,
736 const char *database_name,
737 mongoc_query_flags_t flags,
738 const char *command_json,
739 ...)
740 {
741 va_list args;
742 char *formatted_command_json = NULL;
743 char *ns;
744 request_t *request;
745
746 va_start (args, command_json);
747 if (command_json) {
748 formatted_command_json = bson_strdupv_printf (command_json, args);
749 }
750 va_end (args);
751
752 ns = bson_strdup_printf ("%s.$cmd", database_name);
753
754 request = mock_server_receives_request (server);
755
756 if (request &&
757 !request_matches_query (
758 request, ns, flags, 0, 1, formatted_command_json, NULL, true)) {
759 request_destroy (request);
760 request = NULL;
761 }
762
763 bson_free (formatted_command_json);
764 bson_free (ns);
765
766 return request;
767 }
768
769
770 /*--------------------------------------------------------------------------
771 *
772 * mock_server_receives_ismaster --
773 *
774 * Pop a client ismaster call if one is enqueued, or wait up to
775 * request_timeout_ms for the client to send a request.
776 *
777 * Returns:
778 * A request you must request_destroy, or NULL if the current
779 * request is not an ismaster command.
780 *
781 * Side effects:
782 * Logs if the current request is not an ismaster command.
783 *
784 *--------------------------------------------------------------------------
785 */
786
787 request_t *
mock_server_receives_ismaster(mock_server_t * server)788 mock_server_receives_ismaster (mock_server_t *server)
789 {
790 return mock_server_receives_command (
791 server, "admin", MONGOC_QUERY_SLAVE_OK, "{'isMaster': 1}");
792 }
793
794
795 /*--------------------------------------------------------------------------
796 *
797 * mock_server_receives_gle --
798 *
799 * Pop a client request if one is enqueued, or wait up to
800 * request_timeout_ms for the client to send a request.
801 *
802 * Returns:
803 * A request you must request_destroy, or NULL if the request does
804 * not match.
805 *
806 * Side effects:
807 * Logs if the current request is not getLastError.
808 *
809 *--------------------------------------------------------------------------
810 */
811
812 request_t *
mock_server_receives_gle(mock_server_t * server,const char * database_name)813 mock_server_receives_gle (mock_server_t *server, const char *database_name)
814 {
815 return mock_server_receives_command (
816 server, database_name, MONGOC_QUERY_NONE, "{'getLastError': 1}");
817 }
818
819 /*--------------------------------------------------------------------------
820 *
821 * mock_server_receives_query --
822 *
823 * Pop a client request if one is enqueued, or wait up to
824 * request_timeout_ms for the client to send a request.
825 *
826 * Returns:
827 * A request you must request_destroy, or NULL if the request does
828 * not match.
829 *
830 * Side effects:
831 * Logs if the current request is not a query matching ns, flags,
832 * skip, n_return, query_json, and fields_json.
833 *
834 *--------------------------------------------------------------------------
835 */
836
837 request_t *
mock_server_receives_query(mock_server_t * server,const char * ns,mongoc_query_flags_t flags,uint32_t skip,int32_t n_return,const char * query_json,const char * fields_json)838 mock_server_receives_query (mock_server_t *server,
839 const char *ns,
840 mongoc_query_flags_t flags,
841 uint32_t skip,
842 int32_t n_return,
843 const char *query_json,
844 const char *fields_json)
845 {
846 request_t *request;
847
848 request = mock_server_receives_request (server);
849
850 if (request &&
851 !request_matches_query (
852 request, ns, flags, skip, n_return, query_json, fields_json, false)) {
853 request_destroy (request);
854 return NULL;
855 }
856
857 return request;
858 }
859
860
861 /*--------------------------------------------------------------------------
862 *
863 * mock_server_receives_insert --
864 *
865 * Pop a client request if one is enqueued, or wait up to
866 * request_timeout_ms for the client to send a request.
867 *
868 * Returns:
869 * A request you must request_destroy, or NULL if the request does
870 * not match.
871 *
872 * Side effects:
873 * Logs if the current request is not an insert matching ns, flags,
874 * and doc_json.
875 *
876 *--------------------------------------------------------------------------
877 */
878
879 request_t *
mock_server_receives_insert(mock_server_t * server,const char * ns,mongoc_insert_flags_t flags,const char * doc_json)880 mock_server_receives_insert (mock_server_t *server,
881 const char *ns,
882 mongoc_insert_flags_t flags,
883 const char *doc_json)
884 {
885 request_t *request;
886
887 request = mock_server_receives_request (server);
888
889 if (request && !request_matches_insert (request, ns, flags, doc_json)) {
890 request_destroy (request);
891 return NULL;
892 }
893
894 return request;
895 }
896
897 /*--------------------------------------------------------------------------
898 *
899 * mock_server_receives_bulk_insert --
900 *
901 * Pop a client request if one is enqueued, or wait up to
902 * request_timeout_ms for the client to send a request.
903 *
904 * Returns:
905 * A request you must request_destroy, or NULL if the request does
906 * not match.
907 *
908 * Side effects:
909 * Logs if the current request is not an insert matching ns and flags,
910 * with "n" documents.
911 *
912 *--------------------------------------------------------------------------
913 */
914
915 request_t *
mock_server_receives_bulk_insert(mock_server_t * server,const char * ns,mongoc_insert_flags_t flags,int n)916 mock_server_receives_bulk_insert (mock_server_t *server,
917 const char *ns,
918 mongoc_insert_flags_t flags,
919 int n)
920 {
921 request_t *request;
922
923 request = mock_server_receives_request (server);
924
925 if (request && !request_matches_bulk_insert (request, ns, flags, n)) {
926 request_destroy (request);
927 return NULL;
928 }
929
930 return request;
931 }
932
933 /*--------------------------------------------------------------------------
934 *
935 * mock_server_receives_update --
936 *
937 * Pop a client request if one is enqueued, or wait up to
938 * request_timeout_ms for the client to send a request.
939 *
940 * Returns:
941 * A request you must request_destroy, or NULL if the request does
942 * not match.
943 *
944 * Side effects:
945 * Logs if the current request is not an update matching ns, flags,
946 * selector_json, and update_json.
947 *
948 *--------------------------------------------------------------------------
949 */
950
951 request_t *
mock_server_receives_update(mock_server_t * server,const char * ns,mongoc_update_flags_t flags,const char * selector_json,const char * update_json)952 mock_server_receives_update (mock_server_t *server,
953 const char *ns,
954 mongoc_update_flags_t flags,
955 const char *selector_json,
956 const char *update_json)
957 {
958 request_t *request;
959
960 request = mock_server_receives_request (server);
961
962 if (request &&
963 !request_matches_update (
964 request, ns, flags, selector_json, update_json)) {
965 request_destroy (request);
966 return NULL;
967 }
968
969 return request;
970 }
971
972
973 /*--------------------------------------------------------------------------
974 *
975 * mock_server_receives_delete --
976 *
977 * Pop a client request if one is enqueued, or wait up to
978 * request_timeout_ms for the client to send a request.
979 *
980 * Returns:
981 * A request you must request_destroy, or NULL if the request does
982 * not match.
983 *
984 * Side effects:
985 * Logs if the current request is not a delete matching ns, flags,
986 * and selector_json.
987 *
988 *--------------------------------------------------------------------------
989 */
990
991 request_t *
mock_server_receives_delete(mock_server_t * server,const char * ns,mongoc_remove_flags_t flags,const char * selector_json)992 mock_server_receives_delete (mock_server_t *server,
993 const char *ns,
994 mongoc_remove_flags_t flags,
995 const char *selector_json)
996 {
997 request_t *request;
998
999 request = mock_server_receives_request (server);
1000
1001 if (request && !request_matches_delete (request, ns, flags, selector_json)) {
1002 request_destroy (request);
1003 return NULL;
1004 }
1005
1006 return request;
1007 }
1008
1009
1010 /*--------------------------------------------------------------------------
1011 *
1012 * mock_server_receives_getmore --
1013 *
1014 * Pop a client request if one is enqueued, or wait up to
1015 * request_timeout_ms for the client to send a request.
1016 *
1017 * Returns:
1018 * A request you must request_destroy, or NULL if the request does
1019 * not match.
1020 *
1021 * Side effects:
1022 * Logs if the current request is not a getmore matching n_return
1023 * and cursor_id.
1024 *
1025 *--------------------------------------------------------------------------
1026 */
1027
1028 request_t *
mock_server_receives_getmore(mock_server_t * server,const char * ns,int32_t n_return,int64_t cursor_id)1029 mock_server_receives_getmore (mock_server_t *server,
1030 const char *ns,
1031 int32_t n_return,
1032 int64_t cursor_id)
1033 {
1034 request_t *request;
1035
1036 request = mock_server_receives_request (server);
1037
1038 if (request && !request_matches_getmore (request, ns, n_return, cursor_id)) {
1039 request_destroy (request);
1040 return NULL;
1041 }
1042
1043 return request;
1044 }
1045
1046
1047 /*--------------------------------------------------------------------------
1048 *
1049 * mock_server_receives_kill_cursors --
1050 *
1051 * Pop a client request if one is enqueued, or wait up to
1052 * request_timeout_ms for the client to send a request.
1053 *
1054 * Real-life OP_KILLCURSORS can take multiple ids, but that is
1055 * not yet supported here.
1056 *
1057 * Returns:
1058 * A request you must request_destroy, or NULL if the request
1059 * does not match.
1060 *
1061 * Side effects:
1062 * Logs if the current request is not an OP_KILLCURSORS with the
1063 * expected cursor_id.
1064 *
1065 *--------------------------------------------------------------------------
1066 */
1067
1068 request_t *
mock_server_receives_kill_cursors(mock_server_t * server,int64_t cursor_id)1069 mock_server_receives_kill_cursors (mock_server_t *server, int64_t cursor_id)
1070 {
1071 request_t *request;
1072
1073 request = mock_server_receives_request (server);
1074
1075 if (request && !request_matches_kill_cursors (request, cursor_id)) {
1076 request_destroy (request);
1077 return NULL;
1078 }
1079
1080 return request;
1081 }
1082
1083 /*--------------------------------------------------------------------------
1084 *
1085 * mock_server_hangs_up --
1086 *
1087 * Hang up on a client request.
1088 *
1089 * Returns:
1090 * None.
1091 *
1092 * Side effects:
1093 * Causes a network error on the client side.
1094 *
1095 *--------------------------------------------------------------------------
1096 */
1097
1098 void
mock_server_hangs_up(request_t * request)1099 mock_server_hangs_up (request_t *request)
1100 {
1101 test_suite_mock_server_log ("%5.2f %hu <- %hu \thang up!",
1102 mock_server_get_uptime_sec (request->server),
1103 request->client_port,
1104 request_get_server_port (request));
1105
1106 mongoc_stream_close (request->client);
1107 }
1108
1109
1110 /*--------------------------------------------------------------------------
1111 *
1112 * mock_server_resets --
1113 *
1114 * Forcefully reset a connection from the client.
1115 *
1116 * Returns:
1117 * None.
1118 *
1119 * Side effects:
1120 * Causes ECONNRESET on the client side.
1121 *
1122 *--------------------------------------------------------------------------
1123 */
1124
1125 void
mock_server_resets(request_t * request)1126 mock_server_resets (request_t *request)
1127 {
1128 struct linger no_linger;
1129 no_linger.l_onoff = 1;
1130 no_linger.l_linger = 0;
1131
1132 test_suite_mock_server_log ("%5.2f %hu <- %hu \treset!",
1133 mock_server_get_uptime_sec (request->server),
1134 request->client_port,
1135 request_get_server_port (request));
1136
1137 /* send RST packet to client */
1138 mongoc_stream_setsockopt (
1139 request->client, SOL_SOCKET, SO_LINGER, &no_linger, sizeof no_linger);
1140
1141 mongoc_stream_close (request->client);
1142 }
1143
1144
1145 /*--------------------------------------------------------------------------
1146 *
1147 * mock_server_replies --
1148 *
1149 * Respond to a client request.
1150 *
1151 * Returns:
1152 * None.
1153 *
1154 * Side effects:
1155 * Sends an OP_REPLY to the client.
1156 *
1157 *--------------------------------------------------------------------------
1158 */
1159
1160 void
mock_server_replies(request_t * request,mongoc_reply_flags_t flags,int64_t cursor_id,int32_t starting_from,int32_t number_returned,const char * docs_json)1161 mock_server_replies (request_t *request,
1162 mongoc_reply_flags_t flags,
1163 int64_t cursor_id,
1164 int32_t starting_from,
1165 int32_t number_returned,
1166 const char *docs_json)
1167 {
1168 char *quotes_replaced;
1169 bson_t doc;
1170 bson_error_t error;
1171 bool r;
1172
1173 BSON_ASSERT (request);
1174
1175 if (docs_json) {
1176 quotes_replaced = single_quotes_to_double (docs_json);
1177 r = bson_init_from_json (&doc, quotes_replaced, -1, &error);
1178 bson_free (quotes_replaced);
1179 } else {
1180 r = bson_init_from_json (&doc, "{}", -1, &error);
1181 }
1182
1183 if (!r) {
1184 MONGOC_WARNING ("%s", error.message);
1185 return;
1186 }
1187
1188 mock_server_reply_multi (request, flags, &doc, 1, cursor_id);
1189 bson_destroy (&doc);
1190 }
1191
1192
1193 /*--------------------------------------------------------------------------
1194 *
1195 * mock_server_replies_simple --
1196 *
1197 * Respond to a client request.
1198 *
1199 * Returns:
1200 * None.
1201 *
1202 * Side effects:
1203 * Sends an OP_REPLY to the client.
1204 *
1205 *--------------------------------------------------------------------------
1206 */
1207
1208 void
mock_server_replies_simple(request_t * request,const char * docs_json)1209 mock_server_replies_simple (request_t *request, const char *docs_json)
1210 {
1211 mock_server_replies (request, MONGOC_REPLY_NONE, 0, 0, 1, docs_json);
1212 }
1213
1214
1215 /*--------------------------------------------------------------------------
1216 *
1217 * mock_server_replies_ok_and_destroys --
1218 *
1219 * Respond to a client request.
1220 *
1221 * Returns:
1222 * None.
1223 *
1224 * Side effects:
1225 * Sends an OP_REPLY with "{ok: 1}" to the client.
1226 *
1227 *--------------------------------------------------------------------------
1228 */
1229
1230 void
mock_server_replies_ok_and_destroys(request_t * request)1231 mock_server_replies_ok_and_destroys (request_t *request)
1232 {
1233 mock_server_replies (request, MONGOC_REPLY_NONE, 0, 0, 1, "{'ok': 1}");
1234 request_destroy (request);
1235 }
1236
1237
1238 /*--------------------------------------------------------------------------
1239 *
1240 * mock_server_replies_to_find --
1241 *
1242 * Receive an OP_QUERY or "find" command and reply appropriately.
1243 *
1244 * Returns:
1245 * None.
1246 *
1247 * Side effects:
1248 * Very roughly validates the query or "find" command or aborts.
1249 * The intent is not to test the driver's query or find command
1250 * implementation here, see _test_kill_cursors for example use.
1251 *
1252 *--------------------------------------------------------------------------
1253 */
1254
1255 void
mock_server_replies_to_find(request_t * request,mongoc_query_flags_t flags,int64_t cursor_id,int32_t number_returned,const char * ns,const char * reply_json,bool is_command)1256 mock_server_replies_to_find (request_t *request,
1257 mongoc_query_flags_t flags,
1258 int64_t cursor_id,
1259 int32_t number_returned,
1260 const char *ns,
1261 const char *reply_json,
1262 bool is_command)
1263 {
1264 char *find_reply;
1265 char db[MONGOC_NAMESPACE_MAX];
1266
1267 _mongoc_get_db_name (ns, db);
1268
1269 /* minimal validation, we're not testing query / find cmd here */
1270 if (request->is_command && !is_command) {
1271 test_error ("expected query, got command");
1272 abort ();
1273 }
1274
1275 if (!request->is_command && is_command) {
1276 test_error ("expected command, got query");
1277 abort ();
1278 }
1279
1280 if (!request_matches_flags (request, flags)) {
1281 abort ();
1282 }
1283
1284 if (is_command) {
1285 find_reply =
1286 bson_strdup_printf ("{'ok': 1,"
1287 " 'cursor': {"
1288 " 'id': {'$numberLong': '%" PRId64 "'},"
1289 " 'ns': '%s',"
1290 " 'firstBatch': [%s]}}",
1291 cursor_id,
1292 ns,
1293 reply_json);
1294
1295 mock_server_replies_simple (request, find_reply);
1296 bson_free (find_reply);
1297 } else {
1298 mock_server_replies (
1299 request, MONGOC_REPLY_NONE, cursor_id, 0, number_returned, reply_json);
1300 }
1301 }
1302
1303
1304 /*--------------------------------------------------------------------------
1305 *
1306 * mock_server_destroy --
1307 *
1308 * Free a mock_server_t.
1309 *
1310 * Returns:
1311 * None.
1312 *
1313 * Side effects:
1314 * Closes sockets, joins threads, and calls destructors passed
1315 * to mock_server_autoresponds.
1316 *
1317 *--------------------------------------------------------------------------
1318 */
1319
1320 void
mock_server_destroy(mock_server_t * server)1321 mock_server_destroy (mock_server_t *server)
1322 {
1323 size_t i;
1324 autoresponder_handle_t *handle;
1325 int64_t deadline = bson_get_monotonic_time () + 10 * 1000 * 1000;
1326 request_t *request;
1327
1328 mongoc_mutex_lock (&server->mutex);
1329 if (server->running) {
1330 server->stopped = true;
1331 }
1332 mongoc_mutex_unlock (&server->mutex);
1333
1334 while (bson_get_monotonic_time () <= deadline) {
1335 /* wait 10 seconds */
1336 mongoc_mutex_lock (&server->mutex);
1337 if (!server->running) {
1338 mongoc_mutex_unlock (&server->mutex);
1339 break;
1340 }
1341
1342 mongoc_mutex_unlock (&server->mutex);
1343 _mongoc_usleep (1000);
1344 }
1345
1346 mongoc_mutex_lock (&server->mutex);
1347 if (server->running) {
1348 fprintf (stderr, "server still running after timeout\n");
1349 fflush (stderr);
1350 abort ();
1351 }
1352
1353 mongoc_mutex_unlock (&server->mutex);
1354 mongoc_thread_join (server->main_thread);
1355
1356 _mongoc_array_destroy (&server->worker_threads);
1357
1358 for (i = 0; i < server->autoresponders.len; i++) {
1359 handle = &_mongoc_array_index (
1360 &server->autoresponders, autoresponder_handle_t, i);
1361
1362 autoresponder_handle_destroy (handle);
1363 }
1364
1365 _mongoc_array_destroy (&server->autoresponders);
1366
1367 mongoc_cond_destroy (&server->cond);
1368 mongoc_mutex_destroy (&server->mutex);
1369 mongoc_socket_destroy (server->sock);
1370 bson_free (server->uri_str);
1371 mongoc_uri_destroy (server->uri);
1372
1373 while ((request = (request_t *) q_get_nowait (server->q))) {
1374 request_destroy (request);
1375 }
1376
1377 q_destroy (server->q);
1378 bson_free (server);
1379 }
1380
1381
1382 static uint16_t
get_port(mongoc_socket_t * sock)1383 get_port (mongoc_socket_t *sock)
1384 {
1385 struct sockaddr_in bound_addr = {0};
1386 mongoc_socklen_t addr_len = (mongoc_socklen_t) sizeof bound_addr;
1387
1388 if (mongoc_socket_getsockname (
1389 sock, (struct sockaddr *) &bound_addr, &addr_len) < 0) {
1390 perror ("Failed to get listening port number");
1391 return 0;
1392 }
1393
1394 return ntohs (bound_addr.sin_port);
1395 }
1396
1397
1398 static bool
_mock_server_stopping(mock_server_t * server)1399 _mock_server_stopping (mock_server_t *server)
1400 {
1401 bool stopped;
1402
1403 mongoc_mutex_lock (&server->mutex);
1404 stopped = server->stopped;
1405 mongoc_mutex_unlock (&server->mutex);
1406
1407 return stopped;
1408 }
1409
1410
1411 typedef struct worker_closure_t {
1412 mock_server_t *server;
1413 mongoc_stream_t *client_stream;
1414 uint16_t port;
1415 } worker_closure_t;
1416
1417
1418 static void *
main_thread(void * data)1419 main_thread (void *data)
1420 {
1421 mock_server_t *server = (mock_server_t *) data;
1422 mongoc_socket_t *client_sock;
1423 uint16_t port;
1424 mongoc_stream_t *client_stream;
1425 worker_closure_t *closure;
1426 mongoc_thread_t thread;
1427 mongoc_array_t worker_threads;
1428 size_t i;
1429
1430 mongoc_mutex_lock (&server->mutex);
1431 server->running = true;
1432 mongoc_cond_signal (&server->cond);
1433 mongoc_mutex_unlock (&server->mutex);
1434
1435 for (;;) {
1436 client_sock = mongoc_socket_accept_ex (
1437 server->sock, bson_get_monotonic_time () + 100 * 1000, &port);
1438
1439 if (_mock_server_stopping (server)) {
1440 break;
1441 }
1442
1443 if (client_sock) {
1444 test_suite_mock_server_log (
1445 "%5.2f %hu -> server port %hu (connected)",
1446 mock_server_get_uptime_sec (server),
1447 port,
1448 server->port);
1449
1450 client_stream = mongoc_stream_socket_new (client_sock);
1451
1452 #ifdef MONGOC_ENABLE_SSL
1453 mongoc_mutex_lock (&server->mutex);
1454 if (server->ssl) {
1455 server->ssl_opts.weak_cert_validation = 1;
1456 client_stream = mongoc_stream_tls_new_with_hostname (
1457 client_stream, NULL, &server->ssl_opts, 0);
1458 if (!client_stream) {
1459 mongoc_mutex_unlock (&server->mutex);
1460 perror ("Failed to attach tls stream");
1461 break;
1462 }
1463 }
1464 mongoc_mutex_unlock (&server->mutex);
1465 #endif
1466 closure = (worker_closure_t *) bson_malloc (sizeof *closure);
1467 closure->server = server;
1468 closure->client_stream = client_stream;
1469 closure->port = port;
1470
1471 mongoc_mutex_lock (&server->mutex);
1472 mongoc_thread_create (&thread, worker_thread, closure);
1473 _mongoc_array_append_val (&server->worker_threads, thread);
1474 mongoc_mutex_unlock (&server->mutex);
1475 }
1476 }
1477
1478 /* copy list of worker threads and join them all */
1479 _mongoc_array_init (&worker_threads, sizeof (mongoc_thread_t));
1480 mongoc_mutex_lock (&server->mutex);
1481 _mongoc_array_copy (&worker_threads, &server->worker_threads);
1482 mongoc_mutex_unlock (&server->mutex);
1483
1484 for (i = 0; i < worker_threads.len; i++) {
1485 mongoc_thread_join (
1486 _mongoc_array_index (&worker_threads, mongoc_thread_t, i));
1487 }
1488
1489 _mongoc_array_destroy (&worker_threads);
1490
1491 mongoc_mutex_lock (&server->mutex);
1492 server->running = false;
1493 mongoc_mutex_unlock (&server->mutex);
1494
1495 return NULL;
1496 }
1497
1498
1499 static void
_reply_destroy(reply_t * reply)1500 _reply_destroy (reply_t *reply)
1501 {
1502 int i;
1503
1504 for (i = 0; i < reply->n_docs; i++) {
1505 bson_destroy (&reply->docs[i]);
1506 }
1507
1508 bson_free (reply->docs);
1509 bson_free (reply);
1510 }
1511
1512
1513 static void *
worker_thread(void * data)1514 worker_thread (void *data)
1515 {
1516 worker_closure_t *closure = (worker_closure_t *) data;
1517 mock_server_t *server = closure->server;
1518 mongoc_stream_t *client_stream = closure->client_stream;
1519 mongoc_buffer_t buffer;
1520 mongoc_rpc_t *rpc = NULL;
1521 bool handled;
1522 bson_error_t error;
1523 int32_t msg_len;
1524 sync_queue_t *requests;
1525 sync_queue_t *replies;
1526 request_t *request;
1527 mongoc_array_t autoresponders;
1528 ssize_t i;
1529 autoresponder_handle_t handle;
1530 reply_t *reply;
1531
1532 #ifdef MONGOC_ENABLE_SSL
1533 bool ssl;
1534 #endif
1535
1536 ENTRY;
1537
1538 /* queue of client replies sent over this worker's connection */
1539 replies = q_new ();
1540
1541 #ifdef MONGOC_ENABLE_SSL
1542 mongoc_mutex_lock (&server->mutex);
1543 ssl = server->ssl;
1544 mongoc_mutex_unlock (&server->mutex);
1545
1546 if (ssl) {
1547 if (!mongoc_stream_tls_handshake_block (
1548 client_stream, "localhost", TIMEOUT, &error)) {
1549 mongoc_stream_close (client_stream);
1550 mongoc_stream_destroy (client_stream);
1551 bson_free (closure);
1552 q_destroy (replies);
1553 RETURN (NULL);
1554 }
1555 }
1556 #endif
1557
1558 _mongoc_buffer_init (&buffer, NULL, 0, NULL, NULL);
1559 _mongoc_array_init (&autoresponders, sizeof (autoresponder_handle_t));
1560
1561 again:
1562 /* loop, checking for requests to receive or replies to send */
1563 bson_free (rpc);
1564 rpc = NULL;
1565
1566 if (_mongoc_buffer_fill (&buffer, client_stream, 4, 10, &error) > 0) {
1567 BSON_ASSERT (buffer.len >= 4);
1568
1569 memcpy (&msg_len, buffer.data + buffer.off, 4);
1570 msg_len = BSON_UINT32_FROM_LE (msg_len);
1571
1572 if (msg_len < 16) {
1573 MONGOC_WARNING ("No data");
1574 GOTO (failure);
1575 }
1576
1577 if (_mongoc_buffer_fill (
1578 &buffer, client_stream, (size_t) msg_len, -1, &error) == -1) {
1579 MONGOC_WARNING ("%s():%d: %s", BSON_FUNC, __LINE__, error.message);
1580 GOTO (failure);
1581 }
1582
1583 BSON_ASSERT (buffer.len >= (unsigned) msg_len);
1584
1585 /* copies message from buffer */
1586 request = request_new (
1587 &buffer, msg_len, server, client_stream, closure->port, replies);
1588
1589 memmove (
1590 buffer.data, buffer.data + buffer.off + msg_len, buffer.len - msg_len);
1591 buffer.off = 0;
1592 buffer.len -= msg_len;
1593
1594 mongoc_mutex_lock (&server->mutex);
1595 _mongoc_array_copy (&autoresponders, &server->autoresponders);
1596 mongoc_mutex_unlock (&server->mutex);
1597
1598 test_suite_mock_server_log ("%5.2f %hu -> %hu %s",
1599 mock_server_get_uptime_sec (server),
1600 closure->port,
1601 server->port,
1602 request->as_str);
1603
1604 /* run responders most-recently-added-first */
1605 handled = false;
1606
1607 for (i = server->autoresponders.len - 1; i >= 0; i--) {
1608 handle = _mongoc_array_index (
1609 &server->autoresponders, autoresponder_handle_t, i);
1610
1611 if (handle.responder (request, handle.data)) {
1612 /* responder destroyed request and enqueued a reply in "replies" */
1613 handled = true;
1614 request = NULL;
1615 break;
1616 }
1617 }
1618
1619 if (!handled) {
1620 /* pass to the main thread via the queue */
1621 requests = mock_server_get_queue (server);
1622 q_put (requests, (void *) request);
1623 }
1624 }
1625
1626 if (_mock_server_stopping (server)) {
1627 GOTO (failure);
1628 }
1629
1630 reply = q_get (replies, 10);
1631 if (reply) {
1632 _mock_server_reply_with_stream (server, reply, client_stream);
1633 _reply_destroy (reply);
1634 }
1635
1636 if (_mock_server_stopping (server)) {
1637 GOTO (failure);
1638 }
1639
1640 GOTO (again);
1641
1642 failure:
1643 _mongoc_array_destroy (&autoresponders);
1644 _mongoc_buffer_destroy (&buffer);
1645
1646 mongoc_stream_close (client_stream);
1647 mongoc_stream_destroy (client_stream);
1648 bson_free (rpc);
1649 bson_free (closure);
1650 _mongoc_buffer_destroy (&buffer);
1651
1652 while ((reply = q_get_nowait (replies))) {
1653 _reply_destroy (reply);
1654 }
1655
1656 q_destroy (replies);
1657
1658 RETURN (NULL);
1659 }
1660
1661
1662 /* enqueue server reply for this connection's worker thread to send to client */
1663 void
mock_server_reply_multi(request_t * request,mongoc_reply_flags_t flags,const bson_t * docs,int n_docs,int64_t cursor_id)1664 mock_server_reply_multi (request_t *request,
1665 mongoc_reply_flags_t flags,
1666 const bson_t *docs,
1667 int n_docs,
1668 int64_t cursor_id)
1669 {
1670 reply_t *reply;
1671 int i;
1672
1673 BSON_ASSERT (request);
1674
1675 reply = bson_malloc0 (sizeof (reply_t));
1676
1677 reply->flags = flags;
1678 reply->n_docs = n_docs;
1679 reply->docs = bson_malloc0 (n_docs * sizeof (bson_t));
1680
1681 for (i = 0; i < n_docs; i++) {
1682 bson_copy_to (&docs[i], &reply->docs[i]);
1683 }
1684
1685 reply->cursor_id = cursor_id;
1686 reply->client_port = request_get_client_port (request);
1687 reply->request_opcode = (mongoc_opcode_t) request->request_rpc.header.opcode;
1688 reply->query_flags = (mongoc_query_flags_t) request->request_rpc.query.flags;
1689 reply->response_to = request->request_rpc.header.request_id;
1690
1691 q_put (request->replies, reply);
1692 }
1693
1694
1695 static void
_mock_server_reply_with_stream(mock_server_t * server,reply_t * reply,mongoc_stream_t * client)1696 _mock_server_reply_with_stream (mock_server_t *server,
1697 reply_t *reply,
1698 mongoc_stream_t *client)
1699 {
1700 char *doc_json;
1701 bson_string_t *docs_json;
1702 mongoc_iovec_t *iov;
1703 mongoc_array_t ar;
1704 mongoc_rpc_t r = {{0}};
1705 size_t expected = 0;
1706 ssize_t n_written;
1707 int iovcnt;
1708 int i;
1709 uint8_t *buf;
1710 uint8_t *ptr;
1711 size_t len;
1712
1713 mongoc_reply_flags_t flags = reply->flags;
1714 const bson_t *docs = reply->docs;
1715 int n_docs = reply->n_docs;
1716 int64_t cursor_id = reply->cursor_id;
1717
1718 docs_json = bson_string_new ("");
1719 for (i = 0; i < n_docs; i++) {
1720 doc_json = bson_as_json (&docs[i], NULL);
1721 bson_string_append (docs_json, doc_json);
1722 bson_free (doc_json);
1723 if (i < n_docs - 1) {
1724 bson_string_append (docs_json, ", ");
1725 }
1726 }
1727
1728 test_suite_mock_server_log ("%5.2f %hu <- %hu \t%s",
1729 mock_server_get_uptime_sec (server),
1730 reply->client_port,
1731 mock_server_get_port (server),
1732 docs_json->str);
1733
1734 len = 0;
1735
1736 for (i = 0; i < n_docs; i++) {
1737 len += docs[i].len;
1738 }
1739
1740 ptr = buf = bson_malloc (len);
1741
1742 for (i = 0; i < n_docs; i++) {
1743 memcpy (ptr, bson_get_data (&docs[i]), docs[i].len);
1744 ptr += docs[i].len;
1745 }
1746
1747 _mongoc_array_init (&ar, sizeof (mongoc_iovec_t));
1748
1749 mongoc_mutex_lock (&server->mutex);
1750
1751 if (!(reply->request_opcode == MONGOC_OPCODE_QUERY &&
1752 reply->query_flags & MONGOC_QUERY_EXHAUST)) {
1753 server->last_response_id++;
1754 }
1755
1756 r.header.request_id = server->last_response_id;
1757 mongoc_mutex_unlock (&server->mutex);
1758 r.header.msg_len = 0;
1759 r.header.response_to = reply->response_to;
1760 r.header.opcode = MONGOC_OPCODE_REPLY;
1761 r.reply.flags = flags;
1762 r.reply.cursor_id = cursor_id;
1763 r.reply.start_from = 0;
1764 r.reply.n_returned = 1;
1765 r.reply.documents = buf;
1766 r.reply.documents_len = (uint32_t) len;
1767
1768 _mongoc_rpc_gather (&r, &ar);
1769 _mongoc_rpc_swab_to_le (&r);
1770
1771 iov = (mongoc_iovec_t *) ar.data;
1772 iovcnt = (int) ar.len;
1773
1774 for (i = 0; i < iovcnt; i++) {
1775 expected += iov[i].iov_len;
1776 }
1777
1778 n_written = mongoc_stream_writev (client, iov, (size_t) iovcnt, -1);
1779
1780 BSON_ASSERT (n_written == expected);
1781
1782 bson_string_free (docs_json, true);
1783 _mongoc_array_destroy (&ar);
1784 bson_free (buf);
1785 }
1786
1787
1788 void
autoresponder_handle_destroy(autoresponder_handle_t * handle)1789 autoresponder_handle_destroy (autoresponder_handle_t *handle)
1790 {
1791 if (handle->destructor) {
1792 handle->destructor (handle->data);
1793 }
1794 }
1795