1 #include <mongoc.h>
2 #include <mongoc-collection-private.h>
3 #include <mongoc-apm-private.h>
4 #include <mongoc-host-list-private.h>
5 #include <mongoc-cursor-private.h>
6 #include <mongoc-bulk-operation-private.h>
7 #include <mongoc-client-private.h>
8 
9 #include "json-test.h"
10 #include "test-libmongoc.h"
11 #include "mock_server/mock-server.h"
12 #include "mock_server/future.h"
13 #include "mock_server/future-functions.h"
14 
15 
16 typedef struct {
17    uint32_t n_events;
18    bson_t events;
19    mongoc_uri_t *test_framework_uri;
20    int64_t cursor_id;
21    int64_t operation_id;
22    bool verbose;
23 } context_t;
24 
25 
26 static void
context_init(context_t * context)27 context_init (context_t *context)
28 {
29    context->n_events = 0;
30    bson_init (&context->events);
31    context->test_framework_uri = test_framework_get_uri ();
32    context->cursor_id = 0;
33    context->operation_id = 0;
34    context->verbose =
35       test_framework_getenv_bool ("MONGOC_TEST_MONITORING_VERBOSE");
36 }
37 
38 
39 static void
context_destroy(context_t * context)40 context_destroy (context_t *context)
41 {
42    bson_destroy (&context->events);
43    mongoc_uri_destroy (context->test_framework_uri);
44 }
45 
46 
47 static int
check_server_version(const bson_t * test,context_t * context)48 check_server_version (const bson_t *test, context_t *context)
49 {
50    const char *s;
51    char *padded;
52    server_version_t test_version, server_version;
53    bool r;
54 
55    if (bson_has_field (test, "ignore_if_server_version_greater_than")) {
56       s = bson_lookup_utf8 (test, "ignore_if_server_version_greater_than");
57       /* s is like "3.0", don't skip if server is 3.0.x but skip 3.1+ */
58       padded = bson_strdup_printf ("%s.99", s);
59       test_version = test_framework_str_to_version (padded);
60       bson_free (padded);
61       server_version = test_framework_get_server_version ();
62       r = server_version <= test_version;
63 
64       if (!r && context->verbose) {
65          printf ("      SKIP, Server version > %s\n", s);
66          fflush (stdout);
67       }
68    } else if (bson_has_field (test, "ignore_if_server_version_less_than")) {
69       s = bson_lookup_utf8 (test, "ignore_if_server_version_less_than");
70       test_version = test_framework_str_to_version (s);
71       server_version = test_framework_get_server_version ();
72       r = server_version >= test_version;
73 
74       if (!r && context->verbose) {
75          printf ("      SKIP, Server version < %s\n", s);
76          fflush (stdout);
77       }
78    } else {
79       /* server version is ok, don't skip the test */
80       return true;
81    }
82 
83    return r;
84 }
85 
86 
87 static bool
check_topology_type(const bson_t * test)88 check_topology_type (const bson_t *test)
89 {
90    bson_iter_t iter;
91    bson_iter_t child;
92    bool is_mongos;
93    const char *s;
94 
95    /* so far this field can only contain an array like ["sharded"],
96     * see https://github.com/mongodb/specifications/pull/75
97     */
98    if (bson_iter_init_find (&iter, test, "ignore_if_topology_type")) {
99       ASSERT (BSON_ITER_HOLDS_ARRAY (&iter));
100       ASSERT (bson_iter_recurse (&iter, &child));
101 
102       is_mongos = test_framework_is_mongos ();
103 
104       while (bson_iter_next (&child)) {
105          if (BSON_ITER_HOLDS_UTF8 (&child)) {
106             s = bson_iter_utf8 (&child, NULL);
107 
108             /* skip test if topology type is sharded */
109             if (!strcmp (s, "sharded") && is_mongos) {
110                return false;
111             }
112          }
113       }
114    }
115 
116    return true;
117 }
118 
119 
120 static void
insert_data(mongoc_collection_t * collection,const bson_t * test)121 insert_data (mongoc_collection_t *collection, const bson_t *test)
122 {
123    mongoc_bulk_operation_t *bulk;
124    bson_iter_t iter;
125    bson_iter_t array_iter;
126    bson_t doc;
127    uint32_t r;
128    bson_error_t error;
129 
130    if (!mongoc_collection_drop (collection, &error)) {
131       if (strcmp (error.message, "ns not found")) {
132          /* an error besides ns not found */
133          ASSERT_OR_PRINT (false, error);
134       }
135    }
136 
137    bulk = mongoc_collection_create_bulk_operation (collection, true, NULL);
138 
139    BSON_ASSERT (bson_iter_init_find (&iter, test, "data"));
140    BSON_ASSERT (BSON_ITER_HOLDS_ARRAY (&iter));
141    bson_iter_recurse (&iter, &array_iter);
142 
143    while (bson_iter_next (&array_iter)) {
144       BSON_ASSERT (BSON_ITER_HOLDS_DOCUMENT (&array_iter));
145       bson_iter_bson (&array_iter, &doc);
146       mongoc_bulk_operation_insert (bulk, &doc);
147       bson_destroy (&doc);
148    }
149 
150    r = mongoc_bulk_operation_execute (bulk, NULL, &error);
151    ASSERT_OR_PRINT (r, error);
152 
153    mongoc_bulk_operation_destroy (bulk);
154 }
155 
156 
157 static void
assert_host_in_uri(const mongoc_host_list_t * host,const mongoc_uri_t * uri)158 assert_host_in_uri (const mongoc_host_list_t *host, const mongoc_uri_t *uri)
159 {
160    const mongoc_host_list_t *hosts;
161 
162    hosts = mongoc_uri_get_hosts (uri);
163    while (hosts) {
164       if (_mongoc_host_list_equal (hosts, host)) {
165          return;
166       }
167 
168       hosts = hosts->next;
169    }
170 
171    fprintf (stderr,
172             "Host \"%s\" not in \"%s\"",
173             host->host_and_port,
174             mongoc_uri_get_string (uri));
175    fflush (stderr);
176    abort ();
177 }
178 
179 
180 static bool
ends_with(const char * s,const char * suffix)181 ends_with (const char *s, const char *suffix)
182 {
183    size_t s_len;
184    size_t suffix_len;
185 
186    if (!s) {
187       return false;
188    }
189 
190    s_len = strlen (s);
191    suffix_len = strlen (suffix);
192    return s_len >= suffix_len && !strcmp (s + s_len - suffix_len, suffix);
193 }
194 
195 
196 static int64_t
fake_cursor_id(const bson_iter_t * iter)197 fake_cursor_id (const bson_iter_t *iter)
198 {
199    return bson_iter_as_int64 (iter) ? 42 : 0;
200 }
201 
202 /* Convert "ok" values to doubles, cursor ids and error codes to 42, and
203  * error messages to "". See README at
204  * github.com/mongodb/specifications/tree/master/source/command-monitoring/tests
205  */
206 static void
convert_command_for_test(context_t * context,const bson_t * src,bson_t * dst,const char * path)207 convert_command_for_test (context_t *context,
208                           const bson_t *src,
209                           bson_t *dst,
210                           const char *path)
211 {
212    bson_iter_t iter;
213    const char *key;
214    const char *errmsg;
215    bson_t src_child;
216    bson_t dst_child;
217    char *child_path;
218 
219    bson_iter_init (&iter, src);
220 
221    while (bson_iter_next (&iter)) {
222       key = bson_iter_key (&iter);
223 
224       if (!strcmp (key, "ok")) {
225          /* "The server is inconsistent on whether the ok values returned are
226           * integers or doubles so for simplicity the tests specify all expected
227           * values as doubles. Server 'ok' values of integers MUST be converted
228           * to doubles for comparison with the expected values."
229           */
230          BSON_APPEND_DOUBLE (dst, key, (double) bson_iter_as_int64 (&iter));
231 
232       } else if (!strcmp (key, "errmsg")) {
233          /* "errmsg values of "" MUST BSON_ASSERT that the value is not empty"
234           */
235          errmsg = bson_iter_utf8 (&iter, NULL);
236          ASSERT_CMPSIZE_T (strlen (errmsg), >, (size_t) 0);
237          BSON_APPEND_UTF8 (dst, key, "");
238 
239       } else if (!strcmp (key, "id") && ends_with (path, "cursor")) {
240          /* "When encountering a cursor or getMore value of "42" in a test, the
241           * driver MUST BSON_ASSERT that the values are equal to each other and
242           * greater than zero."
243           */
244          if (context->cursor_id == 0) {
245             context->cursor_id = bson_iter_int64 (&iter);
246          } else if (bson_iter_int64 (&iter) != 0) {
247             ASSERT_CMPINT64 (context->cursor_id, ==, bson_iter_int64 (&iter));
248          }
249 
250          /* replace the reply's cursor id with 42 or 0 - check_expectations()
251           * will BSON_ASSERT it matches the value from the JSON test */
252          BSON_APPEND_INT64 (dst, key, fake_cursor_id (&iter));
253       } else if (ends_with (path, "cursors") ||
254                  ends_with (path, "cursorsUnknown")) {
255          /* payload of a killCursors command-started event:
256           *    {killCursors: "test", cursors: [12345]}
257           * or killCursors command-succeeded event:
258           *    {ok: 1, cursorsUnknown: [12345]}
259           * */
260          ASSERT_CMPINT64 (bson_iter_as_int64 (&iter), >, (int64_t) 0);
261          BSON_APPEND_INT64 (dst, key, 42);
262 
263       } else if (!strcmp (key, "getMore")) {
264          ASSERT_CMPINT64 (context->cursor_id, ==, bson_iter_int64 (&iter));
265          BSON_APPEND_INT64 (dst, key, fake_cursor_id (&iter));
266 
267       } else if (!strcmp (key, "code")) {
268          /* "code values of 42 MUST BSON_ASSERT that the value is present and
269           * greater
270           * than zero" */
271          ASSERT_CMPINT64 (bson_iter_as_int64 (&iter), >, (int64_t) 0);
272          BSON_APPEND_INT32 (dst, key, 42);
273 
274       } else if (BSON_ITER_HOLDS_DOCUMENT (&iter)) {
275          if (path) {
276             child_path = bson_strdup_printf ("%s.%s", path, key);
277          } else {
278             child_path = bson_strdup (key);
279          }
280 
281          bson_iter_bson (&iter, &src_child);
282          bson_append_document_begin (dst, key, -1, &dst_child);
283          convert_command_for_test (
284             context, &src_child, &dst_child, child_path); /* recurse */
285          bson_append_document_end (dst, &dst_child);
286          bson_free (child_path);
287       } else if (BSON_ITER_HOLDS_ARRAY (&iter)) {
288          if (path) {
289             child_path = bson_strdup_printf ("%s.%s", path, key);
290          } else {
291             child_path = bson_strdup (key);
292          }
293 
294          bson_iter_bson (&iter, &src_child);
295          bson_append_array_begin (dst, key, -1, &dst_child);
296          convert_command_for_test (
297             context, &src_child, &dst_child, child_path); /* recurse */
298          bson_append_array_end (dst, &dst_child);
299          bson_free (child_path);
300       } else {
301          bson_append_value (dst, key, -1, bson_iter_value (&iter));
302       }
303    }
304 }
305 
306 
307 static void
started_cb(const mongoc_apm_command_started_t * event)308 started_cb (const mongoc_apm_command_started_t *event)
309 {
310    context_t *context =
311       (context_t *) mongoc_apm_command_started_get_context (event);
312    int64_t operation_id;
313    char *cmd_json;
314    bson_t *events = &context->events;
315    bson_t cmd = BSON_INITIALIZER;
316    char str[16];
317    const char *key;
318    bson_t *new_event;
319 
320    if (context->verbose) {
321       cmd_json = bson_as_canonical_extended_json (event->command, NULL);
322       printf ("%s\n", cmd_json);
323       fflush (stdout);
324       bson_free (cmd_json);
325    }
326 
327    BSON_ASSERT (mongoc_apm_command_started_get_request_id (event) > 0);
328    BSON_ASSERT (mongoc_apm_command_started_get_server_id (event) > 0);
329    assert_host_in_uri (event->host, context->test_framework_uri);
330 
331    /* subsequent events share the first event's operation id */
332    operation_id = mongoc_apm_command_started_get_operation_id (event);
333    ASSERT_CMPINT64 (operation_id, !=, (int64_t) 0);
334    if (!context->operation_id) {
335       context->operation_id = operation_id;
336    } else {
337       ASSERT_CMPINT64 (context->operation_id, ==, operation_id);
338    }
339 
340    convert_command_for_test (context, event->command, &cmd, NULL);
341    new_event = BCON_NEW ("command_started_event",
342                          "{",
343                          "command",
344                          BCON_DOCUMENT (&cmd),
345                          "command_name",
346                          BCON_UTF8 (event->command_name),
347                          "database_name",
348                          BCON_UTF8 (event->database_name),
349                          "}");
350 
351    bson_uint32_to_string (context->n_events, &key, str, sizeof str);
352    BSON_APPEND_DOCUMENT (events, key, new_event);
353 
354    context->n_events++;
355 
356    bson_destroy (new_event);
357    bson_destroy (&cmd);
358 }
359 
360 
361 static void
succeeded_cb(const mongoc_apm_command_succeeded_t * event)362 succeeded_cb (const mongoc_apm_command_succeeded_t *event)
363 {
364    context_t *context =
365       (context_t *) mongoc_apm_command_succeeded_get_context (event);
366    int64_t operation_id;
367    char *reply_json;
368    bson_t reply = BSON_INITIALIZER;
369    char str[16];
370    const char *key;
371    bson_t *new_event;
372 
373    if (context->verbose) {
374       reply_json = bson_as_canonical_extended_json (event->reply, NULL);
375       printf ("\t\t<-- %s\n", reply_json);
376       fflush (stdout);
377       bson_free (reply_json);
378    }
379 
380    BSON_ASSERT (mongoc_apm_command_succeeded_get_request_id (event) > 0);
381    BSON_ASSERT (mongoc_apm_command_succeeded_get_server_id (event) > 0);
382    assert_host_in_uri (event->host, context->test_framework_uri);
383 
384    /* subsequent events share the first event's operation id */
385    operation_id = mongoc_apm_command_succeeded_get_operation_id (event);
386    ASSERT_CMPINT64 (operation_id, !=, (int64_t) 0);
387    ASSERT_CMPINT64 (context->operation_id, ==, operation_id);
388 
389    convert_command_for_test (context, event->reply, &reply, NULL);
390    new_event = BCON_NEW ("command_succeeded_event",
391                          "{",
392                          "reply",
393                          BCON_DOCUMENT (&reply),
394                          "command_name",
395                          BCON_UTF8 (event->command_name),
396                          "}");
397 
398    bson_uint32_to_string (context->n_events, &key, str, sizeof str);
399    BSON_APPEND_DOCUMENT (&context->events, key, new_event);
400 
401    context->n_events++;
402 
403    bson_destroy (new_event);
404    bson_destroy (&reply);
405 }
406 
407 
408 static void
failed_cb(const mongoc_apm_command_failed_t * event)409 failed_cb (const mongoc_apm_command_failed_t *event)
410 {
411    context_t *context =
412       (context_t *) mongoc_apm_command_failed_get_context (event);
413    int64_t operation_id;
414    bson_t reply = BSON_INITIALIZER;
415    char str[16];
416    const char *key;
417    bson_t *new_event;
418 
419    if (context->verbose) {
420       printf (
421          "\t\t<-- %s FAILED: %s\n", event->command_name, event->error->message);
422       fflush (stdout);
423    }
424 
425    BSON_ASSERT (mongoc_apm_command_failed_get_request_id (event) > 0);
426    BSON_ASSERT (mongoc_apm_command_failed_get_server_id (event) > 0);
427    assert_host_in_uri (event->host, context->test_framework_uri);
428 
429    /* subsequent events share the first event's operation id */
430    operation_id = mongoc_apm_command_failed_get_operation_id (event);
431    ASSERT_CMPINT64 (operation_id, !=, (int64_t) 0);
432    ASSERT_CMPINT64 (context->operation_id, ==, operation_id);
433 
434    new_event = BCON_NEW ("command_failed_event",
435                          "{",
436                          "command_name",
437                          BCON_UTF8 (event->command_name),
438                          "}");
439 
440    bson_uint32_to_string (context->n_events, &key, str, sizeof str);
441    BSON_APPEND_DOCUMENT (&context->events, key, new_event);
442 
443    context->n_events++;
444 
445    bson_destroy (new_event);
446    bson_destroy (&reply);
447 }
448 
449 
450 static void
one_bulk_op(mongoc_bulk_operation_t * bulk,const bson_t * request)451 one_bulk_op (mongoc_bulk_operation_t *bulk, const bson_t *request)
452 {
453    bson_iter_t iter;
454    const char *request_name;
455    bson_t request_doc, document, filter, update;
456 
457    bson_iter_init (&iter, request);
458    bson_iter_next (&iter);
459    request_name = bson_iter_key (&iter);
460    bson_iter_bson (&iter, &request_doc);
461 
462    if (!strcmp (request_name, "insertOne")) {
463       bson_lookup_doc (&request_doc, "document", &document);
464       mongoc_bulk_operation_insert (bulk, &document);
465    } else if (!strcmp (request_name, "updateOne")) {
466       bson_lookup_doc (&request_doc, "filter", &filter);
467       bson_lookup_doc (&request_doc, "update", &update);
468       mongoc_bulk_operation_update_one (
469          bulk, &filter, &update, false /* upsert */);
470    } else {
471       test_error ("unrecognized request name %s", request_name);
472       abort ();
473    }
474 }
475 
476 
477 static void
test_bulk_write(mongoc_collection_t * collection,const bson_t * arguments)478 test_bulk_write (mongoc_collection_t *collection, const bson_t *arguments)
479 {
480    bool ordered;
481    mongoc_write_concern_t *wc;
482    mongoc_bulk_operation_t *bulk;
483    bson_iter_t requests_iter;
484    bson_t requests;
485    bson_t request;
486    uint32_t r;
487    bson_error_t error;
488 
489    ordered = bson_lookup_bool (arguments, "ordered", true);
490 
491    if (bson_has_field (arguments, "writeConcern")) {
492       wc = bson_lookup_write_concern (arguments, "writeConcern");
493    } else {
494       wc = mongoc_write_concern_new ();
495    }
496 
497    if (bson_has_field (arguments, "requests")) {
498       bson_lookup_doc (arguments, "requests", &requests);
499    }
500 
501    bulk = mongoc_collection_create_bulk_operation (collection, ordered, wc);
502    bson_iter_init (&requests_iter, &requests);
503    while (bson_iter_next (&requests_iter)) {
504       bson_iter_bson (&requests_iter, &request);
505       one_bulk_op (bulk, &request);
506    }
507 
508    r = mongoc_bulk_operation_execute (bulk, NULL, &error);
509    ASSERT_OR_PRINT (r, error);
510 
511    mongoc_bulk_operation_destroy (bulk);
512    mongoc_write_concern_destroy (wc);
513 }
514 
515 
516 static void
test_count(mongoc_collection_t * collection,const bson_t * arguments)517 test_count (mongoc_collection_t *collection, const bson_t *arguments)
518 {
519    bson_t filter;
520 
521    bson_lookup_doc (arguments, "filter", &filter);
522    mongoc_collection_count (
523       collection, MONGOC_QUERY_NONE, &filter, 0, 0, NULL, NULL);
524 }
525 
526 
527 static void
test_find(mongoc_collection_t * collection,const bson_t * arguments,mongoc_read_prefs_t * read_prefs)528 test_find (mongoc_collection_t *collection,
529            const bson_t *arguments,
530            mongoc_read_prefs_t *read_prefs)
531 {
532    bson_t query;
533    bson_t filter;
534    bson_t sort;
535    uint32_t skip = 0;
536    uint32_t limit = 0;
537    uint32_t batch_size = 0;
538    bson_t modifiers;
539    mongoc_cursor_t *cursor;
540    const bson_t *doc;
541 
542    bson_lookup_doc (arguments, "filter", &filter);
543 
544    if (read_prefs || bson_has_field (arguments, "sort") ||
545        bson_has_field (arguments, "modifiers")) {
546       bson_init (&query);
547       BSON_APPEND_DOCUMENT (&query, "$query", &filter);
548 
549       if (bson_has_field (arguments, "sort")) {
550          bson_lookup_doc (arguments, "sort", &sort);
551          BSON_APPEND_DOCUMENT (&query, "$orderby", &sort);
552       }
553 
554       if (bson_has_field (arguments, "modifiers")) {
555          bson_lookup_doc (arguments, "modifiers", &modifiers);
556          bson_concat (&query, &modifiers);
557       }
558    } else {
559       bson_copy_to (&filter, &query);
560    }
561 
562    if (bson_has_field (arguments, "skip")) {
563       skip = (uint32_t) bson_lookup_int64 (arguments, "skip");
564    }
565 
566    if (bson_has_field (arguments, "limit")) {
567       limit = (uint32_t) bson_lookup_int64 (arguments, "limit");
568    }
569 
570    if (bson_has_field (arguments, "batchSize")) {
571       batch_size = (uint32_t) bson_lookup_int64 (arguments, "batchSize");
572    }
573 
574    cursor = mongoc_collection_find (collection,
575                                     MONGOC_QUERY_NONE,
576                                     skip,
577                                     limit,
578                                     batch_size,
579                                     &query,
580                                     NULL,
581                                     read_prefs);
582 
583    BSON_ASSERT (cursor);
584    while (mongoc_cursor_next (cursor, &doc)) {
585    }
586 
587    /* can cause a killCursors command */
588    mongoc_cursor_destroy (cursor);
589    bson_destroy (&query);
590 }
591 
592 
593 static void
test_delete_many(mongoc_collection_t * collection,const bson_t * arguments)594 test_delete_many (mongoc_collection_t *collection, const bson_t *arguments)
595 {
596    bson_t filter;
597 
598    bson_lookup_doc (arguments, "filter", &filter);
599    mongoc_collection_remove (
600       collection, MONGOC_REMOVE_NONE, &filter, NULL, NULL);
601 }
602 
603 
604 static void
test_delete_one(mongoc_collection_t * collection,const bson_t * arguments)605 test_delete_one (mongoc_collection_t *collection, const bson_t *arguments)
606 {
607    bson_t filter;
608 
609    bson_lookup_doc (arguments, "filter", &filter);
610    mongoc_collection_remove (
611       collection, MONGOC_REMOVE_SINGLE_REMOVE, &filter, NULL, NULL);
612 }
613 
614 
615 static void
test_insert_many(mongoc_collection_t * collection,const bson_t * arguments)616 test_insert_many (mongoc_collection_t *collection, const bson_t *arguments)
617 {
618    bool ordered;
619    mongoc_bulk_operation_t *bulk;
620    bson_t documents;
621    bson_iter_t iter;
622    bson_t doc;
623 
624    ordered = bson_lookup_bool (arguments, "ordered", true);
625    bulk = mongoc_collection_create_bulk_operation (collection, ordered, NULL);
626 
627    bson_lookup_doc (arguments, "documents", &documents);
628    bson_iter_init (&iter, &documents);
629    while (bson_iter_next (&iter)) {
630       bson_iter_bson (&iter, &doc);
631       mongoc_bulk_operation_insert (bulk, &doc);
632    }
633 
634    mongoc_bulk_operation_execute (bulk, NULL, NULL);
635 
636    mongoc_bulk_operation_destroy (bulk);
637 }
638 
639 
640 static void
test_insert_one(mongoc_collection_t * collection,const bson_t * arguments)641 test_insert_one (mongoc_collection_t *collection, const bson_t *arguments)
642 {
643    bson_t document;
644 
645    bson_lookup_doc (arguments, "document", &document);
646    mongoc_collection_insert (
647       collection, MONGOC_INSERT_NONE, &document, NULL, NULL);
648 }
649 
650 
651 static void
test_update(mongoc_collection_t * collection,const bson_t * arguments,bool multi)652 test_update (mongoc_collection_t *collection,
653              const bson_t *arguments,
654              bool multi)
655 {
656    bson_t filter;
657    bson_t update;
658    mongoc_update_flags_t flags = MONGOC_UPDATE_NONE;
659 
660    if (multi) {
661       flags |= MONGOC_UPDATE_MULTI_UPDATE;
662    }
663 
664    if (bson_lookup_bool (arguments, "upsert", false)) {
665       flags |= MONGOC_UPDATE_UPSERT;
666    }
667 
668    bson_lookup_doc (arguments, "filter", &filter);
669    bson_lookup_doc (arguments, "update", &update);
670 
671    mongoc_collection_update (collection, flags, &filter, &update, NULL, NULL);
672 }
673 
674 
675 static void
test_update_many(mongoc_collection_t * collection,const bson_t * arguments)676 test_update_many (mongoc_collection_t *collection, const bson_t *arguments)
677 {
678    test_update (collection, arguments, true);
679 }
680 
681 
682 static void
test_update_one(mongoc_collection_t * collection,const bson_t * arguments)683 test_update_one (mongoc_collection_t *collection, const bson_t *arguments)
684 {
685    test_update (collection, arguments, false);
686 }
687 
688 
689 static void
one_test(mongoc_collection_t * collection,bson_t * test)690 one_test (mongoc_collection_t *collection, bson_t *test)
691 {
692    context_t context;
693    const char *description;
694    mongoc_apm_callbacks_t *callbacks;
695    bson_t operation;
696    bson_t arguments;
697    const char *op_name;
698    mongoc_read_prefs_t *read_prefs = NULL;
699    bson_t expectations;
700 
701    context_init (&context);
702    callbacks = mongoc_apm_callbacks_new ();
703 
704    if (test_suite_debug_output ()) {
705       description = bson_lookup_utf8 (test, "description");
706       printf ("  - %s\n", description);
707       fflush (stdout);
708    }
709 
710    if (!check_server_version (test, &context) || !check_topology_type (test)) {
711       goto done;
712    }
713 
714    mongoc_apm_set_command_started_cb (callbacks, started_cb);
715    mongoc_apm_set_command_succeeded_cb (callbacks, succeeded_cb);
716    mongoc_apm_set_command_failed_cb (callbacks, failed_cb);
717    mongoc_client_set_apm_callbacks (collection->client, callbacks, &context);
718 
719    bson_lookup_doc (test, "operation", &operation);
720    op_name = bson_lookup_utf8 (&operation, "name");
721    bson_lookup_doc (&operation, "arguments", &arguments);
722 
723    if (bson_has_field (&operation, "read_preference")) {
724       read_prefs = bson_lookup_read_prefs (&operation, "read_preference");
725    }
726 
727    if (!strcmp (op_name, "bulkWrite")) {
728       test_bulk_write (collection, &arguments);
729    } else if (!strcmp (op_name, "count")) {
730       test_count (collection, &arguments);
731    } else if (!strcmp (op_name, "find")) {
732       test_find (collection, &arguments, read_prefs);
733    } else if (!strcmp (op_name, "deleteMany")) {
734       test_delete_many (collection, &arguments);
735    } else if (!strcmp (op_name, "deleteOne")) {
736       test_delete_one (collection, &arguments);
737    } else if (!strcmp (op_name, "insertMany")) {
738       test_insert_many (collection, &arguments);
739    } else if (!strcmp (op_name, "insertOne")) {
740       test_insert_one (collection, &arguments);
741    } else if (!strcmp (op_name, "updateMany")) {
742       test_update_many (collection, &arguments);
743    } else if (!strcmp (op_name, "updateOne")) {
744       test_update_one (collection, &arguments);
745    } else {
746       test_error ("unrecognized operation name %s", op_name);
747       abort ();
748    }
749 
750    bson_lookup_doc (test, "expectations", &expectations);
751    check_json_apm_events (&context.events, &expectations);
752 
753 done:
754    mongoc_apm_callbacks_destroy (callbacks);
755    context_destroy (&context);
756    mongoc_read_prefs_destroy (read_prefs);
757 }
758 
759 
760 /*
761  *-----------------------------------------------------------------------
762  *
763  * test_command_monitoring_cb --
764  *
765  *       Runs the JSON tests included with the Command Monitoring spec.
766  *
767  *-----------------------------------------------------------------------
768  */
769 
770 static void
test_command_monitoring_cb(bson_t * scenario)771 test_command_monitoring_cb (bson_t *scenario)
772 {
773    mongoc_client_t *client;
774    const char *db_name;
775    const char *collection_name;
776    bson_iter_t iter;
777    bson_iter_t tests_iter;
778    bson_t test_op;
779    mongoc_collection_t *collection;
780 
781    BSON_ASSERT (scenario);
782 
783    db_name = bson_lookup_utf8 (scenario, "database_name");
784    collection_name = bson_lookup_utf8 (scenario, "collection_name");
785 
786    BSON_ASSERT (bson_iter_init_find (&iter, scenario, "tests"));
787    BSON_ASSERT (BSON_ITER_HOLDS_ARRAY (&iter));
788    bson_iter_recurse (&iter, &tests_iter);
789 
790    while (bson_iter_next (&tests_iter)) {
791       client = test_framework_client_new ();
792       collection =
793          mongoc_client_get_collection (client, db_name, collection_name);
794 
795       insert_data (collection, scenario);
796       bson_iter_bson (&tests_iter, &test_op);
797       one_test (collection, &test_op);
798       mongoc_collection_destroy (collection);
799       mongoc_client_destroy (client);
800    }
801 }
802 
803 
804 /*
805  *-----------------------------------------------------------------------
806  *
807  * Runner for the JSON tests for command monitoring.
808  *
809  *-----------------------------------------------------------------------
810  */
811 static void
test_all_spec_tests(TestSuite * suite)812 test_all_spec_tests (TestSuite *suite)
813 {
814    char resolved[PATH_MAX];
815 
816    ASSERT (realpath (JSON_DIR "/command_monitoring", resolved));
817    install_json_test_suite (suite, resolved, &test_command_monitoring_cb);
818 }
819 
820 
821 static void
test_get_error_failed_cb(const mongoc_apm_command_failed_t * event)822 test_get_error_failed_cb (const mongoc_apm_command_failed_t *event)
823 {
824    bson_error_t *error;
825 
826    error = (bson_error_t *) mongoc_apm_command_failed_get_context (event);
827    mongoc_apm_command_failed_get_error (event, error);
828 }
829 
830 
831 static void
test_get_error(void)832 test_get_error (void)
833 {
834    mock_server_t *server;
835    mongoc_client_t *client;
836    mongoc_apm_callbacks_t *callbacks;
837    future_t *future;
838    request_t *request;
839    bson_error_t error = {0};
840 
841    server = mock_server_with_autoismaster (0);
842    mock_server_run (server);
843 
844    client = mongoc_client_new_from_uri (mock_server_get_uri (server));
845    callbacks = mongoc_apm_callbacks_new ();
846    mongoc_apm_set_command_failed_cb (callbacks, test_get_error_failed_cb);
847    mongoc_client_set_apm_callbacks (client, callbacks, (void *) &error);
848    future = future_client_command_simple (
849       client, "db", tmp_bson ("{'foo': 1}"), NULL, NULL, NULL);
850    request = mock_server_receives_command (
851       server, "db", MONGOC_QUERY_SLAVE_OK, "{'foo': 1}");
852    mock_server_replies_simple (request,
853                                "{'ok': 0, 'errmsg': 'foo', 'code': 42}");
854    ASSERT (!future_get_bool (future));
855    ASSERT_ERROR_CONTAINS (error, MONGOC_ERROR_QUERY, 42, "foo");
856 
857    future_destroy (future);
858    request_destroy (request);
859    mongoc_apm_callbacks_destroy (callbacks);
860    mongoc_client_destroy (client);
861    mock_server_destroy (server);
862 }
863 
864 
865 static void
insert_200_docs(mongoc_collection_t * collection)866 insert_200_docs (mongoc_collection_t *collection)
867 {
868    int i;
869    bson_t *doc;
870    bool r;
871    bson_error_t error;
872 
873    /* insert 200 docs so we have a couple batches */
874    doc = tmp_bson (NULL);
875    for (i = 0; i < 200; i++) {
876       r = mongoc_collection_insert (
877          collection, MONGOC_INSERT_NONE, doc, NULL, &error);
878 
879       ASSERT_OR_PRINT (r, error);
880    }
881 }
882 
883 
884 static void
increment(const mongoc_apm_command_started_t * event)885 increment (const mongoc_apm_command_started_t *event)
886 {
887    int *i = (int *) mongoc_apm_command_started_get_context (event);
888 
889    ++(*i);
890 }
891 
892 
893 static mongoc_apm_callbacks_t *
increment_callbacks(void)894 increment_callbacks (void)
895 {
896    mongoc_apm_callbacks_t *callbacks;
897 
898    callbacks = mongoc_apm_callbacks_new ();
899    mongoc_apm_set_command_started_cb (callbacks, increment);
900 
901    return callbacks;
902 }
903 
904 
905 static void
decrement(const mongoc_apm_command_started_t * event)906 decrement (const mongoc_apm_command_started_t *event)
907 {
908    int *i = (int *) mongoc_apm_command_started_get_context (event);
909 
910    --(*i);
911 }
912 
913 
914 static mongoc_apm_callbacks_t *
decrement_callbacks(void)915 decrement_callbacks (void)
916 {
917    mongoc_apm_callbacks_t *callbacks;
918 
919    callbacks = mongoc_apm_callbacks_new ();
920    mongoc_apm_set_command_started_cb (callbacks, decrement);
921 
922    return callbacks;
923 }
924 
925 
926 static void
test_change_callbacks(void * ctx)927 test_change_callbacks (void *ctx)
928 {
929    mongoc_apm_callbacks_t *inc_callbacks;
930    mongoc_apm_callbacks_t *dec_callbacks;
931    int incremented = 0;
932    int decremented = 0;
933    mongoc_client_t *client;
934    mongoc_collection_t *collection;
935    bson_error_t error;
936    mongoc_cursor_t *cursor;
937    const bson_t *b;
938 
939    inc_callbacks = increment_callbacks ();
940    dec_callbacks = decrement_callbacks ();
941 
942    client = test_framework_client_new ();
943    mongoc_client_set_apm_callbacks (client, inc_callbacks, &incremented);
944 
945    collection = get_test_collection (client, "test_change_callbacks");
946 
947    insert_200_docs (collection);
948    ASSERT_CMPINT (incremented, ==, 200);
949 
950    mongoc_client_set_apm_callbacks (client, dec_callbacks, &decremented);
951    cursor = mongoc_collection_aggregate (
952       collection, MONGOC_QUERY_NONE, tmp_bson (NULL), NULL, NULL);
953 
954    ASSERT (mongoc_cursor_next (cursor, &b));
955    ASSERT_CMPINT (decremented, ==, -1);
956 
957    mongoc_client_set_apm_callbacks (client, inc_callbacks, &incremented);
958    while (mongoc_cursor_next (cursor, &b)) {
959    }
960    ASSERT_OR_PRINT (!mongoc_cursor_error (cursor, &error), error);
961    ASSERT_CMPINT (incremented, ==, 201);
962 
963    mongoc_collection_drop (collection, NULL);
964 
965    mongoc_cursor_destroy (cursor);
966    mongoc_collection_destroy (collection);
967    mongoc_client_destroy (client);
968    mongoc_apm_callbacks_destroy (inc_callbacks);
969    mongoc_apm_callbacks_destroy (dec_callbacks);
970 }
971 
972 
973 static void
test_reset_callbacks(void * ctx)974 test_reset_callbacks (void *ctx)
975 {
976    mongoc_apm_callbacks_t *inc_callbacks;
977    mongoc_apm_callbacks_t *dec_callbacks;
978    int incremented = 0;
979    int decremented = 0;
980    mongoc_client_t *client;
981    mongoc_collection_t *collection;
982    bool r;
983    bson_t *cmd;
984    bson_t cmd_reply;
985    bson_error_t error;
986    mongoc_server_description_t *sd;
987    mongoc_cursor_t *cursor;
988    const bson_t *b;
989 
990    inc_callbacks = increment_callbacks ();
991    dec_callbacks = decrement_callbacks ();
992 
993    client = test_framework_client_new ();
994    collection = get_test_collection (client, "test_reset_apm_callbacks");
995 
996    /* insert 200 docs so we have a couple batches */
997    insert_200_docs (collection);
998 
999    mongoc_client_set_apm_callbacks (client, inc_callbacks, &incremented);
1000    cmd = tmp_bson ("{'aggregate': '%s', 'pipeline': [], 'cursor': {}}",
1001                    collection->collection);
1002 
1003    sd =
1004       mongoc_client_select_server (client, true /* for writes */, NULL, &error);
1005    ASSERT_OR_PRINT (sd, error);
1006 
1007    r = mongoc_client_read_command_with_opts (
1008       client,
1009       "test",
1010       cmd,
1011       NULL,
1012       tmp_bson ("{'serverId': %d}", sd->id),
1013       &cmd_reply,
1014       &error);
1015 
1016    ASSERT_OR_PRINT (r, error);
1017    ASSERT_CMPINT (incremented, ==, 1);
1018 
1019    /* reset callbacks */
1020    mongoc_client_set_apm_callbacks (client, NULL, NULL);
1021    /* destroys cmd_reply */
1022    cursor = mongoc_cursor_new_from_command_reply (client, &cmd_reply, sd->id);
1023    ASSERT (mongoc_cursor_next (cursor, &b));
1024    ASSERT_CMPINT (incremented, ==, 1); /* same value as before */
1025 
1026    mongoc_client_set_apm_callbacks (client, dec_callbacks, &decremented);
1027    while (mongoc_cursor_next (cursor, &b)) {
1028    }
1029    ASSERT_OR_PRINT (!mongoc_cursor_error (cursor, &error), error);
1030    ASSERT_CMPINT (decremented, ==, -1);
1031 
1032    mongoc_collection_drop (collection, NULL);
1033 
1034    mongoc_cursor_destroy (cursor);
1035    mongoc_server_description_destroy (sd);
1036    mongoc_collection_destroy (collection);
1037    mongoc_client_destroy (client);
1038    mongoc_apm_callbacks_destroy (inc_callbacks);
1039    mongoc_apm_callbacks_destroy (dec_callbacks);
1040 }
1041 
1042 
1043 static void
test_set_callbacks_cb(const mongoc_apm_command_started_t * event)1044 test_set_callbacks_cb (const mongoc_apm_command_started_t *event)
1045 {
1046    int *n_calls = (int *) mongoc_apm_command_started_get_context (event);
1047 
1048    (*n_calls)++;
1049 }
1050 
1051 
1052 static void
_test_set_callbacks(bool pooled)1053 _test_set_callbacks (bool pooled)
1054 {
1055    mongoc_client_t *client;
1056    mongoc_client_pool_t *pool = NULL;
1057    mongoc_apm_callbacks_t *callbacks;
1058    int n_calls = 0;
1059    bson_error_t error;
1060    bson_t b;
1061 
1062    callbacks = mongoc_apm_callbacks_new ();
1063    mongoc_apm_set_command_started_cb (callbacks, test_set_callbacks_cb);
1064 
1065    if (pooled) {
1066       pool = test_framework_client_pool_new ();
1067       ASSERT (mongoc_client_pool_set_apm_callbacks (
1068          pool, callbacks, (void *) &n_calls));
1069       client = mongoc_client_pool_pop (pool);
1070    } else {
1071       client = test_framework_client_new ();
1072       ASSERT (mongoc_client_set_apm_callbacks (
1073          client, callbacks, (void *) &n_calls));
1074    }
1075 
1076    ASSERT_OR_PRINT (mongoc_client_get_server_status (client, NULL, &b, &error),
1077                     error);
1078    ASSERT_CMPINT (1, ==, n_calls);
1079 
1080    capture_logs (true);
1081 
1082    if (pooled) {
1083       ASSERT (
1084          !mongoc_client_pool_set_apm_callbacks (pool, NULL, (void *) &n_calls));
1085       ASSERT_CAPTURED_LOG ("mongoc_client_pool_set_apm_callbacks",
1086                            MONGOC_LOG_LEVEL_ERROR,
1087                            "Can only set callbacks once");
1088 
1089       clear_captured_logs ();
1090       ASSERT (
1091          !mongoc_client_set_apm_callbacks (client, NULL, (void *) &n_calls));
1092       ASSERT_CAPTURED_LOG ("mongoc_client_pool_set_apm_callbacks",
1093                            MONGOC_LOG_LEVEL_ERROR,
1094                            "Cannot set callbacks on a pooled client");
1095    } else {
1096       /* repeated calls ok, null is ok */
1097       ASSERT (mongoc_client_set_apm_callbacks (client, NULL, NULL));
1098    }
1099 
1100    if (pooled) {
1101       mongoc_client_pool_push (pool, client);
1102       mongoc_client_pool_destroy (pool);
1103    } else {
1104       mongoc_client_destroy (client);
1105    }
1106 
1107    bson_destroy (&b);
1108    mongoc_apm_callbacks_destroy (callbacks);
1109 }
1110 
1111 
1112 static void
test_set_callbacks_single(void)1113 test_set_callbacks_single (void)
1114 {
1115    _test_set_callbacks (false);
1116 }
1117 
1118 
1119 static void
test_set_callbacks_pooled(void)1120 test_set_callbacks_pooled (void)
1121 {
1122    _test_set_callbacks (true);
1123 }
1124 
1125 
1126 typedef struct {
1127    int64_t request_id;
1128    int64_t op_id;
1129 } ids_t;
1130 
1131 
1132 typedef struct {
1133    mongoc_array_t started_ids;
1134    mongoc_array_t succeeded_ids;
1135    mongoc_array_t failed_ids;
1136    int started_calls;
1137    int succeeded_calls;
1138    int failed_calls;
1139 } op_id_test_t;
1140 
1141 
1142 static void
op_id_test_init(op_id_test_t * test)1143 op_id_test_init (op_id_test_t *test)
1144 {
1145    _mongoc_array_init (&test->started_ids, sizeof (ids_t));
1146    _mongoc_array_init (&test->succeeded_ids, sizeof (ids_t));
1147    _mongoc_array_init (&test->failed_ids, sizeof (ids_t));
1148 
1149    test->started_calls = 0;
1150    test->succeeded_calls = 0;
1151    test->failed_calls = 0;
1152 }
1153 
1154 
1155 static void
op_id_test_cleanup(op_id_test_t * test)1156 op_id_test_cleanup (op_id_test_t *test)
1157 {
1158    _mongoc_array_destroy (&test->started_ids);
1159    _mongoc_array_destroy (&test->succeeded_ids);
1160    _mongoc_array_destroy (&test->failed_ids);
1161 }
1162 
1163 
1164 static void
test_op_id_started_cb(const mongoc_apm_command_started_t * event)1165 test_op_id_started_cb (const mongoc_apm_command_started_t *event)
1166 {
1167    op_id_test_t *test;
1168    ids_t ids;
1169 
1170    test = (op_id_test_t *) mongoc_apm_command_started_get_context (event);
1171    ids.request_id = mongoc_apm_command_started_get_request_id (event);
1172    ids.op_id = mongoc_apm_command_started_get_operation_id (event);
1173 
1174    _mongoc_array_append_val (&test->started_ids, ids);
1175 
1176    test->started_calls++;
1177 }
1178 
1179 
1180 static void
test_op_id_succeeded_cb(const mongoc_apm_command_succeeded_t * event)1181 test_op_id_succeeded_cb (const mongoc_apm_command_succeeded_t *event)
1182 {
1183    op_id_test_t *test;
1184    ids_t ids;
1185 
1186    test = (op_id_test_t *) mongoc_apm_command_succeeded_get_context (event);
1187    ids.request_id = mongoc_apm_command_succeeded_get_request_id (event);
1188    ids.op_id = mongoc_apm_command_succeeded_get_operation_id (event);
1189 
1190    _mongoc_array_append_val (&test->succeeded_ids, ids);
1191 
1192    test->succeeded_calls++;
1193 }
1194 
1195 
1196 static void
test_op_id_failed_cb(const mongoc_apm_command_failed_t * event)1197 test_op_id_failed_cb (const mongoc_apm_command_failed_t *event)
1198 {
1199    op_id_test_t *test;
1200    ids_t ids;
1201 
1202    test = (op_id_test_t *) mongoc_apm_command_failed_get_context (event);
1203    ids.request_id = mongoc_apm_command_failed_get_request_id (event);
1204    ids.op_id = mongoc_apm_command_failed_get_operation_id (event);
1205 
1206    _mongoc_array_append_val (&test->failed_ids, ids);
1207 
1208    test->failed_calls++;
1209 }
1210 
1211 
1212 #define REQUEST_ID(_event_type, _index) \
1213    _mongoc_array_index (&test._event_type##_ids, ids_t, _index).request_id
1214 
1215 #define OP_ID(_event_type, _index) \
1216    _mongoc_array_index (&test._event_type##_ids, ids_t, _index).op_id
1217 
1218 static void
_test_bulk_operation_id(bool pooled,bool use_bulk_operation_new)1219 _test_bulk_operation_id (bool pooled, bool use_bulk_operation_new)
1220 {
1221    mongoc_client_t *client;
1222    mongoc_client_pool_t *pool = NULL;
1223    mongoc_apm_callbacks_t *callbacks;
1224    mongoc_collection_t *collection;
1225    mongoc_bulk_operation_t *bulk;
1226    bson_error_t error;
1227    op_id_test_t test;
1228    int64_t op_id;
1229 
1230    op_id_test_init (&test);
1231 
1232    callbacks = mongoc_apm_callbacks_new ();
1233    mongoc_apm_set_command_started_cb (callbacks, test_op_id_started_cb);
1234    mongoc_apm_set_command_succeeded_cb (callbacks, test_op_id_succeeded_cb);
1235    mongoc_apm_set_command_failed_cb (callbacks, test_op_id_failed_cb);
1236 
1237    if (pooled) {
1238       pool = test_framework_client_pool_new ();
1239       ASSERT (mongoc_client_pool_set_apm_callbacks (
1240          pool, callbacks, (void *) &test));
1241       client = mongoc_client_pool_pop (pool);
1242    } else {
1243       client = test_framework_client_new ();
1244       ASSERT (
1245          mongoc_client_set_apm_callbacks (client, callbacks, (void *) &test));
1246    }
1247 
1248    collection = get_test_collection (client, "test_bulk_operation_id");
1249    if (use_bulk_operation_new) {
1250       bulk = mongoc_bulk_operation_new (false);
1251       mongoc_bulk_operation_set_client (bulk, client);
1252       mongoc_bulk_operation_set_database (bulk, collection->db);
1253       mongoc_bulk_operation_set_collection (bulk, collection->collection);
1254    } else {
1255       bulk = mongoc_collection_create_bulk_operation (collection, false, NULL);
1256    }
1257 
1258    mongoc_bulk_operation_insert (bulk, tmp_bson ("{'_id': 1}"));
1259    mongoc_bulk_operation_update_one (
1260       bulk, tmp_bson ("{'_id': 1}"), tmp_bson ("{'$set': {'x': 1}}"), false);
1261    mongoc_bulk_operation_remove (bulk, tmp_bson ("{}"));
1262 
1263    /* ensure we monitor with bulk->operation_id, not cluster->operation_id */
1264    client->cluster.operation_id = 42;
1265 
1266    /* write errors don't trigger failed events, so we only test success */
1267    ASSERT_OR_PRINT (mongoc_bulk_operation_execute (bulk, NULL, &error), error);
1268    ASSERT_CMPINT (test.started_calls, ==, 3);
1269    ASSERT_CMPINT (test.succeeded_calls, ==, 3);
1270 
1271    ASSERT_CMPINT64 (REQUEST_ID (started, 0), ==, REQUEST_ID (succeeded, 0));
1272    ASSERT_CMPINT64 (REQUEST_ID (started, 1), ==, REQUEST_ID (succeeded, 1));
1273    ASSERT_CMPINT64 (REQUEST_ID (started, 2), ==, REQUEST_ID (succeeded, 2));
1274 
1275    /* 3 unique request ids */
1276    ASSERT_CMPINT64 (REQUEST_ID (started, 0), !=, REQUEST_ID (started, 1));
1277    ASSERT_CMPINT64 (REQUEST_ID (started, 0), !=, REQUEST_ID (started, 2));
1278    ASSERT_CMPINT64 (REQUEST_ID (started, 1), !=, REQUEST_ID (started, 2));
1279    ASSERT_CMPINT64 (REQUEST_ID (succeeded, 0), !=, REQUEST_ID (succeeded, 1));
1280    ASSERT_CMPINT64 (REQUEST_ID (succeeded, 0), !=, REQUEST_ID (succeeded, 2));
1281    ASSERT_CMPINT64 (REQUEST_ID (succeeded, 1), !=, REQUEST_ID (succeeded, 2));
1282 
1283    /* events' operation ids all equal bulk->operation_id */
1284    op_id = bulk->operation_id;
1285    ASSERT_CMPINT64 (op_id, !=, (int64_t) 0);
1286    ASSERT_CMPINT64 (op_id, ==, OP_ID (started, 0));
1287    ASSERT_CMPINT64 (op_id, ==, OP_ID (started, 1));
1288    ASSERT_CMPINT64 (op_id, ==, OP_ID (started, 2));
1289    ASSERT_CMPINT64 (op_id, ==, OP_ID (succeeded, 0));
1290    ASSERT_CMPINT64 (op_id, ==, OP_ID (succeeded, 1));
1291    ASSERT_CMPINT64 (op_id, ==, OP_ID (succeeded, 2));
1292 
1293    mongoc_bulk_operation_destroy (bulk);
1294    mongoc_collection_destroy (collection);
1295 
1296    if (pooled) {
1297       mongoc_client_pool_push (pool, client);
1298       mongoc_client_pool_destroy (pool);
1299    } else {
1300       mongoc_client_destroy (client);
1301    }
1302 
1303    op_id_test_cleanup (&test);
1304    mongoc_apm_callbacks_destroy (callbacks);
1305 }
1306 
1307 
1308 static void
test_collection_bulk_op_single(void)1309 test_collection_bulk_op_single (void)
1310 {
1311    _test_bulk_operation_id (false, false);
1312 }
1313 
1314 
1315 static void
test_collection_bulk_op_pooled(void)1316 test_collection_bulk_op_pooled (void)
1317 {
1318    _test_bulk_operation_id (true, false);
1319 }
1320 
1321 
1322 static void
test_bulk_op_single(void)1323 test_bulk_op_single (void)
1324 {
1325    _test_bulk_operation_id (false, true);
1326 }
1327 
1328 
1329 static void
test_bulk_op_pooled(void)1330 test_bulk_op_pooled (void)
1331 {
1332    _test_bulk_operation_id (true, true);
1333 }
1334 
1335 
1336 static void
_test_query_operation_id(bool pooled,bool use_cmd)1337 _test_query_operation_id (bool pooled, bool use_cmd)
1338 {
1339    mock_server_t *server;
1340    mongoc_client_t *client;
1341    mongoc_client_pool_t *pool = NULL;
1342    mongoc_apm_callbacks_t *callbacks;
1343    mongoc_collection_t *collection;
1344    op_id_test_t test;
1345    mongoc_cursor_t *cursor;
1346    const bson_t *doc;
1347    future_t *future;
1348    request_t *request;
1349    int64_t op_id;
1350 
1351    op_id_test_init (&test);
1352 
1353    server = mock_server_with_autoismaster (use_cmd ? 4 : 0);
1354    mock_server_run (server);
1355 
1356    callbacks = mongoc_apm_callbacks_new ();
1357    mongoc_apm_set_command_started_cb (callbacks, test_op_id_started_cb);
1358    mongoc_apm_set_command_succeeded_cb (callbacks, test_op_id_succeeded_cb);
1359    mongoc_apm_set_command_failed_cb (callbacks, test_op_id_failed_cb);
1360 
1361    if (pooled) {
1362       pool = mongoc_client_pool_new (mock_server_get_uri (server));
1363       ASSERT (mongoc_client_pool_set_apm_callbacks (
1364          pool, callbacks, (void *) &test));
1365       client = mongoc_client_pool_pop (pool);
1366    } else {
1367       client = mongoc_client_new_from_uri (mock_server_get_uri (server));
1368       ASSERT (
1369          mongoc_client_set_apm_callbacks (client, callbacks, (void *) &test));
1370    }
1371 
1372    collection = mongoc_client_get_collection (client, "db", "collection");
1373    cursor = mongoc_collection_find (
1374       collection, MONGOC_QUERY_NONE, 0, 0, 1, tmp_bson ("{}"), NULL, NULL);
1375 
1376    future = future_cursor_next (cursor, &doc);
1377    request = mock_server_receives_request (server);
1378    mock_server_replies_to_find (request,
1379                                 MONGOC_QUERY_SLAVE_OK,
1380                                 123 /* cursor id */,
1381                                 1,
1382                                 "db.collection",
1383                                 "{}",
1384                                 use_cmd);
1385 
1386    ASSERT (future_get_bool (future));
1387    future_destroy (future);
1388    request_destroy (request);
1389 
1390    ASSERT_CMPINT (test.started_calls, ==, 1);
1391    ASSERT_CMPINT (test.succeeded_calls, ==, 1);
1392 
1393    future = future_cursor_next (cursor, &doc);
1394    request = mock_server_receives_request (server);
1395    if (use_cmd) {
1396       mock_server_replies_simple (request,
1397                                   "{'ok': 0, 'code': 42, 'errmsg': 'bad!'}");
1398    } else {
1399       mock_server_replies (request,
1400                            MONGOC_REPLY_QUERY_FAILURE,
1401                            123,
1402                            0,
1403                            0,
1404                            "{'$err': 'uh oh', 'code': 4321}");
1405    }
1406 
1407    ASSERT (!future_get_bool (future));
1408    future_destroy (future);
1409    request_destroy (request);
1410 
1411    ASSERT_CMPINT (test.started_calls, ==, 2);
1412    ASSERT_CMPINT (test.succeeded_calls, ==, 1);
1413    ASSERT_CMPINT (test.failed_calls, ==, 1);
1414 
1415    ASSERT_CMPINT64 (REQUEST_ID (started, 0), ==, REQUEST_ID (succeeded, 0));
1416    ASSERT_CMPINT64 (REQUEST_ID (started, 1), ==, REQUEST_ID (failed, 0));
1417 
1418    /* unique request ids */
1419    ASSERT_CMPINT64 (REQUEST_ID (started, 0), !=, REQUEST_ID (started, 1));
1420 
1421    /* operation ids all the same */
1422    op_id = OP_ID (started, 0);
1423    ASSERT_CMPINT64 (op_id, !=, (int64_t) 0);
1424    ASSERT_CMPINT64 (op_id, ==, OP_ID (started, 1));
1425    ASSERT_CMPINT64 (op_id, ==, OP_ID (failed, 0));
1426 
1427    mock_server_destroy (server);
1428 
1429    /* client logs warning because it can't send killCursors */
1430    capture_logs (true);
1431    mongoc_cursor_destroy (cursor);
1432    mongoc_collection_destroy (collection);
1433 
1434    if (pooled) {
1435       mongoc_client_pool_push (pool, client);
1436       mongoc_client_pool_destroy (pool);
1437    } else {
1438       mongoc_client_destroy (client);
1439    }
1440 
1441    op_id_test_cleanup (&test);
1442    mongoc_apm_callbacks_destroy (callbacks);
1443 }
1444 
1445 
1446 static void
test_query_operation_id_single_cmd(void)1447 test_query_operation_id_single_cmd (void)
1448 {
1449    _test_query_operation_id (false, true);
1450 }
1451 
1452 
1453 static void
test_query_operation_id_pooled_cmd(void)1454 test_query_operation_id_pooled_cmd (void)
1455 {
1456    _test_query_operation_id (true, true);
1457 }
1458 
1459 
1460 static void
test_query_operation_id_single_op_query(void)1461 test_query_operation_id_single_op_query (void)
1462 {
1463    _test_query_operation_id (false, false);
1464 }
1465 
1466 
1467 static void
test_query_operation_id_pooled_op_query(void)1468 test_query_operation_id_pooled_op_query (void)
1469 {
1470    _test_query_operation_id (true, false);
1471 }
1472 
1473 
1474 typedef struct {
1475    int started_calls;
1476    int succeeded_calls;
1477    int failed_calls;
1478    char db[100];
1479    char cmd_name[100];
1480    bson_t cmd;
1481 } cmd_test_t;
1482 
1483 
1484 static void
cmd_test_init(cmd_test_t * test)1485 cmd_test_init (cmd_test_t *test)
1486 {
1487    memset (test, 0, sizeof *test);
1488    bson_init (&test->cmd);
1489 }
1490 
1491 
1492 static void
cmd_test_cleanup(cmd_test_t * test)1493 cmd_test_cleanup (cmd_test_t *test)
1494 {
1495    bson_destroy (&test->cmd);
1496 }
1497 
1498 
1499 static void
cmd_started_cb(const mongoc_apm_command_started_t * event)1500 cmd_started_cb (const mongoc_apm_command_started_t *event)
1501 {
1502    cmd_test_t *test;
1503 
1504    test = (cmd_test_t *) mongoc_apm_command_started_get_context (event);
1505    test->started_calls++;
1506    bson_destroy (&test->cmd);
1507    bson_strncpy (test->db,
1508                  mongoc_apm_command_started_get_database_name (event),
1509                  sizeof (test->db));
1510    bson_copy_to (mongoc_apm_command_started_get_command (event), &test->cmd);
1511    bson_strncpy (test->cmd_name,
1512                  mongoc_apm_command_started_get_command_name (event),
1513                  sizeof (test->cmd_name));
1514 }
1515 
1516 
1517 static void
cmd_succeeded_cb(const mongoc_apm_command_succeeded_t * event)1518 cmd_succeeded_cb (const mongoc_apm_command_succeeded_t *event)
1519 {
1520    cmd_test_t *test;
1521 
1522    test = (cmd_test_t *) mongoc_apm_command_succeeded_get_context (event);
1523    test->succeeded_calls++;
1524    ASSERT_CMPSTR (test->cmd_name,
1525                   mongoc_apm_command_succeeded_get_command_name (event));
1526 }
1527 
1528 
1529 static void
cmd_failed_cb(const mongoc_apm_command_failed_t * event)1530 cmd_failed_cb (const mongoc_apm_command_failed_t *event)
1531 {
1532    cmd_test_t *test;
1533 
1534    test = (cmd_test_t *) mongoc_apm_command_failed_get_context (event);
1535    test->failed_calls++;
1536    ASSERT_CMPSTR (test->cmd_name,
1537                   mongoc_apm_command_failed_get_command_name (event));
1538 }
1539 
1540 
1541 static void
set_cmd_test_callbacks(mongoc_client_t * client,void * context)1542 set_cmd_test_callbacks (mongoc_client_t *client, void *context)
1543 {
1544    mongoc_apm_callbacks_t *callbacks;
1545 
1546    callbacks = mongoc_apm_callbacks_new ();
1547    mongoc_apm_set_command_started_cb (callbacks, cmd_started_cb);
1548    mongoc_apm_set_command_succeeded_cb (callbacks, cmd_succeeded_cb);
1549    mongoc_apm_set_command_failed_cb (callbacks, cmd_failed_cb);
1550    ASSERT (mongoc_client_set_apm_callbacks (client, callbacks, context));
1551    mongoc_apm_callbacks_destroy (callbacks);
1552 }
1553 
1554 
1555 static void
test_client_cmd(void)1556 test_client_cmd (void)
1557 {
1558    cmd_test_t test;
1559    mongoc_client_t *client;
1560    mongoc_cursor_t *cursor;
1561    const bson_t *reply;
1562 
1563    cmd_test_init (&test);
1564    client = test_framework_client_new ();
1565    set_cmd_test_callbacks (client, (void *) &test);
1566    cursor = mongoc_client_command (client,
1567                                    "admin",
1568                                    MONGOC_QUERY_SLAVE_OK,
1569                                    0,
1570                                    0,
1571                                    0,
1572                                    tmp_bson ("{'ping': 1}"),
1573                                    NULL,
1574                                    NULL);
1575 
1576    ASSERT (mongoc_cursor_next (cursor, &reply));
1577    ASSERT_CMPSTR (test.cmd_name, "ping");
1578    ASSERT_MATCH (&test.cmd, "{'ping': 1}");
1579    ASSERT_CMPSTR (test.db, "admin");
1580    ASSERT_CMPINT (1, ==, test.started_calls);
1581    ASSERT_CMPINT (1, ==, test.succeeded_calls);
1582    ASSERT_CMPINT (0, ==, test.failed_calls);
1583 
1584    cmd_test_cleanup (&test);
1585    mongoc_cursor_destroy (cursor);
1586 
1587    cmd_test_init (&test);
1588    cursor = mongoc_client_command (client,
1589                                    "admin",
1590                                    MONGOC_QUERY_SLAVE_OK,
1591                                    0,
1592                                    0,
1593                                    0,
1594                                    tmp_bson ("{'foo': 1}"),
1595                                    NULL,
1596                                    NULL);
1597 
1598    ASSERT (!mongoc_cursor_next (cursor, &reply));
1599    ASSERT_CMPSTR (test.cmd_name, "foo");
1600    ASSERT_MATCH (&test.cmd, "{'foo': 1}");
1601    ASSERT_CMPSTR (test.db, "admin");
1602    ASSERT_CMPINT (1, ==, test.started_calls);
1603    ASSERT_CMPINT (0, ==, test.succeeded_calls);
1604    ASSERT_CMPINT (1, ==, test.failed_calls);
1605 
1606    cmd_test_cleanup (&test);
1607    mongoc_cursor_destroy (cursor);
1608    mongoc_client_destroy (client);
1609 }
1610 
1611 
1612 static void
test_client_cmd_simple(void)1613 test_client_cmd_simple (void)
1614 {
1615    cmd_test_t test;
1616    mongoc_client_t *client;
1617    bool r;
1618    bson_error_t error;
1619 
1620    cmd_test_init (&test);
1621    client = test_framework_client_new ();
1622    set_cmd_test_callbacks (client, (void *) &test);
1623    r = mongoc_client_command_simple (
1624       client, "admin", tmp_bson ("{'ping': 1}"), NULL, NULL, &error);
1625 
1626    ASSERT_OR_PRINT (r, error);
1627    ASSERT_CMPSTR (test.cmd_name, "ping");
1628    ASSERT_MATCH (&test.cmd, "{'ping': 1}");
1629    ASSERT_CMPSTR (test.db, "admin");
1630    ASSERT_CMPINT (1, ==, test.started_calls);
1631    ASSERT_CMPINT (1, ==, test.succeeded_calls);
1632    ASSERT_CMPINT (0, ==, test.failed_calls);
1633 
1634    cmd_test_cleanup (&test);
1635    cmd_test_init (&test);
1636    r = mongoc_client_command_simple (
1637       client, "admin", tmp_bson ("{'foo': 1}"), NULL, NULL, &error);
1638 
1639    ASSERT (!r);
1640    ASSERT_CMPSTR (test.cmd_name, "foo");
1641    ASSERT_MATCH (&test.cmd, "{'foo': 1}");
1642    ASSERT_CMPSTR (test.db, "admin");
1643    ASSERT_CMPINT (1, ==, test.started_calls);
1644    ASSERT_CMPINT (0, ==, test.succeeded_calls);
1645    ASSERT_CMPINT (1, ==, test.failed_calls);
1646 
1647    mongoc_client_destroy (client);
1648    cmd_test_cleanup (&test);
1649 }
1650 
1651 
1652 static void
test_client_cmd_op_ids(void)1653 test_client_cmd_op_ids (void)
1654 {
1655    op_id_test_t test;
1656    mongoc_client_t *client;
1657    mongoc_apm_callbacks_t *callbacks;
1658    bool r;
1659    bson_error_t error;
1660    int64_t op_id;
1661 
1662    op_id_test_init (&test);
1663 
1664    callbacks = mongoc_apm_callbacks_new ();
1665    mongoc_apm_set_command_started_cb (callbacks, test_op_id_started_cb);
1666    mongoc_apm_set_command_succeeded_cb (callbacks, test_op_id_succeeded_cb);
1667    mongoc_apm_set_command_failed_cb (callbacks, test_op_id_failed_cb);
1668 
1669    client = test_framework_client_new ();
1670    mongoc_client_set_apm_callbacks (client, callbacks, (void *) &test);
1671 
1672    r = mongoc_client_command_simple (
1673       client, "admin", tmp_bson ("{'ping': 1}"), NULL, NULL, &error);
1674 
1675    ASSERT_OR_PRINT (r, error);
1676    ASSERT_CMPINT (1, ==, test.started_calls);
1677    ASSERT_CMPINT (1, ==, test.succeeded_calls);
1678    ASSERT_CMPINT (0, ==, test.failed_calls);
1679    ASSERT_CMPINT64 (REQUEST_ID (started, 0), ==, REQUEST_ID (succeeded, 0));
1680    ASSERT_CMPINT64 (OP_ID (started, 0), ==, OP_ID (succeeded, 0));
1681    op_id = OP_ID (started, 0);
1682    ASSERT_CMPINT64 (op_id, !=, (int64_t) 0);
1683 
1684    op_id_test_cleanup (&test);
1685    op_id_test_init (&test);
1686 
1687    /* again. test that we use a new op_id. */
1688    r = mongoc_client_command_simple (
1689       client, "admin", tmp_bson ("{'ping': 1}"), NULL, NULL, &error);
1690 
1691    ASSERT_OR_PRINT (r, error);
1692    ASSERT_CMPINT (1, ==, test.started_calls);
1693    ASSERT_CMPINT (1, ==, test.succeeded_calls);
1694    ASSERT_CMPINT (0, ==, test.failed_calls);
1695    ASSERT_CMPINT64 (REQUEST_ID (started, 0), ==, REQUEST_ID (succeeded, 0));
1696    ASSERT_CMPINT64 (OP_ID (started, 0), ==, OP_ID (succeeded, 0));
1697    ASSERT_CMPINT64 (OP_ID (started, 0), !=, (int64_t) 0);
1698 
1699    /* new op_id */
1700    ASSERT_CMPINT64 (OP_ID (started, 0), !=, op_id);
1701 
1702    mongoc_client_destroy (client);
1703    op_id_test_cleanup (&test);
1704    mongoc_apm_callbacks_destroy (callbacks);
1705 }
1706 
1707 
1708 static void
test_killcursors_deprecated(void)1709 test_killcursors_deprecated (void)
1710 {
1711    cmd_test_t test;
1712    mongoc_client_t *client;
1713    bool r;
1714    bson_error_t error;
1715 
1716    cmd_test_init (&test);
1717    client = test_framework_client_new ();
1718 
1719    /* connect */
1720    r = mongoc_client_command_simple (
1721       client, "admin", tmp_bson ("{'ping': 1}"), NULL, NULL, &error);
1722 
1723    ASSERT_OR_PRINT (r, error);
1724    set_cmd_test_callbacks (client, (void *) &test);
1725 
1726    /* deprecated function without "db" or "collection", skips APM */
1727    mongoc_client_kill_cursor (client, 123);
1728 
1729    ASSERT_CMPINT (0, ==, test.started_calls);
1730    ASSERT_CMPINT (0, ==, test.succeeded_calls);
1731    ASSERT_CMPINT (0, ==, test.failed_calls);
1732 
1733    mongoc_client_destroy (client);
1734    cmd_test_cleanup (&test);
1735 }
1736 
1737 
1738 void
test_command_monitoring_install(TestSuite * suite)1739 test_command_monitoring_install (TestSuite *suite)
1740 {
1741    test_all_spec_tests (suite);
1742    TestSuite_AddMockServerTest (
1743       suite, "/command_monitoring/get_error", test_get_error);
1744    TestSuite_AddLive (suite,
1745                       "/command_monitoring/set_callbacks/single",
1746                       test_set_callbacks_single);
1747    TestSuite_AddLive (suite,
1748                       "/command_monitoring/set_callbacks/pooled",
1749                       test_set_callbacks_pooled);
1750    /* require aggregation cursor */
1751    TestSuite_AddFull (suite,
1752                       "/command_monitoring/set_callbacks/change",
1753                       test_change_callbacks,
1754                       NULL,
1755                       NULL,
1756                       test_framework_skip_if_max_wire_version_less_than_1);
1757    TestSuite_AddFull (suite,
1758                       "/command_monitoring/set_callbacks/reset",
1759                       test_reset_callbacks,
1760                       NULL,
1761                       NULL,
1762                       test_framework_skip_if_max_wire_version_less_than_1);
1763    TestSuite_AddLive (suite,
1764                       "/command_monitoring/operation_id/bulk/collection/single",
1765                       test_collection_bulk_op_single);
1766    TestSuite_AddLive (suite,
1767                       "/command_monitoring/operation_id/bulk/collection/pooled",
1768                       test_collection_bulk_op_pooled);
1769    TestSuite_AddLive (suite,
1770                       "/command_monitoring/operation_id/bulk/new/single",
1771                       test_bulk_op_single);
1772    TestSuite_AddLive (suite,
1773                       "/command_monitoring/operation_id/bulk/new/pooled",
1774                       test_bulk_op_pooled);
1775    TestSuite_AddMockServerTest (
1776       suite,
1777       "/command_monitoring/operation_id/query/single/cmd",
1778       test_query_operation_id_single_cmd);
1779    TestSuite_AddMockServerTest (
1780       suite,
1781       "/command_monitoring/operation_id/query/pooled/cmd",
1782       test_query_operation_id_pooled_cmd);
1783    TestSuite_AddMockServerTest (
1784       suite,
1785       "/command_monitoring/operation_id/query/single/op_query",
1786       test_query_operation_id_single_op_query);
1787    TestSuite_AddMockServerTest (
1788       suite,
1789       "/command_monitoring/operation_id/query/pooled/op_query",
1790       test_query_operation_id_pooled_op_query);
1791    TestSuite_AddLive (suite, "/command_monitoring/client_cmd", test_client_cmd);
1792    TestSuite_AddLive (
1793       suite, "/command_monitoring/client_cmd_simple", test_client_cmd_simple);
1794    TestSuite_AddLive (
1795       suite, "/command_monitoring/client_cmd/op_ids", test_client_cmd_op_ids);
1796    TestSuite_AddLive (suite,
1797                       "/command_monitoring/killcursors_deprecated",
1798                       test_killcursors_deprecated);
1799 }
1800