1 #include <mongoc.h>
2 #include <mongoc-collection-private.h>
3 #include <mongoc-apm-private.h>
4 #include <mongoc-host-list-private.h>
5 #include <mongoc-cursor-private.h>
6 #include <mongoc-bulk-operation-private.h>
7 #include <mongoc-client-private.h>
8
9 #include "json-test.h"
10 #include "test-libmongoc.h"
11 #include "mock_server/mock-server.h"
12 #include "mock_server/future.h"
13 #include "mock_server/future-functions.h"
14
15
16 typedef struct {
17 uint32_t n_events;
18 bson_t events;
19 mongoc_uri_t *test_framework_uri;
20 int64_t cursor_id;
21 int64_t operation_id;
22 bool verbose;
23 } context_t;
24
25
26 static void
context_init(context_t * context)27 context_init (context_t *context)
28 {
29 context->n_events = 0;
30 bson_init (&context->events);
31 context->test_framework_uri = test_framework_get_uri ();
32 context->cursor_id = 0;
33 context->operation_id = 0;
34 context->verbose =
35 test_framework_getenv_bool ("MONGOC_TEST_MONITORING_VERBOSE");
36 }
37
38
39 static void
context_destroy(context_t * context)40 context_destroy (context_t *context)
41 {
42 bson_destroy (&context->events);
43 mongoc_uri_destroy (context->test_framework_uri);
44 }
45
46
47 static int
check_server_version(const bson_t * test,context_t * context)48 check_server_version (const bson_t *test, context_t *context)
49 {
50 const char *s;
51 char *padded;
52 server_version_t test_version, server_version;
53 bool r;
54
55 if (bson_has_field (test, "ignore_if_server_version_greater_than")) {
56 s = bson_lookup_utf8 (test, "ignore_if_server_version_greater_than");
57 /* s is like "3.0", don't skip if server is 3.0.x but skip 3.1+ */
58 padded = bson_strdup_printf ("%s.99", s);
59 test_version = test_framework_str_to_version (padded);
60 bson_free (padded);
61 server_version = test_framework_get_server_version ();
62 r = server_version <= test_version;
63
64 if (!r && context->verbose) {
65 printf (" SKIP, Server version > %s\n", s);
66 fflush (stdout);
67 }
68 } else if (bson_has_field (test, "ignore_if_server_version_less_than")) {
69 s = bson_lookup_utf8 (test, "ignore_if_server_version_less_than");
70 test_version = test_framework_str_to_version (s);
71 server_version = test_framework_get_server_version ();
72 r = server_version >= test_version;
73
74 if (!r && context->verbose) {
75 printf (" SKIP, Server version < %s\n", s);
76 fflush (stdout);
77 }
78 } else {
79 /* server version is ok, don't skip the test */
80 return true;
81 }
82
83 return r;
84 }
85
86
87 static bool
check_topology_type(const bson_t * test)88 check_topology_type (const bson_t *test)
89 {
90 bson_iter_t iter;
91 bson_iter_t child;
92 bool is_mongos;
93 const char *s;
94
95 /* so far this field can only contain an array like ["sharded"],
96 * see https://github.com/mongodb/specifications/pull/75
97 */
98 if (bson_iter_init_find (&iter, test, "ignore_if_topology_type")) {
99 ASSERT (BSON_ITER_HOLDS_ARRAY (&iter));
100 ASSERT (bson_iter_recurse (&iter, &child));
101
102 is_mongos = test_framework_is_mongos ();
103
104 while (bson_iter_next (&child)) {
105 if (BSON_ITER_HOLDS_UTF8 (&child)) {
106 s = bson_iter_utf8 (&child, NULL);
107
108 /* skip test if topology type is sharded */
109 if (!strcmp (s, "sharded") && is_mongos) {
110 return false;
111 }
112 }
113 }
114 }
115
116 return true;
117 }
118
119
120 static void
insert_data(mongoc_collection_t * collection,const bson_t * test)121 insert_data (mongoc_collection_t *collection, const bson_t *test)
122 {
123 mongoc_bulk_operation_t *bulk;
124 bson_iter_t iter;
125 bson_iter_t array_iter;
126 bson_t doc;
127 uint32_t r;
128 bson_error_t error;
129
130 if (!mongoc_collection_drop (collection, &error)) {
131 if (strcmp (error.message, "ns not found")) {
132 /* an error besides ns not found */
133 ASSERT_OR_PRINT (false, error);
134 }
135 }
136
137 bulk = mongoc_collection_create_bulk_operation (collection, true, NULL);
138
139 BSON_ASSERT (bson_iter_init_find (&iter, test, "data"));
140 BSON_ASSERT (BSON_ITER_HOLDS_ARRAY (&iter));
141 bson_iter_recurse (&iter, &array_iter);
142
143 while (bson_iter_next (&array_iter)) {
144 BSON_ASSERT (BSON_ITER_HOLDS_DOCUMENT (&array_iter));
145 bson_iter_bson (&array_iter, &doc);
146 mongoc_bulk_operation_insert (bulk, &doc);
147 bson_destroy (&doc);
148 }
149
150 r = mongoc_bulk_operation_execute (bulk, NULL, &error);
151 ASSERT_OR_PRINT (r, error);
152
153 mongoc_bulk_operation_destroy (bulk);
154 }
155
156
157 static void
assert_host_in_uri(const mongoc_host_list_t * host,const mongoc_uri_t * uri)158 assert_host_in_uri (const mongoc_host_list_t *host, const mongoc_uri_t *uri)
159 {
160 const mongoc_host_list_t *hosts;
161
162 hosts = mongoc_uri_get_hosts (uri);
163 while (hosts) {
164 if (_mongoc_host_list_equal (hosts, host)) {
165 return;
166 }
167
168 hosts = hosts->next;
169 }
170
171 fprintf (stderr,
172 "Host \"%s\" not in \"%s\"",
173 host->host_and_port,
174 mongoc_uri_get_string (uri));
175 fflush (stderr);
176 abort ();
177 }
178
179
180 static bool
ends_with(const char * s,const char * suffix)181 ends_with (const char *s, const char *suffix)
182 {
183 size_t s_len;
184 size_t suffix_len;
185
186 if (!s) {
187 return false;
188 }
189
190 s_len = strlen (s);
191 suffix_len = strlen (suffix);
192 return s_len >= suffix_len && !strcmp (s + s_len - suffix_len, suffix);
193 }
194
195
196 static int64_t
fake_cursor_id(const bson_iter_t * iter)197 fake_cursor_id (const bson_iter_t *iter)
198 {
199 return bson_iter_as_int64 (iter) ? 42 : 0;
200 }
201
202 /* Convert "ok" values to doubles, cursor ids and error codes to 42, and
203 * error messages to "". See README at
204 * github.com/mongodb/specifications/tree/master/source/command-monitoring/tests
205 */
206 static void
convert_command_for_test(context_t * context,const bson_t * src,bson_t * dst,const char * path)207 convert_command_for_test (context_t *context,
208 const bson_t *src,
209 bson_t *dst,
210 const char *path)
211 {
212 bson_iter_t iter;
213 const char *key;
214 const char *errmsg;
215 bson_t src_child;
216 bson_t dst_child;
217 char *child_path;
218
219 bson_iter_init (&iter, src);
220
221 while (bson_iter_next (&iter)) {
222 key = bson_iter_key (&iter);
223
224 if (!strcmp (key, "ok")) {
225 /* "The server is inconsistent on whether the ok values returned are
226 * integers or doubles so for simplicity the tests specify all expected
227 * values as doubles. Server 'ok' values of integers MUST be converted
228 * to doubles for comparison with the expected values."
229 */
230 BSON_APPEND_DOUBLE (dst, key, (double) bson_iter_as_int64 (&iter));
231
232 } else if (!strcmp (key, "errmsg")) {
233 /* "errmsg values of "" MUST BSON_ASSERT that the value is not empty"
234 */
235 errmsg = bson_iter_utf8 (&iter, NULL);
236 ASSERT_CMPSIZE_T (strlen (errmsg), >, (size_t) 0);
237 BSON_APPEND_UTF8 (dst, key, "");
238
239 } else if (!strcmp (key, "id") && ends_with (path, "cursor")) {
240 /* "When encountering a cursor or getMore value of "42" in a test, the
241 * driver MUST BSON_ASSERT that the values are equal to each other and
242 * greater than zero."
243 */
244 if (context->cursor_id == 0) {
245 context->cursor_id = bson_iter_int64 (&iter);
246 } else if (bson_iter_int64 (&iter) != 0) {
247 ASSERT_CMPINT64 (context->cursor_id, ==, bson_iter_int64 (&iter));
248 }
249
250 /* replace the reply's cursor id with 42 or 0 - check_expectations()
251 * will BSON_ASSERT it matches the value from the JSON test */
252 BSON_APPEND_INT64 (dst, key, fake_cursor_id (&iter));
253 } else if (ends_with (path, "cursors") ||
254 ends_with (path, "cursorsUnknown")) {
255 /* payload of a killCursors command-started event:
256 * {killCursors: "test", cursors: [12345]}
257 * or killCursors command-succeeded event:
258 * {ok: 1, cursorsUnknown: [12345]}
259 * */
260 ASSERT_CMPINT64 (bson_iter_as_int64 (&iter), >, (int64_t) 0);
261 BSON_APPEND_INT64 (dst, key, 42);
262
263 } else if (!strcmp (key, "getMore")) {
264 ASSERT_CMPINT64 (context->cursor_id, ==, bson_iter_int64 (&iter));
265 BSON_APPEND_INT64 (dst, key, fake_cursor_id (&iter));
266
267 } else if (!strcmp (key, "code")) {
268 /* "code values of 42 MUST BSON_ASSERT that the value is present and
269 * greater
270 * than zero" */
271 ASSERT_CMPINT64 (bson_iter_as_int64 (&iter), >, (int64_t) 0);
272 BSON_APPEND_INT32 (dst, key, 42);
273
274 } else if (BSON_ITER_HOLDS_DOCUMENT (&iter)) {
275 if (path) {
276 child_path = bson_strdup_printf ("%s.%s", path, key);
277 } else {
278 child_path = bson_strdup (key);
279 }
280
281 bson_iter_bson (&iter, &src_child);
282 bson_append_document_begin (dst, key, -1, &dst_child);
283 convert_command_for_test (
284 context, &src_child, &dst_child, child_path); /* recurse */
285 bson_append_document_end (dst, &dst_child);
286 bson_free (child_path);
287 } else if (BSON_ITER_HOLDS_ARRAY (&iter)) {
288 if (path) {
289 child_path = bson_strdup_printf ("%s.%s", path, key);
290 } else {
291 child_path = bson_strdup (key);
292 }
293
294 bson_iter_bson (&iter, &src_child);
295 bson_append_array_begin (dst, key, -1, &dst_child);
296 convert_command_for_test (
297 context, &src_child, &dst_child, child_path); /* recurse */
298 bson_append_array_end (dst, &dst_child);
299 bson_free (child_path);
300 } else {
301 bson_append_value (dst, key, -1, bson_iter_value (&iter));
302 }
303 }
304 }
305
306
307 static void
started_cb(const mongoc_apm_command_started_t * event)308 started_cb (const mongoc_apm_command_started_t *event)
309 {
310 context_t *context =
311 (context_t *) mongoc_apm_command_started_get_context (event);
312 int64_t operation_id;
313 char *cmd_json;
314 bson_t *events = &context->events;
315 bson_t cmd = BSON_INITIALIZER;
316 char str[16];
317 const char *key;
318 bson_t *new_event;
319
320 if (context->verbose) {
321 cmd_json = bson_as_canonical_extended_json (event->command, NULL);
322 printf ("%s\n", cmd_json);
323 fflush (stdout);
324 bson_free (cmd_json);
325 }
326
327 BSON_ASSERT (mongoc_apm_command_started_get_request_id (event) > 0);
328 BSON_ASSERT (mongoc_apm_command_started_get_server_id (event) > 0);
329 assert_host_in_uri (event->host, context->test_framework_uri);
330
331 /* subsequent events share the first event's operation id */
332 operation_id = mongoc_apm_command_started_get_operation_id (event);
333 ASSERT_CMPINT64 (operation_id, !=, (int64_t) 0);
334 if (!context->operation_id) {
335 context->operation_id = operation_id;
336 } else {
337 ASSERT_CMPINT64 (context->operation_id, ==, operation_id);
338 }
339
340 convert_command_for_test (context, event->command, &cmd, NULL);
341 new_event = BCON_NEW ("command_started_event",
342 "{",
343 "command",
344 BCON_DOCUMENT (&cmd),
345 "command_name",
346 BCON_UTF8 (event->command_name),
347 "database_name",
348 BCON_UTF8 (event->database_name),
349 "}");
350
351 bson_uint32_to_string (context->n_events, &key, str, sizeof str);
352 BSON_APPEND_DOCUMENT (events, key, new_event);
353
354 context->n_events++;
355
356 bson_destroy (new_event);
357 bson_destroy (&cmd);
358 }
359
360
361 static void
succeeded_cb(const mongoc_apm_command_succeeded_t * event)362 succeeded_cb (const mongoc_apm_command_succeeded_t *event)
363 {
364 context_t *context =
365 (context_t *) mongoc_apm_command_succeeded_get_context (event);
366 int64_t operation_id;
367 char *reply_json;
368 bson_t reply = BSON_INITIALIZER;
369 char str[16];
370 const char *key;
371 bson_t *new_event;
372
373 if (context->verbose) {
374 reply_json = bson_as_canonical_extended_json (event->reply, NULL);
375 printf ("\t\t<-- %s\n", reply_json);
376 fflush (stdout);
377 bson_free (reply_json);
378 }
379
380 BSON_ASSERT (mongoc_apm_command_succeeded_get_request_id (event) > 0);
381 BSON_ASSERT (mongoc_apm_command_succeeded_get_server_id (event) > 0);
382 assert_host_in_uri (event->host, context->test_framework_uri);
383
384 /* subsequent events share the first event's operation id */
385 operation_id = mongoc_apm_command_succeeded_get_operation_id (event);
386 ASSERT_CMPINT64 (operation_id, !=, (int64_t) 0);
387 ASSERT_CMPINT64 (context->operation_id, ==, operation_id);
388
389 convert_command_for_test (context, event->reply, &reply, NULL);
390 new_event = BCON_NEW ("command_succeeded_event",
391 "{",
392 "reply",
393 BCON_DOCUMENT (&reply),
394 "command_name",
395 BCON_UTF8 (event->command_name),
396 "}");
397
398 bson_uint32_to_string (context->n_events, &key, str, sizeof str);
399 BSON_APPEND_DOCUMENT (&context->events, key, new_event);
400
401 context->n_events++;
402
403 bson_destroy (new_event);
404 bson_destroy (&reply);
405 }
406
407
408 static void
failed_cb(const mongoc_apm_command_failed_t * event)409 failed_cb (const mongoc_apm_command_failed_t *event)
410 {
411 context_t *context =
412 (context_t *) mongoc_apm_command_failed_get_context (event);
413 int64_t operation_id;
414 bson_t reply = BSON_INITIALIZER;
415 char str[16];
416 const char *key;
417 bson_t *new_event;
418
419 if (context->verbose) {
420 printf (
421 "\t\t<-- %s FAILED: %s\n", event->command_name, event->error->message);
422 fflush (stdout);
423 }
424
425 BSON_ASSERT (mongoc_apm_command_failed_get_request_id (event) > 0);
426 BSON_ASSERT (mongoc_apm_command_failed_get_server_id (event) > 0);
427 assert_host_in_uri (event->host, context->test_framework_uri);
428
429 /* subsequent events share the first event's operation id */
430 operation_id = mongoc_apm_command_failed_get_operation_id (event);
431 ASSERT_CMPINT64 (operation_id, !=, (int64_t) 0);
432 ASSERT_CMPINT64 (context->operation_id, ==, operation_id);
433
434 new_event = BCON_NEW ("command_failed_event",
435 "{",
436 "command_name",
437 BCON_UTF8 (event->command_name),
438 "}");
439
440 bson_uint32_to_string (context->n_events, &key, str, sizeof str);
441 BSON_APPEND_DOCUMENT (&context->events, key, new_event);
442
443 context->n_events++;
444
445 bson_destroy (new_event);
446 bson_destroy (&reply);
447 }
448
449
450 static void
one_bulk_op(mongoc_bulk_operation_t * bulk,const bson_t * request)451 one_bulk_op (mongoc_bulk_operation_t *bulk, const bson_t *request)
452 {
453 bson_iter_t iter;
454 const char *request_name;
455 bson_t request_doc, document, filter, update;
456
457 bson_iter_init (&iter, request);
458 bson_iter_next (&iter);
459 request_name = bson_iter_key (&iter);
460 bson_iter_bson (&iter, &request_doc);
461
462 if (!strcmp (request_name, "insertOne")) {
463 bson_lookup_doc (&request_doc, "document", &document);
464 mongoc_bulk_operation_insert (bulk, &document);
465 } else if (!strcmp (request_name, "updateOne")) {
466 bson_lookup_doc (&request_doc, "filter", &filter);
467 bson_lookup_doc (&request_doc, "update", &update);
468 mongoc_bulk_operation_update_one (
469 bulk, &filter, &update, false /* upsert */);
470 } else {
471 test_error ("unrecognized request name %s", request_name);
472 abort ();
473 }
474 }
475
476
477 static void
test_bulk_write(mongoc_collection_t * collection,const bson_t * arguments)478 test_bulk_write (mongoc_collection_t *collection, const bson_t *arguments)
479 {
480 bool ordered;
481 mongoc_write_concern_t *wc;
482 mongoc_bulk_operation_t *bulk;
483 bson_iter_t requests_iter;
484 bson_t requests;
485 bson_t request;
486 uint32_t r;
487 bson_error_t error;
488
489 ordered = bson_lookup_bool (arguments, "ordered", true);
490
491 if (bson_has_field (arguments, "writeConcern")) {
492 wc = bson_lookup_write_concern (arguments, "writeConcern");
493 } else {
494 wc = mongoc_write_concern_new ();
495 }
496
497 if (bson_has_field (arguments, "requests")) {
498 bson_lookup_doc (arguments, "requests", &requests);
499 }
500
501 bulk = mongoc_collection_create_bulk_operation (collection, ordered, wc);
502 bson_iter_init (&requests_iter, &requests);
503 while (bson_iter_next (&requests_iter)) {
504 bson_iter_bson (&requests_iter, &request);
505 one_bulk_op (bulk, &request);
506 }
507
508 r = mongoc_bulk_operation_execute (bulk, NULL, &error);
509 ASSERT_OR_PRINT (r, error);
510
511 mongoc_bulk_operation_destroy (bulk);
512 mongoc_write_concern_destroy (wc);
513 }
514
515
516 static void
test_count(mongoc_collection_t * collection,const bson_t * arguments)517 test_count (mongoc_collection_t *collection, const bson_t *arguments)
518 {
519 bson_t filter;
520
521 bson_lookup_doc (arguments, "filter", &filter);
522 mongoc_collection_count (
523 collection, MONGOC_QUERY_NONE, &filter, 0, 0, NULL, NULL);
524 }
525
526
527 static void
test_find(mongoc_collection_t * collection,const bson_t * arguments,mongoc_read_prefs_t * read_prefs)528 test_find (mongoc_collection_t *collection,
529 const bson_t *arguments,
530 mongoc_read_prefs_t *read_prefs)
531 {
532 bson_t query;
533 bson_t filter;
534 bson_t sort;
535 uint32_t skip = 0;
536 uint32_t limit = 0;
537 uint32_t batch_size = 0;
538 bson_t modifiers;
539 mongoc_cursor_t *cursor;
540 const bson_t *doc;
541
542 bson_lookup_doc (arguments, "filter", &filter);
543
544 if (read_prefs || bson_has_field (arguments, "sort") ||
545 bson_has_field (arguments, "modifiers")) {
546 bson_init (&query);
547 BSON_APPEND_DOCUMENT (&query, "$query", &filter);
548
549 if (bson_has_field (arguments, "sort")) {
550 bson_lookup_doc (arguments, "sort", &sort);
551 BSON_APPEND_DOCUMENT (&query, "$orderby", &sort);
552 }
553
554 if (bson_has_field (arguments, "modifiers")) {
555 bson_lookup_doc (arguments, "modifiers", &modifiers);
556 bson_concat (&query, &modifiers);
557 }
558 } else {
559 bson_copy_to (&filter, &query);
560 }
561
562 if (bson_has_field (arguments, "skip")) {
563 skip = (uint32_t) bson_lookup_int64 (arguments, "skip");
564 }
565
566 if (bson_has_field (arguments, "limit")) {
567 limit = (uint32_t) bson_lookup_int64 (arguments, "limit");
568 }
569
570 if (bson_has_field (arguments, "batchSize")) {
571 batch_size = (uint32_t) bson_lookup_int64 (arguments, "batchSize");
572 }
573
574 cursor = mongoc_collection_find (collection,
575 MONGOC_QUERY_NONE,
576 skip,
577 limit,
578 batch_size,
579 &query,
580 NULL,
581 read_prefs);
582
583 BSON_ASSERT (cursor);
584 while (mongoc_cursor_next (cursor, &doc)) {
585 }
586
587 /* can cause a killCursors command */
588 mongoc_cursor_destroy (cursor);
589 bson_destroy (&query);
590 }
591
592
593 static void
test_delete_many(mongoc_collection_t * collection,const bson_t * arguments)594 test_delete_many (mongoc_collection_t *collection, const bson_t *arguments)
595 {
596 bson_t filter;
597
598 bson_lookup_doc (arguments, "filter", &filter);
599 mongoc_collection_remove (
600 collection, MONGOC_REMOVE_NONE, &filter, NULL, NULL);
601 }
602
603
604 static void
test_delete_one(mongoc_collection_t * collection,const bson_t * arguments)605 test_delete_one (mongoc_collection_t *collection, const bson_t *arguments)
606 {
607 bson_t filter;
608
609 bson_lookup_doc (arguments, "filter", &filter);
610 mongoc_collection_remove (
611 collection, MONGOC_REMOVE_SINGLE_REMOVE, &filter, NULL, NULL);
612 }
613
614
615 static void
test_insert_many(mongoc_collection_t * collection,const bson_t * arguments)616 test_insert_many (mongoc_collection_t *collection, const bson_t *arguments)
617 {
618 bool ordered;
619 mongoc_bulk_operation_t *bulk;
620 bson_t documents;
621 bson_iter_t iter;
622 bson_t doc;
623
624 ordered = bson_lookup_bool (arguments, "ordered", true);
625 bulk = mongoc_collection_create_bulk_operation (collection, ordered, NULL);
626
627 bson_lookup_doc (arguments, "documents", &documents);
628 bson_iter_init (&iter, &documents);
629 while (bson_iter_next (&iter)) {
630 bson_iter_bson (&iter, &doc);
631 mongoc_bulk_operation_insert (bulk, &doc);
632 }
633
634 mongoc_bulk_operation_execute (bulk, NULL, NULL);
635
636 mongoc_bulk_operation_destroy (bulk);
637 }
638
639
640 static void
test_insert_one(mongoc_collection_t * collection,const bson_t * arguments)641 test_insert_one (mongoc_collection_t *collection, const bson_t *arguments)
642 {
643 bson_t document;
644
645 bson_lookup_doc (arguments, "document", &document);
646 mongoc_collection_insert (
647 collection, MONGOC_INSERT_NONE, &document, NULL, NULL);
648 }
649
650
651 static void
test_update(mongoc_collection_t * collection,const bson_t * arguments,bool multi)652 test_update (mongoc_collection_t *collection,
653 const bson_t *arguments,
654 bool multi)
655 {
656 bson_t filter;
657 bson_t update;
658 mongoc_update_flags_t flags = MONGOC_UPDATE_NONE;
659
660 if (multi) {
661 flags |= MONGOC_UPDATE_MULTI_UPDATE;
662 }
663
664 if (bson_lookup_bool (arguments, "upsert", false)) {
665 flags |= MONGOC_UPDATE_UPSERT;
666 }
667
668 bson_lookup_doc (arguments, "filter", &filter);
669 bson_lookup_doc (arguments, "update", &update);
670
671 mongoc_collection_update (collection, flags, &filter, &update, NULL, NULL);
672 }
673
674
675 static void
test_update_many(mongoc_collection_t * collection,const bson_t * arguments)676 test_update_many (mongoc_collection_t *collection, const bson_t *arguments)
677 {
678 test_update (collection, arguments, true);
679 }
680
681
682 static void
test_update_one(mongoc_collection_t * collection,const bson_t * arguments)683 test_update_one (mongoc_collection_t *collection, const bson_t *arguments)
684 {
685 test_update (collection, arguments, false);
686 }
687
688
689 static void
one_test(mongoc_collection_t * collection,bson_t * test)690 one_test (mongoc_collection_t *collection, bson_t *test)
691 {
692 context_t context;
693 const char *description;
694 mongoc_apm_callbacks_t *callbacks;
695 bson_t operation;
696 bson_t arguments;
697 const char *op_name;
698 mongoc_read_prefs_t *read_prefs = NULL;
699 bson_t expectations;
700
701 context_init (&context);
702 callbacks = mongoc_apm_callbacks_new ();
703
704 if (test_suite_debug_output ()) {
705 description = bson_lookup_utf8 (test, "description");
706 printf (" - %s\n", description);
707 fflush (stdout);
708 }
709
710 if (!check_server_version (test, &context) || !check_topology_type (test)) {
711 goto done;
712 }
713
714 mongoc_apm_set_command_started_cb (callbacks, started_cb);
715 mongoc_apm_set_command_succeeded_cb (callbacks, succeeded_cb);
716 mongoc_apm_set_command_failed_cb (callbacks, failed_cb);
717 mongoc_client_set_apm_callbacks (collection->client, callbacks, &context);
718
719 bson_lookup_doc (test, "operation", &operation);
720 op_name = bson_lookup_utf8 (&operation, "name");
721 bson_lookup_doc (&operation, "arguments", &arguments);
722
723 if (bson_has_field (&operation, "read_preference")) {
724 read_prefs = bson_lookup_read_prefs (&operation, "read_preference");
725 }
726
727 if (!strcmp (op_name, "bulkWrite")) {
728 test_bulk_write (collection, &arguments);
729 } else if (!strcmp (op_name, "count")) {
730 test_count (collection, &arguments);
731 } else if (!strcmp (op_name, "find")) {
732 test_find (collection, &arguments, read_prefs);
733 } else if (!strcmp (op_name, "deleteMany")) {
734 test_delete_many (collection, &arguments);
735 } else if (!strcmp (op_name, "deleteOne")) {
736 test_delete_one (collection, &arguments);
737 } else if (!strcmp (op_name, "insertMany")) {
738 test_insert_many (collection, &arguments);
739 } else if (!strcmp (op_name, "insertOne")) {
740 test_insert_one (collection, &arguments);
741 } else if (!strcmp (op_name, "updateMany")) {
742 test_update_many (collection, &arguments);
743 } else if (!strcmp (op_name, "updateOne")) {
744 test_update_one (collection, &arguments);
745 } else {
746 test_error ("unrecognized operation name %s", op_name);
747 abort ();
748 }
749
750 bson_lookup_doc (test, "expectations", &expectations);
751 check_json_apm_events (&context.events, &expectations);
752
753 done:
754 mongoc_apm_callbacks_destroy (callbacks);
755 context_destroy (&context);
756 mongoc_read_prefs_destroy (read_prefs);
757 }
758
759
760 /*
761 *-----------------------------------------------------------------------
762 *
763 * test_command_monitoring_cb --
764 *
765 * Runs the JSON tests included with the Command Monitoring spec.
766 *
767 *-----------------------------------------------------------------------
768 */
769
770 static void
test_command_monitoring_cb(bson_t * scenario)771 test_command_monitoring_cb (bson_t *scenario)
772 {
773 mongoc_client_t *client;
774 const char *db_name;
775 const char *collection_name;
776 bson_iter_t iter;
777 bson_iter_t tests_iter;
778 bson_t test_op;
779 mongoc_collection_t *collection;
780
781 BSON_ASSERT (scenario);
782
783 db_name = bson_lookup_utf8 (scenario, "database_name");
784 collection_name = bson_lookup_utf8 (scenario, "collection_name");
785
786 BSON_ASSERT (bson_iter_init_find (&iter, scenario, "tests"));
787 BSON_ASSERT (BSON_ITER_HOLDS_ARRAY (&iter));
788 bson_iter_recurse (&iter, &tests_iter);
789
790 while (bson_iter_next (&tests_iter)) {
791 client = test_framework_client_new ();
792 collection =
793 mongoc_client_get_collection (client, db_name, collection_name);
794
795 insert_data (collection, scenario);
796 bson_iter_bson (&tests_iter, &test_op);
797 one_test (collection, &test_op);
798 mongoc_collection_destroy (collection);
799 mongoc_client_destroy (client);
800 }
801 }
802
803
804 /*
805 *-----------------------------------------------------------------------
806 *
807 * Runner for the JSON tests for command monitoring.
808 *
809 *-----------------------------------------------------------------------
810 */
811 static void
test_all_spec_tests(TestSuite * suite)812 test_all_spec_tests (TestSuite *suite)
813 {
814 char resolved[PATH_MAX];
815
816 ASSERT (realpath (JSON_DIR "/command_monitoring", resolved));
817 install_json_test_suite (suite, resolved, &test_command_monitoring_cb);
818 }
819
820
821 static void
test_get_error_failed_cb(const mongoc_apm_command_failed_t * event)822 test_get_error_failed_cb (const mongoc_apm_command_failed_t *event)
823 {
824 bson_error_t *error;
825
826 error = (bson_error_t *) mongoc_apm_command_failed_get_context (event);
827 mongoc_apm_command_failed_get_error (event, error);
828 }
829
830
831 static void
test_get_error(void)832 test_get_error (void)
833 {
834 mock_server_t *server;
835 mongoc_client_t *client;
836 mongoc_apm_callbacks_t *callbacks;
837 future_t *future;
838 request_t *request;
839 bson_error_t error = {0};
840
841 server = mock_server_with_autoismaster (0);
842 mock_server_run (server);
843
844 client = mongoc_client_new_from_uri (mock_server_get_uri (server));
845 callbacks = mongoc_apm_callbacks_new ();
846 mongoc_apm_set_command_failed_cb (callbacks, test_get_error_failed_cb);
847 mongoc_client_set_apm_callbacks (client, callbacks, (void *) &error);
848 future = future_client_command_simple (
849 client, "db", tmp_bson ("{'foo': 1}"), NULL, NULL, NULL);
850 request = mock_server_receives_command (
851 server, "db", MONGOC_QUERY_SLAVE_OK, "{'foo': 1}");
852 mock_server_replies_simple (request,
853 "{'ok': 0, 'errmsg': 'foo', 'code': 42}");
854 ASSERT (!future_get_bool (future));
855 ASSERT_ERROR_CONTAINS (error, MONGOC_ERROR_QUERY, 42, "foo");
856
857 future_destroy (future);
858 request_destroy (request);
859 mongoc_apm_callbacks_destroy (callbacks);
860 mongoc_client_destroy (client);
861 mock_server_destroy (server);
862 }
863
864
865 static void
insert_200_docs(mongoc_collection_t * collection)866 insert_200_docs (mongoc_collection_t *collection)
867 {
868 int i;
869 bson_t *doc;
870 bool r;
871 bson_error_t error;
872
873 /* insert 200 docs so we have a couple batches */
874 doc = tmp_bson (NULL);
875 for (i = 0; i < 200; i++) {
876 r = mongoc_collection_insert (
877 collection, MONGOC_INSERT_NONE, doc, NULL, &error);
878
879 ASSERT_OR_PRINT (r, error);
880 }
881 }
882
883
884 static void
increment(const mongoc_apm_command_started_t * event)885 increment (const mongoc_apm_command_started_t *event)
886 {
887 int *i = (int *) mongoc_apm_command_started_get_context (event);
888
889 ++(*i);
890 }
891
892
893 static mongoc_apm_callbacks_t *
increment_callbacks(void)894 increment_callbacks (void)
895 {
896 mongoc_apm_callbacks_t *callbacks;
897
898 callbacks = mongoc_apm_callbacks_new ();
899 mongoc_apm_set_command_started_cb (callbacks, increment);
900
901 return callbacks;
902 }
903
904
905 static void
decrement(const mongoc_apm_command_started_t * event)906 decrement (const mongoc_apm_command_started_t *event)
907 {
908 int *i = (int *) mongoc_apm_command_started_get_context (event);
909
910 --(*i);
911 }
912
913
914 static mongoc_apm_callbacks_t *
decrement_callbacks(void)915 decrement_callbacks (void)
916 {
917 mongoc_apm_callbacks_t *callbacks;
918
919 callbacks = mongoc_apm_callbacks_new ();
920 mongoc_apm_set_command_started_cb (callbacks, decrement);
921
922 return callbacks;
923 }
924
925
926 static void
test_change_callbacks(void * ctx)927 test_change_callbacks (void *ctx)
928 {
929 mongoc_apm_callbacks_t *inc_callbacks;
930 mongoc_apm_callbacks_t *dec_callbacks;
931 int incremented = 0;
932 int decremented = 0;
933 mongoc_client_t *client;
934 mongoc_collection_t *collection;
935 bson_error_t error;
936 mongoc_cursor_t *cursor;
937 const bson_t *b;
938
939 inc_callbacks = increment_callbacks ();
940 dec_callbacks = decrement_callbacks ();
941
942 client = test_framework_client_new ();
943 mongoc_client_set_apm_callbacks (client, inc_callbacks, &incremented);
944
945 collection = get_test_collection (client, "test_change_callbacks");
946
947 insert_200_docs (collection);
948 ASSERT_CMPINT (incremented, ==, 200);
949
950 mongoc_client_set_apm_callbacks (client, dec_callbacks, &decremented);
951 cursor = mongoc_collection_aggregate (
952 collection, MONGOC_QUERY_NONE, tmp_bson (NULL), NULL, NULL);
953
954 ASSERT (mongoc_cursor_next (cursor, &b));
955 ASSERT_CMPINT (decremented, ==, -1);
956
957 mongoc_client_set_apm_callbacks (client, inc_callbacks, &incremented);
958 while (mongoc_cursor_next (cursor, &b)) {
959 }
960 ASSERT_OR_PRINT (!mongoc_cursor_error (cursor, &error), error);
961 ASSERT_CMPINT (incremented, ==, 201);
962
963 mongoc_collection_drop (collection, NULL);
964
965 mongoc_cursor_destroy (cursor);
966 mongoc_collection_destroy (collection);
967 mongoc_client_destroy (client);
968 mongoc_apm_callbacks_destroy (inc_callbacks);
969 mongoc_apm_callbacks_destroy (dec_callbacks);
970 }
971
972
973 static void
test_reset_callbacks(void * ctx)974 test_reset_callbacks (void *ctx)
975 {
976 mongoc_apm_callbacks_t *inc_callbacks;
977 mongoc_apm_callbacks_t *dec_callbacks;
978 int incremented = 0;
979 int decremented = 0;
980 mongoc_client_t *client;
981 mongoc_collection_t *collection;
982 bool r;
983 bson_t *cmd;
984 bson_t cmd_reply;
985 bson_error_t error;
986 mongoc_server_description_t *sd;
987 mongoc_cursor_t *cursor;
988 const bson_t *b;
989
990 inc_callbacks = increment_callbacks ();
991 dec_callbacks = decrement_callbacks ();
992
993 client = test_framework_client_new ();
994 collection = get_test_collection (client, "test_reset_apm_callbacks");
995
996 /* insert 200 docs so we have a couple batches */
997 insert_200_docs (collection);
998
999 mongoc_client_set_apm_callbacks (client, inc_callbacks, &incremented);
1000 cmd = tmp_bson ("{'aggregate': '%s', 'pipeline': [], 'cursor': {}}",
1001 collection->collection);
1002
1003 sd =
1004 mongoc_client_select_server (client, true /* for writes */, NULL, &error);
1005 ASSERT_OR_PRINT (sd, error);
1006
1007 r = mongoc_client_read_command_with_opts (
1008 client,
1009 "test",
1010 cmd,
1011 NULL,
1012 tmp_bson ("{'serverId': %d}", sd->id),
1013 &cmd_reply,
1014 &error);
1015
1016 ASSERT_OR_PRINT (r, error);
1017 ASSERT_CMPINT (incremented, ==, 1);
1018
1019 /* reset callbacks */
1020 mongoc_client_set_apm_callbacks (client, NULL, NULL);
1021 /* destroys cmd_reply */
1022 cursor = mongoc_cursor_new_from_command_reply (client, &cmd_reply, sd->id);
1023 ASSERT (mongoc_cursor_next (cursor, &b));
1024 ASSERT_CMPINT (incremented, ==, 1); /* same value as before */
1025
1026 mongoc_client_set_apm_callbacks (client, dec_callbacks, &decremented);
1027 while (mongoc_cursor_next (cursor, &b)) {
1028 }
1029 ASSERT_OR_PRINT (!mongoc_cursor_error (cursor, &error), error);
1030 ASSERT_CMPINT (decremented, ==, -1);
1031
1032 mongoc_collection_drop (collection, NULL);
1033
1034 mongoc_cursor_destroy (cursor);
1035 mongoc_server_description_destroy (sd);
1036 mongoc_collection_destroy (collection);
1037 mongoc_client_destroy (client);
1038 mongoc_apm_callbacks_destroy (inc_callbacks);
1039 mongoc_apm_callbacks_destroy (dec_callbacks);
1040 }
1041
1042
1043 static void
test_set_callbacks_cb(const mongoc_apm_command_started_t * event)1044 test_set_callbacks_cb (const mongoc_apm_command_started_t *event)
1045 {
1046 int *n_calls = (int *) mongoc_apm_command_started_get_context (event);
1047
1048 (*n_calls)++;
1049 }
1050
1051
1052 static void
_test_set_callbacks(bool pooled)1053 _test_set_callbacks (bool pooled)
1054 {
1055 mongoc_client_t *client;
1056 mongoc_client_pool_t *pool = NULL;
1057 mongoc_apm_callbacks_t *callbacks;
1058 int n_calls = 0;
1059 bson_error_t error;
1060 bson_t b;
1061
1062 callbacks = mongoc_apm_callbacks_new ();
1063 mongoc_apm_set_command_started_cb (callbacks, test_set_callbacks_cb);
1064
1065 if (pooled) {
1066 pool = test_framework_client_pool_new ();
1067 ASSERT (mongoc_client_pool_set_apm_callbacks (
1068 pool, callbacks, (void *) &n_calls));
1069 client = mongoc_client_pool_pop (pool);
1070 } else {
1071 client = test_framework_client_new ();
1072 ASSERT (mongoc_client_set_apm_callbacks (
1073 client, callbacks, (void *) &n_calls));
1074 }
1075
1076 ASSERT_OR_PRINT (mongoc_client_get_server_status (client, NULL, &b, &error),
1077 error);
1078 ASSERT_CMPINT (1, ==, n_calls);
1079
1080 capture_logs (true);
1081
1082 if (pooled) {
1083 ASSERT (
1084 !mongoc_client_pool_set_apm_callbacks (pool, NULL, (void *) &n_calls));
1085 ASSERT_CAPTURED_LOG ("mongoc_client_pool_set_apm_callbacks",
1086 MONGOC_LOG_LEVEL_ERROR,
1087 "Can only set callbacks once");
1088
1089 clear_captured_logs ();
1090 ASSERT (
1091 !mongoc_client_set_apm_callbacks (client, NULL, (void *) &n_calls));
1092 ASSERT_CAPTURED_LOG ("mongoc_client_pool_set_apm_callbacks",
1093 MONGOC_LOG_LEVEL_ERROR,
1094 "Cannot set callbacks on a pooled client");
1095 } else {
1096 /* repeated calls ok, null is ok */
1097 ASSERT (mongoc_client_set_apm_callbacks (client, NULL, NULL));
1098 }
1099
1100 if (pooled) {
1101 mongoc_client_pool_push (pool, client);
1102 mongoc_client_pool_destroy (pool);
1103 } else {
1104 mongoc_client_destroy (client);
1105 }
1106
1107 bson_destroy (&b);
1108 mongoc_apm_callbacks_destroy (callbacks);
1109 }
1110
1111
1112 static void
test_set_callbacks_single(void)1113 test_set_callbacks_single (void)
1114 {
1115 _test_set_callbacks (false);
1116 }
1117
1118
1119 static void
test_set_callbacks_pooled(void)1120 test_set_callbacks_pooled (void)
1121 {
1122 _test_set_callbacks (true);
1123 }
1124
1125
1126 typedef struct {
1127 int64_t request_id;
1128 int64_t op_id;
1129 } ids_t;
1130
1131
1132 typedef struct {
1133 mongoc_array_t started_ids;
1134 mongoc_array_t succeeded_ids;
1135 mongoc_array_t failed_ids;
1136 int started_calls;
1137 int succeeded_calls;
1138 int failed_calls;
1139 } op_id_test_t;
1140
1141
1142 static void
op_id_test_init(op_id_test_t * test)1143 op_id_test_init (op_id_test_t *test)
1144 {
1145 _mongoc_array_init (&test->started_ids, sizeof (ids_t));
1146 _mongoc_array_init (&test->succeeded_ids, sizeof (ids_t));
1147 _mongoc_array_init (&test->failed_ids, sizeof (ids_t));
1148
1149 test->started_calls = 0;
1150 test->succeeded_calls = 0;
1151 test->failed_calls = 0;
1152 }
1153
1154
1155 static void
op_id_test_cleanup(op_id_test_t * test)1156 op_id_test_cleanup (op_id_test_t *test)
1157 {
1158 _mongoc_array_destroy (&test->started_ids);
1159 _mongoc_array_destroy (&test->succeeded_ids);
1160 _mongoc_array_destroy (&test->failed_ids);
1161 }
1162
1163
1164 static void
test_op_id_started_cb(const mongoc_apm_command_started_t * event)1165 test_op_id_started_cb (const mongoc_apm_command_started_t *event)
1166 {
1167 op_id_test_t *test;
1168 ids_t ids;
1169
1170 test = (op_id_test_t *) mongoc_apm_command_started_get_context (event);
1171 ids.request_id = mongoc_apm_command_started_get_request_id (event);
1172 ids.op_id = mongoc_apm_command_started_get_operation_id (event);
1173
1174 _mongoc_array_append_val (&test->started_ids, ids);
1175
1176 test->started_calls++;
1177 }
1178
1179
1180 static void
test_op_id_succeeded_cb(const mongoc_apm_command_succeeded_t * event)1181 test_op_id_succeeded_cb (const mongoc_apm_command_succeeded_t *event)
1182 {
1183 op_id_test_t *test;
1184 ids_t ids;
1185
1186 test = (op_id_test_t *) mongoc_apm_command_succeeded_get_context (event);
1187 ids.request_id = mongoc_apm_command_succeeded_get_request_id (event);
1188 ids.op_id = mongoc_apm_command_succeeded_get_operation_id (event);
1189
1190 _mongoc_array_append_val (&test->succeeded_ids, ids);
1191
1192 test->succeeded_calls++;
1193 }
1194
1195
1196 static void
test_op_id_failed_cb(const mongoc_apm_command_failed_t * event)1197 test_op_id_failed_cb (const mongoc_apm_command_failed_t *event)
1198 {
1199 op_id_test_t *test;
1200 ids_t ids;
1201
1202 test = (op_id_test_t *) mongoc_apm_command_failed_get_context (event);
1203 ids.request_id = mongoc_apm_command_failed_get_request_id (event);
1204 ids.op_id = mongoc_apm_command_failed_get_operation_id (event);
1205
1206 _mongoc_array_append_val (&test->failed_ids, ids);
1207
1208 test->failed_calls++;
1209 }
1210
1211
1212 #define REQUEST_ID(_event_type, _index) \
1213 _mongoc_array_index (&test._event_type##_ids, ids_t, _index).request_id
1214
1215 #define OP_ID(_event_type, _index) \
1216 _mongoc_array_index (&test._event_type##_ids, ids_t, _index).op_id
1217
1218 static void
_test_bulk_operation_id(bool pooled,bool use_bulk_operation_new)1219 _test_bulk_operation_id (bool pooled, bool use_bulk_operation_new)
1220 {
1221 mongoc_client_t *client;
1222 mongoc_client_pool_t *pool = NULL;
1223 mongoc_apm_callbacks_t *callbacks;
1224 mongoc_collection_t *collection;
1225 mongoc_bulk_operation_t *bulk;
1226 bson_error_t error;
1227 op_id_test_t test;
1228 int64_t op_id;
1229
1230 op_id_test_init (&test);
1231
1232 callbacks = mongoc_apm_callbacks_new ();
1233 mongoc_apm_set_command_started_cb (callbacks, test_op_id_started_cb);
1234 mongoc_apm_set_command_succeeded_cb (callbacks, test_op_id_succeeded_cb);
1235 mongoc_apm_set_command_failed_cb (callbacks, test_op_id_failed_cb);
1236
1237 if (pooled) {
1238 pool = test_framework_client_pool_new ();
1239 ASSERT (mongoc_client_pool_set_apm_callbacks (
1240 pool, callbacks, (void *) &test));
1241 client = mongoc_client_pool_pop (pool);
1242 } else {
1243 client = test_framework_client_new ();
1244 ASSERT (
1245 mongoc_client_set_apm_callbacks (client, callbacks, (void *) &test));
1246 }
1247
1248 collection = get_test_collection (client, "test_bulk_operation_id");
1249 if (use_bulk_operation_new) {
1250 bulk = mongoc_bulk_operation_new (false);
1251 mongoc_bulk_operation_set_client (bulk, client);
1252 mongoc_bulk_operation_set_database (bulk, collection->db);
1253 mongoc_bulk_operation_set_collection (bulk, collection->collection);
1254 } else {
1255 bulk = mongoc_collection_create_bulk_operation (collection, false, NULL);
1256 }
1257
1258 mongoc_bulk_operation_insert (bulk, tmp_bson ("{'_id': 1}"));
1259 mongoc_bulk_operation_update_one (
1260 bulk, tmp_bson ("{'_id': 1}"), tmp_bson ("{'$set': {'x': 1}}"), false);
1261 mongoc_bulk_operation_remove (bulk, tmp_bson ("{}"));
1262
1263 /* ensure we monitor with bulk->operation_id, not cluster->operation_id */
1264 client->cluster.operation_id = 42;
1265
1266 /* write errors don't trigger failed events, so we only test success */
1267 ASSERT_OR_PRINT (mongoc_bulk_operation_execute (bulk, NULL, &error), error);
1268 ASSERT_CMPINT (test.started_calls, ==, 3);
1269 ASSERT_CMPINT (test.succeeded_calls, ==, 3);
1270
1271 ASSERT_CMPINT64 (REQUEST_ID (started, 0), ==, REQUEST_ID (succeeded, 0));
1272 ASSERT_CMPINT64 (REQUEST_ID (started, 1), ==, REQUEST_ID (succeeded, 1));
1273 ASSERT_CMPINT64 (REQUEST_ID (started, 2), ==, REQUEST_ID (succeeded, 2));
1274
1275 /* 3 unique request ids */
1276 ASSERT_CMPINT64 (REQUEST_ID (started, 0), !=, REQUEST_ID (started, 1));
1277 ASSERT_CMPINT64 (REQUEST_ID (started, 0), !=, REQUEST_ID (started, 2));
1278 ASSERT_CMPINT64 (REQUEST_ID (started, 1), !=, REQUEST_ID (started, 2));
1279 ASSERT_CMPINT64 (REQUEST_ID (succeeded, 0), !=, REQUEST_ID (succeeded, 1));
1280 ASSERT_CMPINT64 (REQUEST_ID (succeeded, 0), !=, REQUEST_ID (succeeded, 2));
1281 ASSERT_CMPINT64 (REQUEST_ID (succeeded, 1), !=, REQUEST_ID (succeeded, 2));
1282
1283 /* events' operation ids all equal bulk->operation_id */
1284 op_id = bulk->operation_id;
1285 ASSERT_CMPINT64 (op_id, !=, (int64_t) 0);
1286 ASSERT_CMPINT64 (op_id, ==, OP_ID (started, 0));
1287 ASSERT_CMPINT64 (op_id, ==, OP_ID (started, 1));
1288 ASSERT_CMPINT64 (op_id, ==, OP_ID (started, 2));
1289 ASSERT_CMPINT64 (op_id, ==, OP_ID (succeeded, 0));
1290 ASSERT_CMPINT64 (op_id, ==, OP_ID (succeeded, 1));
1291 ASSERT_CMPINT64 (op_id, ==, OP_ID (succeeded, 2));
1292
1293 mongoc_bulk_operation_destroy (bulk);
1294 mongoc_collection_destroy (collection);
1295
1296 if (pooled) {
1297 mongoc_client_pool_push (pool, client);
1298 mongoc_client_pool_destroy (pool);
1299 } else {
1300 mongoc_client_destroy (client);
1301 }
1302
1303 op_id_test_cleanup (&test);
1304 mongoc_apm_callbacks_destroy (callbacks);
1305 }
1306
1307
1308 static void
test_collection_bulk_op_single(void)1309 test_collection_bulk_op_single (void)
1310 {
1311 _test_bulk_operation_id (false, false);
1312 }
1313
1314
1315 static void
test_collection_bulk_op_pooled(void)1316 test_collection_bulk_op_pooled (void)
1317 {
1318 _test_bulk_operation_id (true, false);
1319 }
1320
1321
1322 static void
test_bulk_op_single(void)1323 test_bulk_op_single (void)
1324 {
1325 _test_bulk_operation_id (false, true);
1326 }
1327
1328
1329 static void
test_bulk_op_pooled(void)1330 test_bulk_op_pooled (void)
1331 {
1332 _test_bulk_operation_id (true, true);
1333 }
1334
1335
1336 static void
_test_query_operation_id(bool pooled,bool use_cmd)1337 _test_query_operation_id (bool pooled, bool use_cmd)
1338 {
1339 mock_server_t *server;
1340 mongoc_client_t *client;
1341 mongoc_client_pool_t *pool = NULL;
1342 mongoc_apm_callbacks_t *callbacks;
1343 mongoc_collection_t *collection;
1344 op_id_test_t test;
1345 mongoc_cursor_t *cursor;
1346 const bson_t *doc;
1347 future_t *future;
1348 request_t *request;
1349 int64_t op_id;
1350
1351 op_id_test_init (&test);
1352
1353 server = mock_server_with_autoismaster (use_cmd ? 4 : 0);
1354 mock_server_run (server);
1355
1356 callbacks = mongoc_apm_callbacks_new ();
1357 mongoc_apm_set_command_started_cb (callbacks, test_op_id_started_cb);
1358 mongoc_apm_set_command_succeeded_cb (callbacks, test_op_id_succeeded_cb);
1359 mongoc_apm_set_command_failed_cb (callbacks, test_op_id_failed_cb);
1360
1361 if (pooled) {
1362 pool = mongoc_client_pool_new (mock_server_get_uri (server));
1363 ASSERT (mongoc_client_pool_set_apm_callbacks (
1364 pool, callbacks, (void *) &test));
1365 client = mongoc_client_pool_pop (pool);
1366 } else {
1367 client = mongoc_client_new_from_uri (mock_server_get_uri (server));
1368 ASSERT (
1369 mongoc_client_set_apm_callbacks (client, callbacks, (void *) &test));
1370 }
1371
1372 collection = mongoc_client_get_collection (client, "db", "collection");
1373 cursor = mongoc_collection_find (
1374 collection, MONGOC_QUERY_NONE, 0, 0, 1, tmp_bson ("{}"), NULL, NULL);
1375
1376 future = future_cursor_next (cursor, &doc);
1377 request = mock_server_receives_request (server);
1378 mock_server_replies_to_find (request,
1379 MONGOC_QUERY_SLAVE_OK,
1380 123 /* cursor id */,
1381 1,
1382 "db.collection",
1383 "{}",
1384 use_cmd);
1385
1386 ASSERT (future_get_bool (future));
1387 future_destroy (future);
1388 request_destroy (request);
1389
1390 ASSERT_CMPINT (test.started_calls, ==, 1);
1391 ASSERT_CMPINT (test.succeeded_calls, ==, 1);
1392
1393 future = future_cursor_next (cursor, &doc);
1394 request = mock_server_receives_request (server);
1395 if (use_cmd) {
1396 mock_server_replies_simple (request,
1397 "{'ok': 0, 'code': 42, 'errmsg': 'bad!'}");
1398 } else {
1399 mock_server_replies (request,
1400 MONGOC_REPLY_QUERY_FAILURE,
1401 123,
1402 0,
1403 0,
1404 "{'$err': 'uh oh', 'code': 4321}");
1405 }
1406
1407 ASSERT (!future_get_bool (future));
1408 future_destroy (future);
1409 request_destroy (request);
1410
1411 ASSERT_CMPINT (test.started_calls, ==, 2);
1412 ASSERT_CMPINT (test.succeeded_calls, ==, 1);
1413 ASSERT_CMPINT (test.failed_calls, ==, 1);
1414
1415 ASSERT_CMPINT64 (REQUEST_ID (started, 0), ==, REQUEST_ID (succeeded, 0));
1416 ASSERT_CMPINT64 (REQUEST_ID (started, 1), ==, REQUEST_ID (failed, 0));
1417
1418 /* unique request ids */
1419 ASSERT_CMPINT64 (REQUEST_ID (started, 0), !=, REQUEST_ID (started, 1));
1420
1421 /* operation ids all the same */
1422 op_id = OP_ID (started, 0);
1423 ASSERT_CMPINT64 (op_id, !=, (int64_t) 0);
1424 ASSERT_CMPINT64 (op_id, ==, OP_ID (started, 1));
1425 ASSERT_CMPINT64 (op_id, ==, OP_ID (failed, 0));
1426
1427 mock_server_destroy (server);
1428
1429 /* client logs warning because it can't send killCursors */
1430 capture_logs (true);
1431 mongoc_cursor_destroy (cursor);
1432 mongoc_collection_destroy (collection);
1433
1434 if (pooled) {
1435 mongoc_client_pool_push (pool, client);
1436 mongoc_client_pool_destroy (pool);
1437 } else {
1438 mongoc_client_destroy (client);
1439 }
1440
1441 op_id_test_cleanup (&test);
1442 mongoc_apm_callbacks_destroy (callbacks);
1443 }
1444
1445
1446 static void
test_query_operation_id_single_cmd(void)1447 test_query_operation_id_single_cmd (void)
1448 {
1449 _test_query_operation_id (false, true);
1450 }
1451
1452
1453 static void
test_query_operation_id_pooled_cmd(void)1454 test_query_operation_id_pooled_cmd (void)
1455 {
1456 _test_query_operation_id (true, true);
1457 }
1458
1459
1460 static void
test_query_operation_id_single_op_query(void)1461 test_query_operation_id_single_op_query (void)
1462 {
1463 _test_query_operation_id (false, false);
1464 }
1465
1466
1467 static void
test_query_operation_id_pooled_op_query(void)1468 test_query_operation_id_pooled_op_query (void)
1469 {
1470 _test_query_operation_id (true, false);
1471 }
1472
1473
1474 typedef struct {
1475 int started_calls;
1476 int succeeded_calls;
1477 int failed_calls;
1478 char db[100];
1479 char cmd_name[100];
1480 bson_t cmd;
1481 } cmd_test_t;
1482
1483
1484 static void
cmd_test_init(cmd_test_t * test)1485 cmd_test_init (cmd_test_t *test)
1486 {
1487 memset (test, 0, sizeof *test);
1488 bson_init (&test->cmd);
1489 }
1490
1491
1492 static void
cmd_test_cleanup(cmd_test_t * test)1493 cmd_test_cleanup (cmd_test_t *test)
1494 {
1495 bson_destroy (&test->cmd);
1496 }
1497
1498
1499 static void
cmd_started_cb(const mongoc_apm_command_started_t * event)1500 cmd_started_cb (const mongoc_apm_command_started_t *event)
1501 {
1502 cmd_test_t *test;
1503
1504 test = (cmd_test_t *) mongoc_apm_command_started_get_context (event);
1505 test->started_calls++;
1506 bson_destroy (&test->cmd);
1507 bson_strncpy (test->db,
1508 mongoc_apm_command_started_get_database_name (event),
1509 sizeof (test->db));
1510 bson_copy_to (mongoc_apm_command_started_get_command (event), &test->cmd);
1511 bson_strncpy (test->cmd_name,
1512 mongoc_apm_command_started_get_command_name (event),
1513 sizeof (test->cmd_name));
1514 }
1515
1516
1517 static void
cmd_succeeded_cb(const mongoc_apm_command_succeeded_t * event)1518 cmd_succeeded_cb (const mongoc_apm_command_succeeded_t *event)
1519 {
1520 cmd_test_t *test;
1521
1522 test = (cmd_test_t *) mongoc_apm_command_succeeded_get_context (event);
1523 test->succeeded_calls++;
1524 ASSERT_CMPSTR (test->cmd_name,
1525 mongoc_apm_command_succeeded_get_command_name (event));
1526 }
1527
1528
1529 static void
cmd_failed_cb(const mongoc_apm_command_failed_t * event)1530 cmd_failed_cb (const mongoc_apm_command_failed_t *event)
1531 {
1532 cmd_test_t *test;
1533
1534 test = (cmd_test_t *) mongoc_apm_command_failed_get_context (event);
1535 test->failed_calls++;
1536 ASSERT_CMPSTR (test->cmd_name,
1537 mongoc_apm_command_failed_get_command_name (event));
1538 }
1539
1540
1541 static void
set_cmd_test_callbacks(mongoc_client_t * client,void * context)1542 set_cmd_test_callbacks (mongoc_client_t *client, void *context)
1543 {
1544 mongoc_apm_callbacks_t *callbacks;
1545
1546 callbacks = mongoc_apm_callbacks_new ();
1547 mongoc_apm_set_command_started_cb (callbacks, cmd_started_cb);
1548 mongoc_apm_set_command_succeeded_cb (callbacks, cmd_succeeded_cb);
1549 mongoc_apm_set_command_failed_cb (callbacks, cmd_failed_cb);
1550 ASSERT (mongoc_client_set_apm_callbacks (client, callbacks, context));
1551 mongoc_apm_callbacks_destroy (callbacks);
1552 }
1553
1554
1555 static void
test_client_cmd(void)1556 test_client_cmd (void)
1557 {
1558 cmd_test_t test;
1559 mongoc_client_t *client;
1560 mongoc_cursor_t *cursor;
1561 const bson_t *reply;
1562
1563 cmd_test_init (&test);
1564 client = test_framework_client_new ();
1565 set_cmd_test_callbacks (client, (void *) &test);
1566 cursor = mongoc_client_command (client,
1567 "admin",
1568 MONGOC_QUERY_SLAVE_OK,
1569 0,
1570 0,
1571 0,
1572 tmp_bson ("{'ping': 1}"),
1573 NULL,
1574 NULL);
1575
1576 ASSERT (mongoc_cursor_next (cursor, &reply));
1577 ASSERT_CMPSTR (test.cmd_name, "ping");
1578 ASSERT_MATCH (&test.cmd, "{'ping': 1}");
1579 ASSERT_CMPSTR (test.db, "admin");
1580 ASSERT_CMPINT (1, ==, test.started_calls);
1581 ASSERT_CMPINT (1, ==, test.succeeded_calls);
1582 ASSERT_CMPINT (0, ==, test.failed_calls);
1583
1584 cmd_test_cleanup (&test);
1585 mongoc_cursor_destroy (cursor);
1586
1587 cmd_test_init (&test);
1588 cursor = mongoc_client_command (client,
1589 "admin",
1590 MONGOC_QUERY_SLAVE_OK,
1591 0,
1592 0,
1593 0,
1594 tmp_bson ("{'foo': 1}"),
1595 NULL,
1596 NULL);
1597
1598 ASSERT (!mongoc_cursor_next (cursor, &reply));
1599 ASSERT_CMPSTR (test.cmd_name, "foo");
1600 ASSERT_MATCH (&test.cmd, "{'foo': 1}");
1601 ASSERT_CMPSTR (test.db, "admin");
1602 ASSERT_CMPINT (1, ==, test.started_calls);
1603 ASSERT_CMPINT (0, ==, test.succeeded_calls);
1604 ASSERT_CMPINT (1, ==, test.failed_calls);
1605
1606 cmd_test_cleanup (&test);
1607 mongoc_cursor_destroy (cursor);
1608 mongoc_client_destroy (client);
1609 }
1610
1611
1612 static void
test_client_cmd_simple(void)1613 test_client_cmd_simple (void)
1614 {
1615 cmd_test_t test;
1616 mongoc_client_t *client;
1617 bool r;
1618 bson_error_t error;
1619
1620 cmd_test_init (&test);
1621 client = test_framework_client_new ();
1622 set_cmd_test_callbacks (client, (void *) &test);
1623 r = mongoc_client_command_simple (
1624 client, "admin", tmp_bson ("{'ping': 1}"), NULL, NULL, &error);
1625
1626 ASSERT_OR_PRINT (r, error);
1627 ASSERT_CMPSTR (test.cmd_name, "ping");
1628 ASSERT_MATCH (&test.cmd, "{'ping': 1}");
1629 ASSERT_CMPSTR (test.db, "admin");
1630 ASSERT_CMPINT (1, ==, test.started_calls);
1631 ASSERT_CMPINT (1, ==, test.succeeded_calls);
1632 ASSERT_CMPINT (0, ==, test.failed_calls);
1633
1634 cmd_test_cleanup (&test);
1635 cmd_test_init (&test);
1636 r = mongoc_client_command_simple (
1637 client, "admin", tmp_bson ("{'foo': 1}"), NULL, NULL, &error);
1638
1639 ASSERT (!r);
1640 ASSERT_CMPSTR (test.cmd_name, "foo");
1641 ASSERT_MATCH (&test.cmd, "{'foo': 1}");
1642 ASSERT_CMPSTR (test.db, "admin");
1643 ASSERT_CMPINT (1, ==, test.started_calls);
1644 ASSERT_CMPINT (0, ==, test.succeeded_calls);
1645 ASSERT_CMPINT (1, ==, test.failed_calls);
1646
1647 mongoc_client_destroy (client);
1648 cmd_test_cleanup (&test);
1649 }
1650
1651
1652 static void
test_client_cmd_op_ids(void)1653 test_client_cmd_op_ids (void)
1654 {
1655 op_id_test_t test;
1656 mongoc_client_t *client;
1657 mongoc_apm_callbacks_t *callbacks;
1658 bool r;
1659 bson_error_t error;
1660 int64_t op_id;
1661
1662 op_id_test_init (&test);
1663
1664 callbacks = mongoc_apm_callbacks_new ();
1665 mongoc_apm_set_command_started_cb (callbacks, test_op_id_started_cb);
1666 mongoc_apm_set_command_succeeded_cb (callbacks, test_op_id_succeeded_cb);
1667 mongoc_apm_set_command_failed_cb (callbacks, test_op_id_failed_cb);
1668
1669 client = test_framework_client_new ();
1670 mongoc_client_set_apm_callbacks (client, callbacks, (void *) &test);
1671
1672 r = mongoc_client_command_simple (
1673 client, "admin", tmp_bson ("{'ping': 1}"), NULL, NULL, &error);
1674
1675 ASSERT_OR_PRINT (r, error);
1676 ASSERT_CMPINT (1, ==, test.started_calls);
1677 ASSERT_CMPINT (1, ==, test.succeeded_calls);
1678 ASSERT_CMPINT (0, ==, test.failed_calls);
1679 ASSERT_CMPINT64 (REQUEST_ID (started, 0), ==, REQUEST_ID (succeeded, 0));
1680 ASSERT_CMPINT64 (OP_ID (started, 0), ==, OP_ID (succeeded, 0));
1681 op_id = OP_ID (started, 0);
1682 ASSERT_CMPINT64 (op_id, !=, (int64_t) 0);
1683
1684 op_id_test_cleanup (&test);
1685 op_id_test_init (&test);
1686
1687 /* again. test that we use a new op_id. */
1688 r = mongoc_client_command_simple (
1689 client, "admin", tmp_bson ("{'ping': 1}"), NULL, NULL, &error);
1690
1691 ASSERT_OR_PRINT (r, error);
1692 ASSERT_CMPINT (1, ==, test.started_calls);
1693 ASSERT_CMPINT (1, ==, test.succeeded_calls);
1694 ASSERT_CMPINT (0, ==, test.failed_calls);
1695 ASSERT_CMPINT64 (REQUEST_ID (started, 0), ==, REQUEST_ID (succeeded, 0));
1696 ASSERT_CMPINT64 (OP_ID (started, 0), ==, OP_ID (succeeded, 0));
1697 ASSERT_CMPINT64 (OP_ID (started, 0), !=, (int64_t) 0);
1698
1699 /* new op_id */
1700 ASSERT_CMPINT64 (OP_ID (started, 0), !=, op_id);
1701
1702 mongoc_client_destroy (client);
1703 op_id_test_cleanup (&test);
1704 mongoc_apm_callbacks_destroy (callbacks);
1705 }
1706
1707
1708 static void
test_killcursors_deprecated(void)1709 test_killcursors_deprecated (void)
1710 {
1711 cmd_test_t test;
1712 mongoc_client_t *client;
1713 bool r;
1714 bson_error_t error;
1715
1716 cmd_test_init (&test);
1717 client = test_framework_client_new ();
1718
1719 /* connect */
1720 r = mongoc_client_command_simple (
1721 client, "admin", tmp_bson ("{'ping': 1}"), NULL, NULL, &error);
1722
1723 ASSERT_OR_PRINT (r, error);
1724 set_cmd_test_callbacks (client, (void *) &test);
1725
1726 /* deprecated function without "db" or "collection", skips APM */
1727 mongoc_client_kill_cursor (client, 123);
1728
1729 ASSERT_CMPINT (0, ==, test.started_calls);
1730 ASSERT_CMPINT (0, ==, test.succeeded_calls);
1731 ASSERT_CMPINT (0, ==, test.failed_calls);
1732
1733 mongoc_client_destroy (client);
1734 cmd_test_cleanup (&test);
1735 }
1736
1737
1738 void
test_command_monitoring_install(TestSuite * suite)1739 test_command_monitoring_install (TestSuite *suite)
1740 {
1741 test_all_spec_tests (suite);
1742 TestSuite_AddMockServerTest (
1743 suite, "/command_monitoring/get_error", test_get_error);
1744 TestSuite_AddLive (suite,
1745 "/command_monitoring/set_callbacks/single",
1746 test_set_callbacks_single);
1747 TestSuite_AddLive (suite,
1748 "/command_monitoring/set_callbacks/pooled",
1749 test_set_callbacks_pooled);
1750 /* require aggregation cursor */
1751 TestSuite_AddFull (suite,
1752 "/command_monitoring/set_callbacks/change",
1753 test_change_callbacks,
1754 NULL,
1755 NULL,
1756 test_framework_skip_if_max_wire_version_less_than_1);
1757 TestSuite_AddFull (suite,
1758 "/command_monitoring/set_callbacks/reset",
1759 test_reset_callbacks,
1760 NULL,
1761 NULL,
1762 test_framework_skip_if_max_wire_version_less_than_1);
1763 TestSuite_AddLive (suite,
1764 "/command_monitoring/operation_id/bulk/collection/single",
1765 test_collection_bulk_op_single);
1766 TestSuite_AddLive (suite,
1767 "/command_monitoring/operation_id/bulk/collection/pooled",
1768 test_collection_bulk_op_pooled);
1769 TestSuite_AddLive (suite,
1770 "/command_monitoring/operation_id/bulk/new/single",
1771 test_bulk_op_single);
1772 TestSuite_AddLive (suite,
1773 "/command_monitoring/operation_id/bulk/new/pooled",
1774 test_bulk_op_pooled);
1775 TestSuite_AddMockServerTest (
1776 suite,
1777 "/command_monitoring/operation_id/query/single/cmd",
1778 test_query_operation_id_single_cmd);
1779 TestSuite_AddMockServerTest (
1780 suite,
1781 "/command_monitoring/operation_id/query/pooled/cmd",
1782 test_query_operation_id_pooled_cmd);
1783 TestSuite_AddMockServerTest (
1784 suite,
1785 "/command_monitoring/operation_id/query/single/op_query",
1786 test_query_operation_id_single_op_query);
1787 TestSuite_AddMockServerTest (
1788 suite,
1789 "/command_monitoring/operation_id/query/pooled/op_query",
1790 test_query_operation_id_pooled_op_query);
1791 TestSuite_AddLive (suite, "/command_monitoring/client_cmd", test_client_cmd);
1792 TestSuite_AddLive (
1793 suite, "/command_monitoring/client_cmd_simple", test_client_cmd_simple);
1794 TestSuite_AddLive (
1795 suite, "/command_monitoring/client_cmd/op_ids", test_client_cmd_op_ids);
1796 TestSuite_AddLive (suite,
1797 "/command_monitoring/killcursors_deprecated",
1798 test_killcursors_deprecated);
1799 }
1800