1 /*
2  * Copyright 2015 MongoDB, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *   http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 
18 #include "mongoc.h"
19 
20 #include "mongoc-buffer-private.h"
21 #include "mongoc-socket-private.h"
22 #include "mongoc-thread-private.h"
23 #include "mongoc-util-private.h"
24 #include "mongoc-trace-private.h"
25 #include "sync-queue.h"
26 #include "mock-server.h"
27 #include "../test-conveniences.h"
28 #include "../test-libmongoc.h"
29 #include "../TestSuite.h"
30 
31 #ifdef HAVE_STRINGS_H
32 #include <strings.h>
33 #endif
34 
35 /* /Async/ismaster_ssl and /TOPOLOGY/scanner_ssl need a reasonable timeout */
36 #define TIMEOUT 5000
37 
38 
39 struct _mock_server_t {
40    bool running;
41    bool stopped;
42    bool rand_delay;
43    int64_t request_timeout_msec;
44    uint16_t port;
45    mongoc_socket_t *sock;
46    char *uri_str;
47    mongoc_uri_t *uri;
48    mongoc_thread_t main_thread;
49    mongoc_cond_t cond;
50    mongoc_mutex_t mutex;
51    int32_t last_response_id;
52    mongoc_array_t worker_threads;
53    sync_queue_t *q;
54    mongoc_array_t autoresponders;
55    int last_autoresponder_id;
56    int64_t start_time;
57 
58 #ifdef MONGOC_ENABLE_SSL
59    bool ssl;
60    mongoc_ssl_opt_t ssl_opts;
61 #endif
62 };
63 
64 
65 struct _autoresponder_handle_t {
66    autoresponder_t responder;
67    void *data;
68    destructor_t destructor;
69    int id;
70 };
71 
72 
73 typedef struct {
74    mongoc_reply_flags_t flags;
75    bson_t *docs;
76    int n_docs;
77    int64_t cursor_id;
78    uint16_t client_port;
79    mongoc_opcode_t request_opcode;
80    mongoc_query_flags_t query_flags;
81    int32_t response_to;
82 } reply_t;
83 
84 
85 static void *
86 main_thread (void *data);
87 
88 static void *
89 worker_thread (void *data);
90 
91 static void
92 _mock_server_reply_with_stream (mock_server_t *server,
93                                 reply_t *reply,
94                                 mongoc_stream_t *client);
95 
96 void
97 autoresponder_handle_destroy (autoresponder_handle_t *handle);
98 
99 static uint16_t
100 get_port (mongoc_socket_t *sock);
101 
102 /*--------------------------------------------------------------------------
103  *
104  * mock_server_new --
105  *
106  *       Get a new mock_server_t. Call mock_server_run to start it,
107  *       then mock_server_get_uri to connect.
108  *
109  *       This server does not autorespond to "ismaster".
110  *
111  * Returns:
112  *       A server you must mock_server_destroy.
113  *
114  * Side effects:
115  *       None.
116  *
117  *--------------------------------------------------------------------------
118  */
119 
120 mock_server_t *
mock_server_new()121 mock_server_new ()
122 {
123    mock_server_t *server =
124       (mock_server_t *) bson_malloc0 (sizeof (mock_server_t));
125 
126    server->request_timeout_msec = get_future_timeout_ms ();
127    _mongoc_array_init (&server->autoresponders,
128                        sizeof (autoresponder_handle_t));
129    _mongoc_array_init (&server->worker_threads, sizeof (mongoc_thread_t));
130    mongoc_cond_init (&server->cond);
131    mongoc_mutex_init (&server->mutex);
132    server->q = q_new ();
133    server->start_time = bson_get_monotonic_time ();
134 
135    return server;
136 }
137 
138 
139 /*--------------------------------------------------------------------------
140  *
141  * mock_server_with_autoismaster --
142  *
143  *       A new mock_server_t that autoresponds to ismaster. Call
144  *       mock_server_run to start it, then mock_server_get_uri to
145  *       connect.
146  *
147  * Returns:
148  *       A server you must mock_server_destroy.
149  *
150  * Side effects:
151  *       None.
152  *
153  *--------------------------------------------------------------------------
154  */
155 
156 mock_server_t *
mock_server_with_autoismaster(int32_t max_wire_version)157 mock_server_with_autoismaster (int32_t max_wire_version)
158 {
159    mock_server_t *server = mock_server_new ();
160 
161    char *ismaster = bson_strdup_printf ("{'ok': 1.0,"
162                                         " 'ismaster': true,"
163                                         " 'minWireVersion': 0,"
164                                         " 'maxWireVersion': %d}",
165                                         max_wire_version);
166 
167    mock_server_auto_ismaster (server, ismaster);
168 
169    bson_free (ismaster);
170 
171    return server;
172 }
173 
174 
175 /*--------------------------------------------------------------------------
176  *
177  * mock_mongos_new --
178  *
179  *       A new mock_server_t that autoresponds to ismaster as if it were a
180  *       mongos. Call mock_server_run to start it, then mock_server_get_uri
181  *       to connect.
182  *
183  * Returns:
184  *       A server you must mock_server_destroy.
185  *
186  * Side effects:
187  *       None.
188  *
189  *--------------------------------------------------------------------------
190  */
191 
192 mock_server_t *
mock_mongos_new(int32_t max_wire_version)193 mock_mongos_new (int32_t max_wire_version)
194 {
195    mock_server_t *server = mock_server_new ();
196 
197    char *ismaster = bson_strdup_printf ("{'ok': 1.0,"
198                                         " 'ismaster': true,"
199                                         " 'msg': 'isdbgrid',"
200                                         " 'minWireVersion': 0,"
201                                         " 'maxWireVersion': %d}",
202                                         max_wire_version);
203 
204    mock_server_auto_ismaster (server, ismaster);
205 
206    bson_free (ismaster);
207 
208    return server;
209 }
210 
211 
212 static bool
hangup(request_t * request,void * ctx)213 hangup (request_t *request, void *ctx)
214 {
215    mock_server_hangs_up (request);
216    request_destroy (request);
217    return true;
218 }
219 
220 
221 /*--------------------------------------------------------------------------
222  *
223  * mock_server_down --
224  *
225  *       A new mock_server_t hangs up. Call mock_server_run to start it,
226  *       then mock_server_get_uri to connect.
227  *
228  * Returns:
229  *       A server you must mock_server_destroy.
230  *
231  * Side effects:
232  *       None.
233  *
234  *--------------------------------------------------------------------------
235  */
236 
237 mock_server_t *
mock_server_down(void)238 mock_server_down (void)
239 {
240    mock_server_t *server = mock_server_new ();
241 
242    mock_server_autoresponds (server, hangup, NULL, NULL);
243 
244    return server;
245 }
246 
247 
248 #ifdef MONGOC_ENABLE_SSL
249 
250 /*--------------------------------------------------------------------------
251  *
252  * mock_server_set_ssl_opts --
253  *
254  *       Set server-side SSL options before calling mock_server_run.
255  *
256  * Returns:
257  *       None.
258  *
259  * Side effects:
260  *       None.
261  *
262  *--------------------------------------------------------------------------
263  */
264 void
mock_server_set_ssl_opts(mock_server_t * server,mongoc_ssl_opt_t * opts)265 mock_server_set_ssl_opts (mock_server_t *server, mongoc_ssl_opt_t *opts)
266 {
267    mongoc_mutex_lock (&server->mutex);
268    server->ssl = true;
269    memcpy (&server->ssl_opts, opts, sizeof *opts);
270    mongoc_mutex_unlock (&server->mutex);
271 }
272 
273 #endif
274 
275 /*--------------------------------------------------------------------------
276  *
277  * mock_server_run --
278  *
279  *       Start listening on an unused port. After this, call
280  *       mock_server_get_uri to connect.
281  *
282  * Returns:
283  *       The bound port.
284  *
285  * Side effects:
286  *       The server's port and URI are set.
287  *
288  *--------------------------------------------------------------------------
289  */
290 uint16_t
mock_server_run(mock_server_t * server)291 mock_server_run (mock_server_t *server)
292 {
293    mongoc_socket_t *ssock;
294    struct sockaddr_in bind_addr;
295    int optval;
296    uint16_t bound_port;
297 
298    /* CDRIVER-2115: don't run mock server tests on 32-bit */
299    BSON_ASSERT (sizeof (void *) * 8 >= 64);
300 
301    MONGOC_INFO ("Starting mock server on port %d.", server->port);
302 
303    ssock = mongoc_socket_new (AF_INET, SOCK_STREAM, 0);
304    if (!ssock) {
305       perror ("Failed to create socket.");
306       return 0;
307    }
308 
309    optval = 1;
310    mongoc_socket_setsockopt (
311       ssock, SOL_SOCKET, SO_REUSEADDR, &optval, sizeof optval);
312 
313    memset (&bind_addr, 0, sizeof bind_addr);
314 
315    bind_addr.sin_family = AF_INET;
316    bind_addr.sin_addr.s_addr = htonl (INADDR_ANY);
317 
318    /* bind to unused port */
319    bind_addr.sin_port = htons (0);
320 
321    if (-1 == mongoc_socket_bind (
322                 ssock, (struct sockaddr *) &bind_addr, sizeof bind_addr)) {
323       perror ("Failed to bind socket");
324       return 0;
325    }
326 
327    if (-1 == mongoc_socket_listen (ssock, 10)) {
328       perror ("Failed to put socket into listen mode");
329       return 0;
330    }
331 
332    bound_port = get_port (ssock);
333    if (!bound_port) {
334       perror ("Failed to get bound port number");
335       return 0;
336    }
337 
338    mongoc_mutex_lock (&server->mutex);
339 
340    server->sock = ssock;
341    server->port = bound_port;
342    /* TODO: configurable timeouts, perhaps from env */
343    server->uri_str = bson_strdup_printf (
344       "mongodb://127.0.0.1:%hu/?serverselectiontimeoutms=10000&"
345       "sockettimeoutms=10000",
346       bound_port);
347    server->uri = mongoc_uri_new (server->uri_str);
348 
349    mongoc_thread_create (&server->main_thread, main_thread, (void *) server);
350    while (!server->running) {
351       mongoc_cond_wait (&server->cond, &server->mutex);
352    }
353 
354    mongoc_mutex_unlock (&server->mutex);
355 
356    test_suite_mock_server_log ("listening on port %hu", bound_port);
357 
358    return (uint16_t) bound_port;
359 }
360 
361 
362 /*--------------------------------------------------------------------------
363  *
364  * mock_server_autoresponds --
365  *
366  *       Respond to matching requests. "data" is passed to the responder
367  *       callback, and passed to "destructor" when the autoresponder is
368  *       destroyed.
369  *
370  *       Responders are run most-recently-added-first until one returns
371  *       true to indicate it has handled the request. If none handles it,
372  *       the request is enqueued until a call to mock_server_receives_*.
373  *
374  *       Autoresponders must call request_destroy after handling a
375  *       request.
376  *
377  * Returns:
378  *       An id for mock_server_remove_autoresponder.
379  *
380  * Side effects:
381  *       If a matching request is enqueued, pop it and respond.
382  *
383  *--------------------------------------------------------------------------
384  */
385 
386 int
mock_server_autoresponds(mock_server_t * server,autoresponder_t responder,void * data,destructor_t destructor)387 mock_server_autoresponds (mock_server_t *server,
388                           autoresponder_t responder,
389                           void *data,
390                           destructor_t destructor)
391 {
392    autoresponder_handle_t handle = {responder, data, destructor};
393    int id;
394 
395    mongoc_mutex_lock (&server->mutex);
396    id = handle.id = server->last_autoresponder_id++;
397    /* TODO: peek and see if a matching request is enqueued */
398    _mongoc_array_append_val (&server->autoresponders, handle);
399    mongoc_mutex_unlock (&server->mutex);
400 
401    return id;
402 }
403 
404 
405 /*--------------------------------------------------------------------------
406  *
407  * mock_server_remove_autoresponder --
408  *
409  *       Remove a responder callback. Pass in the id returned by
410  *       mock_server_autoresponds.
411  *
412  * Returns:
413  *       None.
414  *
415  * Side effects:
416  *       The responder's destructor is called on its "data" pointer.
417  *
418  *--------------------------------------------------------------------------
419  */
420 
421 void
mock_server_remove_autoresponder(mock_server_t * server,int id)422 mock_server_remove_autoresponder (mock_server_t *server, int id)
423 {
424    size_t i;
425    autoresponder_handle_t *handles;
426 
427    mongoc_mutex_lock (&server->mutex);
428    handles = (autoresponder_handle_t *) server->autoresponders.data;
429    for (i = 0; i < server->autoresponders.len; i++) {
430       if (handles[i].id == id) {
431          /* left-shift everyone after */
432          server->autoresponders.len--;
433          for (; i < server->autoresponders.len; i++) {
434             handles[i] = handles[i + 1];
435          }
436 
437          autoresponder_handle_destroy (handles);
438 
439          break;
440       }
441    }
442 
443    mongoc_mutex_unlock (&server->mutex);
444 }
445 
446 
447 static bool
auto_ismaster(request_t * request,void * data)448 auto_ismaster (request_t *request, void *data)
449 {
450    const char *response_json = (const char *) data;
451    char *quotes_replaced;
452    bson_t response;
453    bson_error_t error;
454 
455    if (!request->is_command || strcasecmp (request->command_name, "ismaster")) {
456       return false;
457    }
458 
459    quotes_replaced = single_quotes_to_double (response_json);
460 
461    if (!bson_init_from_json (&response, quotes_replaced, -1, &error)) {
462       fprintf (stderr, "%s\n", error.message);
463       fflush (stderr);
464       abort ();
465    }
466 
467    if (mock_server_get_rand_delay (request->server)) {
468       _mongoc_usleep ((int64_t) (rand () % 10) * 1000);
469    }
470 
471    mock_server_replies (request, MONGOC_REPLY_NONE, 0, 0, 1, response_json);
472 
473    bson_destroy (&response);
474    bson_free (quotes_replaced);
475    request_destroy (request);
476    return true;
477 }
478 
479 
480 /*--------------------------------------------------------------------------
481  *
482  * mock_server_auto_ismaster --
483  *
484  *       Autorespond to "ismaster" with the provided document.
485  *
486  * Returns:
487  *       An id for mock_server_remove_autoresponder.
488  *
489  * Side effects:
490  *       If a matching request is enqueued, pop it and respond.
491  *
492  *--------------------------------------------------------------------------
493  */
494 
495 int
mock_server_auto_ismaster(mock_server_t * server,const char * response_json,...)496 mock_server_auto_ismaster (mock_server_t *server,
497                            const char *response_json,
498                            ...)
499 {
500    char *formatted_response_json;
501    va_list args;
502 
503    va_start (args, response_json);
504    formatted_response_json = bson_strdupv_printf (response_json, args);
505    va_end (args);
506 
507    return mock_server_autoresponds (
508       server, auto_ismaster, (void *) formatted_response_json, bson_free);
509 }
510 
511 
512 /*--------------------------------------------------------------------------
513  *
514  * mock_server_get_uri --
515  *
516  *       Call after mock_server_run to get the connection string.
517  *
518  * Returns:
519  *       A const URI.
520  *
521  * Side effects:
522  *       None.
523  *
524  *--------------------------------------------------------------------------
525  */
526 
527 const mongoc_uri_t *
mock_server_get_uri(mock_server_t * server)528 mock_server_get_uri (mock_server_t *server)
529 {
530    mongoc_uri_t *uri;
531 
532    mongoc_mutex_lock (&server->mutex);
533    uri = server->uri;
534    mongoc_mutex_unlock (&server->mutex);
535 
536    return uri;
537 }
538 
539 
540 /*--------------------------------------------------------------------------
541  *
542  * mock_server_get_host_and_port --
543  *
544  *       Call after mock_server_run to get the server's "host:port".
545  *
546  * Returns:
547  *       A const string.
548  *
549  * Side effects:
550  *       None.
551  *
552  *--------------------------------------------------------------------------
553  */
554 
555 const char *
mock_server_get_host_and_port(mock_server_t * server)556 mock_server_get_host_and_port (mock_server_t *server)
557 {
558    const mongoc_uri_t *uri;
559 
560    uri = mock_server_get_uri (server);
561    BSON_ASSERT (uri); /* must call after mock_server_run */
562    return (mongoc_uri_get_hosts (uri))->host_and_port;
563 }
564 
565 
566 /*--------------------------------------------------------------------------
567  *
568  * mock_server_get_port --
569  *
570  *       Call after mock_server_run to get the server's listening port.
571  *
572  * Returns:
573  *       A port number.
574  *
575  * Side effects:
576  *       None.
577  *
578  *--------------------------------------------------------------------------
579  */
580 
581 uint16_t
mock_server_get_port(mock_server_t * server)582 mock_server_get_port (mock_server_t *server)
583 {
584    return server->port;
585 }
586 
587 
588 /*--------------------------------------------------------------------------
589  *
590  * mock_server_get_request_timeout_msec --
591  *
592  *       How long mock_server_receives_* functions wait for a client
593  *       request before giving up and returning NULL.
594  *
595  *--------------------------------------------------------------------------
596  */
597 
598 int64_t
mock_server_get_request_timeout_msec(mock_server_t * server)599 mock_server_get_request_timeout_msec (mock_server_t *server)
600 {
601    int64_t request_timeout_msec;
602 
603    mongoc_mutex_lock (&server->mutex);
604    request_timeout_msec = server->request_timeout_msec;
605    mongoc_mutex_unlock (&server->mutex);
606 
607    return request_timeout_msec;
608 }
609 
610 /*--------------------------------------------------------------------------
611  *
612  * mock_server_set_request_timeout_msec --
613  *
614  *       How long mock_server_receives_* functions wait for a client
615  *       request before giving up and returning NULL.
616  *
617  *--------------------------------------------------------------------------
618  */
619 
620 void
mock_server_set_request_timeout_msec(mock_server_t * server,int64_t request_timeout_msec)621 mock_server_set_request_timeout_msec (mock_server_t *server,
622                                       int64_t request_timeout_msec)
623 {
624    mongoc_mutex_lock (&server->mutex);
625    server->request_timeout_msec = request_timeout_msec;
626    mongoc_mutex_unlock (&server->mutex);
627 }
628 
629 
630 /*--------------------------------------------------------------------------
631  *
632  * mock_server_get_rand_delay --
633  *
634  *       Does the server delay a random duration before responding?
635  *
636  *--------------------------------------------------------------------------
637  */
638 
639 bool
mock_server_get_rand_delay(mock_server_t * server)640 mock_server_get_rand_delay (mock_server_t *server)
641 {
642    bool rand_delay;
643 
644    mongoc_mutex_lock (&server->mutex);
645    rand_delay = server->rand_delay;
646    mongoc_mutex_unlock (&server->mutex);
647 
648    return rand_delay;
649 }
650 
651 /*--------------------------------------------------------------------------
652  *
653  * mock_server_set_rand_delay --
654  *
655  *       Whether to delay a random duration before responding.
656  *
657  *--------------------------------------------------------------------------
658  */
659 
660 void
mock_server_set_rand_delay(mock_server_t * server,bool rand_delay)661 mock_server_set_rand_delay (mock_server_t *server, bool rand_delay)
662 {
663    mongoc_mutex_lock (&server->mutex);
664    server->rand_delay = rand_delay;
665    mongoc_mutex_unlock (&server->mutex);
666 }
667 
668 
669 /*--------------------------------------------------------------------------
670  *
671  * mock_server_get_uptime_sec --
672  *
673  *       How long since mock_server_run() was called.
674  *
675  *--------------------------------------------------------------------------
676  */
677 
678 double
mock_server_get_uptime_sec(mock_server_t * server)679 mock_server_get_uptime_sec (mock_server_t *server)
680 {
681    double uptime;
682 
683    mongoc_mutex_lock (&server->mutex);
684    uptime = (bson_get_monotonic_time () - server->start_time) / 1e6;
685    mongoc_mutex_unlock (&server->mutex);
686 
687    return uptime;
688 }
689 
690 
691 sync_queue_t *
mock_server_get_queue(mock_server_t * server)692 mock_server_get_queue (mock_server_t *server)
693 {
694    sync_queue_t *q;
695 
696    mongoc_mutex_lock (&server->mutex);
697    q = server->q;
698    mongoc_mutex_unlock (&server->mutex);
699 
700    return q;
701 }
702 
703 
704 request_t *
mock_server_receives_request(mock_server_t * server)705 mock_server_receives_request (mock_server_t *server)
706 {
707    sync_queue_t *q;
708    int64_t request_timeout_msec;
709 
710    q = mock_server_get_queue (server);
711    request_timeout_msec = mock_server_get_request_timeout_msec (server);
712    return (request_t *) q_get (q, request_timeout_msec);
713 }
714 
715 
716 /*--------------------------------------------------------------------------
717  *
718  * mock_server_receives_command --
719  *
720  *       Pop a client request if one is enqueued, or wait up to
721  *       request_timeout_ms for the client to send a request.
722  *
723  * Returns:
724  *       A request you must request_destroy, or NULL if the request does
725  *       not match.
726  *
727  * Side effects:
728  *       Logs if the current request is not a command matching
729  *       database_name, command_name, and command_json.
730  *
731  *--------------------------------------------------------------------------
732  */
733 
734 request_t *
mock_server_receives_command(mock_server_t * server,const char * database_name,mongoc_query_flags_t flags,const char * command_json,...)735 mock_server_receives_command (mock_server_t *server,
736                               const char *database_name,
737                               mongoc_query_flags_t flags,
738                               const char *command_json,
739                               ...)
740 {
741    va_list args;
742    char *formatted_command_json = NULL;
743    char *ns;
744    request_t *request;
745 
746    va_start (args, command_json);
747    if (command_json) {
748       formatted_command_json = bson_strdupv_printf (command_json, args);
749    }
750    va_end (args);
751 
752    ns = bson_strdup_printf ("%s.$cmd", database_name);
753 
754    request = mock_server_receives_request (server);
755 
756    if (request &&
757        !request_matches_query (
758           request, ns, flags, 0, 1, formatted_command_json, NULL, true)) {
759       request_destroy (request);
760       request = NULL;
761    }
762 
763    bson_free (formatted_command_json);
764    bson_free (ns);
765 
766    return request;
767 }
768 
769 
770 /*--------------------------------------------------------------------------
771  *
772  * mock_server_receives_ismaster --
773  *
774  *       Pop a client ismaster call if one is enqueued, or wait up to
775  *       request_timeout_ms for the client to send a request.
776  *
777  * Returns:
778  *       A request you must request_destroy, or NULL if the current
779  *       request is not an ismaster command.
780  *
781  * Side effects:
782  *       Logs if the current request is not an ismaster command.
783  *
784  *--------------------------------------------------------------------------
785  */
786 
787 request_t *
mock_server_receives_ismaster(mock_server_t * server)788 mock_server_receives_ismaster (mock_server_t *server)
789 {
790    return mock_server_receives_command (
791       server, "admin", MONGOC_QUERY_SLAVE_OK, "{'isMaster': 1}");
792 }
793 
794 
795 /*--------------------------------------------------------------------------
796  *
797  * mock_server_receives_gle --
798  *
799  *       Pop a client request if one is enqueued, or wait up to
800  *       request_timeout_ms for the client to send a request.
801  *
802  * Returns:
803  *       A request you must request_destroy, or NULL if the request does
804  *       not match.
805  *
806  * Side effects:
807  *       Logs if the current request is not getLastError.
808  *
809  *--------------------------------------------------------------------------
810  */
811 
812 request_t *
mock_server_receives_gle(mock_server_t * server,const char * database_name)813 mock_server_receives_gle (mock_server_t *server, const char *database_name)
814 {
815    return mock_server_receives_command (
816       server, database_name, MONGOC_QUERY_NONE, "{'getLastError': 1}");
817 }
818 
819 /*--------------------------------------------------------------------------
820  *
821  * mock_server_receives_query --
822  *
823  *       Pop a client request if one is enqueued, or wait up to
824  *       request_timeout_ms for the client to send a request.
825  *
826  * Returns:
827  *       A request you must request_destroy, or NULL if the request does
828  *       not match.
829  *
830  * Side effects:
831  *       Logs if the current request is not a query matching ns, flags,
832  *       skip, n_return, query_json, and fields_json.
833  *
834  *--------------------------------------------------------------------------
835  */
836 
837 request_t *
mock_server_receives_query(mock_server_t * server,const char * ns,mongoc_query_flags_t flags,uint32_t skip,int32_t n_return,const char * query_json,const char * fields_json)838 mock_server_receives_query (mock_server_t *server,
839                             const char *ns,
840                             mongoc_query_flags_t flags,
841                             uint32_t skip,
842                             int32_t n_return,
843                             const char *query_json,
844                             const char *fields_json)
845 {
846    request_t *request;
847 
848    request = mock_server_receives_request (server);
849 
850    if (request &&
851        !request_matches_query (
852           request, ns, flags, skip, n_return, query_json, fields_json, false)) {
853       request_destroy (request);
854       return NULL;
855    }
856 
857    return request;
858 }
859 
860 
861 /*--------------------------------------------------------------------------
862  *
863  * mock_server_receives_insert --
864  *
865  *       Pop a client request if one is enqueued, or wait up to
866  *       request_timeout_ms for the client to send a request.
867  *
868  * Returns:
869  *       A request you must request_destroy, or NULL if the request does
870  *       not match.
871  *
872  * Side effects:
873  *       Logs if the current request is not an insert matching ns, flags,
874  *       and doc_json.
875  *
876  *--------------------------------------------------------------------------
877  */
878 
879 request_t *
mock_server_receives_insert(mock_server_t * server,const char * ns,mongoc_insert_flags_t flags,const char * doc_json)880 mock_server_receives_insert (mock_server_t *server,
881                              const char *ns,
882                              mongoc_insert_flags_t flags,
883                              const char *doc_json)
884 {
885    request_t *request;
886 
887    request = mock_server_receives_request (server);
888 
889    if (request && !request_matches_insert (request, ns, flags, doc_json)) {
890       request_destroy (request);
891       return NULL;
892    }
893 
894    return request;
895 }
896 
897 /*--------------------------------------------------------------------------
898  *
899  * mock_server_receives_bulk_insert --
900  *
901  *       Pop a client request if one is enqueued, or wait up to
902  *       request_timeout_ms for the client to send a request.
903  *
904  * Returns:
905  *       A request you must request_destroy, or NULL if the request does
906  *       not match.
907  *
908  * Side effects:
909  *       Logs if the current request is not an insert matching ns and flags,
910  *       with "n" documents.
911  *
912  *--------------------------------------------------------------------------
913  */
914 
915 request_t *
mock_server_receives_bulk_insert(mock_server_t * server,const char * ns,mongoc_insert_flags_t flags,int n)916 mock_server_receives_bulk_insert (mock_server_t *server,
917                                   const char *ns,
918                                   mongoc_insert_flags_t flags,
919                                   int n)
920 {
921    request_t *request;
922 
923    request = mock_server_receives_request (server);
924 
925    if (request && !request_matches_bulk_insert (request, ns, flags, n)) {
926       request_destroy (request);
927       return NULL;
928    }
929 
930    return request;
931 }
932 
933 /*--------------------------------------------------------------------------
934  *
935  * mock_server_receives_update --
936  *
937  *       Pop a client request if one is enqueued, or wait up to
938  *       request_timeout_ms for the client to send a request.
939  *
940  * Returns:
941  *       A request you must request_destroy, or NULL if the request does
942  *       not match.
943  *
944  * Side effects:
945  *       Logs if the current request is not an update matching ns, flags,
946  *       selector_json, and update_json.
947  *
948  *--------------------------------------------------------------------------
949  */
950 
951 request_t *
mock_server_receives_update(mock_server_t * server,const char * ns,mongoc_update_flags_t flags,const char * selector_json,const char * update_json)952 mock_server_receives_update (mock_server_t *server,
953                              const char *ns,
954                              mongoc_update_flags_t flags,
955                              const char *selector_json,
956                              const char *update_json)
957 {
958    request_t *request;
959 
960    request = mock_server_receives_request (server);
961 
962    if (request &&
963        !request_matches_update (
964           request, ns, flags, selector_json, update_json)) {
965       request_destroy (request);
966       return NULL;
967    }
968 
969    return request;
970 }
971 
972 
973 /*--------------------------------------------------------------------------
974  *
975  * mock_server_receives_delete --
976  *
977  *       Pop a client request if one is enqueued, or wait up to
978  *       request_timeout_ms for the client to send a request.
979  *
980  * Returns:
981  *       A request you must request_destroy, or NULL if the request does
982  *       not match.
983  *
984  * Side effects:
985  *       Logs if the current request is not a delete matching ns, flags,
986  *       and selector_json.
987  *
988  *--------------------------------------------------------------------------
989  */
990 
991 request_t *
mock_server_receives_delete(mock_server_t * server,const char * ns,mongoc_remove_flags_t flags,const char * selector_json)992 mock_server_receives_delete (mock_server_t *server,
993                              const char *ns,
994                              mongoc_remove_flags_t flags,
995                              const char *selector_json)
996 {
997    request_t *request;
998 
999    request = mock_server_receives_request (server);
1000 
1001    if (request && !request_matches_delete (request, ns, flags, selector_json)) {
1002       request_destroy (request);
1003       return NULL;
1004    }
1005 
1006    return request;
1007 }
1008 
1009 
1010 /*--------------------------------------------------------------------------
1011  *
1012  * mock_server_receives_getmore --
1013  *
1014  *       Pop a client request if one is enqueued, or wait up to
1015  *       request_timeout_ms for the client to send a request.
1016  *
1017  * Returns:
1018  *       A request you must request_destroy, or NULL if the request does
1019  *       not match.
1020  *
1021  * Side effects:
1022  *       Logs if the current request is not a getmore matching n_return
1023  *       and cursor_id.
1024  *
1025  *--------------------------------------------------------------------------
1026  */
1027 
1028 request_t *
mock_server_receives_getmore(mock_server_t * server,const char * ns,int32_t n_return,int64_t cursor_id)1029 mock_server_receives_getmore (mock_server_t *server,
1030                               const char *ns,
1031                               int32_t n_return,
1032                               int64_t cursor_id)
1033 {
1034    request_t *request;
1035 
1036    request = mock_server_receives_request (server);
1037 
1038    if (request && !request_matches_getmore (request, ns, n_return, cursor_id)) {
1039       request_destroy (request);
1040       return NULL;
1041    }
1042 
1043    return request;
1044 }
1045 
1046 
1047 /*--------------------------------------------------------------------------
1048  *
1049  * mock_server_receives_kill_cursors --
1050  *
1051  *       Pop a client request if one is enqueued, or wait up to
1052  *       request_timeout_ms for the client to send a request.
1053  *
1054  *       Real-life OP_KILLCURSORS can take multiple ids, but that is
1055  *       not yet supported here.
1056  *
1057  * Returns:
1058  *       A request you must request_destroy, or NULL if the request
1059  *       does not match.
1060  *
1061  * Side effects:
1062  *       Logs if the current request is not an OP_KILLCURSORS with the
1063  *       expected cursor_id.
1064  *
1065  *--------------------------------------------------------------------------
1066  */
1067 
1068 request_t *
mock_server_receives_kill_cursors(mock_server_t * server,int64_t cursor_id)1069 mock_server_receives_kill_cursors (mock_server_t *server, int64_t cursor_id)
1070 {
1071    request_t *request;
1072 
1073    request = mock_server_receives_request (server);
1074 
1075    if (request && !request_matches_kill_cursors (request, cursor_id)) {
1076       request_destroy (request);
1077       return NULL;
1078    }
1079 
1080    return request;
1081 }
1082 
1083 /*--------------------------------------------------------------------------
1084  *
1085  * mock_server_hangs_up --
1086  *
1087  *       Hang up on a client request.
1088  *
1089  * Returns:
1090  *       None.
1091  *
1092  * Side effects:
1093  *       Causes a network error on the client side.
1094  *
1095  *--------------------------------------------------------------------------
1096  */
1097 
1098 void
mock_server_hangs_up(request_t * request)1099 mock_server_hangs_up (request_t *request)
1100 {
1101    test_suite_mock_server_log ("%5.2f  %hu <- %hu \thang up!",
1102                                mock_server_get_uptime_sec (request->server),
1103                                request->client_port,
1104                                request_get_server_port (request));
1105 
1106    mongoc_stream_close (request->client);
1107 }
1108 
1109 
1110 /*--------------------------------------------------------------------------
1111  *
1112  * mock_server_resets --
1113  *
1114  *       Forcefully reset a connection from the client.
1115  *
1116  * Returns:
1117  *       None.
1118  *
1119  * Side effects:
1120  *       Causes ECONNRESET on the client side.
1121  *
1122  *--------------------------------------------------------------------------
1123  */
1124 
1125 void
mock_server_resets(request_t * request)1126 mock_server_resets (request_t *request)
1127 {
1128    struct linger no_linger;
1129    no_linger.l_onoff = 1;
1130    no_linger.l_linger = 0;
1131 
1132    test_suite_mock_server_log ("%5.2f  %hu <- %hu \treset!",
1133                                mock_server_get_uptime_sec (request->server),
1134                                request->client_port,
1135                                request_get_server_port (request));
1136 
1137    /* send RST packet to client */
1138    mongoc_stream_setsockopt (
1139       request->client, SOL_SOCKET, SO_LINGER, &no_linger, sizeof no_linger);
1140 
1141    mongoc_stream_close (request->client);
1142 }
1143 
1144 
1145 /*--------------------------------------------------------------------------
1146  *
1147  * mock_server_replies --
1148  *
1149  *       Respond to a client request.
1150  *
1151  * Returns:
1152  *       None.
1153  *
1154  * Side effects:
1155  *       Sends an OP_REPLY to the client.
1156  *
1157  *--------------------------------------------------------------------------
1158  */
1159 
1160 void
mock_server_replies(request_t * request,mongoc_reply_flags_t flags,int64_t cursor_id,int32_t starting_from,int32_t number_returned,const char * docs_json)1161 mock_server_replies (request_t *request,
1162                      mongoc_reply_flags_t flags,
1163                      int64_t cursor_id,
1164                      int32_t starting_from,
1165                      int32_t number_returned,
1166                      const char *docs_json)
1167 {
1168    char *quotes_replaced;
1169    bson_t doc;
1170    bson_error_t error;
1171    bool r;
1172 
1173    BSON_ASSERT (request);
1174 
1175    if (docs_json) {
1176       quotes_replaced = single_quotes_to_double (docs_json);
1177       r = bson_init_from_json (&doc, quotes_replaced, -1, &error);
1178       bson_free (quotes_replaced);
1179    } else {
1180       r = bson_init_from_json (&doc, "{}", -1, &error);
1181    }
1182 
1183    if (!r) {
1184       MONGOC_WARNING ("%s", error.message);
1185       return;
1186    }
1187 
1188    mock_server_reply_multi (request, flags, &doc, 1, cursor_id);
1189    bson_destroy (&doc);
1190 }
1191 
1192 
1193 /*--------------------------------------------------------------------------
1194  *
1195  * mock_server_replies_simple --
1196  *
1197  *       Respond to a client request.
1198  *
1199  * Returns:
1200  *       None.
1201  *
1202  * Side effects:
1203  *       Sends an OP_REPLY to the client.
1204  *
1205  *--------------------------------------------------------------------------
1206  */
1207 
1208 void
mock_server_replies_simple(request_t * request,const char * docs_json)1209 mock_server_replies_simple (request_t *request, const char *docs_json)
1210 {
1211    mock_server_replies (request, MONGOC_REPLY_NONE, 0, 0, 1, docs_json);
1212 }
1213 
1214 
1215 /*--------------------------------------------------------------------------
1216  *
1217  * mock_server_replies_ok_and_destroys --
1218  *
1219  *       Respond to a client request.
1220  *
1221  * Returns:
1222  *       None.
1223  *
1224  * Side effects:
1225  *       Sends an OP_REPLY with "{ok: 1}" to the client.
1226  *
1227  *--------------------------------------------------------------------------
1228  */
1229 
1230 void
mock_server_replies_ok_and_destroys(request_t * request)1231 mock_server_replies_ok_and_destroys (request_t *request)
1232 {
1233    mock_server_replies (request, MONGOC_REPLY_NONE, 0, 0, 1, "{'ok': 1}");
1234    request_destroy (request);
1235 }
1236 
1237 
1238 /*--------------------------------------------------------------------------
1239  *
1240  * mock_server_replies_to_find --
1241  *
1242  *       Receive an OP_QUERY or "find" command and reply appropriately.
1243  *
1244  * Returns:
1245  *       None.
1246  *
1247  * Side effects:
1248  *       Very roughly validates the query or "find" command or aborts.
1249  *       The intent is not to test the driver's query or find command
1250  *       implementation here, see _test_kill_cursors for example use.
1251  *
1252  *--------------------------------------------------------------------------
1253  */
1254 
1255 void
mock_server_replies_to_find(request_t * request,mongoc_query_flags_t flags,int64_t cursor_id,int32_t number_returned,const char * ns,const char * reply_json,bool is_command)1256 mock_server_replies_to_find (request_t *request,
1257                              mongoc_query_flags_t flags,
1258                              int64_t cursor_id,
1259                              int32_t number_returned,
1260                              const char *ns,
1261                              const char *reply_json,
1262                              bool is_command)
1263 {
1264    char *find_reply;
1265    char db[MONGOC_NAMESPACE_MAX];
1266 
1267    _mongoc_get_db_name (ns, db);
1268 
1269    /* minimal validation, we're not testing query / find cmd here */
1270    if (request->is_command && !is_command) {
1271       test_error ("expected query, got command");
1272       abort ();
1273    }
1274 
1275    if (!request->is_command && is_command) {
1276       test_error ("expected command, got query");
1277       abort ();
1278    }
1279 
1280    if (!request_matches_flags (request, flags)) {
1281       abort ();
1282    }
1283 
1284    if (is_command) {
1285       find_reply =
1286          bson_strdup_printf ("{'ok': 1,"
1287                              " 'cursor': {"
1288                              "    'id': {'$numberLong': '%" PRId64 "'},"
1289                              "    'ns': '%s',"
1290                              "    'firstBatch': [%s]}}",
1291                              cursor_id,
1292                              ns,
1293                              reply_json);
1294 
1295       mock_server_replies_simple (request, find_reply);
1296       bson_free (find_reply);
1297    } else {
1298       mock_server_replies (
1299          request, MONGOC_REPLY_NONE, cursor_id, 0, number_returned, reply_json);
1300    }
1301 }
1302 
1303 
1304 /*--------------------------------------------------------------------------
1305  *
1306  * mock_server_destroy --
1307  *
1308  *       Free a mock_server_t.
1309  *
1310  * Returns:
1311  *       None.
1312  *
1313  * Side effects:
1314  *       Closes sockets, joins threads, and calls destructors passed
1315  *       to mock_server_autoresponds.
1316  *
1317  *--------------------------------------------------------------------------
1318  */
1319 
1320 void
mock_server_destroy(mock_server_t * server)1321 mock_server_destroy (mock_server_t *server)
1322 {
1323    size_t i;
1324    autoresponder_handle_t *handle;
1325    int64_t deadline = bson_get_monotonic_time () + 10 * 1000 * 1000;
1326    request_t *request;
1327 
1328    mongoc_mutex_lock (&server->mutex);
1329    if (server->running) {
1330       server->stopped = true;
1331    }
1332    mongoc_mutex_unlock (&server->mutex);
1333 
1334    while (bson_get_monotonic_time () <= deadline) {
1335       /* wait 10 seconds */
1336       mongoc_mutex_lock (&server->mutex);
1337       if (!server->running) {
1338          mongoc_mutex_unlock (&server->mutex);
1339          break;
1340       }
1341 
1342       mongoc_mutex_unlock (&server->mutex);
1343       _mongoc_usleep (1000);
1344    }
1345 
1346    mongoc_mutex_lock (&server->mutex);
1347    if (server->running) {
1348       fprintf (stderr, "server still running after timeout\n");
1349       fflush (stderr);
1350       abort ();
1351    }
1352 
1353    mongoc_mutex_unlock (&server->mutex);
1354    mongoc_thread_join (server->main_thread);
1355 
1356    _mongoc_array_destroy (&server->worker_threads);
1357 
1358    for (i = 0; i < server->autoresponders.len; i++) {
1359       handle = &_mongoc_array_index (
1360          &server->autoresponders, autoresponder_handle_t, i);
1361 
1362       autoresponder_handle_destroy (handle);
1363    }
1364 
1365    _mongoc_array_destroy (&server->autoresponders);
1366 
1367    mongoc_cond_destroy (&server->cond);
1368    mongoc_mutex_destroy (&server->mutex);
1369    mongoc_socket_destroy (server->sock);
1370    bson_free (server->uri_str);
1371    mongoc_uri_destroy (server->uri);
1372 
1373    while ((request = (request_t *) q_get_nowait (server->q))) {
1374       request_destroy (request);
1375    }
1376 
1377    q_destroy (server->q);
1378    bson_free (server);
1379 }
1380 
1381 
1382 static uint16_t
get_port(mongoc_socket_t * sock)1383 get_port (mongoc_socket_t *sock)
1384 {
1385    struct sockaddr_in bound_addr = {0};
1386    mongoc_socklen_t addr_len = (mongoc_socklen_t) sizeof bound_addr;
1387 
1388    if (mongoc_socket_getsockname (
1389           sock, (struct sockaddr *) &bound_addr, &addr_len) < 0) {
1390       perror ("Failed to get listening port number");
1391       return 0;
1392    }
1393 
1394    return ntohs (bound_addr.sin_port);
1395 }
1396 
1397 
1398 static bool
_mock_server_stopping(mock_server_t * server)1399 _mock_server_stopping (mock_server_t *server)
1400 {
1401    bool stopped;
1402 
1403    mongoc_mutex_lock (&server->mutex);
1404    stopped = server->stopped;
1405    mongoc_mutex_unlock (&server->mutex);
1406 
1407    return stopped;
1408 }
1409 
1410 
1411 typedef struct worker_closure_t {
1412    mock_server_t *server;
1413    mongoc_stream_t *client_stream;
1414    uint16_t port;
1415 } worker_closure_t;
1416 
1417 
1418 static void *
main_thread(void * data)1419 main_thread (void *data)
1420 {
1421    mock_server_t *server = (mock_server_t *) data;
1422    mongoc_socket_t *client_sock;
1423    uint16_t port;
1424    mongoc_stream_t *client_stream;
1425    worker_closure_t *closure;
1426    mongoc_thread_t thread;
1427    mongoc_array_t worker_threads;
1428    size_t i;
1429 
1430    mongoc_mutex_lock (&server->mutex);
1431    server->running = true;
1432    mongoc_cond_signal (&server->cond);
1433    mongoc_mutex_unlock (&server->mutex);
1434 
1435    for (;;) {
1436       client_sock = mongoc_socket_accept_ex (
1437          server->sock, bson_get_monotonic_time () + 100 * 1000, &port);
1438 
1439       if (_mock_server_stopping (server)) {
1440          break;
1441       }
1442 
1443       if (client_sock) {
1444          test_suite_mock_server_log (
1445             "%5.2f  %hu -> server port %hu (connected)",
1446             mock_server_get_uptime_sec (server),
1447             port,
1448             server->port);
1449 
1450          client_stream = mongoc_stream_socket_new (client_sock);
1451 
1452 #ifdef MONGOC_ENABLE_SSL
1453          mongoc_mutex_lock (&server->mutex);
1454          if (server->ssl) {
1455             server->ssl_opts.weak_cert_validation = 1;
1456             client_stream = mongoc_stream_tls_new_with_hostname (
1457                client_stream, NULL, &server->ssl_opts, 0);
1458             if (!client_stream) {
1459                mongoc_mutex_unlock (&server->mutex);
1460                perror ("Failed to attach tls stream");
1461                break;
1462             }
1463          }
1464          mongoc_mutex_unlock (&server->mutex);
1465 #endif
1466          closure = (worker_closure_t *) bson_malloc (sizeof *closure);
1467          closure->server = server;
1468          closure->client_stream = client_stream;
1469          closure->port = port;
1470 
1471          mongoc_mutex_lock (&server->mutex);
1472          mongoc_thread_create (&thread, worker_thread, closure);
1473          _mongoc_array_append_val (&server->worker_threads, thread);
1474          mongoc_mutex_unlock (&server->mutex);
1475       }
1476    }
1477 
1478    /* copy list of worker threads and join them all */
1479    _mongoc_array_init (&worker_threads, sizeof (mongoc_thread_t));
1480    mongoc_mutex_lock (&server->mutex);
1481    _mongoc_array_copy (&worker_threads, &server->worker_threads);
1482    mongoc_mutex_unlock (&server->mutex);
1483 
1484    for (i = 0; i < worker_threads.len; i++) {
1485       mongoc_thread_join (
1486          _mongoc_array_index (&worker_threads, mongoc_thread_t, i));
1487    }
1488 
1489    _mongoc_array_destroy (&worker_threads);
1490 
1491    mongoc_mutex_lock (&server->mutex);
1492    server->running = false;
1493    mongoc_mutex_unlock (&server->mutex);
1494 
1495    return NULL;
1496 }
1497 
1498 
1499 static void
_reply_destroy(reply_t * reply)1500 _reply_destroy (reply_t *reply)
1501 {
1502    int i;
1503 
1504    for (i = 0; i < reply->n_docs; i++) {
1505       bson_destroy (&reply->docs[i]);
1506    }
1507 
1508    bson_free (reply->docs);
1509    bson_free (reply);
1510 }
1511 
1512 
1513 static void *
worker_thread(void * data)1514 worker_thread (void *data)
1515 {
1516    worker_closure_t *closure = (worker_closure_t *) data;
1517    mock_server_t *server = closure->server;
1518    mongoc_stream_t *client_stream = closure->client_stream;
1519    mongoc_buffer_t buffer;
1520    mongoc_rpc_t *rpc = NULL;
1521    bool handled;
1522    bson_error_t error;
1523    int32_t msg_len;
1524    sync_queue_t *requests;
1525    sync_queue_t *replies;
1526    request_t *request;
1527    mongoc_array_t autoresponders;
1528    ssize_t i;
1529    autoresponder_handle_t handle;
1530    reply_t *reply;
1531 
1532 #ifdef MONGOC_ENABLE_SSL
1533    bool ssl;
1534 #endif
1535 
1536    ENTRY;
1537 
1538    /* queue of client replies sent over this worker's connection */
1539    replies = q_new ();
1540 
1541 #ifdef MONGOC_ENABLE_SSL
1542    mongoc_mutex_lock (&server->mutex);
1543    ssl = server->ssl;
1544    mongoc_mutex_unlock (&server->mutex);
1545 
1546    if (ssl) {
1547       if (!mongoc_stream_tls_handshake_block (
1548              client_stream, "localhost", TIMEOUT, &error)) {
1549          mongoc_stream_close (client_stream);
1550          mongoc_stream_destroy (client_stream);
1551          bson_free (closure);
1552          q_destroy (replies);
1553          RETURN (NULL);
1554       }
1555    }
1556 #endif
1557 
1558    _mongoc_buffer_init (&buffer, NULL, 0, NULL, NULL);
1559    _mongoc_array_init (&autoresponders, sizeof (autoresponder_handle_t));
1560 
1561 again:
1562    /* loop, checking for requests to receive or replies to send */
1563    bson_free (rpc);
1564    rpc = NULL;
1565 
1566    if (_mongoc_buffer_fill (&buffer, client_stream, 4, 10, &error) > 0) {
1567       BSON_ASSERT (buffer.len >= 4);
1568 
1569       memcpy (&msg_len, buffer.data + buffer.off, 4);
1570       msg_len = BSON_UINT32_FROM_LE (msg_len);
1571 
1572       if (msg_len < 16) {
1573          MONGOC_WARNING ("No data");
1574          GOTO (failure);
1575       }
1576 
1577       if (_mongoc_buffer_fill (
1578              &buffer, client_stream, (size_t) msg_len, -1, &error) == -1) {
1579          MONGOC_WARNING ("%s():%d: %s", BSON_FUNC, __LINE__, error.message);
1580          GOTO (failure);
1581       }
1582 
1583       BSON_ASSERT (buffer.len >= (unsigned) msg_len);
1584 
1585       /* copies message from buffer */
1586       request = request_new (
1587          &buffer, msg_len, server, client_stream, closure->port, replies);
1588 
1589       memmove (
1590          buffer.data, buffer.data + buffer.off + msg_len, buffer.len - msg_len);
1591       buffer.off = 0;
1592       buffer.len -= msg_len;
1593 
1594       mongoc_mutex_lock (&server->mutex);
1595       _mongoc_array_copy (&autoresponders, &server->autoresponders);
1596       mongoc_mutex_unlock (&server->mutex);
1597 
1598       test_suite_mock_server_log ("%5.2f  %hu -> %hu %s",
1599                                   mock_server_get_uptime_sec (server),
1600                                   closure->port,
1601                                   server->port,
1602                                   request->as_str);
1603 
1604       /* run responders most-recently-added-first */
1605       handled = false;
1606 
1607       for (i = server->autoresponders.len - 1; i >= 0; i--) {
1608          handle = _mongoc_array_index (
1609             &server->autoresponders, autoresponder_handle_t, i);
1610 
1611          if (handle.responder (request, handle.data)) {
1612             /* responder destroyed request and enqueued a reply in "replies" */
1613             handled = true;
1614             request = NULL;
1615             break;
1616          }
1617       }
1618 
1619       if (!handled) {
1620          /* pass to the main thread via the queue */
1621          requests = mock_server_get_queue (server);
1622          q_put (requests, (void *) request);
1623       }
1624    }
1625 
1626    if (_mock_server_stopping (server)) {
1627       GOTO (failure);
1628    }
1629 
1630    reply = q_get (replies, 10);
1631    if (reply) {
1632       _mock_server_reply_with_stream (server, reply, client_stream);
1633       _reply_destroy (reply);
1634    }
1635 
1636    if (_mock_server_stopping (server)) {
1637       GOTO (failure);
1638    }
1639 
1640    GOTO (again);
1641 
1642 failure:
1643    _mongoc_array_destroy (&autoresponders);
1644    _mongoc_buffer_destroy (&buffer);
1645 
1646    mongoc_stream_close (client_stream);
1647    mongoc_stream_destroy (client_stream);
1648    bson_free (rpc);
1649    bson_free (closure);
1650    _mongoc_buffer_destroy (&buffer);
1651 
1652    while ((reply = q_get_nowait (replies))) {
1653       _reply_destroy (reply);
1654    }
1655 
1656    q_destroy (replies);
1657 
1658    RETURN (NULL);
1659 }
1660 
1661 
1662 /* enqueue server reply for this connection's worker thread to send to client */
1663 void
mock_server_reply_multi(request_t * request,mongoc_reply_flags_t flags,const bson_t * docs,int n_docs,int64_t cursor_id)1664 mock_server_reply_multi (request_t *request,
1665                          mongoc_reply_flags_t flags,
1666                          const bson_t *docs,
1667                          int n_docs,
1668                          int64_t cursor_id)
1669 {
1670    reply_t *reply;
1671    int i;
1672 
1673    BSON_ASSERT (request);
1674 
1675    reply = bson_malloc0 (sizeof (reply_t));
1676 
1677    reply->flags = flags;
1678    reply->n_docs = n_docs;
1679    reply->docs = bson_malloc0 (n_docs * sizeof (bson_t));
1680 
1681    for (i = 0; i < n_docs; i++) {
1682       bson_copy_to (&docs[i], &reply->docs[i]);
1683    }
1684 
1685    reply->cursor_id = cursor_id;
1686    reply->client_port = request_get_client_port (request);
1687    reply->request_opcode = (mongoc_opcode_t) request->request_rpc.header.opcode;
1688    reply->query_flags = (mongoc_query_flags_t) request->request_rpc.query.flags;
1689    reply->response_to = request->request_rpc.header.request_id;
1690 
1691    q_put (request->replies, reply);
1692 }
1693 
1694 
1695 static void
_mock_server_reply_with_stream(mock_server_t * server,reply_t * reply,mongoc_stream_t * client)1696 _mock_server_reply_with_stream (mock_server_t *server,
1697                                 reply_t *reply,
1698                                 mongoc_stream_t *client)
1699 {
1700    char *doc_json;
1701    bson_string_t *docs_json;
1702    mongoc_iovec_t *iov;
1703    mongoc_array_t ar;
1704    mongoc_rpc_t r = {{0}};
1705    size_t expected = 0;
1706    ssize_t n_written;
1707    int iovcnt;
1708    int i;
1709    uint8_t *buf;
1710    uint8_t *ptr;
1711    size_t len;
1712 
1713    mongoc_reply_flags_t flags = reply->flags;
1714    const bson_t *docs = reply->docs;
1715    int n_docs = reply->n_docs;
1716    int64_t cursor_id = reply->cursor_id;
1717 
1718    docs_json = bson_string_new ("");
1719    for (i = 0; i < n_docs; i++) {
1720       doc_json = bson_as_json (&docs[i], NULL);
1721       bson_string_append (docs_json, doc_json);
1722       bson_free (doc_json);
1723       if (i < n_docs - 1) {
1724          bson_string_append (docs_json, ", ");
1725       }
1726    }
1727 
1728    test_suite_mock_server_log ("%5.2f  %hu <- %hu \t%s",
1729                                mock_server_get_uptime_sec (server),
1730                                reply->client_port,
1731                                mock_server_get_port (server),
1732                                docs_json->str);
1733 
1734    len = 0;
1735 
1736    for (i = 0; i < n_docs; i++) {
1737       len += docs[i].len;
1738    }
1739 
1740    ptr = buf = bson_malloc (len);
1741 
1742    for (i = 0; i < n_docs; i++) {
1743       memcpy (ptr, bson_get_data (&docs[i]), docs[i].len);
1744       ptr += docs[i].len;
1745    }
1746 
1747    _mongoc_array_init (&ar, sizeof (mongoc_iovec_t));
1748 
1749    mongoc_mutex_lock (&server->mutex);
1750 
1751    if (!(reply->request_opcode == MONGOC_OPCODE_QUERY &&
1752          reply->query_flags & MONGOC_QUERY_EXHAUST)) {
1753       server->last_response_id++;
1754    }
1755 
1756    r.header.request_id = server->last_response_id;
1757    mongoc_mutex_unlock (&server->mutex);
1758    r.header.msg_len = 0;
1759    r.header.response_to = reply->response_to;
1760    r.header.opcode = MONGOC_OPCODE_REPLY;
1761    r.reply.flags = flags;
1762    r.reply.cursor_id = cursor_id;
1763    r.reply.start_from = 0;
1764    r.reply.n_returned = 1;
1765    r.reply.documents = buf;
1766    r.reply.documents_len = (uint32_t) len;
1767 
1768    _mongoc_rpc_gather (&r, &ar);
1769    _mongoc_rpc_swab_to_le (&r);
1770 
1771    iov = (mongoc_iovec_t *) ar.data;
1772    iovcnt = (int) ar.len;
1773 
1774    for (i = 0; i < iovcnt; i++) {
1775       expected += iov[i].iov_len;
1776    }
1777 
1778    n_written = mongoc_stream_writev (client, iov, (size_t) iovcnt, -1);
1779 
1780    BSON_ASSERT (n_written == expected);
1781 
1782    bson_string_free (docs_json, true);
1783    _mongoc_array_destroy (&ar);
1784    bson_free (buf);
1785 }
1786 
1787 
1788 void
autoresponder_handle_destroy(autoresponder_handle_t * handle)1789 autoresponder_handle_destroy (autoresponder_handle_t *handle)
1790 {
1791    if (handle->destructor) {
1792       handle->destructor (handle->data);
1793    }
1794 }
1795