1 /*
2  * librdkafka - Apache Kafka C library
3  *
4  * Copyright (c) 2012-2013, Magnus Edenhill
5  * All rights reserved.
6  *
7  * Redistribution and use in source and binary forms, with or without
8  * modification, are permitted provided that the following conditions are met:
9  *
10  * 1. Redistributions of source code must retain the above copyright notice,
11  *    this list of conditions and the following disclaimer.
12  * 2. Redistributions in binary form must reproduce the above copyright notice,
13  *    this list of conditions and the following disclaimer in the documentation
14  *    and/or other materials provided with the distribution.
15  *
16  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
17  * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
18  * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
19  * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
20  * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
21  * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
22  * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
23  * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
24  * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
25  * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
26  * POSSIBILITY OF SUCH DAMAGE.
27  */
28 
29 
30 #define _CRT_RAND_S  // rand_s() on MSVC
31 #include <stdarg.h>
32 #include "test.h"
33 #include <signal.h>
34 #include <stdlib.h>
35 #include <stdio.h>
36 
37 #ifdef _WIN32
38 #include <direct.h> /* _getcwd */
39 #else
40 #include <sys/wait.h> /* waitpid */
41 #endif
42 
43 /* Typical include path would be <librdkafka/rdkafka.h>, but this program
44  * is built from within the librdkafka source tree and thus differs. */
45 #include "rdkafka.h"
46 
47 int test_level = 2;
48 int test_seed = 0;
49 
50 char test_mode[64] = "bare";
51 char test_scenario[64] = "default";
52 static volatile sig_atomic_t test_exit = 0;
53 static char test_topic_prefix[128] = "rdkafkatest";
54 static int  test_topic_random = 0;
55 int          tests_running_cnt = 0;
56 int          test_concurrent_max = 5;
57 int         test_assert_on_fail = 0;
58 double test_timeout_multiplier  = 1.0;
59 static char *test_sql_cmd = NULL;
60 int  test_session_timeout_ms = 6000;
61 int          test_broker_version;
62 static const char *test_broker_version_str = "2.4.0.0";
63 int          test_flags = 0;
64 int          test_neg_flags = TEST_F_KNOWN_ISSUE;
65 /* run delete-test-topics.sh between each test (when concurrent_max = 1) */
66 static int   test_delete_topics_between = 0;
67 static const char *test_git_version = "HEAD";
68 static const char *test_sockem_conf = "";
69 int          test_on_ci = 0; /* Tests are being run on CI, be more forgiving
70                               * with regards to timeouts, etc. */
71 int          test_quick = 0; /** Run tests quickly */
72 int          test_idempotent_producer = 0;
73 int          test_rusage = 0; /**< Check resource usage */
74 /**< CPU speed calibration for rusage threshold checks.
75  *   >1.0: CPU is slower than base line system,
76  *   <1.0: CPU is faster than base line system. */
77 double       test_rusage_cpu_calibration = 1.0;
78 static const char *tests_to_run = NULL; /* all */
79 static const char *subtests_to_run = NULL; /* all */
80 static const char *tests_to_skip = NULL; /* none */
81 int          test_write_report = 0; /**< Write test report file */
82 
83 static int show_summary = 1;
84 static int test_summary (int do_lock);
85 
86 /**
87  * Protects shared state, such as tests[]
88  */
89 mtx_t test_mtx;
90 cnd_t test_cnd;
91 
92 static const char *test_states[] = {
93         "DNS",
94         "SKIPPED",
95         "RUNNING",
96         "PASSED",
97         "FAILED",
98 };
99 
100 
101 
102 #define _TEST_DECL(NAME)                                                \
103         extern int main_ ## NAME (int, char **)
104 #define _TEST(NAME,FLAGS,...)						\
105         { .name = # NAME, .mainfunc = main_ ## NAME, .flags = FLAGS, __VA_ARGS__ }
106 
107 
108 /**
109  * Declare all tests here
110  */
111 _TEST_DECL(0000_unittests);
112 _TEST_DECL(0001_multiobj);
113 _TEST_DECL(0002_unkpart);
114 _TEST_DECL(0003_msgmaxsize);
115 _TEST_DECL(0004_conf);
116 _TEST_DECL(0005_order);
117 _TEST_DECL(0006_symbols);
118 _TEST_DECL(0007_autotopic);
119 _TEST_DECL(0008_reqacks);
120 _TEST_DECL(0009_mock_cluster);
121 _TEST_DECL(0011_produce_batch);
122 _TEST_DECL(0012_produce_consume);
123 _TEST_DECL(0013_null_msgs);
124 _TEST_DECL(0014_reconsume_191);
125 _TEST_DECL(0015_offsets_seek);
126 _TEST_DECL(0016_client_swname);
127 _TEST_DECL(0017_compression);
128 _TEST_DECL(0018_cgrp_term);
129 _TEST_DECL(0019_list_groups);
130 _TEST_DECL(0020_destroy_hang);
131 _TEST_DECL(0021_rkt_destroy);
132 _TEST_DECL(0022_consume_batch);
133 _TEST_DECL(0025_timers);
134 _TEST_DECL(0026_consume_pause);
135 _TEST_DECL(0028_long_topicnames);
136 _TEST_DECL(0029_assign_offset);
137 _TEST_DECL(0030_offset_commit);
138 _TEST_DECL(0031_get_offsets);
139 _TEST_DECL(0033_regex_subscribe);
140 _TEST_DECL(0033_regex_subscribe_local);
141 _TEST_DECL(0034_offset_reset);
142 _TEST_DECL(0035_api_version);
143 _TEST_DECL(0036_partial_fetch);
144 _TEST_DECL(0037_destroy_hang_local);
145 _TEST_DECL(0038_performance);
146 _TEST_DECL(0039_event_dr);
147 _TEST_DECL(0039_event_log);
148 _TEST_DECL(0039_event);
149 _TEST_DECL(0040_io_event);
150 _TEST_DECL(0041_fetch_max_bytes);
151 _TEST_DECL(0042_many_topics);
152 _TEST_DECL(0043_no_connection);
153 _TEST_DECL(0044_partition_cnt);
154 _TEST_DECL(0045_subscribe_update);
155 _TEST_DECL(0045_subscribe_update_topic_remove);
156 _TEST_DECL(0045_subscribe_update_non_exist_and_partchange);
157 _TEST_DECL(0046_rkt_cache);
158 _TEST_DECL(0047_partial_buf_tmout);
159 _TEST_DECL(0048_partitioner);
160 _TEST_DECL(0049_consume_conn_close);
161 _TEST_DECL(0050_subscribe_adds);
162 _TEST_DECL(0051_assign_adds);
163 _TEST_DECL(0052_msg_timestamps);
164 _TEST_DECL(0053_stats_timing);
165 _TEST_DECL(0053_stats);
166 _TEST_DECL(0054_offset_time);
167 _TEST_DECL(0055_producer_latency);
168 _TEST_DECL(0056_balanced_group_mt);
169 _TEST_DECL(0057_invalid_topic);
170 _TEST_DECL(0058_log);
171 _TEST_DECL(0059_bsearch);
172 _TEST_DECL(0060_op_prio);
173 _TEST_DECL(0061_consumer_lag);
174 _TEST_DECL(0062_stats_event);
175 _TEST_DECL(0063_clusterid);
176 _TEST_DECL(0064_interceptors);
177 _TEST_DECL(0065_yield);
178 _TEST_DECL(0066_plugins);
179 _TEST_DECL(0067_empty_topic);
180 _TEST_DECL(0068_produce_timeout);
181 _TEST_DECL(0069_consumer_add_parts);
182 _TEST_DECL(0070_null_empty);
183 _TEST_DECL(0072_headers_ut);
184 _TEST_DECL(0073_headers);
185 _TEST_DECL(0074_producev);
186 _TEST_DECL(0075_retry);
187 _TEST_DECL(0076_produce_retry);
188 _TEST_DECL(0077_compaction);
189 _TEST_DECL(0078_c_from_cpp);
190 _TEST_DECL(0079_fork);
191 _TEST_DECL(0080_admin_ut);
192 _TEST_DECL(0081_admin);
193 _TEST_DECL(0082_fetch_max_bytes);
194 _TEST_DECL(0083_cb_event);
195 _TEST_DECL(0084_destroy_flags_local);
196 _TEST_DECL(0084_destroy_flags);
197 _TEST_DECL(0085_headers);
198 _TEST_DECL(0086_purge_local);
199 _TEST_DECL(0086_purge_remote);
200 _TEST_DECL(0088_produce_metadata_timeout);
201 _TEST_DECL(0089_max_poll_interval);
202 _TEST_DECL(0090_idempotence);
203 _TEST_DECL(0091_max_poll_interval_timeout);
204 _TEST_DECL(0092_mixed_msgver);
205 _TEST_DECL(0093_holb_consumer);
206 _TEST_DECL(0094_idempotence_msg_timeout);
207 _TEST_DECL(0095_all_brokers_down);
208 _TEST_DECL(0097_ssl_verify);
209 _TEST_DECL(0098_consumer_txn);
210 _TEST_DECL(0099_commit_metadata);
211 _TEST_DECL(0100_thread_interceptors);
212 _TEST_DECL(0101_fetch_from_follower);
213 _TEST_DECL(0102_static_group_rebalance);
214 _TEST_DECL(0103_transactions_local);
215 _TEST_DECL(0103_transactions);
216 _TEST_DECL(0104_fetch_from_follower_mock);
217 _TEST_DECL(0105_transactions_mock);
218 _TEST_DECL(0106_cgrp_sess_timeout);
219 _TEST_DECL(0107_topic_recreate);
220 _TEST_DECL(0109_auto_create_topics);
221 _TEST_DECL(0110_batch_size);
222 _TEST_DECL(0111_delay_create_topics);
223 _TEST_DECL(0112_assign_unknown_part);
224 _TEST_DECL(0113_cooperative_rebalance_local);
225 _TEST_DECL(0113_cooperative_rebalance);
226 _TEST_DECL(0114_sticky_partitioning);
227 _TEST_DECL(0115_producer_auth);
228 _TEST_DECL(0116_kafkaconsumer_close);
229 _TEST_DECL(0117_mock_errors);
230 _TEST_DECL(0118_commit_rebalance);
231 _TEST_DECL(0119_consumer_auth);
232 _TEST_DECL(0120_asymmetric_subscription);
233 _TEST_DECL(0121_clusterid);
234 _TEST_DECL(0122_buffer_cleaning_after_rebalance);
235 _TEST_DECL(0123_connections_max_idle);
236 _TEST_DECL(0124_openssl_invalid_engine);
237 
238 /* Manual tests */
239 _TEST_DECL(8000_idle);
240 
241 
242 /* Define test resource usage thresholds if the default limits
243  * are not tolerable.
244  *
245  * Fields:
246  *  .ucpu  - Max User CPU percentage  (double)
247  *  .scpu  - Max System/Kernel CPU percentage  (double)
248  *  .rss   - Max RSS (memory) in megabytes  (double)
249  *  .ctxsw - Max number of voluntary context switches  (int)
250  *
251  * Also see test_rusage_check_thresholds() in rusage.c
252  *
253  * Make a comment in the _THRES() below why the extra thresholds are required.
254  *
255  * Usage:
256  *  _TEST(00...., ...,
257  *        _THRES(.ucpu = 15.0)),  <--  Max 15% User CPU usage
258  */
259 #define _THRES(...) .rusage_thres = { __VA_ARGS__ }
260 
261 /**
262  * Define all tests here
263  */
264 struct test tests[] = {
265         /* Special MAIN test to hold over-all timings, etc. */
266         { .name = "<MAIN>", .flags = TEST_F_LOCAL },
267         _TEST(0000_unittests, TEST_F_LOCAL,
268               /* The msgq insert order tests are heavy on
269                * user CPU (memory scan), RSS, and
270                * system CPU (lots of allocations -> madvise(2)). */
271               _THRES(.ucpu = 100.0, .scpu = 20.0, .rss = 900.0)),
272         _TEST(0001_multiobj, 0),
273         _TEST(0002_unkpart, 0),
274         _TEST(0003_msgmaxsize, 0),
275         _TEST(0004_conf, TEST_F_LOCAL),
276         _TEST(0005_order, 0),
277         _TEST(0006_symbols, TEST_F_LOCAL),
278         _TEST(0007_autotopic, 0),
279         _TEST(0008_reqacks, 0),
280         _TEST(0009_mock_cluster, TEST_F_LOCAL,
281               /* Mock cluster requires MsgVersion 2 */
282               TEST_BRKVER(0,11,0,0)),
283         _TEST(0011_produce_batch, 0,
284               /* Produces a lot of messages */
285               _THRES(.ucpu = 40.0, .scpu = 8.0)),
286         _TEST(0012_produce_consume, 0),
287         _TEST(0013_null_msgs, 0),
288         _TEST(0014_reconsume_191, 0),
289         _TEST(0015_offsets_seek, 0),
290         _TEST(0016_client_swname, 0),
291         _TEST(0017_compression, 0),
292         _TEST(0018_cgrp_term, 0, TEST_BRKVER(0,9,0,0)),
293         _TEST(0019_list_groups, 0, TEST_BRKVER(0,9,0,0)),
294         _TEST(0020_destroy_hang, 0, TEST_BRKVER(0,9,0,0)),
295         _TEST(0021_rkt_destroy, 0),
296         _TEST(0022_consume_batch, 0),
297         _TEST(0025_timers, TEST_F_LOCAL),
298 	_TEST(0026_consume_pause, TEST_F_KNOWN_ISSUE, TEST_BRKVER(0,9,0,0),
299                 .extra = "Fragile test due to #2190"),
300 	_TEST(0028_long_topicnames, TEST_F_KNOWN_ISSUE, TEST_BRKVER(0,9,0,0),
301 	      .extra = "https://github.com/edenhill/librdkafka/issues/529"),
302 	_TEST(0029_assign_offset, 0),
303 	_TEST(0030_offset_commit, 0, TEST_BRKVER(0,9,0,0),
304               /* Loops over committed() until timeout */
305               _THRES(.ucpu = 10.0, .scpu = 5.0)),
306 	_TEST(0031_get_offsets, 0),
307 	_TEST(0033_regex_subscribe, 0, TEST_BRKVER(0,9,0,0)),
308         _TEST(0033_regex_subscribe_local, TEST_F_LOCAL),
309 	_TEST(0034_offset_reset, 0),
310 	_TEST(0035_api_version, 0),
311 	_TEST(0036_partial_fetch, 0),
312 	_TEST(0037_destroy_hang_local, TEST_F_LOCAL),
313 	_TEST(0038_performance, 0,
314               /* Produces and consumes a lot of messages */
315               _THRES(.ucpu = 150.0, .scpu = 10)),
316 	_TEST(0039_event_dr, 0),
317         _TEST(0039_event_log, TEST_F_LOCAL),
318         _TEST(0039_event, TEST_F_LOCAL),
319 	_TEST(0040_io_event, 0, TEST_BRKVER(0,9,0,0)),
320 	_TEST(0041_fetch_max_bytes, 0,
321               /* Re-fetches large messages multiple times */
322               _THRES(.ucpu = 20.0, .scpu = 10.0)),
323 	_TEST(0042_many_topics, 0),
324 	_TEST(0043_no_connection, TEST_F_LOCAL),
325 	_TEST(0044_partition_cnt, 0, TEST_BRKVER(1,0,0,0),
326               /* Produces a lot of messages */
327               _THRES(.ucpu = 30.0)),
328 	_TEST(0045_subscribe_update, 0, TEST_BRKVER(0,9,0,0)),
329 	_TEST(0045_subscribe_update_topic_remove, 0,
330               TEST_BRKVER(0,9,0,0),
331               .scenario = "noautocreate"),
332         _TEST(0045_subscribe_update_non_exist_and_partchange, 0,
333               TEST_BRKVER(0,9,0,0),
334               .scenario = "noautocreate"),
335 	_TEST(0046_rkt_cache, TEST_F_LOCAL),
336 	_TEST(0047_partial_buf_tmout, TEST_F_KNOWN_ISSUE),
337 	_TEST(0048_partitioner, 0,
338               /* Produces many small messages */
339               _THRES(.ucpu = 10.0, .scpu = 5.0)),
340 #if WITH_SOCKEM
341         _TEST(0049_consume_conn_close, TEST_F_SOCKEM, TEST_BRKVER(0,9,0,0)),
342 #endif
343         _TEST(0050_subscribe_adds, 0, TEST_BRKVER(0,9,0,0)),
344         _TEST(0051_assign_adds, 0, TEST_BRKVER(0,9,0,0)),
345         _TEST(0052_msg_timestamps, 0, TEST_BRKVER(0,10,0,0)),
346         _TEST(0053_stats_timing, TEST_F_LOCAL),
347         _TEST(0053_stats, 0),
348         _TEST(0054_offset_time, 0, TEST_BRKVER(0,10,1,0)),
349         _TEST(0055_producer_latency, TEST_F_KNOWN_ISSUE_WIN32),
350         _TEST(0056_balanced_group_mt, 0, TEST_BRKVER(0,9,0,0)),
351         _TEST(0057_invalid_topic, 0, TEST_BRKVER(0,9,0,0)),
352         _TEST(0058_log, TEST_F_LOCAL),
353         _TEST(0059_bsearch, 0, TEST_BRKVER(0,10,0,0)),
354         _TEST(0060_op_prio, 0, TEST_BRKVER(0,9,0,0)),
355         _TEST(0061_consumer_lag, 0),
356         _TEST(0062_stats_event, TEST_F_LOCAL),
357         _TEST(0063_clusterid, 0, TEST_BRKVER(0,10,1,0)),
358         _TEST(0064_interceptors, 0, TEST_BRKVER(0,9,0,0)),
359         _TEST(0065_yield, 0),
360         _TEST(0066_plugins,
361               TEST_F_LOCAL|TEST_F_KNOWN_ISSUE_WIN32|TEST_F_KNOWN_ISSUE_OSX,
362               .extra = "dynamic loading of tests might not be fixed for this platform"),
363         _TEST(0067_empty_topic, 0),
364 #if WITH_SOCKEM
365         _TEST(0068_produce_timeout, TEST_F_SOCKEM),
366 #endif
367         _TEST(0069_consumer_add_parts, TEST_F_KNOWN_ISSUE_WIN32,
368               TEST_BRKVER(1,0,0,0)),
369         _TEST(0070_null_empty, 0),
370         _TEST(0072_headers_ut, TEST_F_LOCAL),
371         _TEST(0073_headers, 0, TEST_BRKVER(0,11,0,0)),
372         _TEST(0074_producev, TEST_F_LOCAL),
373 #if WITH_SOCKEM
374         _TEST(0075_retry, TEST_F_SOCKEM),
375 #endif
376         _TEST(0076_produce_retry, TEST_F_SOCKEM),
377         _TEST(0077_compaction, 0,
378               /* The test itself requires message headers */
379               TEST_BRKVER(0,11,0,0)),
380         _TEST(0078_c_from_cpp, TEST_F_LOCAL),
381         _TEST(0079_fork, TEST_F_LOCAL|TEST_F_KNOWN_ISSUE,
382               .extra = "using a fork():ed rd_kafka_t is not supported and will "
383               "most likely hang"),
384         _TEST(0080_admin_ut, TEST_F_LOCAL),
385         _TEST(0081_admin, 0, TEST_BRKVER(0,10,2,0)),
386         _TEST(0082_fetch_max_bytes, 0, TEST_BRKVER(0,10,1,0)),
387         _TEST(0083_cb_event, 0, TEST_BRKVER(0,9,0,0)),
388         _TEST(0084_destroy_flags_local, TEST_F_LOCAL),
389         _TEST(0084_destroy_flags, 0),
390         _TEST(0085_headers, 0, TEST_BRKVER(0,11,0,0)),
391         _TEST(0086_purge_local, TEST_F_LOCAL),
392         _TEST(0086_purge_remote, 0),
393 #if WITH_SOCKEM
394         _TEST(0088_produce_metadata_timeout, TEST_F_SOCKEM),
395 #endif
396         _TEST(0089_max_poll_interval, 0, TEST_BRKVER(0,10,1,0)),
397         _TEST(0090_idempotence, 0, TEST_BRKVER(0,11,0,0)),
398         _TEST(0091_max_poll_interval_timeout, 0, TEST_BRKVER(0,10,1,0)),
399         _TEST(0092_mixed_msgver, 0, TEST_BRKVER(0,11,0,0)),
400         _TEST(0093_holb_consumer, 0, TEST_BRKVER(0,10,1,0)),
401 #if WITH_SOCKEM
402         _TEST(0094_idempotence_msg_timeout, TEST_F_SOCKEM,
403               TEST_BRKVER(0,11,0,0)),
404 #endif
405         _TEST(0095_all_brokers_down, TEST_F_LOCAL),
406         _TEST(0097_ssl_verify, 0),
407         _TEST(0098_consumer_txn, 0, TEST_BRKVER(0,11,0,0)),
408         _TEST(0099_commit_metadata, 0),
409         _TEST(0100_thread_interceptors, TEST_F_LOCAL),
410         _TEST(0101_fetch_from_follower, 0, TEST_BRKVER(2,4,0,0)),
411         _TEST(0102_static_group_rebalance, 0,
412               TEST_BRKVER(2,3,0,0)),
413         _TEST(0103_transactions_local, TEST_F_LOCAL),
414         _TEST(0103_transactions, 0, TEST_BRKVER(0, 11, 0, 0),
415               .scenario = "default,ak23"),
416         _TEST(0104_fetch_from_follower_mock, TEST_F_LOCAL,
417               TEST_BRKVER(2,4,0,0)),
418         _TEST(0105_transactions_mock, TEST_F_LOCAL, TEST_BRKVER(0,11,0,0)),
419         _TEST(0106_cgrp_sess_timeout, TEST_F_LOCAL, TEST_BRKVER(0,11,0,0)),
420         _TEST(0107_topic_recreate, 0, TEST_BRKVER_TOPIC_ADMINAPI,
421               .scenario = "noautocreate"),
422         _TEST(0109_auto_create_topics, 0),
423         _TEST(0110_batch_size, 0),
424         _TEST(0111_delay_create_topics, 0, TEST_BRKVER_TOPIC_ADMINAPI,
425               .scenario = "noautocreate"),
426         _TEST(0112_assign_unknown_part, 0),
427         _TEST(0113_cooperative_rebalance_local, TEST_F_LOCAL,
428               TEST_BRKVER(2,4,0,0)),
429         _TEST(0113_cooperative_rebalance, 0, TEST_BRKVER(2,4,0,0)),
430         _TEST(0114_sticky_partitioning, 0),
431         _TEST(0115_producer_auth, 0, TEST_BRKVER(2,1,0,0)),
432         _TEST(0116_kafkaconsumer_close, TEST_F_LOCAL),
433         _TEST(0117_mock_errors, TEST_F_LOCAL),
434         _TEST(0118_commit_rebalance, 0),
435         _TEST(0119_consumer_auth, 0, TEST_BRKVER(2,1,0,0)),
436         _TEST(0120_asymmetric_subscription, TEST_F_LOCAL),
437         _TEST(0121_clusterid, TEST_F_LOCAL),
438         _TEST(0122_buffer_cleaning_after_rebalance, TEST_BRKVER(2,4,0,0)),
439         _TEST(0123_connections_max_idle, 0),
440         _TEST(0124_openssl_invalid_engine, TEST_F_LOCAL),
441 
442         /* Manual tests */
443         _TEST(8000_idle, TEST_F_MANUAL),
444 
445         { NULL }
446 };
447 
448 
449 RD_TLS struct test *test_curr = &tests[0];
450 
451 
452 
453 #if WITH_SOCKEM
454 /**
455  * Socket network emulation with sockem
456  */
457 
test_socket_add(struct test * test,sockem_t * skm)458 static void test_socket_add (struct test *test, sockem_t *skm) {
459         TEST_LOCK();
460         rd_list_add(&test->sockets, skm);
461         TEST_UNLOCK();
462 }
463 
test_socket_del(struct test * test,sockem_t * skm,int do_lock)464 static void test_socket_del (struct test *test, sockem_t *skm, int do_lock) {
465         if (do_lock)
466                 TEST_LOCK();
467         /* Best effort, skm might not have been added if connect_cb failed */
468         rd_list_remove(&test->sockets, skm);
469         if (do_lock)
470                 TEST_UNLOCK();
471 }
472 
test_socket_sockem_set_all(const char * key,int val)473 int test_socket_sockem_set_all (const char *key, int val) {
474         int i;
475         sockem_t *skm;
476         int cnt = 0;
477 
478         TEST_LOCK();
479 
480         cnt = rd_list_cnt(&test_curr->sockets);
481         TEST_SAY("Setting sockem %s=%d on %s%d socket(s)\n", key, val,
482                  cnt > 0 ? "" : _C_RED, cnt);
483 
484         RD_LIST_FOREACH(skm, &test_curr->sockets, i) {
485                 if (sockem_set(skm, key, val, NULL) == -1)
486                         TEST_FAIL("sockem_set(%s, %d) failed", key, val);
487         }
488 
489         TEST_UNLOCK();
490 
491         return cnt;
492 }
493 
test_socket_sockem_set(int s,const char * key,int value)494 void test_socket_sockem_set (int s, const char *key, int value) {
495         sockem_t *skm;
496 
497         TEST_LOCK();
498         skm = sockem_find(s);
499         if (skm)
500                 sockem_set(skm, key, value, NULL);
501         TEST_UNLOCK();
502 }
503 
test_socket_close_all(struct test * test,int reinit)504 void test_socket_close_all (struct test *test, int reinit) {
505         TEST_LOCK();
506         rd_list_destroy(&test->sockets);
507         if (reinit)
508                 rd_list_init(&test->sockets, 16, (void *)sockem_close);
509         TEST_UNLOCK();
510 }
511 
512 
test_connect_cb(int s,const struct sockaddr * addr,int addrlen,const char * id,void * opaque)513 static int test_connect_cb (int s, const struct sockaddr *addr,
514                             int addrlen, const char *id, void *opaque) {
515         struct test *test = opaque;
516         sockem_t *skm;
517         int r;
518 
519         skm = sockem_connect(s, addr, addrlen, test_sockem_conf, 0, NULL);
520         if (!skm)
521                 return errno;
522 
523         if (test->connect_cb) {
524                 r = test->connect_cb(test, skm, id);
525                 if (r)
526                         return r;
527         }
528 
529         test_socket_add(test, skm);
530 
531         return 0;
532 }
533 
test_closesocket_cb(int s,void * opaque)534 static int test_closesocket_cb (int s, void *opaque) {
535         struct test *test = opaque;
536         sockem_t *skm;
537 
538         TEST_LOCK();
539         skm = sockem_find(s);
540         if (skm) {
541                 /* Close sockem's sockets */
542                 sockem_close(skm);
543                 test_socket_del(test, skm, 0/*nolock*/);
544         }
545         TEST_UNLOCK();
546 
547         /* Close librdkafka's socket */
548 #ifdef _WIN32
549         closesocket(s);
550 #else
551         close(s);
552 #endif
553 
554         return 0;
555 }
556 
557 
test_socket_enable(rd_kafka_conf_t * conf)558 void test_socket_enable (rd_kafka_conf_t *conf) {
559         rd_kafka_conf_set_connect_cb(conf, test_connect_cb);
560         rd_kafka_conf_set_closesocket_cb(conf, test_closesocket_cb);
561 	rd_kafka_conf_set_opaque(conf, test_curr);
562 }
563 #endif /* WITH_SOCKEM */
564 
565 /**
566  * @brief For use as the is_fatal_cb(), treating no errors as test-fatal.
567  */
test_error_is_not_fatal_cb(rd_kafka_t * rk,rd_kafka_resp_err_t err,const char * reason)568 int test_error_is_not_fatal_cb (rd_kafka_t *rk, rd_kafka_resp_err_t err,
569                                 const char *reason) {
570         return 0;
571 }
572 
test_error_cb(rd_kafka_t * rk,int err,const char * reason,void * opaque)573 static void test_error_cb (rd_kafka_t *rk, int err,
574 			   const char *reason, void *opaque) {
575         if (test_curr->is_fatal_cb && !test_curr->is_fatal_cb(rk, err, reason)) {
576                 TEST_SAY(_C_YEL "%s rdkafka error (non-testfatal): %s: %s\n",
577                          rd_kafka_name(rk), rd_kafka_err2str(err), reason);
578         } else {
579                 if (err == RD_KAFKA_RESP_ERR__FATAL) {
580                         char errstr[512];
581                         TEST_SAY(_C_RED "%s Fatal error: %s\n",
582                                  rd_kafka_name(rk), reason);
583 
584                         err = rd_kafka_fatal_error(rk, errstr, sizeof(errstr));
585 
586                         if (test_curr->is_fatal_cb &&
587                             !test_curr->is_fatal_cb(rk,  err, reason))
588                                 TEST_SAY(_C_YEL
589                                          "%s rdkafka ignored FATAL error: "
590                                          "%s: %s\n",
591                                          rd_kafka_name(rk),
592                                          rd_kafka_err2str(err), errstr);
593                         else
594                                 TEST_FAIL("%s rdkafka FATAL error: %s: %s",
595                                           rd_kafka_name(rk),
596                                           rd_kafka_err2str(err), errstr);
597 
598                 } else {
599                         TEST_FAIL("%s rdkafka error: %s: %s",
600                                   rd_kafka_name(rk),
601                                   rd_kafka_err2str(err), reason);
602                 }
603         }
604 }
605 
test_stats_cb(rd_kafka_t * rk,char * json,size_t json_len,void * opaque)606 static int test_stats_cb (rd_kafka_t *rk, char *json, size_t json_len,
607                            void *opaque) {
608         struct test *test = test_curr;
609         if (test->stats_fp)
610                 fprintf(test->stats_fp,
611                         "{\"test\": \"%s\", \"instance\":\"%s\", "
612                         "\"stats\": %s}\n",
613                         test->name, rd_kafka_name(rk), json);
614         return 0;
615 }
616 
617 
618 /**
619  * @brief Limit the test run time (in seconds)
620  */
test_timeout_set(int timeout)621 void test_timeout_set (int timeout) {
622 	TEST_LOCK();
623         TEST_SAY("Setting test timeout to %ds * %.1f\n",
624 		 timeout, test_timeout_multiplier);
625 	timeout = (int)((double)timeout * test_timeout_multiplier);
626 	test_curr->timeout = test_clock() + (timeout * 1000000);
627 	TEST_UNLOCK();
628 }
629 
tmout_multip(int msecs)630 int tmout_multip (int msecs) {
631         int r;
632         TEST_LOCK();
633         r = (int)(((double)(msecs)) * test_timeout_multiplier);
634         TEST_UNLOCK();
635         return r;
636 }
637 
638 
639 
640 #ifdef _WIN32
test_init_win32(void)641 static void test_init_win32 (void) {
642         /* Enable VT emulation to support colored output. */
643         HANDLE hOut = GetStdHandle(STD_OUTPUT_HANDLE);
644         DWORD dwMode = 0;
645 
646         if (hOut == INVALID_HANDLE_VALUE ||
647             !GetConsoleMode(hOut, &dwMode))
648                 return;
649 
650 #ifndef ENABLE_VIRTUAL_TERMINAL_PROCESSING
651 #define ENABLE_VIRTUAL_TERMINAL_PROCESSING 0x4
652 #endif
653         dwMode |= ENABLE_VIRTUAL_TERMINAL_PROCESSING;
654         SetConsoleMode(hOut, dwMode);
655 }
656 #endif
657 
658 
test_init(void)659 static void test_init (void) {
660         int seed;
661         const char *tmp;
662 
663 
664         if (test_seed)
665                 return;
666 
667         if ((tmp = test_getenv("TEST_LEVEL", NULL)))
668                 test_level = atoi(tmp);
669         if ((tmp = test_getenv("TEST_MODE", NULL)))
670                 strncpy(test_mode, tmp, sizeof(test_mode)-1);
671         if ((tmp = test_getenv("TEST_SCENARIO", NULL)))
672                 strncpy(test_scenario, tmp, sizeof(test_scenario)-1);
673         if ((tmp = test_getenv("TEST_SOCKEM", NULL)))
674                 test_sockem_conf = tmp;
675         if ((tmp = test_getenv("TEST_SEED", NULL)))
676                 seed = atoi(tmp);
677         else
678                 seed = test_clock() & 0xffffffff;
679         if ((tmp = test_getenv("TEST_CPU_CALIBRATION", NULL))) {
680                 test_rusage_cpu_calibration = strtod(tmp, NULL);
681                 if (test_rusage_cpu_calibration < 0.00001) {
682                         fprintf(stderr,
683                                 "%% Invalid CPU calibration "
684                                 "value (from TEST_CPU_CALIBRATION env): %s\n",
685                                 tmp);
686                         exit(1);
687                 }
688         }
689 
690 #ifdef _WIN32
691         test_init_win32();
692 	{
693 		LARGE_INTEGER cycl;
694 		QueryPerformanceCounter(&cycl);
695 		seed = (int)cycl.QuadPart;
696 	}
697 #endif
698 	srand(seed);
699 	test_seed = seed;
700 }
701 
702 
test_mk_topic_name(const char * suffix,int randomized)703 const char *test_mk_topic_name (const char *suffix, int randomized) {
704         static RD_TLS char ret[512];
705 
706         /* Strip main_ prefix (caller is using __FUNCTION__) */
707         if (!strncmp(suffix, "main_", 5))
708                 suffix += 5;
709 
710         if (test_topic_random || randomized)
711                 rd_snprintf(ret, sizeof(ret), "%s_rnd%"PRIx64"_%s",
712                          test_topic_prefix, test_id_generate(), suffix);
713         else
714                 rd_snprintf(ret, sizeof(ret), "%s_%s", test_topic_prefix, suffix);
715 
716         TEST_SAY("Using topic \"%s\"\n", ret);
717 
718         return ret;
719 }
720 
721 
722 /**
723  * @brief Set special test config property
724  * @returns 1 if property was known, else 0.
725  */
test_set_special_conf(const char * name,const char * val,int * timeoutp)726 int test_set_special_conf (const char *name, const char *val, int *timeoutp) {
727         if (!strcmp(name, "test.timeout.multiplier")) {
728                 TEST_LOCK();
729                 test_timeout_multiplier = strtod(val, NULL);
730                 TEST_UNLOCK();
731                 *timeoutp = tmout_multip((*timeoutp)*1000) / 1000;
732         } else if (!strcmp(name, "test.topic.prefix")) {
733                 rd_snprintf(test_topic_prefix, sizeof(test_topic_prefix),
734                             "%s", val);
735         } else if (!strcmp(name, "test.topic.random")) {
736                 if (!strcmp(val, "true") ||
737                     !strcmp(val, "1"))
738                         test_topic_random = 1;
739                 else
740                         test_topic_random = 0;
741         } else if (!strcmp(name, "test.concurrent.max")) {
742                 TEST_LOCK();
743                 test_concurrent_max = (int)strtod(val, NULL);
744                 TEST_UNLOCK();
745         } else if (!strcmp(name, "test.sql.command")) {
746                 TEST_LOCK();
747                 if (test_sql_cmd)
748                         rd_free(test_sql_cmd);
749                 test_sql_cmd = rd_strdup(val);
750                 TEST_UNLOCK();
751         } else
752                 return 0;
753 
754         return 1;
755 }
756 
test_read_conf_file(const char * conf_path,rd_kafka_conf_t * conf,rd_kafka_topic_conf_t * topic_conf,int * timeoutp)757 static void test_read_conf_file (const char *conf_path,
758                                  rd_kafka_conf_t *conf,
759                                  rd_kafka_topic_conf_t *topic_conf,
760                                  int *timeoutp) {
761         FILE *fp;
762 	char buf[1024];
763 	int line = 0;
764 
765 #ifndef _WIN32
766 	fp = fopen(conf_path, "r");
767 #else
768 	fp = NULL;
769 	errno = fopen_s(&fp, conf_path, "r");
770 #endif
771 	if (!fp) {
772 		if (errno == ENOENT) {
773 			TEST_SAY("Test config file %s not found\n", conf_path);
774                         return;
775 		} else
776 			TEST_FAIL("Failed to read %s: %s",
777 				  conf_path, strerror(errno));
778 	}
779 
780 	while (fgets(buf, sizeof(buf)-1, fp)) {
781 		char *t;
782 		char *b = buf;
783 		rd_kafka_conf_res_t res = RD_KAFKA_CONF_UNKNOWN;
784 		char *name, *val;
785                 char errstr[512];
786 
787 		line++;
788 		if ((t = strchr(b, '\n')))
789 			*t = '\0';
790 
791 		if (*b == '#' || !*b)
792 			continue;
793 
794 		if (!(t = strchr(b, '=')))
795 			TEST_FAIL("%s:%i: expected name=value format\n",
796 				  conf_path, line);
797 
798 		name = b;
799 		*t = '\0';
800 		val = t+1;
801 
802                 if (test_set_special_conf(name, val, timeoutp))
803                         continue;
804 
805                 if (!strncmp(name, "topic.", strlen("topic."))) {
806 			name += strlen("topic.");
807                         if (topic_conf)
808                                 res = rd_kafka_topic_conf_set(topic_conf,
809                                                               name, val,
810                                                               errstr,
811                                                               sizeof(errstr));
812                         else
813                                 res = RD_KAFKA_CONF_OK;
814                         name -= strlen("topic.");
815                 }
816 
817                 if (res == RD_KAFKA_CONF_UNKNOWN) {
818                         if (conf)
819                                 res = rd_kafka_conf_set(conf,
820                                                         name, val,
821                                                         errstr, sizeof(errstr));
822                         else
823                                 res = RD_KAFKA_CONF_OK;
824                 }
825 
826 		if (res != RD_KAFKA_CONF_OK)
827 			TEST_FAIL("%s:%i: %s\n",
828 				  conf_path, line, errstr);
829 	}
830 
831 	fclose(fp);
832 }
833 
834 /**
835  * @brief Get path to test config file
836  */
test_conf_get_path(void)837 const char *test_conf_get_path (void) {
838         return test_getenv("RDKAFKA_TEST_CONF", "test.conf");
839 }
840 
test_getenv(const char * env,const char * def)841 const char *test_getenv (const char *env, const char *def) {
842         return rd_getenv(env, def);
843 }
844 
test_conf_common_init(rd_kafka_conf_t * conf,int timeout)845 void test_conf_common_init (rd_kafka_conf_t *conf, int timeout) {
846         if (conf) {
847                 const char *tmp = test_getenv("TEST_DEBUG", NULL);
848                 if (tmp)
849                         test_conf_set(conf, "debug", tmp);
850         }
851 
852         if (timeout)
853                 test_timeout_set(timeout);
854 }
855 
856 
857 /**
858  * Creates and sets up kafka configuration objects.
859  * Will read "test.conf" file if it exists.
860  */
test_conf_init(rd_kafka_conf_t ** conf,rd_kafka_topic_conf_t ** topic_conf,int timeout)861 void test_conf_init (rd_kafka_conf_t **conf, rd_kafka_topic_conf_t **topic_conf,
862                      int timeout) {
863         const char *test_conf = test_conf_get_path();
864 
865         if (conf) {
866                 *conf = rd_kafka_conf_new();
867                 rd_kafka_conf_set(*conf, "client.id", test_curr->name, NULL, 0);
868                 if (test_idempotent_producer)
869                         test_conf_set(*conf, "enable.idempotence", "true");
870                 rd_kafka_conf_set_error_cb(*conf, test_error_cb);
871                 rd_kafka_conf_set_stats_cb(*conf, test_stats_cb);
872 
873                 /* Allow higher request timeouts on CI */
874                 if (test_on_ci)
875                         test_conf_set(*conf, "request.timeout.ms", "10000");
876 
877 #ifdef SIGIO
878 		{
879 			char buf[64];
880 
881 			/* Quick termination */
882 			rd_snprintf(buf, sizeof(buf), "%i", SIGIO);
883 			rd_kafka_conf_set(*conf, "internal.termination.signal",
884 					  buf, NULL, 0);
885 			signal(SIGIO, SIG_IGN);
886 		}
887 #endif
888         }
889 
890 #if WITH_SOCKEM
891         if (*test_sockem_conf && conf)
892                 test_socket_enable(*conf);
893 #endif
894 
895 	if (topic_conf)
896 		*topic_conf = rd_kafka_topic_conf_new();
897 
898 	/* Open and read optional local test configuration file, if any. */
899         test_read_conf_file(test_conf,
900                             conf ? *conf : NULL,
901                             topic_conf ? *topic_conf : NULL, &timeout);
902 
903         test_conf_common_init(conf ? *conf : NULL, timeout);
904 }
905 
906 
test_rand(void)907 static RD_INLINE unsigned int test_rand(void) {
908 	unsigned int r;
909 #ifdef _WIN32
910 	rand_s(&r);
911 #else
912 	r = rand();
913 #endif
914 	return r;
915 }
916 /**
917  * Generate a "unique" test id.
918  */
test_id_generate(void)919 uint64_t test_id_generate (void) {
920 	return (((uint64_t)test_rand()) << 32) | (uint64_t)test_rand();
921 }
922 
923 
924 /**
925  * Generate a "unique" string id
926  */
test_str_id_generate(char * dest,size_t dest_size)927 char *test_str_id_generate (char *dest, size_t dest_size) {
928         rd_snprintf(dest, dest_size, "%"PRId64, test_id_generate());
929 	return dest;
930 }
931 
932 /**
933  * Same as test_str_id_generate but returns a temporary string.
934  */
test_str_id_generate_tmp(void)935 const char *test_str_id_generate_tmp (void) {
936 	static RD_TLS char ret[64];
937 	return test_str_id_generate(ret, sizeof(ret));
938 }
939 
940 /**
941  * Format a message token.
942  * Pad's to dest_size.
943  */
test_msg_fmt(char * dest,size_t dest_size,uint64_t testid,int32_t partition,int msgid)944 void test_msg_fmt (char *dest, size_t dest_size,
945                    uint64_t testid, int32_t partition, int msgid) {
946         size_t of;
947 
948         of = rd_snprintf(dest, dest_size,
949                          "testid=%"PRIu64", partition=%"PRId32", msg=%i\n",
950                          testid, partition, msgid);
951         if (of < dest_size - 1) {
952                 memset(dest+of, '!', dest_size-of);
953                 dest[dest_size-1] = '\0';
954         }
955 }
956 
957 /**
958  * @brief Prepare message value and key for test produce.
959  */
test_prepare_msg(uint64_t testid,int32_t partition,int msg_id,char * val,size_t val_size,char * key,size_t key_size)960 void test_prepare_msg (uint64_t testid, int32_t partition, int msg_id,
961                        char *val, size_t val_size,
962                        char *key, size_t key_size) {
963         size_t of = 0;
964 
965         test_msg_fmt(key, key_size, testid, partition, msg_id);
966 
967         while (of < val_size) {
968                 /* Copy-repeat key into val until val_size */
969                 size_t len = RD_MIN(val_size-of, key_size);
970                 memcpy(val+of, key, len);
971                 of += len;
972         }
973 }
974 
975 
976 
977 /**
978  * Parse a message token
979  */
test_msg_parse00(const char * func,int line,uint64_t testid,int32_t exp_partition,int * msgidp,const char * topic,int32_t partition,int64_t offset,const char * key,size_t key_size)980 void test_msg_parse00 (const char *func, int line,
981                        uint64_t testid, int32_t exp_partition, int *msgidp,
982                        const char *topic, int32_t partition, int64_t offset,
983                        const char *key, size_t key_size) {
984         char buf[128];
985         uint64_t in_testid;
986         int in_part;
987 
988         if (!key)
989                 TEST_FAIL("%s:%i: Message (%s [%"PRId32"] @ %"PRId64") "
990                           "has empty key\n",
991                           func, line, topic, partition, offset);
992 
993         rd_snprintf(buf, sizeof(buf), "%.*s", (int)key_size, key);
994 
995         if (sscanf(buf, "testid=%"SCNu64", partition=%i, msg=%i\n",
996                    &in_testid, &in_part, msgidp) != 3)
997                 TEST_FAIL("%s:%i: Incorrect key format: %s", func, line, buf);
998 
999 
1000         if (testid != in_testid ||
1001             (exp_partition != -1 && exp_partition != in_part))
1002                 TEST_FAIL("%s:%i: Our testid %"PRIu64", part %i did "
1003                           "not match message: \"%s\"\n",
1004                   func, line, testid, (int)exp_partition, buf);
1005 }
1006 
test_msg_parse0(const char * func,int line,uint64_t testid,rd_kafka_message_t * rkmessage,int32_t exp_partition,int * msgidp)1007 void test_msg_parse0 (const char *func, int line,
1008                       uint64_t testid, rd_kafka_message_t *rkmessage,
1009                       int32_t exp_partition, int *msgidp) {
1010         test_msg_parse00(func, line, testid, exp_partition, msgidp,
1011                          rd_kafka_topic_name(rkmessage->rkt),
1012                          rkmessage->partition, rkmessage->offset,
1013                          (const char *)rkmessage->key, rkmessage->key_len);
1014 }
1015 
1016 
1017 struct run_args {
1018         struct test *test;
1019         int argc;
1020         char **argv;
1021 };
1022 
run_test0(struct run_args * run_args)1023 static int run_test0 (struct run_args *run_args) {
1024         struct test *test = run_args->test;
1025 	test_timing_t t_run;
1026 	int r;
1027         char stats_file[256];
1028 
1029         rd_snprintf(stats_file, sizeof(stats_file), "stats_%s_%"PRIu64".json",
1030                     test->name, test_id_generate());
1031         if (!(test->stats_fp = fopen(stats_file, "w+")))
1032                 TEST_SAY("=== Failed to create stats file %s: %s ===\n",
1033                          stats_file, strerror(errno));
1034 
1035 	test_curr = test;
1036 
1037 #if WITH_SOCKEM
1038         rd_list_init(&test->sockets, 16, (void *)sockem_close);
1039 #endif
1040         /* Don't check message status by default */
1041         test->exp_dr_status = (rd_kafka_msg_status_t)-1;
1042 
1043 	TEST_SAY("================= Running test %s =================\n",
1044 		 test->name);
1045         if (test->stats_fp)
1046                 TEST_SAY("==== Stats written to file %s ====\n", stats_file);
1047 
1048         test_rusage_start(test_curr);
1049 	TIMING_START(&t_run, "%s", test->name);
1050         test->start = t_run.ts_start;
1051 
1052         /* Run test main function */
1053 	r = test->mainfunc(run_args->argc, run_args->argv);
1054 
1055         TIMING_STOP(&t_run);
1056         test_rusage_stop(test_curr,
1057                          (double)TIMING_DURATION(&t_run) / 1000000.0);
1058 
1059         TEST_LOCK();
1060         test->duration = TIMING_DURATION(&t_run);
1061 
1062 	if (test->state == TEST_SKIPPED) {
1063 		TEST_SAY("================= Test %s SKIPPED "
1064 			 "=================\n",
1065                          run_args->test->name);
1066 	} else if (r) {
1067                 test->state = TEST_FAILED;
1068 		TEST_SAY("\033[31m"
1069 			 "================= Test %s FAILED ================="
1070 			 "\033[0m\n",
1071                          run_args->test->name);
1072         } else {
1073                 test->state = TEST_PASSED;
1074 		TEST_SAY("\033[32m"
1075 			 "================= Test %s PASSED ================="
1076 			 "\033[0m\n",
1077                          run_args->test->name);
1078         }
1079         TEST_UNLOCK();
1080 
1081         cnd_broadcast(&test_cnd);
1082 
1083 #if WITH_SOCKEM
1084         test_socket_close_all(test, 0);
1085 #endif
1086 
1087         if (test->stats_fp) {
1088                 long pos = ftell(test->stats_fp);
1089                 fclose(test->stats_fp);
1090                 test->stats_fp = NULL;
1091                 /* Delete file if nothing was written */
1092                 if (pos == 0) {
1093 #ifndef _WIN32
1094                         unlink(stats_file);
1095 #else
1096                         _unlink(stats_file);
1097 #endif
1098                 }
1099         }
1100 
1101         if (test_delete_topics_between && test_concurrent_max == 1)
1102                 test_delete_all_test_topics(60*1000);
1103 
1104 	return r;
1105 }
1106 
1107 
1108 
1109 
run_test_from_thread(void * arg)1110 static int run_test_from_thread (void *arg) {
1111         struct run_args *run_args = arg;
1112 
1113 	thrd_detach(thrd_current());
1114 
1115 	run_test0(run_args);
1116 
1117         TEST_LOCK();
1118         tests_running_cnt--;
1119         TEST_UNLOCK();
1120 
1121         free(run_args);
1122 
1123         return 0;
1124 }
1125 
1126 
1127 /**
1128  * @brief Check running tests for timeouts.
1129  * @locks TEST_LOCK MUST be held
1130  */
check_test_timeouts(void)1131 static void check_test_timeouts (void) {
1132         int64_t now = test_clock();
1133         struct test *test;
1134 
1135         for (test = tests ; test->name ; test++) {
1136                 if (test->state != TEST_RUNNING)
1137                         continue;
1138 
1139                 /* Timeout check */
1140                 if (now > test->timeout) {
1141                         struct test *save_test = test_curr;
1142                         test_curr = test;
1143                         test->state = TEST_FAILED;
1144                         test_summary(0/*no-locks*/);
1145                         TEST_FAIL0(__FILE__,__LINE__,0/*nolock*/,
1146                                    0/*fail-later*/,
1147                                    "Test %s%s%s%s timed out "
1148                                    "(timeout set to %d seconds)\n",
1149                                    test->name,
1150                                    *test->subtest ? " (" : "",
1151                                    test->subtest,
1152                                    *test->subtest ? ")" : "",
1153                                    (int)(test->timeout-
1154                                          test->start)/
1155                                    1000000);
1156                         test_curr = save_test;
1157                         tests_running_cnt--; /* fail-later misses this*/
1158 #ifdef _WIN32
1159                         TerminateThread(test->thrd, -1);
1160 #else
1161                         pthread_kill(test->thrd, SIGKILL);
1162 #endif
1163                 }
1164         }
1165 }
1166 
1167 
run_test(struct test * test,int argc,char ** argv)1168 static int run_test (struct test *test, int argc, char **argv) {
1169         struct run_args *run_args = calloc(1, sizeof(*run_args));
1170         int wait_cnt = 0;
1171 
1172         run_args->test = test;
1173         run_args->argc = argc;
1174         run_args->argv = argv;
1175 
1176         TEST_LOCK();
1177         while (tests_running_cnt >= test_concurrent_max) {
1178                 if (!(wait_cnt++ % 100))
1179                         TEST_SAY("Too many tests running (%d >= %d): "
1180                                  "postponing %s start...\n",
1181                                  tests_running_cnt, test_concurrent_max,
1182                                  test->name);
1183                 cnd_timedwait_ms(&test_cnd, &test_mtx, 100);
1184 
1185                 check_test_timeouts();
1186         }
1187         tests_running_cnt++;
1188         test->timeout = test_clock() + (int64_t)(30.0 * 1000000.0 *
1189                                                  test_timeout_multiplier);
1190         test->state = TEST_RUNNING;
1191         TEST_UNLOCK();
1192 
1193         if (thrd_create(&test->thrd, run_test_from_thread, run_args) !=
1194             thrd_success) {
1195                 TEST_LOCK();
1196                 tests_running_cnt--;
1197                 test->state = TEST_FAILED;
1198                 TEST_UNLOCK();
1199 
1200                 TEST_FAIL("Failed to start thread for test %s\n",
1201                           test->name);
1202         }
1203 
1204         return 0;
1205 }
1206 
run_tests(int argc,char ** argv)1207 static void run_tests (int argc, char **argv) {
1208         struct test *test;
1209 
1210         for (test = tests ; test->name ; test++) {
1211                 char testnum[128];
1212                 char *t;
1213                 const char *skip_reason = NULL;
1214                 rd_bool_t skip_silent = rd_false;
1215 		char tmp[128];
1216                 const char *scenario =
1217                         test->scenario ? test->scenario : "default";
1218 
1219                 if (!test->mainfunc)
1220                         continue;
1221 
1222                 /* Extract test number, as string */
1223                 strncpy(testnum, test->name, sizeof(testnum)-1);
1224                 testnum[sizeof(testnum)-1] = '\0';
1225                 if ((t = strchr(testnum, '_')))
1226                         *t = '\0';
1227 
1228                 if ((test_flags && (test_flags & test->flags) != test_flags)) {
1229                         skip_reason = "filtered due to test flags";
1230                         skip_silent = rd_true;
1231                 } if ((test_neg_flags & ~test_flags) & test->flags)
1232 			skip_reason = "Filtered due to negative test flags";
1233 		if (test_broker_version &&
1234 		    (test->minver > test_broker_version ||
1235 		     (test->maxver && test->maxver < test_broker_version))) {
1236 			rd_snprintf(tmp, sizeof(tmp),
1237 				    "not applicable for broker "
1238 				    "version %d.%d.%d.%d",
1239 				    TEST_BRKVER_X(test_broker_version, 0),
1240 				    TEST_BRKVER_X(test_broker_version, 1),
1241 				    TEST_BRKVER_X(test_broker_version, 2),
1242 				    TEST_BRKVER_X(test_broker_version, 3));
1243 			skip_reason = tmp;
1244 		}
1245 
1246                 if (!strstr(scenario, test_scenario)) {
1247                         rd_snprintf(tmp, sizeof(tmp),
1248                                     "requires test scenario %s", scenario);
1249                         skip_silent = rd_true;
1250                         skip_reason = tmp;
1251                 }
1252 
1253                 if (tests_to_run && !strstr(tests_to_run, testnum)) {
1254                         skip_reason = "not included in TESTS list";
1255                         skip_silent = rd_true;
1256                 } else if (!tests_to_run && (test->flags & TEST_F_MANUAL)) {
1257                         skip_reason = "manual test";
1258                         skip_silent = rd_true;
1259                 } else if (tests_to_skip && strstr(tests_to_skip, testnum))
1260                         skip_reason = "included in TESTS_SKIP list";
1261 
1262                 if (!skip_reason) {
1263                         run_test(test, argc, argv);
1264                 } else {
1265                         if (skip_silent) {
1266                                 TEST_SAYL(3,
1267                                           "================= Skipping test %s "
1268                                           "(%s) ================\n",
1269                                           test->name, skip_reason);
1270                                 TEST_LOCK();
1271                                 test->state = TEST_SKIPPED;
1272                                 TEST_UNLOCK();
1273                         } else {
1274                                 test_curr = test;
1275                                 TEST_SKIP("%s\n", skip_reason);
1276                                 test_curr = &tests[0];
1277                         }
1278 
1279                 }
1280         }
1281 
1282 
1283 }
1284 
1285 /**
1286  * @brief Print summary for all tests.
1287  *
1288  * @returns the number of failed tests.
1289  */
test_summary(int do_lock)1290 static int test_summary (int do_lock) {
1291         struct test *test;
1292         FILE *report_fp = NULL;
1293         char report_path[128];
1294         time_t t;
1295         struct tm *tm;
1296         char datestr[64];
1297         int64_t total_duration = 0;
1298         int tests_run = 0;
1299         int tests_failed = 0;
1300 	int tests_failed_known = 0;
1301         int tests_passed = 0;
1302 	FILE *sql_fp = NULL;
1303         const char *tmp;
1304 
1305         t = time(NULL);
1306         tm = localtime(&t);
1307         strftime(datestr, sizeof(datestr), "%Y%m%d%H%M%S", tm);
1308 
1309         if ((tmp = test_getenv("TEST_REPORT", NULL)))
1310                 rd_snprintf(report_path, sizeof(report_path), "%s", tmp);
1311         else if (test_write_report)
1312                 rd_snprintf(report_path, sizeof(report_path),
1313                             "test_report_%s.json", datestr);
1314         else
1315                 report_path[0] = '\0';
1316 
1317         if (*report_path) {
1318                 report_fp = fopen(report_path, "w+");
1319                 if (!report_fp)
1320                         TEST_WARN("Failed to create report file %s: %s\n",
1321                                   report_path, strerror(errno));
1322                 else
1323                         fprintf(report_fp,
1324                                 "{ \"id\": \"%s_%s\", \"mode\": \"%s\", "
1325                                 "\"scenario\": \"%s\", "
1326                                 "\"date\": \"%s\", "
1327                                 "\"git_version\": \"%s\", "
1328                                 "\"broker_version\": \"%s\", "
1329                                 "\"tests\": {",
1330                                 datestr, test_mode, test_mode,
1331                                 test_scenario, datestr,
1332                                 test_git_version,
1333                                 test_broker_version_str);
1334         }
1335 
1336         if (do_lock)
1337                 TEST_LOCK();
1338 
1339 	if (test_sql_cmd) {
1340 #ifdef _WIN32
1341 		sql_fp = _popen(test_sql_cmd, "w");
1342 #else
1343 		sql_fp = popen(test_sql_cmd, "w");
1344 #endif
1345 
1346 		fprintf(sql_fp,
1347 			"CREATE TABLE IF NOT EXISTS "
1348 			"runs(runid text PRIMARY KEY, mode text, "
1349 			"date datetime, cnt int, passed int, failed int, "
1350 			"duration numeric);\n"
1351 			"CREATE TABLE IF NOT EXISTS "
1352 			"tests(runid text, mode text, name text, state text, "
1353 			"extra text, duration numeric);\n");
1354 	}
1355 
1356 	if (show_summary)
1357 		printf("TEST %s (%s, scenario %s) SUMMARY\n"
1358 		       "#==================================================================#\n",
1359 		       datestr, test_mode, test_scenario);
1360 
1361         for (test = tests ; test->name ; test++) {
1362                 const char *color;
1363                 int64_t duration;
1364 		char extra[128] = "";
1365 		int do_count = 1;
1366 
1367                 if (!(duration = test->duration) && test->start > 0)
1368                         duration = test_clock() - test->start;
1369 
1370                 if (test == tests) {
1371 			/* <MAIN> test:
1372 			 * test accounts for total runtime.
1373 			 * dont include in passed/run/failed counts. */
1374                         total_duration = duration;
1375 			do_count = 0;
1376 		}
1377 
1378                 switch (test->state)
1379                 {
1380                 case TEST_PASSED:
1381                         color = _C_GRN;
1382 			if (do_count) {
1383 				tests_passed++;
1384 				tests_run++;
1385 			}
1386                         break;
1387                 case TEST_FAILED:
1388 			if (test->flags & TEST_F_KNOWN_ISSUE) {
1389 				rd_snprintf(extra, sizeof(extra),
1390 					    " <-- known issue%s%s",
1391 					    test->extra ? ": " : "",
1392 					    test->extra ? test->extra : "");
1393 				if (do_count)
1394 					tests_failed_known++;
1395 			}
1396                         color = _C_RED;
1397 			if (do_count) {
1398 				tests_failed++;
1399 				tests_run++;
1400 			}
1401                         break;
1402                 case TEST_RUNNING:
1403                         color = _C_MAG;
1404 			if (do_count) {
1405 				tests_failed++; /* All tests should be finished */
1406 				tests_run++;
1407 			}
1408                         break;
1409                 case TEST_NOT_STARTED:
1410                         color = _C_YEL;
1411                         if (test->extra)
1412                                 rd_snprintf(extra, sizeof(extra), " %s",
1413                                             test->extra);
1414                         break;
1415                 default:
1416                         color = _C_CYA;
1417                         break;
1418                 }
1419 
1420                 if (show_summary &&
1421                     (test->state != TEST_SKIPPED || *test->failstr ||
1422                      (tests_to_run &&
1423                       !strncmp(tests_to_run, test->name,
1424                                strlen(tests_to_run))))) {
1425                         printf("|%s %-40s | %10s | %7.3fs %s|",
1426                                color,
1427                                test->name, test_states[test->state],
1428                                (double)duration/1000000.0, _C_CLR);
1429                         if (test->state == TEST_FAILED)
1430                                 printf(_C_RED " %s" _C_CLR, test->failstr);
1431                         else if (test->state == TEST_SKIPPED)
1432                                 printf(_C_CYA " %s" _C_CLR, test->failstr);
1433                         printf("%s\n", extra);
1434                 }
1435 
1436                 if (report_fp) {
1437 			int i;
1438                         fprintf(report_fp,
1439                                 "%s\"%s\": {"
1440                                 "\"name\": \"%s\", "
1441                                 "\"state\": \"%s\", "
1442 				"\"known_issue\": %s, "
1443 				"\"extra\": \"%s\", "
1444                                 "\"duration\": %.3f, "
1445 				"\"report\": [ ",
1446                                 test == tests ? "": ", ",
1447 				test->name,
1448                                 test->name, test_states[test->state],
1449 				test->flags & TEST_F_KNOWN_ISSUE ? "true":"false",
1450 				test->extra ? test->extra : "",
1451                                 (double)duration/1000000.0);
1452 
1453 			for (i = 0 ; i < test->report_cnt ; i++) {
1454 				fprintf(report_fp, "%s%s ",
1455 					i == 0 ? "":",",
1456 					test->report_arr[i]);
1457 			}
1458 
1459 			fprintf(report_fp, "] }");
1460 		}
1461 
1462 		if (sql_fp)
1463 			fprintf(sql_fp,
1464 				"INSERT INTO tests VALUES("
1465 				"'%s_%s', '%s', '%s', '%s', '%s', %f);\n",
1466 				datestr, test_mode, test_mode,
1467                                 test->name, test_states[test->state],
1468 				test->extra ? test->extra : "",
1469 				(double)duration/1000000.0);
1470         }
1471         if (do_lock)
1472                 TEST_UNLOCK();
1473 
1474 	if (show_summary)
1475 		printf("#==================================================================#\n");
1476 
1477         if (report_fp) {
1478                 fprintf(report_fp,
1479                         "}, "
1480                         "\"tests_run\": %d, "
1481                         "\"tests_passed\": %d, "
1482                         "\"tests_failed\": %d, "
1483                         "\"duration\": %.3f"
1484                         "}\n",
1485                         tests_run, tests_passed, tests_failed,
1486                         (double)total_duration/1000000.0);
1487 
1488                 fclose(report_fp);
1489                 TEST_SAY("# Test report written to %s\n", report_path);
1490         }
1491 
1492 	if (sql_fp) {
1493 		fprintf(sql_fp,
1494 			"INSERT INTO runs VALUES('%s_%s', '%s', datetime(), "
1495 			"%d, %d, %d, %f);\n",
1496 			datestr, test_mode, test_mode,
1497 			tests_run, tests_passed, tests_failed,
1498 			(double)total_duration/1000000.0);
1499 		fclose(sql_fp);
1500 	}
1501 
1502         return tests_failed - tests_failed_known;
1503 }
1504 
1505 #ifndef _WIN32
test_sig_term(int sig)1506 static void test_sig_term (int sig) {
1507 	if (test_exit)
1508 		exit(1);
1509 	fprintf(stderr, "Exiting tests, waiting for running tests to finish.\n");
1510 	test_exit = 1;
1511 }
1512 #endif
1513 
1514 /**
1515  * Wait 'timeout' seconds for rdkafka to kill all its threads and clean up.
1516  */
test_wait_exit(int timeout)1517 static void test_wait_exit (int timeout) {
1518 	int r;
1519         time_t start = time(NULL);
1520 
1521 	while ((r = rd_kafka_thread_cnt()) && timeout-- >= 0) {
1522 		TEST_SAY("%i thread(s) in use by librdkafka, waiting...\n", r);
1523 		rd_sleep(1);
1524 	}
1525 
1526 	TEST_SAY("%i thread(s) in use by librdkafka\n", r);
1527 
1528         if (r > 0)
1529                 TEST_FAIL("%i thread(s) still active in librdkafka", r);
1530 
1531         timeout -= (int)(time(NULL) - start);
1532         if (timeout > 0) {
1533 		TEST_SAY("Waiting %d seconds for all librdkafka memory "
1534 			 "to be released\n", timeout);
1535                 if (rd_kafka_wait_destroyed(timeout * 1000) == -1)
1536                         TEST_FAIL("Not all internal librdkafka "
1537                                   "objects destroyed\n");
1538 	}
1539 }
1540 
1541 
1542 
1543 
1544 /**
1545  * @brief Test framework cleanup before termination.
1546  */
test_cleanup(void)1547 static void test_cleanup (void) {
1548 	struct test *test;
1549 
1550 	/* Free report arrays */
1551 	for (test = tests ; test->name ; test++) {
1552 		int i;
1553 		if (!test->report_arr)
1554 			continue;
1555 		for (i = 0 ; i < test->report_cnt ; i++)
1556 			rd_free(test->report_arr[i]);
1557 		rd_free(test->report_arr);
1558 		test->report_arr = NULL;
1559 	}
1560 
1561 	if (test_sql_cmd)
1562 		rd_free(test_sql_cmd);
1563 }
1564 
1565 
main(int argc,char ** argv)1566 int main(int argc, char **argv) {
1567         int i, r;
1568 	test_timing_t t_all;
1569 	int a,b,c,d;
1570         const char *tmpver;
1571 
1572 	mtx_init(&test_mtx, mtx_plain);
1573         cnd_init(&test_cnd);
1574 
1575         test_init();
1576 
1577 #ifndef _WIN32
1578         signal(SIGINT, test_sig_term);
1579 #endif
1580         tests_to_run = test_getenv("TESTS", NULL);
1581         subtests_to_run = test_getenv("SUBTESTS", NULL);
1582         tests_to_skip = test_getenv("TESTS_SKIP", NULL);
1583         tmpver = test_getenv("TEST_KAFKA_VERSION", NULL);
1584         if (!tmpver)
1585                 tmpver = test_getenv("KAFKA_VERSION", test_broker_version_str);
1586         test_broker_version_str = tmpver;
1587 
1588         test_git_version = test_getenv("RDKAFKA_GITVER", "HEAD");
1589 
1590         /* Are we running on CI? */
1591         if (test_getenv("CI", NULL)) {
1592                 test_on_ci = 1;
1593                 test_concurrent_max = 3;
1594         }
1595 
1596 	test_conf_init(NULL, NULL, 10);
1597 
1598         for (i = 1 ; i < argc ; i++) {
1599                 if (!strncmp(argv[i], "-p", 2) && strlen(argv[i]) > 2) {
1600                         if (test_rusage) {
1601                                 fprintf(stderr,
1602                                         "%% %s ignored: -R takes preceedence\n",
1603                                         argv[i]);
1604                                 continue;
1605                         }
1606                         test_concurrent_max = (int)strtod(argv[i]+2, NULL);
1607                 } else if (!strcmp(argv[i], "-l"))
1608                         test_flags |= TEST_F_LOCAL;
1609 		else if (!strcmp(argv[i], "-L"))
1610                         test_neg_flags |= TEST_F_LOCAL;
1611                 else if (!strcmp(argv[i], "-a"))
1612                         test_assert_on_fail = 1;
1613 		else if (!strcmp(argv[i], "-k"))
1614 			test_flags |= TEST_F_KNOWN_ISSUE;
1615 		else if (!strcmp(argv[i], "-K"))
1616 			test_neg_flags |= TEST_F_KNOWN_ISSUE;
1617                 else if (!strcmp(argv[i], "-E"))
1618                         test_neg_flags |= TEST_F_SOCKEM;
1619 		else if (!strcmp(argv[i], "-V") && i+1 < argc)
1620  			test_broker_version_str = argv[++i];
1621                 else if (!strcmp(argv[i], "-s") && i+1 < argc)
1622                         strncpy(test_scenario, argv[++i],
1623                                 sizeof(test_scenario)-1);
1624 		else if (!strcmp(argv[i], "-S"))
1625 			show_summary = 0;
1626                 else if (!strcmp(argv[i], "-D"))
1627                         test_delete_topics_between = 1;
1628                 else if (!strcmp(argv[i], "-P"))
1629                         test_idempotent_producer = 1;
1630                 else if (!strcmp(argv[i], "-Q"))
1631                         test_quick = 1;
1632                 else if (!strcmp(argv[i], "-r"))
1633                         test_write_report = 1;
1634                 else if (!strncmp(argv[i], "-R", 2)) {
1635                         test_rusage = 1;
1636                         test_concurrent_max = 1;
1637                         if (strlen(argv[i]) > strlen("-R")) {
1638                                 test_rusage_cpu_calibration =
1639                                         strtod(argv[i]+2, NULL);
1640                                 if (test_rusage_cpu_calibration < 0.00001) {
1641                                         fprintf(stderr,
1642                                                 "%% Invalid CPU calibration "
1643                                                 "value: %s\n", argv[i]+2);
1644                                         exit(1);
1645                                 }
1646                         }
1647                 } else if (*argv[i] != '-')
1648                         tests_to_run = argv[i];
1649                 else {
1650                         printf("Unknown option: %s\n"
1651                                "\n"
1652                                "Usage: %s [options] [<test-match-substr>]\n"
1653                                "Options:\n"
1654                                "  -p<N>  Run N tests in parallel\n"
1655                                "  -l/-L  Only/dont run local tests (no broker needed)\n"
1656 			       "  -k/-K  Only/dont run tests with known issues\n"
1657                                "  -E     Don't run sockem tests\n"
1658                                "  -a     Assert on failures\n"
1659                                "  -r     Write test_report_...json file.\n"
1660 			       "  -S     Dont show test summary\n"
1661                                "  -s <scenario> Test scenario.\n"
1662 			       "  -V <N.N.N.N> Broker version.\n"
1663                                "  -D     Delete all test topics between each test (-p1) or after all tests\n"
1664                                "  -P     Run all tests with `enable.idempotency=true`\n"
1665                                "  -Q     Run tests in quick mode: faster tests, fewer iterations, less data.\n"
1666                                "  -R     Check resource usage thresholds.\n"
1667                                "  -R<C>  Check resource usage thresholds but adjust CPU thresholds by C (float):\n"
1668                                "            C < 1.0: CPU is faster than base line system.\n"
1669                                "            C > 1.0: CPU is slower than base line system.\n"
1670                                "            E.g. -R2.5 = CPU is 2.5x slower than base line system.\n"
1671 			       "\n"
1672 			       "Environment variables:\n"
1673 			       "  TESTS - substring matched test to run (e.g., 0033)\n"
1674                                "  SUBTESTS - substring matched subtest to run "
1675                                "(e.g., n_wildcard)\n"
1676 			       "  TEST_KAFKA_VERSION - broker version (e.g., 0.9.0.1)\n"
1677                                "  TEST_SCENARIO - Test scenario\n"
1678 			       "  TEST_LEVEL - Test verbosity level\n"
1679 			       "  TEST_MODE - bare, helgrind, valgrind\n"
1680 			       "  TEST_SEED - random seed\n"
1681 			       "  RDKAFKA_TEST_CONF - test config file (test.conf)\n"
1682 			       "  KAFKA_PATH - Path to kafka source dir\n"
1683 			       "  ZK_ADDRESS - Zookeeper address\n"
1684                                "\n",
1685                                argv[i], argv[0]);
1686                         exit(1);
1687                 }
1688         }
1689 
1690 	TEST_SAY("Git version: %s\n", test_git_version);
1691 
1692         if (!strcmp(test_broker_version_str, "trunk"))
1693                 test_broker_version_str = "9.9.9.9"; /* for now */
1694 
1695         d = 0;
1696         if (sscanf(test_broker_version_str, "%d.%d.%d.%d",
1697 		   &a, &b, &c, &d) < 3) {
1698 		printf("%% Expected broker version to be in format "
1699 		       "N.N.N (N=int), not %s\n",
1700 		       test_broker_version_str);
1701 		exit(1);
1702 	}
1703 	test_broker_version = TEST_BRKVER(a, b, c, d);
1704 	TEST_SAY("Broker version: %s (%d.%d.%d.%d)\n",
1705 		 test_broker_version_str,
1706 		 TEST_BRKVER_X(test_broker_version, 0),
1707 		 TEST_BRKVER_X(test_broker_version, 1),
1708 		 TEST_BRKVER_X(test_broker_version, 2),
1709 		 TEST_BRKVER_X(test_broker_version, 3));
1710 
1711 	/* Set up fake "<MAIN>" test for all operations performed in
1712 	 * the main thread rather than the per-test threads.
1713 	 * Nice side effect is that we get timing and status for main as well.*/
1714         test_curr = &tests[0];
1715         test_curr->state = TEST_PASSED;
1716         test_curr->start = test_clock();
1717 
1718         if (test_on_ci) {
1719                 TEST_LOCK();
1720                 test_timeout_multiplier += 2;
1721                 TEST_UNLOCK();
1722         }
1723 
1724 	if (!strcmp(test_mode, "helgrind") ||
1725 	    !strcmp(test_mode, "drd")) {
1726 		TEST_LOCK();
1727 		test_timeout_multiplier += 5;
1728 		TEST_UNLOCK();
1729 	} else if (!strcmp(test_mode, "valgrind")) {
1730 		TEST_LOCK();
1731 		test_timeout_multiplier += 3;
1732 		TEST_UNLOCK();
1733 	}
1734 
1735         /* Broker version 0.9 and api.version.request=true (which is default)
1736          * will cause a 10s stall per connection. Instead of fixing
1737          * that for each affected API in every test we increase the timeout
1738          * multiplier accordingly instead. The typical consume timeout is 5
1739          * seconds, so a multiplier of 3 should be good. */
1740         if ((test_broker_version & 0xffff0000) == 0x00090000)
1741                 test_timeout_multiplier += 3;
1742 
1743         if (test_concurrent_max > 1)
1744                 test_timeout_multiplier += (double)test_concurrent_max / 3;
1745 
1746 	TEST_SAY("Tests to run : %s\n", tests_to_run ? tests_to_run : "all");
1747         if (subtests_to_run)
1748                 TEST_SAY("Sub tests    : %s\n", subtests_to_run);
1749         if (tests_to_skip)
1750                 TEST_SAY("Skip tests   : %s\n", tests_to_skip);
1751         TEST_SAY("Test mode    : %s%s%s\n",
1752                  test_quick ? "quick, ":"",
1753                  test_mode,
1754                  test_on_ci ? ", CI":"");
1755         TEST_SAY("Test scenario: %s\n", test_scenario);
1756         TEST_SAY("Test filter  : %s\n",
1757                  (test_flags & TEST_F_LOCAL) ? "local tests only" : "no filter");
1758         TEST_SAY("Test timeout multiplier: %.1f\n", test_timeout_multiplier);
1759         TEST_SAY("Action on test failure: %s\n",
1760                  test_assert_on_fail ? "assert crash" : "continue other tests");
1761         if (test_rusage)
1762                 TEST_SAY("Test rusage : yes (%.2fx CPU calibration)\n",
1763                          test_rusage_cpu_calibration);
1764         if (test_idempotent_producer)
1765                 TEST_SAY("Test Idempotent Producer: enabled\n");
1766 
1767         {
1768                 char cwd[512], *pcwd;
1769 #ifdef _WIN32
1770                 pcwd = _getcwd(cwd, sizeof(cwd) - 1);
1771 #else
1772                 pcwd = getcwd(cwd, sizeof(cwd) - 1);
1773 #endif
1774                 if (pcwd)
1775                         TEST_SAY("Current directory: %s\n", cwd);
1776         }
1777 
1778         test_timeout_set(30);
1779 
1780         TIMING_START(&t_all, "ALL-TESTS");
1781 
1782         /* Run tests */
1783         run_tests(argc, argv);
1784 
1785         TEST_LOCK();
1786         while (tests_running_cnt > 0 && !test_exit) {
1787                 struct test *test;
1788 
1789                 if (!test_quick && test_level >= 2) {
1790                         TEST_SAY("%d test(s) running:", tests_running_cnt);
1791 
1792                         for (test = tests ; test->name ; test++) {
1793                                 if (test->state != TEST_RUNNING)
1794                                         continue;
1795 
1796                                 TEST_SAY0(" %s", test->name);
1797                         }
1798 
1799                         TEST_SAY0("\n");
1800                 }
1801 
1802                 check_test_timeouts();
1803 
1804                 TEST_UNLOCK();
1805 
1806                 if (test_quick)
1807                         rd_usleep(200*1000, NULL);
1808                 else
1809                         rd_sleep(1);
1810                 TEST_LOCK();
1811         }
1812 
1813 	TIMING_STOP(&t_all);
1814 
1815         test_curr = &tests[0];
1816         test_curr->duration = test_clock() - test_curr->start;
1817 
1818         TEST_UNLOCK();
1819 
1820         if (test_delete_topics_between)
1821                 test_delete_all_test_topics(60*1000);
1822 
1823         r = test_summary(1/*lock*/) ? 1 : 0;
1824 
1825         /* Wait for everything to be cleaned up since broker destroys are
1826 	 * handled in its own thread. */
1827 	test_wait_exit(0);
1828 
1829 	/* If we havent failed at this point then
1830 	 * there were no threads leaked */
1831         if (r == 0)
1832                 TEST_SAY("\n============== ALL TESTS PASSED ==============\n");
1833 
1834 	test_cleanup();
1835 
1836 	if (r > 0)
1837 		TEST_FAIL("%d test(s) failed, see previous errors", r);
1838 
1839 	return r;
1840 }
1841 
1842 
1843 
1844 
1845 
1846 /******************************************************************************
1847  *
1848  * Helpers
1849  *
1850  ******************************************************************************/
1851 
test_dr_msg_cb(rd_kafka_t * rk,const rd_kafka_message_t * rkmessage,void * opaque)1852 void test_dr_msg_cb (rd_kafka_t *rk, const rd_kafka_message_t *rkmessage,
1853                      void *opaque) {
1854         int *remainsp = rkmessage->_private;
1855         static const char *status_names[] = {
1856                 [RD_KAFKA_MSG_STATUS_NOT_PERSISTED] = "NotPersisted",
1857                 [RD_KAFKA_MSG_STATUS_POSSIBLY_PERSISTED] = "PossiblyPersisted",
1858                 [RD_KAFKA_MSG_STATUS_PERSISTED] = "Persisted"
1859         };
1860 
1861         TEST_SAYL(4, "Delivery report: %s (%s) to %s [%"PRId32"] "
1862                   "at offset %"PRId64" latency %.2fms\n",
1863                   rd_kafka_err2str(rkmessage->err),
1864                   status_names[rd_kafka_message_status(rkmessage)],
1865                   rd_kafka_topic_name(rkmessage->rkt),
1866                   rkmessage->partition,
1867                   rkmessage->offset,
1868                   (float)rd_kafka_message_latency(rkmessage) / 1000.0);
1869 
1870         if (!test_curr->produce_sync) {
1871                 if (!test_curr->ignore_dr_err &&
1872                     rkmessage->err != test_curr->exp_dr_err)
1873                         TEST_FAIL("Message delivery (to %s [%"PRId32"]) "
1874                                   "failed: expected %s, got %s",
1875                                   rd_kafka_topic_name(rkmessage->rkt),
1876                                   rkmessage->partition,
1877                                   rd_kafka_err2str(test_curr->exp_dr_err),
1878                                   rd_kafka_err2str(rkmessage->err));
1879 
1880                 if ((int)test_curr->exp_dr_status != -1) {
1881                         rd_kafka_msg_status_t status =
1882                                 rd_kafka_message_status(rkmessage);
1883 
1884                         TEST_ASSERT(status == test_curr->exp_dr_status,
1885                                     "Expected message status %s, not %s",
1886                                     status_names[test_curr->exp_dr_status],
1887                                     status_names[status]);
1888                 }
1889 
1890                 /* Add message to msgver */
1891                 if (!rkmessage->err && test_curr->dr_mv)
1892                         test_msgver_add_msg(rk, test_curr->dr_mv, rkmessage);
1893         }
1894 
1895 	if (remainsp) {
1896                 TEST_ASSERT(*remainsp > 0,
1897                             "Too many messages delivered (remains %i)",
1898                             *remainsp);
1899 
1900                 (*remainsp)--;
1901         }
1902 
1903         if (test_curr->produce_sync)
1904                 test_curr->produce_sync_err = rkmessage->err;
1905 }
1906 
1907 
test_create_handle(int mode,rd_kafka_conf_t * conf)1908 rd_kafka_t *test_create_handle (int mode, rd_kafka_conf_t *conf) {
1909 	rd_kafka_t *rk;
1910 	char errstr[512];
1911 
1912         if (!conf) {
1913                 test_conf_init(&conf, NULL, 0);
1914 #if WITH_SOCKEM
1915                 if (*test_sockem_conf)
1916                         test_socket_enable(conf);
1917 #endif
1918         } else {
1919                 if (!strcmp(test_conf_get(conf, "client.id"), "rdkafka"))
1920                         test_conf_set(conf, "client.id", test_curr->name);
1921         }
1922 
1923 
1924 
1925 	/* Creat kafka instance */
1926 	rk = rd_kafka_new(mode, conf, errstr, sizeof(errstr));
1927 	if (!rk)
1928 		TEST_FAIL("Failed to create rdkafka instance: %s\n", errstr);
1929 
1930 	TEST_SAY("Created    kafka instance %s\n", rd_kafka_name(rk));
1931 
1932 	return rk;
1933 }
1934 
1935 
test_create_producer(void)1936 rd_kafka_t *test_create_producer (void) {
1937 	rd_kafka_conf_t *conf;
1938 
1939 	test_conf_init(&conf, NULL, 0);
1940         rd_kafka_conf_set_dr_msg_cb(conf, test_dr_msg_cb);
1941 
1942 	return test_create_handle(RD_KAFKA_PRODUCER, conf);
1943 }
1944 
1945 
1946 /**
1947  * Create topic_t object with va-arg list as key-value config pairs
1948  * terminated by NULL.
1949  */
test_create_topic_object(rd_kafka_t * rk,const char * topic,...)1950 rd_kafka_topic_t *test_create_topic_object (rd_kafka_t *rk,
1951 					    const char *topic, ...) {
1952 	rd_kafka_topic_t *rkt;
1953 	rd_kafka_topic_conf_t *topic_conf;
1954 	va_list ap;
1955 	const char *name, *val;
1956 
1957 	test_conf_init(NULL, &topic_conf, 0);
1958 
1959 	va_start(ap, topic);
1960 	while ((name = va_arg(ap, const char *)) &&
1961 	       (val = va_arg(ap, const char *))) {
1962                 test_topic_conf_set(topic_conf, name, val);
1963 	}
1964 	va_end(ap);
1965 
1966 	rkt = rd_kafka_topic_new(rk, topic, topic_conf);
1967 	if (!rkt)
1968 		TEST_FAIL("Failed to create topic: %s\n",
1969                           rd_kafka_err2str(rd_kafka_last_error()));
1970 
1971 	return rkt;
1972 
1973 }
1974 
1975 
test_create_producer_topic(rd_kafka_t * rk,const char * topic,...)1976 rd_kafka_topic_t *test_create_producer_topic (rd_kafka_t *rk,
1977 	const char *topic, ...) {
1978 	rd_kafka_topic_t *rkt;
1979 	rd_kafka_topic_conf_t *topic_conf;
1980 	char errstr[512];
1981 	va_list ap;
1982 	const char *name, *val;
1983 
1984 	test_conf_init(NULL, &topic_conf, 0);
1985 
1986 	va_start(ap, topic);
1987 	while ((name = va_arg(ap, const char *)) &&
1988 	       (val = va_arg(ap, const char *))) {
1989 		if (rd_kafka_topic_conf_set(topic_conf, name, val,
1990 			errstr, sizeof(errstr)) != RD_KAFKA_CONF_OK)
1991 			TEST_FAIL("Conf failed: %s\n", errstr);
1992 	}
1993 	va_end(ap);
1994 
1995 	/* Make sure all replicas are in-sync after producing
1996 	 * so that consume test wont fail. */
1997         rd_kafka_topic_conf_set(topic_conf, "request.required.acks", "-1",
1998                                 errstr, sizeof(errstr));
1999 
2000 
2001 	rkt = rd_kafka_topic_new(rk, topic, topic_conf);
2002 	if (!rkt)
2003 		TEST_FAIL("Failed to create topic: %s\n",
2004                           rd_kafka_err2str(rd_kafka_last_error()));
2005 
2006 	return rkt;
2007 
2008 }
2009 
2010 
2011 
2012 /**
2013  * Produces \p cnt messages and returns immediately.
2014  * Does not wait for delivery.
2015  * \p msgcounterp is incremented for each produced messages and passed
2016  * as \p msg_opaque which is later used in test_dr_msg_cb to decrement
2017  * the counter on delivery.
2018  *
2019  * If \p payload is NULL the message key and payload will be formatted
2020  * according to standard test format, otherwise the key will be NULL and
2021  * payload send as message payload.
2022  *
2023  * Default message size is 128 bytes, if \p size is non-zero and \p payload
2024  * is NULL the message size of \p size will be used.
2025  */
test_produce_msgs_nowait(rd_kafka_t * rk,rd_kafka_topic_t * rkt,uint64_t testid,int32_t partition,int msg_base,int cnt,const char * payload,size_t size,int msgrate,int * msgcounterp)2026 void test_produce_msgs_nowait (rd_kafka_t *rk, rd_kafka_topic_t *rkt,
2027                                uint64_t testid, int32_t partition,
2028                                int msg_base, int cnt,
2029                                const char *payload, size_t size, int msgrate,
2030                                int *msgcounterp) {
2031 	int msg_id;
2032 	test_timing_t t_all, t_poll;
2033 	char key[128];
2034 	void *buf;
2035 	int64_t tot_bytes = 0;
2036         int64_t tot_time_poll = 0;
2037         int64_t per_msg_wait = 0;
2038 
2039         if (msgrate > 0)
2040                 per_msg_wait = 1000000 / (int64_t)msgrate;
2041 
2042 
2043 	if (payload)
2044 		buf = (void *)payload;
2045 	else {
2046 		if (size == 0)
2047 			size = 128;
2048 		buf = calloc(1, size);
2049 	}
2050 
2051 	TEST_SAY("Produce to %s [%"PRId32"]: messages #%d..%d\n",
2052 		 rd_kafka_topic_name(rkt), partition, msg_base, msg_base+cnt);
2053 
2054 	TIMING_START(&t_all, "PRODUCE");
2055         TIMING_START(&t_poll, "SUM(POLL)");
2056 
2057 	for (msg_id = msg_base ; msg_id < msg_base + cnt ; msg_id++) {
2058                 int wait_time = 0;
2059 
2060                 if (!payload)
2061                         test_prepare_msg(testid, partition, msg_id,
2062                                          buf, size, key, sizeof(key));
2063 
2064 
2065 		if (rd_kafka_produce(rkt, partition,
2066 				     RD_KAFKA_MSG_F_COPY,
2067 				     buf, size,
2068 				     !payload ? key : NULL,
2069 				     !payload ? strlen(key) : 0,
2070 				     msgcounterp) == -1)
2071 			TEST_FAIL("Failed to produce message %i "
2072 				  "to partition %i: %s",
2073 				  msg_id, (int)partition,
2074                                   rd_kafka_err2str(rd_kafka_last_error()));
2075 
2076                 (*msgcounterp)++;
2077 		tot_bytes += size;
2078 
2079                 TIMING_RESTART(&t_poll);
2080                 do {
2081                         if (per_msg_wait) {
2082                                 wait_time = (int)(per_msg_wait -
2083                                                   TIMING_DURATION(&t_poll)) /
2084                                         1000;
2085                                 if (wait_time < 0)
2086                                         wait_time = 0;
2087                         }
2088                         rd_kafka_poll(rk, wait_time);
2089                 } while (wait_time > 0);
2090 
2091                 tot_time_poll = TIMING_DURATION(&t_poll);
2092 
2093 		if (TIMING_EVERY(&t_all, 3*1000000))
2094 			TEST_SAY("produced %3d%%: %d/%d messages "
2095 				 "(%d msgs/s, %d bytes/s)\n",
2096 				 ((msg_id - msg_base) * 100) / cnt,
2097 				 msg_id - msg_base, cnt,
2098 				 (int)((msg_id - msg_base) /
2099 				       (TIMING_DURATION(&t_all) / 1000000)),
2100 				 (int)((tot_bytes) /
2101 				       (TIMING_DURATION(&t_all) / 1000000)));
2102         }
2103 
2104 	if (!payload)
2105 		free(buf);
2106 
2107         t_poll.duration = tot_time_poll;
2108         TIMING_STOP(&t_poll);
2109 	TIMING_STOP(&t_all);
2110 }
2111 
2112 /**
2113  * Waits for the messages tracked by counter \p msgcounterp to be delivered.
2114  */
test_wait_delivery(rd_kafka_t * rk,int * msgcounterp)2115 void test_wait_delivery (rd_kafka_t *rk, int *msgcounterp) {
2116 	test_timing_t t_all;
2117         int start_cnt = *msgcounterp;
2118 
2119         TIMING_START(&t_all, "PRODUCE.DELIVERY.WAIT");
2120 
2121 	/* Wait for messages to be delivered */
2122 	while (*msgcounterp > 0 && rd_kafka_outq_len(rk) > 0) {
2123 		rd_kafka_poll(rk, 10);
2124                 if (TIMING_EVERY(&t_all, 3*1000000)) {
2125                         int delivered = start_cnt - *msgcounterp;
2126                         TEST_SAY("wait_delivery: "
2127                                  "%d/%d messages delivered: %d msgs/s\n",
2128                                  delivered, start_cnt,
2129                                  (int)(delivered /
2130                                        (TIMING_DURATION(&t_all) / 1000000)));
2131                 }
2132         }
2133 
2134 	TIMING_STOP(&t_all);
2135 
2136         TEST_ASSERT(*msgcounterp == 0,
2137                     "Not all messages delivered: msgcounter still at %d, "
2138                     "outq_len %d",
2139                     *msgcounterp, rd_kafka_outq_len(rk));
2140 }
2141 
2142 /**
2143  * Produces \p cnt messages and waits for succesful delivery
2144  */
test_produce_msgs(rd_kafka_t * rk,rd_kafka_topic_t * rkt,uint64_t testid,int32_t partition,int msg_base,int cnt,const char * payload,size_t size)2145 void test_produce_msgs (rd_kafka_t *rk, rd_kafka_topic_t *rkt,
2146                         uint64_t testid, int32_t partition,
2147                         int msg_base, int cnt,
2148 			const char *payload, size_t size) {
2149 	int remains = 0;
2150 
2151         test_produce_msgs_nowait(rk, rkt, testid, partition, msg_base, cnt,
2152                                  payload, size, 0, &remains);
2153 
2154         test_wait_delivery(rk, &remains);
2155 }
2156 
2157 
2158 /**
2159  * @brief Produces \p cnt messages and waits for succesful delivery
2160  */
test_produce_msgs2(rd_kafka_t * rk,const char * topic,uint64_t testid,int32_t partition,int msg_base,int cnt,const char * payload,size_t size)2161 void test_produce_msgs2 (rd_kafka_t *rk, const char *topic,
2162                          uint64_t testid, int32_t partition,
2163                          int msg_base, int cnt,
2164                          const char *payload, size_t size) {
2165         int remains = 0;
2166         rd_kafka_topic_t *rkt = test_create_topic_object(rk, topic, NULL);
2167 
2168         test_produce_msgs_nowait(rk, rkt, testid, partition, msg_base, cnt,
2169                                  payload, size, 0, &remains);
2170 
2171         test_wait_delivery(rk, &remains);
2172 
2173         rd_kafka_topic_destroy(rkt);
2174 }
2175 
2176 /**
2177  * @brief Produces \p cnt messages without waiting for delivery.
2178  */
test_produce_msgs2_nowait(rd_kafka_t * rk,const char * topic,uint64_t testid,int32_t partition,int msg_base,int cnt,const char * payload,size_t size,int * remainsp)2179 void test_produce_msgs2_nowait (rd_kafka_t *rk, const char *topic,
2180                                 uint64_t testid, int32_t partition,
2181                                 int msg_base, int cnt,
2182                                 const char *payload, size_t size,
2183                                 int *remainsp) {
2184         rd_kafka_topic_t *rkt = test_create_topic_object(rk, topic, NULL);
2185 
2186         test_produce_msgs_nowait(rk, rkt, testid, partition, msg_base, cnt,
2187                                  payload, size, 0, remainsp);
2188 
2189         rd_kafka_topic_destroy(rkt);
2190 }
2191 
2192 
2193 /**
2194  * Produces \p cnt messages at \p msgs/s, and waits for succesful delivery
2195  */
test_produce_msgs_rate(rd_kafka_t * rk,rd_kafka_topic_t * rkt,uint64_t testid,int32_t partition,int msg_base,int cnt,const char * payload,size_t size,int msgrate)2196 void test_produce_msgs_rate (rd_kafka_t *rk, rd_kafka_topic_t *rkt,
2197                              uint64_t testid, int32_t partition,
2198                              int msg_base, int cnt,
2199                              const char *payload, size_t size, int msgrate) {
2200 	int remains = 0;
2201 
2202         test_produce_msgs_nowait(rk, rkt, testid, partition, msg_base, cnt,
2203                                  payload, size, msgrate, &remains);
2204 
2205         test_wait_delivery(rk, &remains);
2206 }
2207 
2208 
2209 
2210 /**
2211  * Create producer, produce \p msgcnt messages to \p topic \p partition,
2212  * destroy consumer, and returns the used testid.
2213  */
2214 uint64_t
test_produce_msgs_easy_size(const char * topic,uint64_t testid,int32_t partition,int msgcnt,size_t size)2215 test_produce_msgs_easy_size (const char *topic, uint64_t testid,
2216                              int32_t partition, int msgcnt, size_t size) {
2217         rd_kafka_t *rk;
2218         rd_kafka_topic_t *rkt;
2219         test_timing_t t_produce;
2220 
2221         if (!testid)
2222                 testid = test_id_generate();
2223         rk = test_create_producer();
2224         rkt = test_create_producer_topic(rk, topic, NULL);
2225 
2226         TIMING_START(&t_produce, "PRODUCE");
2227         test_produce_msgs(rk, rkt, testid, partition, 0, msgcnt, NULL, size);
2228         TIMING_STOP(&t_produce);
2229         rd_kafka_topic_destroy(rkt);
2230         rd_kafka_destroy(rk);
2231 
2232         return testid;
2233 }
2234 
test_produce_sync(rd_kafka_t * rk,rd_kafka_topic_t * rkt,uint64_t testid,int32_t partition)2235 rd_kafka_resp_err_t test_produce_sync (rd_kafka_t *rk, rd_kafka_topic_t *rkt,
2236                                        uint64_t testid, int32_t partition) {
2237         test_curr->produce_sync = 1;
2238         test_produce_msgs(rk, rkt, testid, partition, 0, 1, NULL, 0);
2239         test_curr->produce_sync = 0;
2240         return test_curr->produce_sync_err;
2241 }
2242 
2243 
2244 /**
2245  * @brief Easy produce function.
2246  *
2247  * @param ... is a NULL-terminated list of key, value config property pairs.
2248  */
test_produce_msgs_easy_v(const char * topic,uint64_t testid,int32_t partition,int msg_base,int cnt,size_t size,...)2249 void test_produce_msgs_easy_v (const char *topic, uint64_t testid,
2250                                int32_t partition,
2251                                int msg_base, int cnt, size_t size, ...) {
2252         rd_kafka_conf_t *conf;
2253         rd_kafka_t *p;
2254         rd_kafka_topic_t *rkt;
2255         va_list ap;
2256         const char *key, *val;
2257 
2258         test_conf_init(&conf, NULL, 0);
2259 
2260         va_start(ap, size);
2261         while ((key = va_arg(ap, const char *)) &&
2262                (val = va_arg(ap, const char *)))
2263                 test_conf_set(conf, key, val);
2264         va_end(ap);
2265 
2266         rd_kafka_conf_set_dr_msg_cb(conf, test_dr_msg_cb);
2267 
2268         p = test_create_handle(RD_KAFKA_PRODUCER, conf);
2269 
2270         rkt = test_create_producer_topic(p, topic, NULL);
2271 
2272         test_produce_msgs(p, rkt, testid, partition, msg_base, cnt, NULL, size);
2273 
2274         rd_kafka_topic_destroy(rkt);
2275         rd_kafka_destroy(p);
2276 }
2277 
2278 
2279 /**
2280  * @brief Produce messages to multiple topic-partitions.
2281  *
2282  * @param ...vararg is a tuple of:
2283  *           const char *topic
2284  *           int32_t partition (or UA)
2285  *           int msg_base
2286  *           int msg_cnt
2287  *
2288  * End with a NULL topic
2289  */
test_produce_msgs_easy_multi(uint64_t testid,...)2290 void test_produce_msgs_easy_multi (uint64_t testid, ...) {
2291         rd_kafka_conf_t *conf;
2292         rd_kafka_t *p;
2293         va_list ap;
2294         const char *topic;
2295         int msgcounter = 0;
2296 
2297         test_conf_init(&conf, NULL, 0);
2298 
2299         rd_kafka_conf_set_dr_msg_cb(conf, test_dr_msg_cb);
2300 
2301         p = test_create_handle(RD_KAFKA_PRODUCER, conf);
2302 
2303         va_start(ap, testid);
2304         while ((topic = va_arg(ap, const char *))) {
2305                 int32_t partition = va_arg(ap, int32_t);
2306                 int msg_base = va_arg(ap, int);
2307                 int msg_cnt = va_arg(ap, int);
2308                 rd_kafka_topic_t *rkt;
2309 
2310                 rkt = test_create_producer_topic(p, topic, NULL);
2311 
2312                 test_produce_msgs_nowait(p, rkt, testid, partition,
2313                                          msg_base, msg_cnt,
2314                                          NULL, 0, 0, &msgcounter);
2315 
2316                 rd_kafka_topic_destroy(rkt);
2317         }
2318         va_end(ap);
2319 
2320         test_flush(p, tmout_multip(10*1000));
2321 
2322         rd_kafka_destroy(p);
2323 }
2324 
2325 
2326 /**
2327  * @brief A standard rebalance callback.
2328  */
test_rebalance_cb(rd_kafka_t * rk,rd_kafka_resp_err_t err,rd_kafka_topic_partition_list_t * parts,void * opaque)2329 void test_rebalance_cb (rd_kafka_t *rk,
2330                         rd_kafka_resp_err_t err,
2331                         rd_kafka_topic_partition_list_t *parts,
2332                         void *opaque) {
2333 
2334         TEST_SAY("%s: Rebalance: %s: %d partition(s)\n",
2335                  rd_kafka_name(rk), rd_kafka_err2name(err), parts->cnt);
2336 
2337         switch (err)
2338         {
2339         case RD_KAFKA_RESP_ERR__ASSIGN_PARTITIONS:
2340                 test_consumer_assign("assign", rk, parts);
2341                 break;
2342         case RD_KAFKA_RESP_ERR__REVOKE_PARTITIONS:
2343                 test_consumer_unassign("unassign", rk);
2344                 break;
2345         default:
2346                 TEST_FAIL("Unknown rebalance event: %s",
2347                           rd_kafka_err2name(err));
2348                 break;
2349         }
2350 }
2351 
2352 
2353 
test_create_consumer(const char * group_id,void (* rebalance_cb)(rd_kafka_t * rk,rd_kafka_resp_err_t err,rd_kafka_topic_partition_list_t * partitions,void * opaque),rd_kafka_conf_t * conf,rd_kafka_topic_conf_t * default_topic_conf)2354 rd_kafka_t *test_create_consumer (const char *group_id,
2355 				  void (*rebalance_cb) (
2356 					  rd_kafka_t *rk,
2357 					  rd_kafka_resp_err_t err,
2358 					  rd_kafka_topic_partition_list_t
2359 					  *partitions,
2360 					  void *opaque),
2361 				  rd_kafka_conf_t *conf,
2362                                   rd_kafka_topic_conf_t *default_topic_conf) {
2363 	rd_kafka_t *rk;
2364 	char tmp[64];
2365 
2366 	if (!conf)
2367 		test_conf_init(&conf, NULL, 0);
2368 
2369         if (group_id) {
2370 		test_conf_set(conf, "group.id", group_id);
2371 
2372 		rd_snprintf(tmp, sizeof(tmp), "%d", test_session_timeout_ms);
2373 		test_conf_set(conf, "session.timeout.ms", tmp);
2374 
2375 		if (rebalance_cb)
2376 			rd_kafka_conf_set_rebalance_cb(conf, rebalance_cb);
2377 	} else {
2378 		TEST_ASSERT(!rebalance_cb);
2379 	}
2380 
2381         if (default_topic_conf)
2382                 rd_kafka_conf_set_default_topic_conf(conf, default_topic_conf);
2383 
2384 	/* Create kafka instance */
2385 	rk = test_create_handle(RD_KAFKA_CONSUMER, conf);
2386 
2387 	if (group_id)
2388 		rd_kafka_poll_set_consumer(rk);
2389 
2390 	return rk;
2391 }
2392 
test_create_consumer_topic(rd_kafka_t * rk,const char * topic)2393 rd_kafka_topic_t *test_create_consumer_topic (rd_kafka_t *rk,
2394                                               const char *topic) {
2395 	rd_kafka_topic_t *rkt;
2396 	rd_kafka_topic_conf_t *topic_conf;
2397 
2398 	test_conf_init(NULL, &topic_conf, 0);
2399 
2400 	rkt = rd_kafka_topic_new(rk, topic, topic_conf);
2401 	if (!rkt)
2402 		TEST_FAIL("Failed to create topic: %s\n",
2403                           rd_kafka_err2str(rd_kafka_last_error()));
2404 
2405 	return rkt;
2406 }
2407 
2408 
test_consumer_start(const char * what,rd_kafka_topic_t * rkt,int32_t partition,int64_t start_offset)2409 void test_consumer_start (const char *what,
2410                           rd_kafka_topic_t *rkt, int32_t partition,
2411                           int64_t start_offset) {
2412 
2413 	TEST_SAY("%s: consumer_start: %s [%"PRId32"] at offset %"PRId64"\n",
2414 		 what, rd_kafka_topic_name(rkt), partition, start_offset);
2415 
2416 	if (rd_kafka_consume_start(rkt, partition, start_offset) == -1)
2417 		TEST_FAIL("%s: consume_start failed: %s\n",
2418 			  what, rd_kafka_err2str(rd_kafka_last_error()));
2419 }
2420 
test_consumer_stop(const char * what,rd_kafka_topic_t * rkt,int32_t partition)2421 void test_consumer_stop (const char *what,
2422                          rd_kafka_topic_t *rkt, int32_t partition) {
2423 
2424 	TEST_SAY("%s: consumer_stop: %s [%"PRId32"]\n",
2425 		 what, rd_kafka_topic_name(rkt), partition);
2426 
2427 	if (rd_kafka_consume_stop(rkt, partition) == -1)
2428 		TEST_FAIL("%s: consume_stop failed: %s\n",
2429 			  what, rd_kafka_err2str(rd_kafka_last_error()));
2430 }
2431 
test_consumer_seek(const char * what,rd_kafka_topic_t * rkt,int32_t partition,int64_t offset)2432 void test_consumer_seek (const char *what, rd_kafka_topic_t *rkt,
2433                          int32_t partition, int64_t offset) {
2434 	int err;
2435 
2436 	TEST_SAY("%s: consumer_seek: %s [%"PRId32"] to offset %"PRId64"\n",
2437 		 what, rd_kafka_topic_name(rkt), partition, offset);
2438 
2439 	if ((err = rd_kafka_seek(rkt, partition, offset, 2000)))
2440 		TEST_FAIL("%s: consume_seek(%s, %"PRId32", %"PRId64") "
2441 			  "failed: %s\n",
2442 			  what,
2443 			  rd_kafka_topic_name(rkt), partition, offset,
2444 			  rd_kafka_err2str(err));
2445 }
2446 
2447 
2448 
2449 /**
2450  * Returns offset of the last message consumed
2451  */
test_consume_msgs(const char * what,rd_kafka_topic_t * rkt,uint64_t testid,int32_t partition,int64_t offset,int exp_msg_base,int exp_cnt,int parse_fmt)2452 int64_t test_consume_msgs (const char *what, rd_kafka_topic_t *rkt,
2453                            uint64_t testid, int32_t partition, int64_t offset,
2454                            int exp_msg_base, int exp_cnt, int parse_fmt) {
2455 	int cnt = 0;
2456 	int msg_next = exp_msg_base;
2457 	int fails = 0;
2458 	int64_t offset_last = -1;
2459 	int64_t tot_bytes = 0;
2460 	test_timing_t t_first, t_all;
2461 
2462 	TEST_SAY("%s: consume_msgs: %s [%"PRId32"]: expect msg #%d..%d "
2463 		 "at offset %"PRId64"\n",
2464 		 what, rd_kafka_topic_name(rkt), partition,
2465 		 exp_msg_base, exp_msg_base+exp_cnt, offset);
2466 
2467 	if (offset != TEST_NO_SEEK) {
2468 		rd_kafka_resp_err_t err;
2469 		test_timing_t t_seek;
2470 
2471 		TIMING_START(&t_seek, "SEEK");
2472 		if ((err = rd_kafka_seek(rkt, partition, offset, 5000)))
2473 			TEST_FAIL("%s: consume_msgs: %s [%"PRId32"]: "
2474 				  "seek to %"PRId64" failed: %s\n",
2475 				  what, rd_kafka_topic_name(rkt), partition,
2476 				  offset, rd_kafka_err2str(err));
2477 		TIMING_STOP(&t_seek);
2478 		TEST_SAY("%s: seeked to offset %"PRId64"\n", what, offset);
2479 	}
2480 
2481 	TIMING_START(&t_first, "FIRST MSG");
2482 	TIMING_START(&t_all, "ALL MSGS");
2483 
2484 	while (cnt < exp_cnt) {
2485 		rd_kafka_message_t *rkmessage;
2486 		int msg_id;
2487 
2488                 rkmessage = rd_kafka_consume(rkt, partition,
2489                                              tmout_multip(5000));
2490 
2491 		if (TIMING_EVERY(&t_all, 3*1000000))
2492 			TEST_SAY("%s: "
2493 				 "consumed %3d%%: %d/%d messages "
2494 				 "(%d msgs/s, %d bytes/s)\n",
2495 				 what, cnt * 100 / exp_cnt, cnt, exp_cnt,
2496 				 (int)(cnt /
2497 				       (TIMING_DURATION(&t_all) / 1000000)),
2498 				 (int)(tot_bytes /
2499 				       (TIMING_DURATION(&t_all) / 1000000)));
2500 
2501 		if (!rkmessage)
2502 			TEST_FAIL("%s: consume_msgs: %s [%"PRId32"]: "
2503 				  "expected msg #%d (%d/%d): timed out\n",
2504 				  what, rd_kafka_topic_name(rkt), partition,
2505 				  msg_next, cnt, exp_cnt);
2506 
2507 		if (rkmessage->err)
2508 			TEST_FAIL("%s: consume_msgs: %s [%"PRId32"]: "
2509 				  "expected msg #%d (%d/%d): got error: %s\n",
2510 				  what, rd_kafka_topic_name(rkt), partition,
2511 				  msg_next, cnt, exp_cnt,
2512 				  rd_kafka_err2str(rkmessage->err));
2513 
2514 		if (cnt == 0)
2515 			TIMING_STOP(&t_first);
2516 
2517 		if (parse_fmt)
2518 			test_msg_parse(testid, rkmessage, partition, &msg_id);
2519 		else
2520 			msg_id = 0;
2521 
2522 		if (test_level >= 3)
2523 			TEST_SAY("%s: consume_msgs: %s [%"PRId32"]: "
2524 				 "got msg #%d at offset %"PRId64
2525 				 " (expect #%d at offset %"PRId64")\n",
2526 				 what, rd_kafka_topic_name(rkt), partition,
2527 				 msg_id, rkmessage->offset,
2528 				 msg_next,
2529 				 offset >= 0 ? offset + cnt : -1);
2530 
2531 		if (parse_fmt && msg_id != msg_next) {
2532 			TEST_SAY("%s: consume_msgs: %s [%"PRId32"]: "
2533 				 "expected msg #%d (%d/%d): got msg #%d\n",
2534 				 what, rd_kafka_topic_name(rkt), partition,
2535 				 msg_next, cnt, exp_cnt, msg_id);
2536 			fails++;
2537 		}
2538 
2539 		cnt++;
2540 		tot_bytes += rkmessage->len;
2541 		msg_next++;
2542 		offset_last = rkmessage->offset;
2543 
2544 		rd_kafka_message_destroy(rkmessage);
2545 	}
2546 
2547 	TIMING_STOP(&t_all);
2548 
2549 	if (fails)
2550 		TEST_FAIL("%s: consume_msgs: %s [%"PRId32"]: %d failures\n",
2551 			  what, rd_kafka_topic_name(rkt), partition, fails);
2552 
2553 	TEST_SAY("%s: consume_msgs: %s [%"PRId32"]: "
2554 		 "%d/%d messages consumed succesfully\n",
2555 		 what, rd_kafka_topic_name(rkt), partition,
2556 		 cnt, exp_cnt);
2557 	return offset_last;
2558 }
2559 
2560 
2561 /**
2562  * Create high-level consumer subscribing to \p topic from BEGINNING
2563  * and expects \d exp_msgcnt with matching \p testid
2564  * Destroys consumer when done.
2565  *
2566  * @param txn If true, isolation.level is set to read_committed.
2567  * @param partition If -1 the topic will be subscribed to, otherwise the
2568  *                  single partition will be assigned immediately.
2569  *
2570  * If \p group_id is NULL a new unique group is generated
2571  */
2572 void
test_consume_msgs_easy_mv0(const char * group_id,const char * topic,rd_bool_t txn,int32_t partition,uint64_t testid,int exp_eofcnt,int exp_msgcnt,rd_kafka_topic_conf_t * tconf,test_msgver_t * mv)2573 test_consume_msgs_easy_mv0 (const char *group_id, const char *topic,
2574                             rd_bool_t txn,
2575                             int32_t partition,
2576                             uint64_t testid, int exp_eofcnt, int exp_msgcnt,
2577                             rd_kafka_topic_conf_t *tconf,
2578                             test_msgver_t *mv) {
2579         rd_kafka_t *rk;
2580         char grpid0[64];
2581         rd_kafka_conf_t *conf;
2582 
2583         test_conf_init(&conf, tconf ? NULL : &tconf, 0);
2584 
2585         if (!group_id)
2586                 group_id = test_str_id_generate(grpid0, sizeof(grpid0));
2587 
2588         if (txn)
2589                 test_conf_set(conf, "isolation.level", "read_committed");
2590 
2591         test_topic_conf_set(tconf, "auto.offset.reset", "smallest");
2592         if (exp_eofcnt != -1)
2593                 test_conf_set(conf, "enable.partition.eof", "true");
2594         rk = test_create_consumer(group_id, NULL, conf, tconf);
2595 
2596         rd_kafka_poll_set_consumer(rk);
2597 
2598         if (partition == -1) {
2599                 TEST_SAY("Subscribing to topic %s in group %s "
2600                          "(expecting %d msgs with testid %"PRIu64")\n",
2601                          topic, group_id, exp_msgcnt, testid);
2602 
2603                 test_consumer_subscribe(rk, topic);
2604         } else {
2605                 rd_kafka_topic_partition_list_t *plist;
2606 
2607                 TEST_SAY("Assign topic %s [%"PRId32"] in group %s "
2608                          "(expecting %d msgs with testid %"PRIu64")\n",
2609                          topic, partition, group_id, exp_msgcnt, testid);
2610 
2611                 plist = rd_kafka_topic_partition_list_new(1);
2612                 rd_kafka_topic_partition_list_add(plist, topic, partition);
2613                 test_consumer_assign("consume_easy_mv", rk, plist);
2614                 rd_kafka_topic_partition_list_destroy(plist);
2615         }
2616 
2617         /* Consume messages */
2618         test_consumer_poll("consume.easy", rk, testid, exp_eofcnt,
2619                            -1, exp_msgcnt, mv);
2620 
2621         test_consumer_close(rk);
2622 
2623         rd_kafka_destroy(rk);
2624 }
2625 
2626 void
test_consume_msgs_easy(const char * group_id,const char * topic,uint64_t testid,int exp_eofcnt,int exp_msgcnt,rd_kafka_topic_conf_t * tconf)2627 test_consume_msgs_easy (const char *group_id, const char *topic,
2628                         uint64_t testid, int exp_eofcnt, int exp_msgcnt,
2629                         rd_kafka_topic_conf_t *tconf) {
2630         test_msgver_t mv;
2631 
2632         test_msgver_init(&mv, testid);
2633 
2634         test_consume_msgs_easy_mv(group_id, topic, -1, testid, exp_eofcnt,
2635                                   exp_msgcnt, tconf, &mv);
2636 
2637         test_msgver_clear(&mv);
2638 }
2639 
2640 
2641 void
test_consume_txn_msgs_easy(const char * group_id,const char * topic,uint64_t testid,int exp_eofcnt,int exp_msgcnt,rd_kafka_topic_conf_t * tconf)2642 test_consume_txn_msgs_easy (const char *group_id, const char *topic,
2643                             uint64_t testid, int exp_eofcnt, int exp_msgcnt,
2644                             rd_kafka_topic_conf_t *tconf) {
2645         test_msgver_t mv;
2646 
2647         test_msgver_init(&mv, testid);
2648 
2649         test_consume_msgs_easy_mv0(group_id, topic, rd_true/*txn*/,
2650                                    -1, testid, exp_eofcnt,
2651                                    exp_msgcnt, tconf, &mv);
2652 
2653         test_msgver_clear(&mv);
2654 }
2655 
2656 
2657 /**
2658  * @brief Waits for up to \p timeout_ms for consumer to receive assignment.
2659  *        If no assignment received without the timeout the test fails.
2660  *
2661  * @warning This method will poll the consumer and might thus read messages.
2662  *          Set \p do_poll to false to use a sleep rather than poll.
2663  */
test_consumer_wait_assignment(rd_kafka_t * rk,rd_bool_t do_poll)2664 void test_consumer_wait_assignment (rd_kafka_t *rk, rd_bool_t do_poll) {
2665         rd_kafka_topic_partition_list_t *assignment = NULL;
2666         int i;
2667 
2668         while (1) {
2669                 rd_kafka_resp_err_t err;
2670 
2671                 err = rd_kafka_assignment(rk, &assignment);
2672                 TEST_ASSERT(!err, "rd_kafka_assignment() failed: %s",
2673                             rd_kafka_err2str(err));
2674 
2675                 if (assignment->cnt > 0)
2676                         break;
2677 
2678                 rd_kafka_topic_partition_list_destroy(assignment);
2679 
2680                 if (do_poll)
2681                         test_consumer_poll_once(rk, NULL, 1000);
2682                 else
2683                         rd_usleep(1000*1000, NULL);
2684         }
2685 
2686         TEST_SAY("%s: Assignment (%d partition(s)): ",
2687                  rd_kafka_name(rk), assignment->cnt);
2688         for (i = 0 ; i < assignment->cnt ; i++)
2689                 TEST_SAY0("%s%s[%"PRId32"]",
2690                           i == 0 ? "" : ", ",
2691                           assignment->elems[i].topic,
2692                           assignment->elems[i].partition);
2693         TEST_SAY0("\n");
2694 
2695         rd_kafka_topic_partition_list_destroy(assignment);
2696 }
2697 
2698 
2699 /**
2700  * @brief Verify that the consumer's assignment matches the expected assignment.
2701  *
2702  * The va-list is a NULL-terminated list of (const char *topic, int partition)
2703  * tuples.
2704  *
2705  * Fails the test on mismatch, unless \p fail_immediately is false.
2706  */
test_consumer_verify_assignment0(const char * func,int line,rd_kafka_t * rk,int fail_immediately,...)2707 void test_consumer_verify_assignment0 (const char *func, int line,
2708                                        rd_kafka_t *rk,
2709                                        int fail_immediately, ...) {
2710         va_list ap;
2711         int cnt = 0;
2712         const char *topic;
2713         rd_kafka_topic_partition_list_t *assignment;
2714         rd_kafka_resp_err_t err;
2715         int i;
2716 
2717         if ((err = rd_kafka_assignment(rk, &assignment)))
2718                 TEST_FAIL("%s:%d: Failed to get assignment for %s: %s",
2719                           func, line, rd_kafka_name(rk), rd_kafka_err2str(err));
2720 
2721         TEST_SAY("%s assignment (%d partition(s)):\n", rd_kafka_name(rk),
2722                  assignment->cnt);
2723         for (i = 0 ; i < assignment->cnt ; i++)
2724                 TEST_SAY(" %s [%"PRId32"]\n",
2725                          assignment->elems[i].topic,
2726                          assignment->elems[i].partition);
2727 
2728         va_start(ap, fail_immediately);
2729         while ((topic = va_arg(ap, const char *))) {
2730                 int partition = va_arg(ap, int);
2731                 cnt++;
2732 
2733                 if (!rd_kafka_topic_partition_list_find(assignment,
2734                                                         topic, partition))
2735                         TEST_FAIL_LATER(
2736                                 "%s:%d: Expected %s [%d] not found in %s's "
2737                                 "assignment (%d partition(s))",
2738                                 func, line,
2739                                 topic, partition, rd_kafka_name(rk),
2740                                 assignment->cnt);
2741         }
2742         va_end(ap);
2743 
2744         if (cnt != assignment->cnt)
2745                 TEST_FAIL_LATER(
2746                         "%s:%d: "
2747                         "Expected %d assigned partition(s) for %s, not %d",
2748                         func, line, cnt, rd_kafka_name(rk), assignment->cnt);
2749 
2750         if (fail_immediately)
2751                 TEST_LATER_CHECK();
2752 
2753         rd_kafka_topic_partition_list_destroy(assignment);
2754 }
2755 
2756 
2757 
2758 
2759 
2760 /**
2761  * @brief Start subscribing for 'topic'
2762  */
test_consumer_subscribe(rd_kafka_t * rk,const char * topic)2763 void test_consumer_subscribe (rd_kafka_t *rk, const char *topic) {
2764         rd_kafka_topic_partition_list_t *topics;
2765 	rd_kafka_resp_err_t err;
2766 
2767 	topics = rd_kafka_topic_partition_list_new(1);
2768         rd_kafka_topic_partition_list_add(topics, topic,
2769 					  RD_KAFKA_PARTITION_UA);
2770 
2771         err = rd_kafka_subscribe(rk, topics);
2772         if (err)
2773                 TEST_FAIL("%s: Failed to subscribe to %s: %s\n",
2774                           rd_kafka_name(rk), topic, rd_kafka_err2str(err));
2775 
2776         rd_kafka_topic_partition_list_destroy(topics);
2777 }
2778 
2779 
test_consumer_assign(const char * what,rd_kafka_t * rk,rd_kafka_topic_partition_list_t * partitions)2780 void test_consumer_assign (const char *what, rd_kafka_t *rk,
2781 			   rd_kafka_topic_partition_list_t *partitions) {
2782         rd_kafka_resp_err_t err;
2783         test_timing_t timing;
2784 
2785         TIMING_START(&timing, "ASSIGN.PARTITIONS");
2786         err = rd_kafka_assign(rk, partitions);
2787         TIMING_STOP(&timing);
2788         if (err)
2789                 TEST_FAIL("%s: failed to assign %d partition(s): %s\n",
2790 			  what, partitions->cnt, rd_kafka_err2str(err));
2791         else
2792                 TEST_SAY("%s: assigned %d partition(s)\n",
2793 			 what, partitions->cnt);
2794 }
2795 
2796 
test_consumer_incremental_assign(const char * what,rd_kafka_t * rk,rd_kafka_topic_partition_list_t * partitions)2797 void test_consumer_incremental_assign (const char *what, rd_kafka_t *rk,
2798                                        rd_kafka_topic_partition_list_t
2799                                        *partitions) {
2800         rd_kafka_error_t *error;
2801         test_timing_t timing;
2802 
2803         TIMING_START(&timing, "INCREMENTAL.ASSIGN.PARTITIONS");
2804         error = rd_kafka_incremental_assign(rk, partitions);
2805         TIMING_STOP(&timing);
2806         if (error) {
2807                 TEST_FAIL("%s: incremental assign of %d partition(s) failed: "
2808                           "%s", what, partitions->cnt,
2809                           rd_kafka_error_string(error));
2810                 rd_kafka_error_destroy(error);
2811         } else
2812                 TEST_SAY("%s: incremental assign of %d partition(s) done\n",
2813                          what, partitions->cnt);
2814 }
2815 
2816 
test_consumer_unassign(const char * what,rd_kafka_t * rk)2817 void test_consumer_unassign (const char *what, rd_kafka_t *rk) {
2818         rd_kafka_resp_err_t err;
2819         test_timing_t timing;
2820 
2821         TIMING_START(&timing, "UNASSIGN.PARTITIONS");
2822         err = rd_kafka_assign(rk, NULL);
2823         TIMING_STOP(&timing);
2824         if (err)
2825                 TEST_FAIL("%s: failed to unassign current partitions: %s\n",
2826                           what, rd_kafka_err2str(err));
2827         else
2828                 TEST_SAY("%s: unassigned current partitions\n", what);
2829 }
2830 
2831 
test_consumer_incremental_unassign(const char * what,rd_kafka_t * rk,rd_kafka_topic_partition_list_t * partitions)2832 void test_consumer_incremental_unassign (const char *what, rd_kafka_t *rk,
2833                                          rd_kafka_topic_partition_list_t
2834                                          *partitions) {
2835         rd_kafka_error_t *error;
2836         test_timing_t timing;
2837 
2838         TIMING_START(&timing, "INCREMENTAL.UNASSIGN.PARTITIONS");
2839         error = rd_kafka_incremental_unassign(rk, partitions);
2840         TIMING_STOP(&timing);
2841         if (error) {
2842                 TEST_FAIL("%s: incremental unassign of %d partition(s) "
2843                           "failed: %s", what, partitions->cnt,
2844                           rd_kafka_error_string(error));
2845                 rd_kafka_error_destroy(error);
2846         } else
2847                 TEST_SAY("%s: incremental unassign of %d partition(s) done\n",
2848                          what, partitions->cnt);
2849 }
2850 
2851 
2852 /**
2853  * @brief Assign a single partition with an optional starting offset
2854  */
test_consumer_assign_partition(const char * what,rd_kafka_t * rk,const char * topic,int32_t partition,int64_t offset)2855 void test_consumer_assign_partition (const char *what, rd_kafka_t *rk,
2856                                      const char *topic, int32_t partition,
2857                                      int64_t offset) {
2858         rd_kafka_topic_partition_list_t *part;
2859 
2860         part = rd_kafka_topic_partition_list_new(1);
2861         rd_kafka_topic_partition_list_add(part, topic, partition)->offset =
2862                 offset;
2863 
2864         test_consumer_assign(what, rk, part);
2865 
2866         rd_kafka_topic_partition_list_destroy(part);
2867 }
2868 
2869 
test_consumer_pause_resume_partition(rd_kafka_t * rk,const char * topic,int32_t partition,rd_bool_t pause)2870 void test_consumer_pause_resume_partition (rd_kafka_t *rk,
2871                                            const char *topic, int32_t partition,
2872                                            rd_bool_t pause) {
2873         rd_kafka_topic_partition_list_t *part;
2874         rd_kafka_resp_err_t err;
2875 
2876         part = rd_kafka_topic_partition_list_new(1);
2877         rd_kafka_topic_partition_list_add(part, topic, partition);
2878 
2879         if (pause)
2880                 err = rd_kafka_pause_partitions(rk, part);
2881         else
2882                 err = rd_kafka_resume_partitions(rk, part);
2883 
2884         TEST_ASSERT(!err, "Failed to %s %s [%"PRId32"]: %s",
2885                     pause ? "pause":"resume",
2886                     topic, partition,
2887                     rd_kafka_err2str(err));
2888 
2889         rd_kafka_topic_partition_list_destroy(part);
2890 }
2891 
2892 
2893 /**
2894  * Message verification services
2895  *
2896  */
2897 
test_msgver_init(test_msgver_t * mv,uint64_t testid)2898 void test_msgver_init (test_msgver_t *mv, uint64_t testid) {
2899 	memset(mv, 0, sizeof(*mv));
2900 	mv->testid = testid;
2901 	/* Max warning logs before suppressing. */
2902 	mv->log_max = (test_level + 1) * 100;
2903 }
2904 
test_msgver_ignore_eof(test_msgver_t * mv)2905 void test_msgver_ignore_eof (test_msgver_t *mv) {
2906         mv->ignore_eof = rd_true;
2907 }
2908 
2909 #define TEST_MV_WARN(mv,...) do {			\
2910 		if ((mv)->log_cnt++ > (mv)->log_max)	\
2911 			(mv)->log_suppr_cnt++;		\
2912 		else					\
2913 			TEST_WARN(__VA_ARGS__);		\
2914 	} while (0)
2915 
2916 
2917 
test_mv_mvec_grow(struct test_mv_mvec * mvec,int tot_size)2918 static void test_mv_mvec_grow (struct test_mv_mvec *mvec, int tot_size) {
2919 	if (tot_size <= mvec->size)
2920 		return;
2921 	mvec->size = tot_size;
2922 	mvec->m = realloc(mvec->m, sizeof(*mvec->m) * mvec->size);
2923 }
2924 
2925 /**
2926  * Make sure there is room for at least \p cnt messages, else grow mvec.
2927  */
test_mv_mvec_reserve(struct test_mv_mvec * mvec,int cnt)2928 static void test_mv_mvec_reserve (struct test_mv_mvec *mvec, int cnt) {
2929 	test_mv_mvec_grow(mvec, mvec->cnt + cnt);
2930 }
2931 
test_mv_mvec_init(struct test_mv_mvec * mvec,int exp_cnt)2932 void test_mv_mvec_init (struct test_mv_mvec *mvec, int exp_cnt) {
2933 	TEST_ASSERT(mvec->m == NULL, "mvec not cleared");
2934 
2935 	if (!exp_cnt)
2936 		return;
2937 
2938 	test_mv_mvec_grow(mvec, exp_cnt);
2939 }
2940 
2941 
test_mv_mvec_clear(struct test_mv_mvec * mvec)2942 void test_mv_mvec_clear (struct test_mv_mvec *mvec) {
2943 	if (mvec->m)
2944 		free(mvec->m);
2945 }
2946 
test_msgver_clear(test_msgver_t * mv)2947 void test_msgver_clear (test_msgver_t *mv) {
2948 	int i;
2949 	for (i = 0 ; i < mv->p_cnt ; i++) {
2950 		struct test_mv_p *p = mv->p[i];
2951 		free(p->topic);
2952 		test_mv_mvec_clear(&p->mvec);
2953 		free(p);
2954 	}
2955 
2956 	free(mv->p);
2957 
2958 	test_msgver_init(mv, mv->testid);
2959 }
2960 
test_msgver_p_get(test_msgver_t * mv,const char * topic,int32_t partition,int do_create)2961 struct test_mv_p *test_msgver_p_get (test_msgver_t *mv, const char *topic,
2962 				     int32_t partition, int do_create) {
2963 	int i;
2964 	struct test_mv_p *p;
2965 
2966 	for (i = 0 ; i < mv->p_cnt ; i++) {
2967 		p = mv->p[i];
2968 		if (p->partition == partition && !strcmp(p->topic, topic))
2969 			return p;
2970 	}
2971 
2972 	if (!do_create)
2973 		TEST_FAIL("Topic %s [%d] not found in msgver", topic, partition);
2974 
2975 	if (mv->p_cnt == mv->p_size) {
2976 		mv->p_size = (mv->p_size + 4) * 2;
2977 		mv->p = realloc(mv->p, sizeof(*mv->p) * mv->p_size);
2978 	}
2979 
2980 	mv->p[mv->p_cnt++] = p = calloc(1, sizeof(*p));
2981 
2982 	p->topic = rd_strdup(topic);
2983 	p->partition = partition;
2984 	p->eof_offset = RD_KAFKA_OFFSET_INVALID;
2985 
2986 	return p;
2987 }
2988 
2989 
2990 /**
2991  * Add (room for) message to message vector.
2992  * Resizes the vector as needed.
2993  */
test_mv_mvec_add(struct test_mv_mvec * mvec)2994 static struct test_mv_m *test_mv_mvec_add (struct test_mv_mvec *mvec) {
2995 	if (mvec->cnt == mvec->size) {
2996 		test_mv_mvec_grow(mvec, (mvec->size ? mvec->size * 2 : 10000));
2997 	}
2998 
2999 	mvec->cnt++;
3000 
3001 	return &mvec->m[mvec->cnt-1];
3002 }
3003 
3004 /**
3005  * Returns message at index \p mi
3006  */
test_mv_mvec_get(struct test_mv_mvec * mvec,int mi)3007 static RD_INLINE struct test_mv_m *test_mv_mvec_get (struct test_mv_mvec *mvec,
3008 						    int mi) {
3009         if (mi >= mvec->cnt)
3010                 return NULL;
3011 	return &mvec->m[mi];
3012 }
3013 
3014 /**
3015  * @returns the message with msgid \p msgid, or NULL.
3016  */
test_mv_mvec_find_by_msgid(struct test_mv_mvec * mvec,int msgid)3017 static struct test_mv_m *test_mv_mvec_find_by_msgid (struct test_mv_mvec *mvec,
3018                                                      int msgid) {
3019         int mi;
3020 
3021         for (mi = 0 ; mi < mvec->cnt ; mi++)
3022                 if (mvec->m[mi].msgid == msgid)
3023                         return &mvec->m[mi];
3024 
3025         return NULL;
3026 }
3027 
3028 
3029 /**
3030  * Print message list to \p fp
3031  */
3032 static RD_UNUSED
test_mv_mvec_dump(FILE * fp,const struct test_mv_mvec * mvec)3033 void test_mv_mvec_dump (FILE *fp, const struct test_mv_mvec *mvec) {
3034 	int mi;
3035 
3036 	fprintf(fp, "*** Dump mvec with %d messages (capacity %d): ***\n",
3037 		mvec->cnt, mvec->size);
3038 	for (mi = 0 ; mi < mvec->cnt ; mi++)
3039 		fprintf(fp, "  msgid %d, offset %"PRId64"\n",
3040 			mvec->m[mi].msgid, mvec->m[mi].offset);
3041 	fprintf(fp, "*** Done ***\n");
3042 
3043 }
3044 
test_mv_mvec_sort(struct test_mv_mvec * mvec,int (* cmp)(const void *,const void *))3045 static void test_mv_mvec_sort (struct test_mv_mvec *mvec,
3046 			       int (*cmp) (const void *, const void *)) {
3047 	qsort(mvec->m, mvec->cnt, sizeof(*mvec->m), cmp);
3048 }
3049 
3050 
3051 /**
3052  * @brief Adds a message to the msgver service.
3053  *
3054  * @returns 1 if message is from the expected testid, else 0 (not added)
3055  */
test_msgver_add_msg00(const char * func,int line,const char * clientname,test_msgver_t * mv,uint64_t testid,const char * topic,int32_t partition,int64_t offset,int64_t timestamp,int32_t broker_id,rd_kafka_resp_err_t err,int msgnum)3056 int test_msgver_add_msg00 (const char *func, int line, const char *clientname,
3057                            test_msgver_t *mv,
3058                            uint64_t testid,
3059                            const char *topic, int32_t partition,
3060                            int64_t offset, int64_t timestamp, int32_t broker_id,
3061                            rd_kafka_resp_err_t err, int msgnum) {
3062         struct test_mv_p *p;
3063         struct test_mv_m *m;
3064 
3065         if (testid != mv->testid) {
3066                 TEST_SAYL(3, "%s:%d: %s: mismatching testid %"PRIu64
3067                           " != %"PRIu64"\n",
3068                           func, line, clientname, testid, mv->testid);
3069                 return 0; /* Ignore message */
3070         }
3071 
3072         if (err == RD_KAFKA_RESP_ERR__PARTITION_EOF && mv->ignore_eof) {
3073                 TEST_SAYL(3, "%s:%d: %s: ignoring EOF for %s [%"PRId32"]\n",
3074                           func, line, clientname, topic, partition);
3075                 return 0; /* Ignore message */
3076         }
3077 
3078         p = test_msgver_p_get(mv, topic, partition, 1);
3079 
3080         if (err == RD_KAFKA_RESP_ERR__PARTITION_EOF) {
3081                 p->eof_offset = offset;
3082                 return 1;
3083         }
3084 
3085         m = test_mv_mvec_add(&p->mvec);
3086 
3087         m->offset = offset;
3088         m->msgid  = msgnum;
3089         m->timestamp = timestamp;
3090         m->broker_id = broker_id;
3091 
3092         if (test_level > 2) {
3093                 TEST_SAY("%s:%d: %s: "
3094                          "Recv msg %s [%"PRId32"] offset %"PRId64" msgid %d "
3095                          "timestamp %"PRId64" broker %"PRId32"\n",
3096                          func, line, clientname,
3097                          p->topic, p->partition, m->offset, m->msgid,
3098                          m->timestamp, m->broker_id);
3099         }
3100 
3101         mv->msgcnt++;
3102 
3103         return 1;
3104 }
3105 
3106 /**
3107  * Adds a message to the msgver service.
3108  *
3109  * Message must be a proper message or PARTITION_EOF.
3110  *
3111  * @param override_topic if non-NULL, overrides the rkmessage's topic
3112  *                       with this one.
3113  *
3114  * @returns 1 if message is from the expected testid, else 0 (not added).
3115  */
test_msgver_add_msg0(const char * func,int line,const char * clientname,test_msgver_t * mv,const rd_kafka_message_t * rkmessage,const char * override_topic)3116 int test_msgver_add_msg0 (const char *func, int line, const char *clientname,
3117                           test_msgver_t *mv,
3118                           const rd_kafka_message_t *rkmessage,
3119                           const char *override_topic) {
3120 	uint64_t in_testid;
3121 	int in_part;
3122 	int in_msgnum = -1;
3123 	char buf[128];
3124         const void *val;
3125         size_t valsize;
3126 
3127         if (mv->fwd)
3128                 test_msgver_add_msg0(func, line, clientname,
3129                                      mv->fwd, rkmessage, override_topic);
3130 
3131         if (rd_kafka_message_status(rkmessage) ==
3132             RD_KAFKA_MSG_STATUS_NOT_PERSISTED && rkmessage->err) {
3133 		if (rkmessage->err != RD_KAFKA_RESP_ERR__PARTITION_EOF)
3134 			return 0; /* Ignore error */
3135 
3136 		in_testid = mv->testid;
3137 
3138 	} else {
3139 
3140                 if (!mv->msgid_hdr) {
3141                         rd_snprintf(buf, sizeof(buf), "%.*s",
3142                                     (int)rkmessage->len,
3143                                     (char *)rkmessage->payload);
3144                         val = buf;
3145                 } else {
3146                         /* msgid is in message header */
3147                         rd_kafka_headers_t *hdrs;
3148 
3149                         if (rd_kafka_message_headers(rkmessage, &hdrs) ||
3150                             rd_kafka_header_get_last(hdrs, mv->msgid_hdr,
3151                                                      &val, &valsize)) {
3152                                 TEST_SAYL(3,
3153                                           "%s:%d: msgid expected in header %s "
3154                                           "but %s exists for "
3155                                           "message at offset %"PRId64
3156                                           " has no headers\n",
3157                                           func, line, mv->msgid_hdr,
3158                                           hdrs ? "no such header" : "no headers",
3159                                           rkmessage->offset);
3160 
3161                                 return 0;
3162                         }
3163                 }
3164 
3165                 if (sscanf(val, "testid=%"SCNu64", partition=%i, msg=%i\n",
3166                            &in_testid, &in_part, &in_msgnum) != 3)
3167                         TEST_FAIL("%s:%d: Incorrect format at offset %"PRId64
3168                                   ": %s",
3169                                   func, line, rkmessage->offset,
3170                                   (const char *)val);
3171         }
3172 
3173         return test_msgver_add_msg00(func, line, clientname, mv, in_testid,
3174                                      override_topic ?
3175                                      override_topic :
3176                                      rd_kafka_topic_name(rkmessage->rkt),
3177                                      rkmessage->partition,
3178                                      rkmessage->offset,
3179                                      rd_kafka_message_timestamp(rkmessage, NULL),
3180                                      rd_kafka_message_broker_id(rkmessage),
3181                                      rkmessage->err,
3182                                      in_msgnum);
3183         return 1;
3184 }
3185 
3186 
3187 
3188 /**
3189  * Verify that all messages were received in order.
3190  *
3191  * - Offsets need to occur without gaps
3192  * - msgids need to be increasing: but may have gaps, e.g., using partitioner)
3193  */
test_mv_mvec_verify_order(test_msgver_t * mv,int flags,struct test_mv_p * p,struct test_mv_mvec * mvec,struct test_mv_vs * vs)3194 static int test_mv_mvec_verify_order (test_msgver_t *mv, int flags,
3195 				      struct test_mv_p *p,
3196 				      struct test_mv_mvec *mvec,
3197 				      struct test_mv_vs *vs) {
3198 	int mi;
3199 	int fails = 0;
3200 
3201 	for (mi = 1/*skip first*/ ; mi < mvec->cnt ; mi++) {
3202 		struct test_mv_m *prev = test_mv_mvec_get(mvec, mi-1);
3203 		struct test_mv_m *this = test_mv_mvec_get(mvec, mi);
3204 
3205 		if (((flags & TEST_MSGVER_BY_OFFSET) &&
3206 		     prev->offset + 1 != this->offset) ||
3207 		    ((flags & TEST_MSGVER_BY_MSGID) &&
3208 		     prev->msgid > this->msgid)) {
3209 			TEST_MV_WARN(
3210 				mv,
3211 				" %s [%"PRId32"] msg rcvidx #%d/%d: "
3212 				"out of order (prev vs this): "
3213 				"offset %"PRId64" vs %"PRId64", "
3214 				"msgid %d vs %d\n",
3215 				p ? p->topic : "*",
3216 				p ? p->partition : -1,
3217 				mi, mvec->cnt,
3218 				prev->offset, this->offset,
3219 				prev->msgid, this->msgid);
3220 			fails++;
3221                 } else if ((flags & TEST_MSGVER_BY_BROKER_ID) &&
3222                            this->broker_id != vs->broker_id) {
3223                         TEST_MV_WARN(
3224                                 mv,
3225                                 " %s [%"PRId32"] msg rcvidx #%d/%d: "
3226                                 "broker id mismatch: expected %"PRId32
3227                                 ", not %"PRId32"\n",
3228                                 p ? p->topic : "*",
3229                                 p ? p->partition : -1,
3230                                 mi, mvec->cnt,
3231                                 vs->broker_id, this->broker_id);
3232                         fails++;
3233                 }
3234         }
3235 
3236 	return fails;
3237 }
3238 
3239 
3240 /**
3241  * @brief Verify that messages correspond to 'correct' msgver.
3242  */
test_mv_mvec_verify_corr(test_msgver_t * mv,int flags,struct test_mv_p * p,struct test_mv_mvec * mvec,struct test_mv_vs * vs)3243 static int test_mv_mvec_verify_corr (test_msgver_t *mv, int flags,
3244                                       struct test_mv_p *p,
3245                                       struct test_mv_mvec *mvec,
3246                                       struct test_mv_vs *vs) {
3247         int mi;
3248         int fails = 0;
3249         struct test_mv_p *corr_p = NULL;
3250         struct test_mv_mvec *corr_mvec;
3251         int verifycnt = 0;
3252 
3253         TEST_ASSERT(vs->corr);
3254 
3255         /* Get correct mvec for comparison. */
3256         if (p)
3257                 corr_p = test_msgver_p_get(vs->corr, p->topic, p->partition, 0);
3258         if (!corr_p) {
3259                 TEST_MV_WARN(mv,
3260                              " %s [%"PRId32"]: "
3261                              "no corresponding correct partition found\n",
3262                              p ? p->topic : "*",
3263                              p ? p->partition : -1);
3264                 return 1;
3265         }
3266 
3267         corr_mvec = &corr_p->mvec;
3268 
3269         for (mi = 0 ; mi < mvec->cnt ; mi++) {
3270                 struct test_mv_m *this = test_mv_mvec_get(mvec, mi);
3271                 const struct test_mv_m *corr;
3272 
3273 
3274                 if (flags & TEST_MSGVER_SUBSET)
3275                         corr = test_mv_mvec_find_by_msgid(corr_mvec,
3276                                                           this->msgid);
3277                 else
3278                         corr = test_mv_mvec_get(corr_mvec, mi);
3279 
3280                 if (0)
3281                         TEST_MV_WARN(mv,
3282                                      "msg #%d: msgid %d, offset %"PRId64"\n",
3283                                      mi, this->msgid, this->offset);
3284                 if (!corr) {
3285                         if (!(flags & TEST_MSGVER_SUBSET)) {
3286                                 TEST_MV_WARN(
3287                                         mv,
3288                                         " %s [%"PRId32"] msg rcvidx #%d/%d: "
3289                                         "out of range: correct mvec has "
3290                                         "%d messages: "
3291                                         "message offset %"PRId64", msgid %d\n",
3292                                         p ? p->topic : "*",
3293                                         p ? p->partition : -1,
3294                                         mi, mvec->cnt, corr_mvec->cnt,
3295                                         this->offset, this->msgid);
3296                                 fails++;
3297                         }
3298                         continue;
3299                 }
3300 
3301                 if (((flags & TEST_MSGVER_BY_OFFSET) &&
3302                      this->offset != corr->offset) ||
3303                     ((flags & TEST_MSGVER_BY_MSGID) &&
3304                      this->msgid != corr->msgid) ||
3305                     ((flags & TEST_MSGVER_BY_TIMESTAMP) &&
3306                      this->timestamp != corr->timestamp) ||
3307                     ((flags & TEST_MSGVER_BY_BROKER_ID) &&
3308                      this->broker_id != corr->broker_id)) {
3309                         TEST_MV_WARN(
3310                                 mv,
3311                                 " %s [%"PRId32"] msg rcvidx #%d/%d: "
3312                                 "did not match correct msg: "
3313                                 "offset %"PRId64" vs %"PRId64", "
3314                                 "msgid %d vs %d, "
3315                                 "timestamp %"PRId64" vs %"PRId64", "
3316                                 "broker %"PRId32" vs %"PRId32" (fl 0x%x)\n",
3317                                 p ? p->topic : "*",
3318                                 p ? p->partition : -1,
3319                                 mi, mvec->cnt,
3320                                 this->offset, corr->offset,
3321                                 this->msgid, corr->msgid,
3322                                 this->timestamp, corr->timestamp,
3323                                 this->broker_id, corr->broker_id,
3324                                 flags);
3325                         fails++;
3326                 } else {
3327                         verifycnt++;
3328                 }
3329         }
3330 
3331         if (verifycnt != corr_mvec->cnt &&
3332             !(flags & TEST_MSGVER_SUBSET)) {
3333                 TEST_MV_WARN(
3334                         mv,
3335                         " %s [%"PRId32"]: of %d input messages, "
3336                         "only %d/%d matched correct messages\n",
3337                         p ? p->topic : "*",
3338                         p ? p->partition : -1,
3339                         mvec->cnt, verifycnt, corr_mvec->cnt);
3340                 fails++;
3341         }
3342 
3343         return fails;
3344 }
3345 
3346 
3347 
test_mv_m_cmp_offset(const void * _a,const void * _b)3348 static int test_mv_m_cmp_offset (const void *_a, const void *_b) {
3349 	const struct test_mv_m *a = _a, *b = _b;
3350 
3351         return RD_CMP(a->offset, b->offset);
3352 }
3353 
test_mv_m_cmp_msgid(const void * _a,const void * _b)3354 static int test_mv_m_cmp_msgid (const void *_a, const void *_b) {
3355 	const struct test_mv_m *a = _a, *b = _b;
3356 
3357         return RD_CMP(a->msgid, b->msgid);
3358 }
3359 
3360 
3361 /**
3362  * Verify that there are no duplicate message.
3363  *
3364  * - Offsets are checked
3365  * - msgids are checked
3366  *
3367  * * NOTE: This sorts the message (.m) array, first by offset, then by msgid
3368  *         and leaves the message array sorted (by msgid)
3369  */
test_mv_mvec_verify_dup(test_msgver_t * mv,int flags,struct test_mv_p * p,struct test_mv_mvec * mvec,struct test_mv_vs * vs)3370 static int test_mv_mvec_verify_dup (test_msgver_t *mv, int flags,
3371 				    struct test_mv_p *p,
3372 				    struct test_mv_mvec *mvec,
3373 				    struct test_mv_vs *vs) {
3374 	int mi;
3375 	int fails = 0;
3376 	enum {
3377 		_P_OFFSET,
3378 		_P_MSGID
3379 	} pass;
3380 
3381 	for (pass = _P_OFFSET ; pass <= _P_MSGID ; pass++) {
3382 
3383 		if (pass == _P_OFFSET) {
3384 			if (!(flags & TEST_MSGVER_BY_OFFSET))
3385 				continue;
3386 			test_mv_mvec_sort(mvec, test_mv_m_cmp_offset);
3387 		} else if (pass == _P_MSGID) {
3388 			if (!(flags & TEST_MSGVER_BY_MSGID))
3389 				continue;
3390 			test_mv_mvec_sort(mvec, test_mv_m_cmp_msgid);
3391 		}
3392 
3393 		for (mi = 1/*skip first*/ ; mi < mvec->cnt ; mi++) {
3394 			struct test_mv_m *prev = test_mv_mvec_get(mvec, mi-1);
3395 			struct test_mv_m *this = test_mv_mvec_get(mvec, mi);
3396 			int is_dup = 0;
3397 
3398 			if (pass == _P_OFFSET)
3399 				is_dup = prev->offset == this->offset;
3400 			else if (pass == _P_MSGID)
3401 				is_dup = prev->msgid == this->msgid;
3402 
3403 			if (!is_dup)
3404 				continue;
3405 
3406 			TEST_MV_WARN(mv,
3407 				     " %s [%"PRId32"] "
3408 				     "duplicate msg (prev vs this): "
3409 				     "offset %"PRId64" vs %"PRId64", "
3410 				     "msgid %d vs %d\n",
3411 				     p ? p->topic : "*",
3412 				     p ? p->partition : -1,
3413 				     prev->offset, this->offset,
3414 				     prev->msgid,  this->msgid);
3415 			fails++;
3416 		}
3417 	}
3418 
3419 	return fails;
3420 }
3421 
3422 
3423 
3424 /**
3425  * Verify that \p mvec contains the expected range:
3426  *  - TEST_MSGVER_BY_MSGID: msgid within \p vs->msgid_min .. \p vs->msgid_max
3427  *  - TEST_MSGVER_BY_TIMESTAMP: timestamp with \p vs->timestamp_min .. _max
3428  *
3429  * * NOTE: TEST_MSGVER_BY_MSGID is required
3430  *
3431  * * NOTE: This sorts the message (.m) array by msgid
3432  *         and leaves the message array sorted (by msgid)
3433  */
test_mv_mvec_verify_range(test_msgver_t * mv,int flags,struct test_mv_p * p,struct test_mv_mvec * mvec,struct test_mv_vs * vs)3434 static int test_mv_mvec_verify_range (test_msgver_t *mv, int flags,
3435                                       struct test_mv_p *p,
3436                                       struct test_mv_mvec *mvec,
3437                                       struct test_mv_vs *vs) {
3438         int mi;
3439         int fails = 0;
3440         int cnt = 0;
3441         int exp_cnt = vs->msgid_max - vs->msgid_min + 1;
3442         int skip_cnt = 0;
3443 
3444         if (!(flags & TEST_MSGVER_BY_MSGID))
3445                 return 0;
3446 
3447         test_mv_mvec_sort(mvec, test_mv_m_cmp_msgid);
3448 
3449         //test_mv_mvec_dump(stdout, mvec);
3450 
3451         for (mi = 0 ; mi < mvec->cnt ; mi++) {
3452                 struct test_mv_m *prev = mi ? test_mv_mvec_get(mvec, mi-1):NULL;
3453                 struct test_mv_m *this = test_mv_mvec_get(mvec, mi);
3454 
3455                 if (this->msgid < vs->msgid_min) {
3456                         skip_cnt++;
3457                         continue;
3458                 } else if (this->msgid > vs->msgid_max)
3459                         break;
3460 
3461                 if (flags & TEST_MSGVER_BY_TIMESTAMP) {
3462                         if (this->timestamp < vs->timestamp_min ||
3463                             this->timestamp > vs->timestamp_max) {
3464                                 TEST_MV_WARN(
3465                                         mv,
3466                                         " %s [%"PRId32"] range check: "
3467                                         "msgid #%d (at mi %d): "
3468                                         "timestamp %"PRId64" outside "
3469                                         "expected range %"PRId64"..%"PRId64"\n",
3470                                         p ? p->topic : "*",
3471                                         p ? p->partition : -1,
3472                                         this->msgid, mi,
3473                                         this->timestamp,
3474                                         vs->timestamp_min, vs->timestamp_max);
3475                                 fails++;
3476                         }
3477                 }
3478 
3479                 if ((flags & TEST_MSGVER_BY_BROKER_ID) &&
3480                     this->broker_id != vs->broker_id) {
3481                         TEST_MV_WARN(
3482                                 mv,
3483                                 " %s [%"PRId32"] range check: "
3484                                 "msgid #%d (at mi %d): "
3485                                 "expected broker id %"PRId32", not %"PRId32"\n",
3486                                 p ? p->topic : "*",
3487                                 p ? p->partition : -1,
3488                                 this->msgid, mi,
3489                                 vs->broker_id, this->broker_id);
3490                                 fails++;
3491                 }
3492 
3493                 if (cnt++ == 0) {
3494                         if (this->msgid != vs->msgid_min) {
3495                                 TEST_MV_WARN(mv,
3496                                              " %s [%"PRId32"] range check: "
3497                                              "first message #%d (at mi %d) "
3498                                              "is not first in "
3499                                              "expected range %d..%d\n",
3500                                              p ? p->topic : "*",
3501                                              p ? p->partition : -1,
3502                                              this->msgid, mi,
3503                                              vs->msgid_min, vs->msgid_max);
3504                                 fails++;
3505                         }
3506                 } else if (cnt > exp_cnt) {
3507                         TEST_MV_WARN(mv,
3508                                      " %s [%"PRId32"] range check: "
3509                                      "too many messages received (%d/%d) at "
3510                                      "msgid %d for expected range %d..%d\n",
3511                                      p ? p->topic : "*",
3512                                      p ? p->partition : -1,
3513                                      cnt, exp_cnt, this->msgid,
3514                                      vs->msgid_min, vs->msgid_max);
3515                         fails++;
3516                 }
3517 
3518                 if (!prev) {
3519                         skip_cnt++;
3520                         continue;
3521                 }
3522 
3523                 if (prev->msgid + 1 != this->msgid) {
3524                         TEST_MV_WARN(mv, " %s [%"PRId32"] range check: "
3525                                      " %d message(s) missing between "
3526                                      "msgid %d..%d in expected range %d..%d\n",
3527                                      p ? p->topic : "*",
3528                                      p ? p->partition : -1,
3529                                      this->msgid - prev->msgid - 1,
3530                                      prev->msgid+1, this->msgid-1,
3531                                      vs->msgid_min, vs->msgid_max);
3532                         fails++;
3533                 }
3534         }
3535 
3536         if (cnt != exp_cnt) {
3537                 TEST_MV_WARN(mv,
3538                              " %s [%"PRId32"] range check: "
3539                              " wrong number of messages seen, wanted %d got %d "
3540                              "in expected range %d..%d (%d messages skipped)\n",
3541                              p ? p->topic : "*",
3542                              p ? p->partition : -1,
3543                              exp_cnt, cnt, vs->msgid_min, vs->msgid_max,
3544                              skip_cnt);
3545                 fails++;
3546         }
3547 
3548         return fails;
3549 }
3550 
3551 
3552 
3553 /**
3554  * Run verifier \p f for all partitions.
3555  */
3556 #define test_mv_p_verify_f(mv,flags,f,vs)	\
3557 	test_mv_p_verify_f0(mv,flags,f, # f, vs)
test_mv_p_verify_f0(test_msgver_t * mv,int flags,int (* f)(test_msgver_t * mv,int flags,struct test_mv_p * p,struct test_mv_mvec * mvec,struct test_mv_vs * vs),const char * f_name,struct test_mv_vs * vs)3558 static int test_mv_p_verify_f0 (test_msgver_t *mv, int flags,
3559 				int (*f) (test_msgver_t *mv,
3560 					  int flags,
3561 					  struct test_mv_p *p,
3562 					  struct test_mv_mvec *mvec,
3563 					  struct test_mv_vs *vs),
3564 				const char *f_name,
3565 				struct test_mv_vs *vs) {
3566 	int i;
3567 	int fails = 0;
3568 
3569 	for (i = 0 ; i < mv->p_cnt ; i++) {
3570 		TEST_SAY("Verifying %s [%"PRId32"] %d msgs with %s\n",
3571 			 mv->p[i]->topic, mv->p[i]->partition,
3572 			 mv->p[i]->mvec.cnt, f_name);
3573 		fails += f(mv, flags, mv->p[i], &mv->p[i]->mvec, vs);
3574 	}
3575 
3576 	return fails;
3577 }
3578 
3579 
3580 /**
3581  * Collect all messages from all topics and partitions into vs->mvec
3582  */
test_mv_collect_all_msgs(test_msgver_t * mv,struct test_mv_vs * vs)3583 static void test_mv_collect_all_msgs (test_msgver_t *mv,
3584 				      struct test_mv_vs *vs) {
3585 	int i;
3586 
3587 	for (i = 0 ; i < mv->p_cnt ; i++) {
3588 		struct test_mv_p *p = mv->p[i];
3589 		int mi;
3590 
3591 		test_mv_mvec_reserve(&vs->mvec, p->mvec.cnt);
3592 		for (mi = 0 ; mi < p->mvec.cnt ; mi++) {
3593 			struct test_mv_m *m = test_mv_mvec_get(&p->mvec, mi);
3594 			struct test_mv_m *m_new = test_mv_mvec_add(&vs->mvec);
3595 			*m_new = *m;
3596 		}
3597 	}
3598 }
3599 
3600 
3601 /**
3602  * Verify that all messages (by msgid) in range msg_base+exp_cnt were received
3603  * and received only once.
3604  * This works across all partitions.
3605  */
test_msgver_verify_range(test_msgver_t * mv,int flags,struct test_mv_vs * vs)3606 static int test_msgver_verify_range (test_msgver_t *mv, int flags,
3607 				     struct test_mv_vs *vs) {
3608 	int fails = 0;
3609 
3610 	/**
3611 	 * Create temporary array to hold expected message set,
3612 	 * then traverse all topics and partitions and move matching messages
3613 	 * to that set. Then verify the message set.
3614 	 */
3615 
3616 	test_mv_mvec_init(&vs->mvec, vs->exp_cnt);
3617 
3618 	/* Collect all msgs into vs mvec */
3619 	test_mv_collect_all_msgs(mv, vs);
3620 
3621 	fails += test_mv_mvec_verify_range(mv, TEST_MSGVER_BY_MSGID|flags,
3622 					   NULL, &vs->mvec, vs);
3623 	fails += test_mv_mvec_verify_dup(mv, TEST_MSGVER_BY_MSGID|flags,
3624 					 NULL, &vs->mvec, vs);
3625 
3626 	test_mv_mvec_clear(&vs->mvec);
3627 
3628 	return fails;
3629 }
3630 
3631 
3632 /**
3633  * Verify that \p exp_cnt messages were received for \p topic and \p partition
3634  * starting at msgid base \p msg_base.
3635  */
test_msgver_verify_part0(const char * func,int line,const char * what,test_msgver_t * mv,int flags,const char * topic,int partition,int msg_base,int exp_cnt)3636 int test_msgver_verify_part0 (const char *func, int line, const char *what,
3637 			      test_msgver_t *mv, int flags,
3638 			      const char *topic, int partition,
3639 			      int msg_base, int exp_cnt) {
3640 	int fails = 0;
3641 	struct test_mv_vs vs = { .msg_base = msg_base, .exp_cnt = exp_cnt };
3642 	struct test_mv_p *p;
3643 
3644 	TEST_SAY("%s:%d: %s: Verifying %d received messages (flags 0x%x) "
3645 		 "in %s [%d]: expecting msgids %d..%d (%d)\n",
3646 		 func, line, what, mv->msgcnt, flags, topic, partition,
3647 		 msg_base, msg_base+exp_cnt, exp_cnt);
3648 
3649 	p = test_msgver_p_get(mv, topic, partition, 0);
3650 
3651 	/* Per-partition checks */
3652 	if (flags & TEST_MSGVER_ORDER)
3653 		fails += test_mv_mvec_verify_order(mv, flags, p, &p->mvec, &vs);
3654 	if (flags & TEST_MSGVER_DUP)
3655 		fails += test_mv_mvec_verify_dup(mv, flags, p, &p->mvec, &vs);
3656 
3657 	if (mv->msgcnt < vs.exp_cnt) {
3658 		TEST_MV_WARN(mv,
3659 			     "%s:%d: "
3660 			     "%s [%"PRId32"] expected %d messages but only "
3661 			     "%d received\n",
3662 			     func, line,
3663 			     p ? p->topic : "*",
3664 			     p ? p->partition : -1,
3665 			     vs.exp_cnt, mv->msgcnt);
3666 		fails++;
3667 	}
3668 
3669 
3670 	if (mv->log_suppr_cnt > 0)
3671 		TEST_WARN("%s:%d: %s: %d message warning logs suppressed\n",
3672 			  func, line, what, mv->log_suppr_cnt);
3673 
3674 	if (fails)
3675 		TEST_FAIL("%s:%d: %s: Verification of %d received messages "
3676 			  "failed: "
3677 			  "expected msgids %d..%d (%d): see previous errors\n",
3678 			  func, line, what,
3679 			  mv->msgcnt, msg_base, msg_base+exp_cnt, exp_cnt);
3680 	else
3681 		TEST_SAY("%s:%d: %s: Verification of %d received messages "
3682 			 "succeeded: "
3683 			 "expected msgids %d..%d (%d)\n",
3684 			 func, line, what,
3685 			 mv->msgcnt, msg_base, msg_base+exp_cnt, exp_cnt);
3686 
3687 	return fails;
3688 
3689 }
3690 
3691 /**
3692  * Verify that \p exp_cnt messages were received starting at
3693  * msgid base \p msg_base.
3694  */
test_msgver_verify0(const char * func,int line,const char * what,test_msgver_t * mv,int flags,struct test_mv_vs vs)3695 int test_msgver_verify0 (const char *func, int line, const char *what,
3696 			 test_msgver_t *mv,
3697 			 int flags, struct test_mv_vs vs) {
3698 	int fails = 0;
3699 
3700 	TEST_SAY("%s:%d: %s: Verifying %d received messages (flags 0x%x): "
3701 		 "expecting msgids %d..%d (%d)\n",
3702 		 func, line, what, mv->msgcnt, flags,
3703 		 vs.msg_base, vs.msg_base+vs.exp_cnt, vs.exp_cnt);
3704         if (flags & TEST_MSGVER_BY_TIMESTAMP) {
3705                 assert((flags & TEST_MSGVER_BY_MSGID)); /* Required */
3706                 TEST_SAY("%s:%d: %s: "
3707                          " and expecting timestamps %"PRId64"..%"PRId64"\n",
3708                          func, line, what,
3709                          vs.timestamp_min, vs.timestamp_max);
3710         }
3711 
3712 	/* Per-partition checks */
3713 	if (flags & TEST_MSGVER_ORDER)
3714 		fails += test_mv_p_verify_f(mv, flags,
3715 					    test_mv_mvec_verify_order, &vs);
3716 	if (flags & TEST_MSGVER_DUP)
3717 		fails += test_mv_p_verify_f(mv, flags,
3718 					    test_mv_mvec_verify_dup, &vs);
3719 
3720 	/* Checks across all partitions */
3721 	if ((flags & TEST_MSGVER_RANGE) && vs.exp_cnt > 0) {
3722 		vs.msgid_min = vs.msg_base;
3723 		vs.msgid_max = vs.msgid_min + vs.exp_cnt - 1;
3724 		fails += test_msgver_verify_range(mv, flags, &vs);
3725 	}
3726 
3727 	if (mv->log_suppr_cnt > 0)
3728 		TEST_WARN("%s:%d: %s: %d message warning logs suppressed\n",
3729 			  func, line, what, mv->log_suppr_cnt);
3730 
3731 	if (vs.exp_cnt != mv->msgcnt) {
3732                 if (!(flags & TEST_MSGVER_SUBSET)) {
3733                         TEST_WARN("%s:%d: %s: expected %d messages, got %d\n",
3734                                   func, line, what, vs.exp_cnt, mv->msgcnt);
3735                         fails++;
3736                 }
3737 	}
3738 
3739 	if (fails)
3740 		TEST_FAIL("%s:%d: %s: Verification of %d received messages "
3741 			  "failed: "
3742 			  "expected msgids %d..%d (%d): see previous errors\n",
3743 			  func, line, what,
3744 			  mv->msgcnt, vs.msg_base, vs.msg_base+vs.exp_cnt,
3745                           vs.exp_cnt);
3746 	else
3747 		TEST_SAY("%s:%d: %s: Verification of %d received messages "
3748 			 "succeeded: "
3749 			 "expected msgids %d..%d (%d)\n",
3750 			 func, line, what,
3751 			 mv->msgcnt, vs.msg_base, vs.msg_base+vs.exp_cnt,
3752                          vs.exp_cnt);
3753 
3754 	return fails;
3755 }
3756 
3757 
3758 
3759 
test_verify_rkmessage0(const char * func,int line,rd_kafka_message_t * rkmessage,uint64_t testid,int32_t partition,int msgnum)3760 void test_verify_rkmessage0 (const char *func, int line,
3761                              rd_kafka_message_t *rkmessage, uint64_t testid,
3762                              int32_t partition, int msgnum) {
3763 	uint64_t in_testid;
3764 	int in_part;
3765 	int in_msgnum;
3766 	char buf[128];
3767 
3768 	rd_snprintf(buf, sizeof(buf), "%.*s",
3769 		 (int)rkmessage->len, (char *)rkmessage->payload);
3770 
3771 	if (sscanf(buf, "testid=%"SCNu64", partition=%i, msg=%i\n",
3772 		   &in_testid, &in_part, &in_msgnum) != 3)
3773 		TEST_FAIL("Incorrect format: %s", buf);
3774 
3775 	if (testid != in_testid ||
3776 	    (partition != -1 && partition != in_part) ||
3777 	    (msgnum != -1 && msgnum != in_msgnum) ||
3778 	    in_msgnum < 0)
3779 		goto fail_match;
3780 
3781 	if (test_level > 2) {
3782 		TEST_SAY("%s:%i: Our testid %"PRIu64", part %i (%i), msg %i\n",
3783 			 func, line,
3784 			 testid, (int)partition, (int)rkmessage->partition,
3785 			 msgnum);
3786 	}
3787 
3788 
3789         return;
3790 
3791 fail_match:
3792 	TEST_FAIL("%s:%i: Our testid %"PRIu64", part %i, msg %i did "
3793 		  "not match message: \"%s\"\n",
3794 		  func, line,
3795 		  testid, (int)partition, msgnum, buf);
3796 }
3797 
3798 
3799 /**
3800  * @brief Verify that \p mv is identical to \p corr according to flags.
3801  */
test_msgver_verify_compare0(const char * func,int line,const char * what,test_msgver_t * mv,test_msgver_t * corr,int flags)3802 void test_msgver_verify_compare0 (const char *func, int line,
3803                                   const char *what, test_msgver_t *mv,
3804                                   test_msgver_t *corr, int flags) {
3805         struct test_mv_vs vs;
3806         int fails = 0;
3807 
3808         memset(&vs, 0, sizeof(vs));
3809 
3810         TEST_SAY("%s:%d: %s: Verifying %d received messages (flags 0x%x) by "
3811                  "comparison to correct msgver (%d messages)\n",
3812                  func, line, what, mv->msgcnt, flags, corr->msgcnt);
3813 
3814         vs.corr = corr;
3815 
3816         /* Per-partition checks */
3817         fails += test_mv_p_verify_f(mv, flags,
3818                                     test_mv_mvec_verify_corr, &vs);
3819 
3820         if (mv->log_suppr_cnt > 0)
3821                 TEST_WARN("%s:%d: %s: %d message warning logs suppressed\n",
3822                           func, line, what, mv->log_suppr_cnt);
3823 
3824         if (corr->msgcnt != mv->msgcnt) {
3825                 if (!(flags & TEST_MSGVER_SUBSET)) {
3826                         TEST_WARN("%s:%d: %s: expected %d messages, got %d\n",
3827                                   func, line, what, corr->msgcnt, mv->msgcnt);
3828                         fails++;
3829                 }
3830         }
3831 
3832         if (fails)
3833                 TEST_FAIL("%s:%d: %s: Verification of %d received messages "
3834                           "failed: expected %d messages: see previous errors\n",
3835                           func, line, what,
3836                           mv->msgcnt, corr->msgcnt);
3837         else
3838                 TEST_SAY("%s:%d: %s: Verification of %d received messages "
3839                          "succeeded: matching %d messages from correct msgver\n",
3840                          func, line, what,
3841                          mv->msgcnt, corr->msgcnt);
3842 
3843 }
3844 
3845 
3846 /**
3847  * Consumer poll but dont expect any proper messages for \p timeout_ms.
3848  */
test_consumer_poll_no_msgs(const char * what,rd_kafka_t * rk,uint64_t testid,int timeout_ms)3849 void test_consumer_poll_no_msgs (const char *what, rd_kafka_t *rk,
3850 				 uint64_t testid, int timeout_ms) {
3851 	int64_t tmout = test_clock() + timeout_ms * 1000;
3852         int cnt = 0;
3853         test_timing_t t_cons;
3854 	test_msgver_t mv;
3855 
3856 	test_msgver_init(&mv, testid);
3857 
3858         TEST_SAY("%s: not expecting any messages for %dms\n",
3859 		 what, timeout_ms);
3860 
3861         TIMING_START(&t_cons, "CONSUME");
3862 
3863 	do {
3864                 rd_kafka_message_t *rkmessage;
3865 
3866                 rkmessage = rd_kafka_consumer_poll(rk, timeout_ms);
3867                 if (!rkmessage)
3868 			continue;
3869 
3870                 if (rkmessage->err == RD_KAFKA_RESP_ERR__PARTITION_EOF) {
3871                         TEST_SAY("%s [%"PRId32"] reached EOF at "
3872                                  "offset %"PRId64"\n",
3873                                  rd_kafka_topic_name(rkmessage->rkt),
3874                                  rkmessage->partition,
3875                                  rkmessage->offset);
3876                         test_msgver_add_msg(rk, &mv, rkmessage);
3877 
3878                 } else if (rkmessage->err) {
3879                         TEST_FAIL("%s [%"PRId32"] error (offset %"PRId64"): %s",
3880                                  rkmessage->rkt ?
3881                                  rd_kafka_topic_name(rkmessage->rkt) :
3882                                  "(no-topic)",
3883                                  rkmessage->partition,
3884                                  rkmessage->offset,
3885                                  rd_kafka_message_errstr(rkmessage));
3886 
3887                 } else {
3888                         if (test_msgver_add_msg(rk, &mv, rkmessage)) {
3889 				TEST_MV_WARN(&mv,
3890 					     "Received unexpected message on "
3891 					     "%s [%"PRId32"] at offset "
3892 					     "%"PRId64"\n",
3893 					     rd_kafka_topic_name(rkmessage->
3894 								 rkt),
3895 					     rkmessage->partition,
3896 					     rkmessage->offset);
3897 				cnt++;
3898 			}
3899                 }
3900 
3901                 rd_kafka_message_destroy(rkmessage);
3902         } while (test_clock() <= tmout);
3903 
3904         TIMING_STOP(&t_cons);
3905 
3906 	test_msgver_verify(what, &mv, TEST_MSGVER_ALL, 0, 0);
3907 	test_msgver_clear(&mv);
3908 
3909 	TEST_ASSERT(cnt == 0, "Expected 0 messages, got %d", cnt);
3910 }
3911 
3912 /**
3913  * @brief Consumer poll with expectation that a \p err will be reached
3914  * within \p timeout_ms.
3915  */
test_consumer_poll_expect_err(rd_kafka_t * rk,uint64_t testid,int timeout_ms,rd_kafka_resp_err_t err)3916 void test_consumer_poll_expect_err (rd_kafka_t *rk, uint64_t testid,
3917                                     int timeout_ms, rd_kafka_resp_err_t err) {
3918         int64_t tmout = test_clock() + timeout_ms * 1000;
3919 
3920         TEST_SAY("%s: expecting error %s within %dms\n",
3921                  rd_kafka_name(rk), rd_kafka_err2name(err), timeout_ms);
3922 
3923         do {
3924                 rd_kafka_message_t *rkmessage;
3925                 rkmessage = rd_kafka_consumer_poll(rk, timeout_ms);
3926                 if (!rkmessage)
3927                         continue;
3928 
3929                 if (rkmessage->err == err) {
3930                         TEST_SAY("Got expected error: %s: %s\n",
3931                                  rd_kafka_err2name(rkmessage->err),
3932                                  rd_kafka_message_errstr(rkmessage));
3933                         rd_kafka_message_destroy(rkmessage);
3934 
3935                         return;
3936                 } else if (rkmessage->err) {
3937                         TEST_FAIL("%s [%"PRId32"] unexpected error "
3938                                  "(offset %"PRId64"): %s",
3939                                  rkmessage->rkt ?
3940                                  rd_kafka_topic_name(rkmessage->rkt) :
3941                                  "(no-topic)",
3942                                  rkmessage->partition,
3943                                  rkmessage->offset,
3944                                  rd_kafka_err2name(rkmessage->err));
3945                 }
3946 
3947                 rd_kafka_message_destroy(rkmessage);
3948         } while (test_clock() <= tmout);
3949         TEST_FAIL("Expected error %s not seen in %dms",
3950                   rd_kafka_err2name(err), timeout_ms);
3951 }
3952 
3953 /**
3954  * Call consumer poll once and then return.
3955  * Messages are handled.
3956  *
3957  * \p mv is optional
3958  *
3959  * @returns 0 on timeout, 1 if a message was received or .._PARTITION_EOF
3960  *          if EOF was reached.
3961  *          TEST_FAIL()s on all errors.
3962  */
test_consumer_poll_once(rd_kafka_t * rk,test_msgver_t * mv,int timeout_ms)3963 int test_consumer_poll_once (rd_kafka_t *rk, test_msgver_t *mv, int timeout_ms){
3964 	rd_kafka_message_t *rkmessage;
3965 
3966 	rkmessage = rd_kafka_consumer_poll(rk, timeout_ms);
3967 	if (!rkmessage)
3968 		return 0;
3969 
3970 	if (rkmessage->err == RD_KAFKA_RESP_ERR__PARTITION_EOF) {
3971 		TEST_SAY("%s [%"PRId32"] reached EOF at "
3972 			 "offset %"PRId64"\n",
3973 			 rd_kafka_topic_name(rkmessage->rkt),
3974 			 rkmessage->partition,
3975 			 rkmessage->offset);
3976 		if (mv)
3977 			test_msgver_add_msg(rk, mv, rkmessage);
3978 		rd_kafka_message_destroy(rkmessage);
3979 		return RD_KAFKA_RESP_ERR__PARTITION_EOF;
3980 
3981 	} else if (rkmessage->err) {
3982 		TEST_FAIL("%s [%"PRId32"] error (offset %"PRId64"): %s",
3983 			  rkmessage->rkt ?
3984 			  rd_kafka_topic_name(rkmessage->rkt) :
3985 			  "(no-topic)",
3986 			  rkmessage->partition,
3987 			  rkmessage->offset,
3988 			  rd_kafka_message_errstr(rkmessage));
3989 
3990 	} else {
3991 		if (mv)
3992 			test_msgver_add_msg(rk, mv, rkmessage);
3993 	}
3994 
3995 	rd_kafka_message_destroy(rkmessage);
3996 	return 1;
3997 }
3998 
3999 
4000 /**
4001  * @param exact Require exact exp_eof_cnt (unless -1) and exp_cnt (unless -1).
4002  *              If false: poll until either one is reached.
4003  */
test_consumer_poll_exact(const char * what,rd_kafka_t * rk,uint64_t testid,int exp_eof_cnt,int exp_msg_base,int exp_cnt,rd_bool_t exact,test_msgver_t * mv)4004 int test_consumer_poll_exact (const char *what, rd_kafka_t *rk, uint64_t testid,
4005                               int exp_eof_cnt, int exp_msg_base, int exp_cnt,
4006                               rd_bool_t exact, test_msgver_t *mv) {
4007         int eof_cnt = 0;
4008         int cnt = 0;
4009         test_timing_t t_cons;
4010 
4011         TEST_SAY("%s: consume %s%d messages\n", what,
4012                  exact ? "exactly ": "", exp_cnt);
4013 
4014         TIMING_START(&t_cons, "CONSUME");
4015 
4016         while ((!exact &&
4017                 ((exp_eof_cnt <= 0 || eof_cnt < exp_eof_cnt) &&
4018                  (exp_cnt <= 0 || cnt < exp_cnt))) ||
4019                (exact &&
4020                 (eof_cnt < exp_eof_cnt ||
4021                  cnt < exp_cnt))) {
4022                 rd_kafka_message_t *rkmessage;
4023 
4024                 rkmessage = rd_kafka_consumer_poll(rk, tmout_multip(10*1000));
4025                 if (!rkmessage) /* Shouldn't take this long to get a msg */
4026                         TEST_FAIL("%s: consumer_poll() timeout "
4027 				  "(%d/%d eof, %d/%d msgs)\n", what,
4028 				  eof_cnt, exp_eof_cnt, cnt, exp_cnt);
4029 
4030 
4031                 if (rkmessage->err == RD_KAFKA_RESP_ERR__PARTITION_EOF) {
4032                         TEST_SAY("%s [%"PRId32"] reached EOF at "
4033                                  "offset %"PRId64"\n",
4034                                  rd_kafka_topic_name(rkmessage->rkt),
4035                                  rkmessage->partition,
4036                                  rkmessage->offset);
4037                         TEST_ASSERT(exp_eof_cnt != 0, "expected no EOFs");
4038 			if (mv)
4039 				test_msgver_add_msg(rk, mv, rkmessage);
4040                         eof_cnt++;
4041 
4042                 } else if (rkmessage->err) {
4043                         TEST_FAIL("%s [%"PRId32"] error (offset %"PRId64
4044 				  "): %s",
4045                                  rkmessage->rkt ?
4046                                  rd_kafka_topic_name(rkmessage->rkt) :
4047                                  "(no-topic)",
4048                                  rkmessage->partition,
4049                                  rkmessage->offset,
4050                                  rd_kafka_message_errstr(rkmessage));
4051 
4052                 } else {
4053                         TEST_SAYL(4, "%s: consumed message on %s [%"PRId32"] "
4054                                   "at offset %"PRId64"\n",
4055                                   what,
4056                                   rd_kafka_topic_name(rkmessage->rkt),
4057                                   rkmessage->partition,
4058                                   rkmessage->offset);
4059 
4060 			if (!mv || test_msgver_add_msg(rk, mv, rkmessage))
4061 				cnt++;
4062                 }
4063 
4064                 rd_kafka_message_destroy(rkmessage);
4065         }
4066 
4067         TIMING_STOP(&t_cons);
4068 
4069         TEST_SAY("%s: consumed %d/%d messages (%d/%d EOFs)\n",
4070                  what, cnt, exp_cnt, eof_cnt, exp_eof_cnt);
4071 
4072         TEST_ASSERT(!exact ||
4073                     ((exp_cnt == -1 || exp_cnt == cnt) &&
4074                      (exp_eof_cnt == -1 || exp_eof_cnt == eof_cnt)),
4075                     "%s: mismatch between exact expected counts and actual: "
4076                     "%d/%d EOFs, %d/%d msgs",
4077                     what, eof_cnt, exp_eof_cnt, cnt, exp_cnt);
4078 
4079         if (exp_cnt == 0)
4080                 TEST_ASSERT(cnt == 0 && eof_cnt == exp_eof_cnt,
4081                             "%s: expected no messages and %d EOFs: "
4082                             "got %d messages and %d EOFs",
4083                             what, exp_eof_cnt, cnt, eof_cnt);
4084         return cnt;
4085 }
4086 
4087 
test_consumer_poll(const char * what,rd_kafka_t * rk,uint64_t testid,int exp_eof_cnt,int exp_msg_base,int exp_cnt,test_msgver_t * mv)4088 int test_consumer_poll (const char *what, rd_kafka_t *rk, uint64_t testid,
4089                         int exp_eof_cnt, int exp_msg_base, int exp_cnt,
4090                         test_msgver_t *mv) {
4091         return test_consumer_poll_exact(what, rk, testid,
4092                                         exp_eof_cnt, exp_msg_base, exp_cnt,
4093                                         rd_false/*not exact */, mv);
4094 }
4095 
test_consumer_close(rd_kafka_t * rk)4096 void test_consumer_close (rd_kafka_t *rk) {
4097         rd_kafka_resp_err_t err;
4098         test_timing_t timing;
4099 
4100         TEST_SAY("Closing consumer %s\n", rd_kafka_name(rk));
4101 
4102         TIMING_START(&timing, "CONSUMER.CLOSE");
4103         err = rd_kafka_consumer_close(rk);
4104         TIMING_STOP(&timing);
4105         if (err)
4106                 TEST_FAIL("Failed to close consumer: %s\n",
4107                           rd_kafka_err2str(err));
4108 }
4109 
4110 
test_flush(rd_kafka_t * rk,int timeout_ms)4111 void test_flush (rd_kafka_t *rk, int timeout_ms) {
4112 	test_timing_t timing;
4113 	rd_kafka_resp_err_t err;
4114 
4115 	TEST_SAY("%s: Flushing %d messages\n",
4116 		 rd_kafka_name(rk), rd_kafka_outq_len(rk));
4117 	TIMING_START(&timing, "FLUSH");
4118 	err = rd_kafka_flush(rk, timeout_ms);
4119 	TIMING_STOP(&timing);
4120 	if (err)
4121 		TEST_FAIL("Failed to flush(%s, %d): %s: len() = %d\n",
4122 			  rd_kafka_name(rk), timeout_ms,
4123 			  rd_kafka_err2str(err),
4124                           rd_kafka_outq_len(rk));
4125 }
4126 
4127 
test_conf_set(rd_kafka_conf_t * conf,const char * name,const char * val)4128 void test_conf_set (rd_kafka_conf_t *conf, const char *name, const char *val) {
4129         char errstr[512];
4130         if (rd_kafka_conf_set(conf, name, val, errstr, sizeof(errstr)) !=
4131             RD_KAFKA_CONF_OK)
4132                 TEST_FAIL("Failed to set config \"%s\"=\"%s\": %s\n",
4133                           name, val, errstr);
4134 }
4135 
test_conf_get(const rd_kafka_conf_t * conf,const char * name)4136 char *test_conf_get (const rd_kafka_conf_t *conf, const char *name) {
4137 	static RD_TLS char ret[256];
4138 	size_t ret_sz = sizeof(ret);
4139 	if (rd_kafka_conf_get(conf, name, ret, &ret_sz) != RD_KAFKA_CONF_OK)
4140 		TEST_FAIL("Failed to get config \"%s\": %s\n", name,
4141 			  "unknown property");
4142 	return ret;
4143 }
4144 
4145 
test_topic_conf_get(const rd_kafka_topic_conf_t * tconf,const char * name)4146 char *test_topic_conf_get (const rd_kafka_topic_conf_t *tconf,
4147                            const char *name) {
4148         static RD_TLS char ret[256];
4149         size_t ret_sz = sizeof(ret);
4150         if (rd_kafka_topic_conf_get(tconf, name, ret, &ret_sz) !=
4151             RD_KAFKA_CONF_OK)
4152                 TEST_FAIL("Failed to get topic config \"%s\": %s\n", name,
4153                           "unknown property");
4154         return ret;
4155 }
4156 
4157 
4158 /**
4159  * @brief Check if property \name matches \p val in \p conf.
4160  *        If \p conf is NULL the test config will be used. */
test_conf_match(rd_kafka_conf_t * conf,const char * name,const char * val)4161 int test_conf_match (rd_kafka_conf_t *conf, const char *name, const char *val) {
4162         char *real;
4163         int free_conf = 0;
4164 
4165         if (!conf) {
4166                 test_conf_init(&conf, NULL, 0);
4167                 free_conf = 1;
4168         }
4169 
4170         real = test_conf_get(conf, name);
4171 
4172         if (free_conf)
4173                 rd_kafka_conf_destroy(conf);
4174 
4175         return !strcmp(real, val);
4176 }
4177 
4178 
test_topic_conf_set(rd_kafka_topic_conf_t * tconf,const char * name,const char * val)4179 void test_topic_conf_set (rd_kafka_topic_conf_t *tconf,
4180                           const char *name, const char *val) {
4181         char errstr[512];
4182         if (rd_kafka_topic_conf_set(tconf, name, val, errstr, sizeof(errstr)) !=
4183             RD_KAFKA_CONF_OK)
4184                 TEST_FAIL("Failed to set topic config \"%s\"=\"%s\": %s\n",
4185                           name, val, errstr);
4186 }
4187 
4188 /**
4189  * @brief First attempt to set topic level property, then global.
4190  */
test_any_conf_set(rd_kafka_conf_t * conf,rd_kafka_topic_conf_t * tconf,const char * name,const char * val)4191 void test_any_conf_set (rd_kafka_conf_t *conf,
4192                         rd_kafka_topic_conf_t *tconf,
4193                         const char *name, const char *val) {
4194         rd_kafka_conf_res_t res = RD_KAFKA_CONF_UNKNOWN;
4195         char errstr[512] = {"Missing conf_t"};
4196 
4197         if (tconf)
4198                 res = rd_kafka_topic_conf_set(tconf, name, val,
4199                                               errstr, sizeof(errstr));
4200         if (res == RD_KAFKA_CONF_UNKNOWN && conf)
4201                 res = rd_kafka_conf_set(conf, name, val,
4202                                         errstr, sizeof(errstr));
4203 
4204         if (res != RD_KAFKA_CONF_OK)
4205                 TEST_FAIL("Failed to set any config \"%s\"=\"%s\": %s\n",
4206                           name, val, errstr);
4207 }
4208 
4209 
4210 /**
4211  * @returns true if test clients need to be configured for authentication
4212  *          or other security measures (SSL), else false for unauthed plaintext.
4213  */
test_needs_auth(void)4214 int test_needs_auth (void) {
4215         rd_kafka_conf_t *conf;
4216         const char *sec;
4217 
4218         test_conf_init(&conf, NULL, 0);
4219 
4220         sec = test_conf_get(conf, "security.protocol");
4221 
4222         rd_kafka_conf_destroy(conf);
4223 
4224         return strcmp(sec, "plaintext");
4225 }
4226 
4227 
test_print_partition_list(const rd_kafka_topic_partition_list_t * partitions)4228 void test_print_partition_list (const rd_kafka_topic_partition_list_t
4229 				*partitions) {
4230         int i;
4231         for (i = 0 ; i < partitions->cnt ; i++) {
4232 		TEST_SAY(" %s [%"PRId32"] offset %"PRId64"%s%s\n",
4233 			 partitions->elems[i].topic,
4234 			 partitions->elems[i].partition,
4235 			 partitions->elems[i].offset,
4236 			 partitions->elems[i].err ? ": " : "",
4237 			 partitions->elems[i].err ?
4238 			 rd_kafka_err2str(partitions->elems[i].err) : "");
4239         }
4240 }
4241 
4242 /**
4243  * @brief Compare two lists, returning 0 if equal.
4244  *
4245  * @remark The lists may be sorted by this function.
4246  */
test_partition_list_cmp(rd_kafka_topic_partition_list_t * al,rd_kafka_topic_partition_list_t * bl)4247 int test_partition_list_cmp (rd_kafka_topic_partition_list_t *al,
4248                              rd_kafka_topic_partition_list_t *bl) {
4249         int i;
4250 
4251         if (al->cnt < bl->cnt)
4252                 return -1;
4253         else if (al->cnt > bl->cnt)
4254                 return 1;
4255         else if (al->cnt == 0)
4256                 return 0;
4257 
4258         rd_kafka_topic_partition_list_sort(al, NULL, NULL);
4259         rd_kafka_topic_partition_list_sort(bl, NULL, NULL);
4260 
4261         for (i = 0 ; i < al->cnt ; i++) {
4262                 const rd_kafka_topic_partition_t *a = &al->elems[i];
4263                 const rd_kafka_topic_partition_t *b = &bl->elems[i];
4264                 if (a->partition != b->partition ||
4265                     strcmp(a->topic, b->topic))
4266                         return -1;
4267         }
4268 
4269         return 0;
4270 }
4271 
4272 
4273 /**
4274  * @brief Execute script from the Kafka distribution bin/ path.
4275  */
test_kafka_cmd(const char * fmt,...)4276 void test_kafka_cmd (const char *fmt, ...) {
4277 #ifdef _WIN32
4278 	TEST_FAIL("%s not supported on Windows, yet", __FUNCTION__);
4279 #else
4280 	char cmd[1024];
4281 	int r;
4282 	va_list ap;
4283 	test_timing_t t_cmd;
4284 	const char *kpath;
4285 
4286 	kpath = test_getenv("KAFKA_PATH", NULL);
4287 
4288 	if (!kpath)
4289 		TEST_FAIL("%s: KAFKA_PATH must be set",
4290 			  __FUNCTION__);
4291 
4292 	r = rd_snprintf(cmd, sizeof(cmd),
4293 			"%s/bin/", kpath);
4294 	TEST_ASSERT(r < (int)sizeof(cmd));
4295 
4296 	va_start(ap, fmt);
4297 	rd_vsnprintf(cmd+r, sizeof(cmd)-r, fmt, ap);
4298 	va_end(ap);
4299 
4300 	TEST_SAY("Executing: %s\n", cmd);
4301 	TIMING_START(&t_cmd, "exec");
4302 	r = system(cmd);
4303 	TIMING_STOP(&t_cmd);
4304 
4305 	if (r == -1)
4306 		TEST_FAIL("system(\"%s\") failed: %s", cmd, strerror(errno));
4307 	else if (WIFSIGNALED(r))
4308 		TEST_FAIL("system(\"%s\") terminated by signal %d\n", cmd,
4309 			  WTERMSIG(r));
4310 	else if (WEXITSTATUS(r))
4311 		TEST_FAIL("system(\"%s\") failed with exit status %d\n",
4312 			  cmd, WEXITSTATUS(r));
4313 #endif
4314 }
4315 
4316 /**
4317  * @brief Execute kafka-topics.sh from the Kafka distribution.
4318  */
test_kafka_topics(const char * fmt,...)4319 void test_kafka_topics (const char *fmt, ...) {
4320 #ifdef _WIN32
4321 	TEST_FAIL("%s not supported on Windows, yet", __FUNCTION__);
4322 #else
4323 	char cmd[512];
4324 	int r;
4325 	va_list ap;
4326 	test_timing_t t_cmd;
4327 	const char *kpath, *zk;
4328 
4329 	kpath = test_getenv("KAFKA_PATH", NULL);
4330 	zk = test_getenv("ZK_ADDRESS", NULL);
4331 
4332 	if (!kpath || !zk)
4333 		TEST_FAIL("%s: KAFKA_PATH and ZK_ADDRESS must be set",
4334 			  __FUNCTION__);
4335 
4336 	r = rd_snprintf(cmd, sizeof(cmd),
4337 			"%s/bin/kafka-topics.sh --zookeeper %s ", kpath, zk);
4338 	TEST_ASSERT(r < (int)sizeof(cmd));
4339 
4340 	va_start(ap, fmt);
4341 	rd_vsnprintf(cmd+r, sizeof(cmd)-r, fmt, ap);
4342 	va_end(ap);
4343 
4344 	TEST_SAY("Executing: %s\n", cmd);
4345 	TIMING_START(&t_cmd, "exec");
4346 	r = system(cmd);
4347 	TIMING_STOP(&t_cmd);
4348 
4349 	if (r == -1)
4350 		TEST_FAIL("system(\"%s\") failed: %s", cmd, strerror(errno));
4351 	else if (WIFSIGNALED(r))
4352 		TEST_FAIL("system(\"%s\") terminated by signal %d\n", cmd,
4353 			  WTERMSIG(r));
4354 	else if (WEXITSTATUS(r))
4355 		TEST_FAIL("system(\"%s\") failed with exit status %d\n",
4356 			  cmd, WEXITSTATUS(r));
4357 #endif
4358 }
4359 
4360 
4361 
4362 /**
4363  * @brief Create topic using Topic Admin API
4364  */
test_admin_create_topic(rd_kafka_t * use_rk,const char * topicname,int partition_cnt,int replication_factor)4365 static void test_admin_create_topic (rd_kafka_t *use_rk,
4366                                      const char *topicname, int partition_cnt,
4367                                      int replication_factor) {
4368         rd_kafka_t *rk;
4369         rd_kafka_NewTopic_t *newt[1];
4370         const size_t newt_cnt = 1;
4371         rd_kafka_AdminOptions_t *options;
4372         rd_kafka_queue_t *rkqu;
4373         rd_kafka_event_t *rkev;
4374         const rd_kafka_CreateTopics_result_t *res;
4375         const rd_kafka_topic_result_t **terr;
4376         int timeout_ms = tmout_multip(10000);
4377         size_t res_cnt;
4378         rd_kafka_resp_err_t err;
4379         char errstr[512];
4380         test_timing_t t_create;
4381 
4382         if (!(rk = use_rk))
4383                 rk = test_create_producer();
4384 
4385         rkqu = rd_kafka_queue_new(rk);
4386 
4387         newt[0] = rd_kafka_NewTopic_new(topicname, partition_cnt,
4388                                         replication_factor,
4389                                         errstr, sizeof(errstr));
4390         TEST_ASSERT(newt[0] != NULL, "%s", errstr);
4391 
4392         options = rd_kafka_AdminOptions_new(rk, RD_KAFKA_ADMIN_OP_CREATETOPICS);
4393         err = rd_kafka_AdminOptions_set_operation_timeout(options, timeout_ms,
4394                                                           errstr,
4395                                                           sizeof(errstr));
4396         TEST_ASSERT(!err, "%s", errstr);
4397 
4398         TEST_SAY("Creating topic \"%s\" "
4399                  "(partitions=%d, replication_factor=%d, timeout=%d)\n",
4400                  topicname, partition_cnt, replication_factor, timeout_ms);
4401 
4402         TIMING_START(&t_create, "CreateTopics");
4403         rd_kafka_CreateTopics(rk, newt, newt_cnt, options, rkqu);
4404 
4405         /* Wait for result */
4406         rkev = rd_kafka_queue_poll(rkqu, timeout_ms + 2000);
4407         TEST_ASSERT(rkev, "Timed out waiting for CreateTopics result");
4408 
4409         TIMING_STOP(&t_create);
4410 
4411         TEST_ASSERT(!rd_kafka_event_error(rkev),
4412                     "CreateTopics failed: %s",
4413                     rd_kafka_event_error_string(rkev));
4414 
4415         res = rd_kafka_event_CreateTopics_result(rkev);
4416         TEST_ASSERT(res, "Expected CreateTopics_result, not %s",
4417                     rd_kafka_event_name(rkev));
4418 
4419         terr = rd_kafka_CreateTopics_result_topics(res, &res_cnt);
4420         TEST_ASSERT(terr, "CreateTopics_result_topics returned NULL");
4421         TEST_ASSERT(res_cnt == newt_cnt,
4422                     "CreateTopics_result_topics returned %"PRIusz" topics, "
4423                     "not the expected %"PRIusz,
4424                     res_cnt, newt_cnt);
4425 
4426         TEST_ASSERT(!rd_kafka_topic_result_error(terr[0]) ||
4427                     rd_kafka_topic_result_error(terr[0]) ==
4428                     RD_KAFKA_RESP_ERR_TOPIC_ALREADY_EXISTS,
4429                     "Topic %s result error: %s",
4430                     rd_kafka_topic_result_name(terr[0]),
4431                     rd_kafka_topic_result_error_string(terr[0]));
4432 
4433         rd_kafka_event_destroy(rkev);
4434 
4435         rd_kafka_queue_destroy(rkqu);
4436 
4437         rd_kafka_AdminOptions_destroy(options);
4438 
4439         rd_kafka_NewTopic_destroy(newt[0]);
4440 
4441         if (!use_rk)
4442                 rd_kafka_destroy(rk);
4443 }
4444 
4445 
4446 
4447 
4448 /**
4449  * @brief Create topic using kafka-topics.sh --create
4450  */
test_create_topic_sh(const char * topicname,int partition_cnt,int replication_factor)4451 static void test_create_topic_sh (const char *topicname, int partition_cnt,
4452                                   int replication_factor) {
4453 	test_kafka_topics("--create --topic \"%s\" "
4454 			  "--replication-factor %d --partitions %d",
4455 			  topicname, replication_factor, partition_cnt);
4456 }
4457 
4458 
4459 /**
4460  * @brief Create topic
4461  */
test_create_topic(rd_kafka_t * use_rk,const char * topicname,int partition_cnt,int replication_factor)4462 void test_create_topic (rd_kafka_t *use_rk,
4463                         const char *topicname, int partition_cnt,
4464                         int replication_factor) {
4465         if (test_broker_version < TEST_BRKVER(0,10,2,0))
4466                 test_create_topic_sh(topicname, partition_cnt,
4467                                      replication_factor);
4468         else
4469                 test_admin_create_topic(use_rk, topicname, partition_cnt,
4470                                         replication_factor);
4471 }
4472 
4473 
4474 /**
4475  * @brief Create topic using kafka-topics.sh --delete
4476  */
test_delete_topic_sh(const char * topicname)4477 static void test_delete_topic_sh (const char *topicname) {
4478 	test_kafka_topics("--delete --topic \"%s\" ", topicname);
4479 }
4480 
4481 
4482 /**
4483  * @brief Delete topic using Topic Admin API
4484  */
test_admin_delete_topic(rd_kafka_t * use_rk,const char * topicname)4485 static void test_admin_delete_topic (rd_kafka_t *use_rk,
4486                                      const char *topicname) {
4487         rd_kafka_t *rk;
4488         rd_kafka_DeleteTopic_t *delt[1];
4489         const size_t delt_cnt = 1;
4490         rd_kafka_AdminOptions_t *options;
4491         rd_kafka_queue_t *rkqu;
4492         rd_kafka_event_t *rkev;
4493         const rd_kafka_DeleteTopics_result_t *res;
4494         const rd_kafka_topic_result_t **terr;
4495         int timeout_ms = tmout_multip(10000);
4496         size_t res_cnt;
4497         rd_kafka_resp_err_t err;
4498         char errstr[512];
4499         test_timing_t t_create;
4500 
4501         if (!(rk = use_rk))
4502                 rk = test_create_producer();
4503 
4504         rkqu = rd_kafka_queue_new(rk);
4505 
4506         delt[0] = rd_kafka_DeleteTopic_new(topicname);
4507 
4508         options = rd_kafka_AdminOptions_new(rk, RD_KAFKA_ADMIN_OP_DELETETOPICS);
4509         err = rd_kafka_AdminOptions_set_operation_timeout(options, timeout_ms,
4510                                                           errstr,
4511                                                           sizeof(errstr));
4512         TEST_ASSERT(!err, "%s", errstr);
4513 
4514         TEST_SAY("Deleting topic \"%s\" "
4515                  "(timeout=%d)\n",
4516                  topicname, timeout_ms);
4517 
4518         TIMING_START(&t_create, "DeleteTopics");
4519         rd_kafka_DeleteTopics(rk, delt, delt_cnt, options, rkqu);
4520 
4521         /* Wait for result */
4522         rkev = rd_kafka_queue_poll(rkqu, timeout_ms + 2000);
4523         TEST_ASSERT(rkev, "Timed out waiting for DeleteTopics result");
4524 
4525         TIMING_STOP(&t_create);
4526 
4527         res = rd_kafka_event_DeleteTopics_result(rkev);
4528         TEST_ASSERT(res, "Expected DeleteTopics_result, not %s",
4529                     rd_kafka_event_name(rkev));
4530 
4531         terr = rd_kafka_DeleteTopics_result_topics(res, &res_cnt);
4532         TEST_ASSERT(terr, "DeleteTopics_result_topics returned NULL");
4533         TEST_ASSERT(res_cnt == delt_cnt,
4534                     "DeleteTopics_result_topics returned %"PRIusz" topics, "
4535                     "not the expected %"PRIusz,
4536                     res_cnt, delt_cnt);
4537 
4538         TEST_ASSERT(!rd_kafka_topic_result_error(terr[0]),
4539                     "Topic %s result error: %s",
4540                     rd_kafka_topic_result_name(terr[0]),
4541                     rd_kafka_topic_result_error_string(terr[0]));
4542 
4543         rd_kafka_event_destroy(rkev);
4544 
4545         rd_kafka_queue_destroy(rkqu);
4546 
4547         rd_kafka_AdminOptions_destroy(options);
4548 
4549         rd_kafka_DeleteTopic_destroy(delt[0]);
4550 
4551         if (!use_rk)
4552                 rd_kafka_destroy(rk);
4553 }
4554 
4555 
4556 /**
4557  * @brief Delete a topic
4558  */
test_delete_topic(rd_kafka_t * use_rk,const char * topicname)4559 void test_delete_topic (rd_kafka_t *use_rk, const char *topicname) {
4560         if (test_broker_version < TEST_BRKVER(0,10,2,0))
4561                 test_delete_topic_sh(topicname);
4562         else
4563                 test_admin_delete_topic(use_rk, topicname);
4564 }
4565 
4566 
4567 /**
4568  * @brief Create additional partitions for a topic using Admin API
4569  */
test_admin_create_partitions(rd_kafka_t * use_rk,const char * topicname,int new_partition_cnt)4570 static void test_admin_create_partitions (rd_kafka_t *use_rk,
4571                                           const char *topicname,
4572                                           int new_partition_cnt) {
4573         rd_kafka_t *rk;
4574         rd_kafka_NewPartitions_t *newp[1];
4575         const size_t newp_cnt = 1;
4576         rd_kafka_AdminOptions_t *options;
4577         rd_kafka_queue_t *rkqu;
4578         rd_kafka_event_t *rkev;
4579         const rd_kafka_CreatePartitions_result_t *res;
4580         const rd_kafka_topic_result_t **terr;
4581         int timeout_ms = tmout_multip(10000);
4582         size_t res_cnt;
4583         rd_kafka_resp_err_t err;
4584         char errstr[512];
4585         test_timing_t t_create;
4586 
4587         if (!(rk = use_rk))
4588                 rk = test_create_producer();
4589 
4590         rkqu = rd_kafka_queue_new(rk);
4591 
4592         newp[0] = rd_kafka_NewPartitions_new(topicname, new_partition_cnt,
4593                                              errstr, sizeof(errstr));
4594         TEST_ASSERT(newp[0] != NULL, "%s", errstr);
4595 
4596         options = rd_kafka_AdminOptions_new(rk,
4597                                             RD_KAFKA_ADMIN_OP_CREATEPARTITIONS);
4598         err = rd_kafka_AdminOptions_set_operation_timeout(options, timeout_ms,
4599                                                           errstr,
4600                                                           sizeof(errstr));
4601         TEST_ASSERT(!err, "%s", errstr);
4602 
4603         TEST_SAY("Creating %d (total) partitions for topic \"%s\"\n",
4604                  new_partition_cnt, topicname);
4605 
4606         TIMING_START(&t_create, "CreatePartitions");
4607         rd_kafka_CreatePartitions(rk, newp, newp_cnt, options, rkqu);
4608 
4609         /* Wait for result */
4610         rkev = rd_kafka_queue_poll(rkqu, timeout_ms + 2000);
4611         TEST_ASSERT(rkev, "Timed out waiting for CreatePartitions result");
4612 
4613         TIMING_STOP(&t_create);
4614 
4615         res = rd_kafka_event_CreatePartitions_result(rkev);
4616         TEST_ASSERT(res, "Expected CreatePartitions_result, not %s",
4617                     rd_kafka_event_name(rkev));
4618 
4619         terr = rd_kafka_CreatePartitions_result_topics(res, &res_cnt);
4620         TEST_ASSERT(terr, "CreatePartitions_result_topics returned NULL");
4621         TEST_ASSERT(res_cnt == newp_cnt,
4622                     "CreatePartitions_result_topics returned %"PRIusz
4623                     " topics, not the expected %"PRIusz,
4624                     res_cnt, newp_cnt);
4625 
4626         TEST_ASSERT(!rd_kafka_topic_result_error(terr[0]),
4627                     "Topic %s result error: %s",
4628                     rd_kafka_topic_result_name(terr[0]),
4629                     rd_kafka_topic_result_error_string(terr[0]));
4630 
4631         rd_kafka_event_destroy(rkev);
4632 
4633         rd_kafka_queue_destroy(rkqu);
4634 
4635         rd_kafka_AdminOptions_destroy(options);
4636 
4637         rd_kafka_NewPartitions_destroy(newp[0]);
4638 
4639         if (!use_rk)
4640                 rd_kafka_destroy(rk);
4641 }
4642 
4643 
4644 /**
4645  * @brief Create partitions for topic
4646  */
test_create_partitions(rd_kafka_t * use_rk,const char * topicname,int new_partition_cnt)4647 void test_create_partitions (rd_kafka_t *use_rk,
4648                              const char *topicname, int new_partition_cnt) {
4649         if (test_broker_version < TEST_BRKVER(0,10,2,0))
4650                 test_kafka_topics("--alter --topic %s --partitions %d",
4651                                   topicname, new_partition_cnt);
4652         else
4653                 test_admin_create_partitions(use_rk, topicname,
4654                                              new_partition_cnt);
4655 }
4656 
4657 
test_get_partition_count(rd_kafka_t * rk,const char * topicname,int timeout_ms)4658 int test_get_partition_count (rd_kafka_t *rk, const char *topicname,
4659                               int timeout_ms) {
4660         rd_kafka_t *use_rk;
4661         rd_kafka_resp_err_t err;
4662         rd_kafka_topic_t *rkt;
4663         int64_t abs_timeout = test_clock() + (timeout_ms * 1000);
4664         int ret = -1;
4665 
4666         if (!rk)
4667                 use_rk = test_create_producer();
4668         else
4669                 use_rk = rk;
4670 
4671         rkt = rd_kafka_topic_new(use_rk, topicname, NULL);
4672 
4673         do {
4674                 const struct rd_kafka_metadata *metadata;
4675 
4676                 err = rd_kafka_metadata(use_rk, 0, rkt, &metadata,
4677                                         tmout_multip(15000));
4678                 if (err)
4679                         TEST_WARN("metadata() for %s failed: %s\n",
4680                                   rkt ? rd_kafka_topic_name(rkt) :
4681                                   "(all-local)",
4682                                   rd_kafka_err2str(err));
4683                 else {
4684                         if (metadata->topic_cnt == 1) {
4685                                 if (metadata->topics[0].err == 0 ||
4686                                     metadata->topics[0].partition_cnt > 0) {
4687                                         int32_t cnt;
4688                                         cnt = metadata->topics[0].partition_cnt;
4689                                         rd_kafka_metadata_destroy(metadata);
4690                                         ret = (int)cnt;
4691                                         break;
4692                                 }
4693                                 TEST_SAY("metadata(%s) returned %s: retrying\n",
4694                                          rd_kafka_topic_name(rkt),
4695                                          rd_kafka_err2str(metadata->
4696                                                           topics[0].err));
4697                         }
4698                         rd_kafka_metadata_destroy(metadata);
4699                         rd_sleep(1);
4700                 }
4701         } while (test_clock() < abs_timeout);
4702 
4703         rd_kafka_topic_destroy(rkt);
4704 
4705         if (!rk)
4706                 rd_kafka_destroy(use_rk);
4707 
4708         return ret;
4709 }
4710 
4711 /**
4712  * @brief Let the broker auto-create the topic for us.
4713  */
test_auto_create_topic_rkt(rd_kafka_t * rk,rd_kafka_topic_t * rkt,int timeout_ms)4714 rd_kafka_resp_err_t test_auto_create_topic_rkt (rd_kafka_t *rk,
4715                                                 rd_kafka_topic_t *rkt,
4716                                                 int timeout_ms) {
4717 	const struct rd_kafka_metadata *metadata;
4718 	rd_kafka_resp_err_t err;
4719 	test_timing_t t;
4720         int64_t abs_timeout = test_clock() + (timeout_ms * 1000);
4721 
4722         do {
4723                 TIMING_START(&t, "auto_create_topic");
4724                 err = rd_kafka_metadata(rk, 0, rkt, &metadata,
4725                                         tmout_multip(15000));
4726                 TIMING_STOP(&t);
4727                 if (err)
4728                         TEST_WARN("metadata() for %s failed: %s\n",
4729                                   rkt ? rd_kafka_topic_name(rkt) :
4730                                   "(all-local)",
4731                                   rd_kafka_err2str(err));
4732                 else {
4733                         if (metadata->topic_cnt == 1) {
4734                                 if (metadata->topics[0].err == 0 ||
4735                                     metadata->topics[0].partition_cnt > 0) {
4736                                         rd_kafka_metadata_destroy(metadata);
4737                                         return 0;
4738                                 }
4739                                 TEST_SAY("metadata(%s) returned %s: retrying\n",
4740                                          rd_kafka_topic_name(rkt),
4741                                          rd_kafka_err2str(metadata->
4742                                                           topics[0].err));
4743                         }
4744                         rd_kafka_metadata_destroy(metadata);
4745                         rd_sleep(1);
4746                 }
4747         } while (test_clock() < abs_timeout);
4748 
4749         return err;
4750 }
4751 
test_auto_create_topic(rd_kafka_t * rk,const char * name,int timeout_ms)4752 rd_kafka_resp_err_t test_auto_create_topic (rd_kafka_t *rk, const char *name,
4753                                             int timeout_ms) {
4754         rd_kafka_topic_t *rkt = rd_kafka_topic_new(rk, name, NULL);
4755         rd_kafka_resp_err_t err;
4756         if (!rkt)
4757                 return rd_kafka_last_error();
4758         err = test_auto_create_topic_rkt(rk, rkt, timeout_ms);
4759         rd_kafka_topic_destroy(rkt);
4760         return err;
4761 }
4762 
4763 
4764 /**
4765  * @brief Check if topic auto creation works.
4766  * @returns 1 if it does, else 0.
4767  */
test_check_auto_create_topic(void)4768 int test_check_auto_create_topic (void) {
4769         rd_kafka_t *rk;
4770         rd_kafka_conf_t *conf;
4771         rd_kafka_resp_err_t err;
4772         const char *topic = test_mk_topic_name("autocreatetest", 1);
4773 
4774         test_conf_init(&conf, NULL, 0);
4775         rk = test_create_handle(RD_KAFKA_PRODUCER, conf);
4776         err = test_auto_create_topic(rk, topic, tmout_multip(5000));
4777         if (err)
4778                 TEST_SAY("Auto topic creation of \"%s\" failed: %s\n",
4779                          topic, rd_kafka_err2str(err));
4780         rd_kafka_destroy(rk);
4781 
4782         return err ? 0 : 1;
4783 }
4784 
4785 
4786 /**
4787  * @brief Builds and runs a Java application from the java/ directory.
4788  *
4789  *        The application is started in the background, use
4790  *        test_waitpid() to await its demise.
4791  *
4792  * @param cls The app class to run using java/run-class.sh
4793  *
4794  * @returns -1 if the application could not be started, else the pid.
4795  */
test_run_java(const char * cls,const char ** argv)4796 int test_run_java (const char *cls, const char **argv) {
4797 #ifdef _WIN32
4798         TEST_WARN("%s(%s) not supported Windows, yet",
4799                   __FUNCTION__, cls);
4800         return -1;
4801 #else
4802         int r;
4803         const char *kpath;
4804         pid_t pid;
4805         const char **full_argv, **p;
4806         int cnt;
4807         extern char **environ;
4808 
4809         kpath = test_getenv("KAFKA_PATH", NULL);
4810 
4811         if (!kpath) {
4812                 TEST_WARN("%s(%s): KAFKA_PATH must be set\n",
4813                           __FUNCTION__, cls);
4814                 return -1;
4815         }
4816 
4817         /* Build */
4818         r = system("make -s java");
4819 
4820         if (r == -1 || WIFSIGNALED(r) || WEXITSTATUS(r)) {
4821                 TEST_WARN("%s(%s): failed to build java class (code %d)\n",
4822                           __FUNCTION__, cls, r);
4823                 return -1;
4824         }
4825 
4826         /* For child process and run cls */
4827         pid = fork();
4828         if (pid == -1) {
4829                 TEST_WARN("%s(%s): failed to fork: %s\n",
4830                           __FUNCTION__, cls, strerror(errno));
4831                 return -1;
4832         }
4833 
4834         if (pid > 0)
4835                 return (int)pid; /* In parent process */
4836 
4837         /* In child process */
4838 
4839         /* Reconstruct argv to contain run-class.sh and the cls */
4840         for (cnt = 0 ; argv[cnt] ; cnt++)
4841                 ;
4842 
4843         cnt += 3; /* run-class.sh, cls, .., NULL */
4844         full_argv = malloc(sizeof(*full_argv) * cnt);
4845         full_argv[0] = "java/run-class.sh";
4846         full_argv[1] = (const char *)cls;
4847 
4848         /* Copy arguments */
4849         for (p = &full_argv[2] ; *argv ; p++, argv++)
4850                 *p = *argv;
4851         *p = NULL;
4852 
4853         /* Run */
4854         r = execve(full_argv[0], (char *const*)full_argv, environ);
4855 
4856         TEST_WARN("%s(%s): failed to execute run-class.sh: %s\n",
4857                   __FUNCTION__, cls, strerror(errno));
4858         exit(2);
4859 
4860         return -1; /* NOTREACHED */
4861 #endif
4862 }
4863 
4864 
4865 /**
4866  * @brief Wait for child-process \p pid to exit.
4867  *
4868  * @returns -1 if the child process exited successfully, else -1.
4869  */
test_waitpid(int pid)4870 int test_waitpid (int pid) {
4871 #ifdef _WIN32
4872         TEST_WARN("%s() not supported Windows, yet",
4873                   __FUNCTION__);
4874         return -1;
4875 #else
4876         pid_t r;
4877         int status = 0;
4878 
4879         r = waitpid((pid_t)pid, &status, 0);
4880 
4881         if (r == -1) {
4882                 TEST_WARN("waitpid(%d) failed: %s\n",
4883                           pid, strerror(errno));
4884                 return -1;
4885         }
4886 
4887         if (WIFSIGNALED(status)) {
4888                 TEST_WARN("Process %d terminated by signal %d\n", pid,
4889                           WTERMSIG(status));
4890                 return -1;
4891         } else if (WEXITSTATUS(status)) {
4892                 TEST_WARN("Process %d exited with status %d\n",
4893                           pid, WEXITSTATUS(status));
4894                 return -1;
4895         }
4896 
4897         return 0;
4898 #endif
4899 }
4900 
4901 
4902 /**
4903  * @brief Check if \p feature is builtin to librdkafka.
4904  * @returns returns 1 if feature is built in, else 0.
4905  */
test_check_builtin(const char * feature)4906 int test_check_builtin (const char *feature) {
4907 	rd_kafka_conf_t *conf;
4908 	char errstr[128];
4909 	int r;
4910 
4911 	conf = rd_kafka_conf_new();
4912 	if (rd_kafka_conf_set(conf, "builtin.features", feature,
4913 			      errstr, sizeof(errstr)) != RD_KAFKA_CONF_OK) {
4914 		TEST_SAY("Feature \"%s\" not built-in: %s\n",
4915 			 feature, errstr);
4916 		r = 0;
4917 	} else {
4918 		TEST_SAY("Feature \"%s\" is built-in\n", feature);
4919 		r = 1;
4920 	}
4921 
4922 	rd_kafka_conf_destroy(conf);
4923 	return r;
4924 }
4925 
4926 
tsprintf(const char * fmt,...)4927 char *tsprintf (const char *fmt, ...) {
4928 	static RD_TLS char ret[8][512];
4929 	static RD_TLS int i;
4930 	va_list ap;
4931 
4932 
4933 	i = (i + 1) % 8;
4934 
4935 	va_start(ap, fmt);
4936 	rd_vsnprintf(ret[i], sizeof(ret[i]), fmt, ap);
4937 	va_end(ap);
4938 
4939 	return ret[i];
4940 }
4941 
4942 
4943 /**
4944  * @brief Add a test report JSON object.
4945  * These will be written as a JSON array to the test report file.
4946  */
test_report_add(struct test * test,const char * fmt,...)4947 void test_report_add (struct test *test, const char *fmt, ...) {
4948 	va_list ap;
4949 	char buf[512];
4950 
4951 	va_start(ap, fmt);
4952 	vsnprintf(buf, sizeof(buf), fmt, ap);
4953 	va_end(ap);
4954 
4955 	if (test->report_cnt == test->report_size) {
4956 		if (test->report_size == 0)
4957 			test->report_size = 8;
4958 		else
4959 			test->report_size *= 2;
4960 
4961 		test->report_arr = realloc(test->report_arr,
4962 					   sizeof(*test->report_arr) *
4963 					   test->report_size);
4964 	}
4965 
4966 	test->report_arr[test->report_cnt++] = rd_strdup(buf);
4967 
4968 	TEST_SAYL(1, "Report #%d: %s\n", test->report_cnt-1, buf);
4969 }
4970 
4971 /**
4972  * Returns 1 if KAFKA_PATH and ZK_ADDRESS is set to se we can use the
4973  * kafka-topics.sh script to manually create topics.
4974  *
4975  * If \p skip is set TEST_SKIP() will be called with a helpful message.
4976  */
test_can_create_topics(int skip)4977 int test_can_create_topics (int skip) {
4978         /* Has AdminAPI */
4979         if (test_broker_version >= TEST_BRKVER(0,10,2,0))
4980                 return 1;
4981 
4982 #ifdef _WIN32
4983 	if (skip)
4984 		TEST_SKIP("Cannot create topics on Win32\n");
4985 	return 0;
4986 #else
4987 
4988 	if (!test_getenv("KAFKA_PATH", NULL) ||
4989 	    !test_getenv("ZK_ADDRESS", NULL)) {
4990 		if (skip)
4991 			TEST_SKIP("Cannot create topics "
4992 				  "(set KAFKA_PATH and ZK_ADDRESS)\n");
4993 		return 0;
4994 	}
4995 
4996 
4997 	return 1;
4998 #endif
4999 }
5000 
5001 
5002 /**
5003  * Wait for \p event_type, discarding all other events prior to it.
5004  */
test_wait_event(rd_kafka_queue_t * eventq,rd_kafka_event_type_t event_type,int timeout_ms)5005 rd_kafka_event_t *test_wait_event (rd_kafka_queue_t *eventq,
5006 				   rd_kafka_event_type_t event_type,
5007 				   int timeout_ms) {
5008 	test_timing_t t_w;
5009 	int64_t abs_timeout = test_clock() + (timeout_ms * 1000);
5010 
5011 	TIMING_START(&t_w, "wait_event");
5012 	while (test_clock() < abs_timeout) {
5013 		rd_kafka_event_t *rkev;
5014 
5015 		rkev = rd_kafka_queue_poll(eventq,
5016 					   (int)(abs_timeout - test_clock())/
5017 					   1000);
5018 
5019 		if (rd_kafka_event_type(rkev) == event_type) {
5020 			TIMING_STOP(&t_w);
5021 			return rkev;
5022 		}
5023 
5024 		if (!rkev)
5025 			continue;
5026 
5027 		if (rd_kafka_event_error(rkev))
5028 			TEST_SAY("discarding ignored event %s: %s\n",
5029 				 rd_kafka_event_name(rkev),
5030 				 rd_kafka_event_error_string(rkev));
5031 		else
5032 			TEST_SAY("discarding ignored event %s\n",
5033 				 rd_kafka_event_name(rkev));
5034 		rd_kafka_event_destroy(rkev);
5035 
5036 	}
5037 	TIMING_STOP(&t_w);
5038 
5039 	return NULL;
5040 }
5041 
5042 
test_SAY(const char * file,int line,int level,const char * str)5043 void test_SAY (const char *file, int line, int level, const char *str) {
5044         TEST_SAYL(level, "%s", str);
5045 }
5046 
test_SKIP(const char * file,int line,const char * str)5047 void test_SKIP (const char *file, int line, const char *str) {
5048         TEST_WARN("SKIPPING TEST: %s", str);
5049         TEST_LOCK();
5050         test_curr->state = TEST_SKIPPED;
5051         if (!*test_curr->failstr) {
5052                 rd_snprintf(test_curr->failstr,
5053                             sizeof(test_curr->failstr), "%s", str);
5054                 rtrim(test_curr->failstr);
5055         }
5056         TEST_UNLOCK();
5057 }
5058 
test_curr_name(void)5059 const char *test_curr_name (void) {
5060         return test_curr->name;
5061 }
5062 
5063 
5064 /**
5065  * @brief Dump/print message haders
5066  */
test_headers_dump(const char * what,int lvl,const rd_kafka_headers_t * hdrs)5067 void test_headers_dump (const char *what, int lvl,
5068                         const rd_kafka_headers_t *hdrs) {
5069         size_t idx = 0;
5070         const char *name, *value;
5071         size_t size;
5072 
5073         while (!rd_kafka_header_get_all(hdrs, idx++, &name,
5074                                         (const void **)&value, &size))
5075                 TEST_SAYL(lvl, "%s: Header #%"PRIusz": %s='%s'\n",
5076                           what, idx-1, name,
5077                           value ? value : "(NULL)");
5078 }
5079 
5080 
5081 /**
5082  * @brief Retrieve and return the list of broker ids in the cluster.
5083  *
5084  * @param rk Optional instance to use.
5085  * @param cntp Will be updated to the number of brokers returned.
5086  *
5087  * @returns a malloc:ed list of int32_t broker ids.
5088  */
test_get_broker_ids(rd_kafka_t * use_rk,size_t * cntp)5089 int32_t *test_get_broker_ids (rd_kafka_t *use_rk, size_t *cntp) {
5090         int32_t *ids;
5091         rd_kafka_t *rk;
5092         const rd_kafka_metadata_t *md;
5093         rd_kafka_resp_err_t err;
5094         size_t i;
5095 
5096         if (!(rk = use_rk))
5097                 rk = test_create_producer();
5098 
5099         err = rd_kafka_metadata(rk, 0, NULL, &md, tmout_multip(5000));
5100         TEST_ASSERT(!err, "%s", rd_kafka_err2str(err));
5101         TEST_ASSERT(md->broker_cnt > 0,
5102                     "%d brokers, expected > 0", md->broker_cnt);
5103 
5104         ids = malloc(sizeof(*ids) * md->broker_cnt);
5105 
5106         for (i = 0 ; i < (size_t)md->broker_cnt ; i++)
5107                 ids[i] = md->brokers[i].id;
5108 
5109         *cntp = md->broker_cnt;
5110 
5111         rd_kafka_metadata_destroy(md);
5112 
5113         if (!use_rk)
5114                 rd_kafka_destroy(rk);
5115 
5116         return ids;
5117 }
5118 
5119 
5120 
5121 /**
5122  * @brief Verify that all topics in \p topics are reported in metadata,
5123  *        and that none of the topics in \p not_topics are reported.
5124  *
5125  * @returns the number of failures (but does not FAIL).
5126  */
verify_topics_in_metadata(rd_kafka_t * rk,rd_kafka_metadata_topic_t * topics,size_t topic_cnt,rd_kafka_metadata_topic_t * not_topics,size_t not_topic_cnt)5127 static int verify_topics_in_metadata (rd_kafka_t *rk,
5128                                       rd_kafka_metadata_topic_t *topics,
5129                                       size_t topic_cnt,
5130                                       rd_kafka_metadata_topic_t *not_topics,
5131                                       size_t not_topic_cnt) {
5132         const rd_kafka_metadata_t *md;
5133         rd_kafka_resp_err_t err;
5134         int ti;
5135         size_t i;
5136         int fails = 0;
5137 
5138         /* Mark topics with dummy error which is overwritten
5139          * when topic is found in metadata, allowing us to check
5140          * for missed topics. */
5141         for (i = 0 ; i < topic_cnt ; i++)
5142                 topics[i].err = 12345;
5143 
5144         err = rd_kafka_metadata(rk, 1/*all_topics*/, NULL, &md,
5145                                 tmout_multip(5000));
5146         TEST_ASSERT(!err, "metadata failed: %s", rd_kafka_err2str(err));
5147 
5148         for (ti = 0 ; ti < md->topic_cnt ; ti++) {
5149                 const rd_kafka_metadata_topic_t *mdt = &md->topics[ti];
5150 
5151                 for (i = 0 ; i < topic_cnt ; i++) {
5152                         int pi;
5153                         rd_kafka_metadata_topic_t *exp_mdt;
5154 
5155                         if (strcmp(topics[i].topic, mdt->topic))
5156                                 continue;
5157 
5158                         exp_mdt = &topics[i];
5159 
5160                         exp_mdt->err = mdt->err; /* indicate found */
5161                         if (mdt->err) {
5162                                 TEST_SAY("metadata: "
5163                                          "Topic %s has error %s\n",
5164                                          mdt->topic,
5165                                          rd_kafka_err2str(mdt->err));
5166                                 fails++;
5167                         }
5168 
5169                         if (exp_mdt->partition_cnt > 0 &&
5170                             mdt->partition_cnt != exp_mdt->partition_cnt) {
5171                                 TEST_SAY("metadata: "
5172                                          "Topic %s, expected %d partitions"
5173                                          ", not %d\n",
5174                                          mdt->topic,
5175                                          exp_mdt->partition_cnt,
5176                                          mdt->partition_cnt);
5177                                 fails++;
5178                                 continue;
5179                         }
5180 
5181                         /* Verify per-partition values */
5182                         for (pi = 0 ; exp_mdt->partitions &&
5183                                      pi < exp_mdt->partition_cnt ; pi++) {
5184                                 const rd_kafka_metadata_partition_t *mdp =
5185                                         &mdt->partitions[pi];
5186                                 const rd_kafka_metadata_partition_t *exp_mdp =
5187                                         &exp_mdt->partitions[pi];
5188 
5189                                 if (mdp->id != exp_mdp->id) {
5190                                         TEST_SAY("metadata: "
5191                                                  "Topic %s, "
5192                                                  "partition %d, "
5193                                                  "partition list out of order,"
5194                                                  " expected %d, not %d\n",
5195                                                  mdt->topic, pi,
5196                                                  exp_mdp->id, mdp->id);
5197                                         fails++;
5198                                         continue;
5199                                 }
5200 
5201                                 if (exp_mdp->replicas) {
5202                                         if (mdp->replica_cnt !=
5203                                             exp_mdp->replica_cnt) {
5204                                                 TEST_SAY("metadata: "
5205                                                          "Topic %s, "
5206                                                          "partition %d, "
5207                                                          "expected %d replicas,"
5208                                                          " not %d\n",
5209                                                          mdt->topic, pi,
5210                                                          exp_mdp->replica_cnt,
5211                                                          mdp->replica_cnt);
5212                                                 fails++;
5213                                         } else if (memcmp(mdp->replicas,
5214                                                           exp_mdp->replicas,
5215                                                           mdp->replica_cnt *
5216                                                           sizeof(*mdp->replicas))) {
5217                                                 int ri;
5218 
5219                                                 TEST_SAY("metadata: "
5220                                                          "Topic %s, "
5221                                                          "partition %d, "
5222                                                          "replica mismatch:\n",
5223                                                          mdt->topic, pi);
5224 
5225                                                 for (ri = 0 ;
5226                                                      ri < mdp->replica_cnt ;
5227                                                      ri++) {
5228                                                         TEST_SAY(" #%d: "
5229                                                                  "expected "
5230                                                                  "replica %d, "
5231                                                                  "not %d\n",
5232                                                                  ri,
5233                                                                  exp_mdp->
5234                                                                  replicas[ri],
5235                                                                  mdp->
5236                                                                  replicas[ri]);
5237                                                 }
5238 
5239                                                 fails++;
5240                                         }
5241 
5242                                 }
5243                         }
5244                 }
5245 
5246                 for (i = 0 ; i < not_topic_cnt ; i++) {
5247                         if (strcmp(not_topics[i].topic, mdt->topic))
5248                                 continue;
5249 
5250                         TEST_SAY("metadata: "
5251                                  "Topic %s found in metadata, unexpected\n",
5252                                  mdt->topic);
5253                         fails++;
5254                 }
5255 
5256         }
5257 
5258         for (i  = 0 ; i < topic_cnt ; i++) {
5259                 if ((int)topics[i].err == 12345) {
5260                         TEST_SAY("metadata: "
5261                                  "Topic %s not seen in metadata\n",
5262                                  topics[i].topic);
5263                         fails++;
5264                 }
5265         }
5266 
5267         if (fails > 0)
5268                 TEST_SAY("Metadata verification for %"PRIusz" topics failed "
5269                          "with %d errors (see above)\n",
5270                          topic_cnt, fails);
5271         else
5272                 TEST_SAY("Metadata verification succeeded: "
5273                          "%"PRIusz" desired topics seen, "
5274                          "%"PRIusz" undesired topics not seen\n",
5275                          topic_cnt, not_topic_cnt);
5276 
5277         rd_kafka_metadata_destroy(md);
5278 
5279         return fails;
5280 }
5281 
5282 
5283 
5284 /**
5285  * @brief Wait for metadata to reflect expected and not expected topics
5286  */
test_wait_metadata_update(rd_kafka_t * rk,rd_kafka_metadata_topic_t * topics,size_t topic_cnt,rd_kafka_metadata_topic_t * not_topics,size_t not_topic_cnt,int tmout)5287 void test_wait_metadata_update (rd_kafka_t *rk,
5288                                 rd_kafka_metadata_topic_t *topics,
5289                                 size_t topic_cnt,
5290                                 rd_kafka_metadata_topic_t *not_topics,
5291                                 size_t not_topic_cnt,
5292                                 int tmout) {
5293         int64_t abs_timeout;
5294         test_timing_t t_md;
5295         rd_kafka_t *our_rk = NULL;
5296 
5297         if (!rk)
5298                 rk = our_rk = test_create_handle(RD_KAFKA_PRODUCER, NULL);
5299 
5300         abs_timeout = test_clock() + (tmout * 1000);
5301 
5302         TEST_SAY("Waiting for up to %dms for metadata update\n", tmout);
5303 
5304         TIMING_START(&t_md, "METADATA.WAIT");
5305         do {
5306                 int md_fails;
5307 
5308                 md_fails = verify_topics_in_metadata(
5309                         rk,
5310                         topics, topic_cnt,
5311                         not_topics, not_topic_cnt);
5312 
5313                 if (!md_fails) {
5314                         TEST_SAY("All expected topics (not?) "
5315                                  "seen in metadata\n");
5316                         abs_timeout = 0;
5317                         break;
5318                 }
5319 
5320                 rd_sleep(1);
5321         } while (test_clock() < abs_timeout);
5322         TIMING_STOP(&t_md);
5323 
5324         if (our_rk)
5325                 rd_kafka_destroy(our_rk);
5326 
5327         if (abs_timeout)
5328                 TEST_FAIL("Expected topics not seen in given time.");
5329 }
5330 
5331 /**
5332  * @brief Wait for topic to be available in metadata
5333  */
test_wait_topic_exists(rd_kafka_t * rk,const char * topic,int tmout)5334 void test_wait_topic_exists (rd_kafka_t *rk, const char *topic, int tmout) {
5335         rd_kafka_metadata_topic_t topics = { .topic = (char *)topic };
5336 
5337         test_wait_metadata_update(rk, &topics, 1, NULL, 0, tmout);
5338 
5339         /* Wait an additional second for the topic to propagate in
5340          * the cluster. This is not perfect but a cheap workaround for
5341          * the asynchronous nature of topic creations in Kafka. */
5342         rd_sleep(1);
5343 }
5344 
5345 
5346 
5347 /**
5348  * @brief Wait for up to \p tmout for any type of admin result.
5349  * @returns the event
5350  */
5351 rd_kafka_event_t *
test_wait_admin_result(rd_kafka_queue_t * q,rd_kafka_event_type_t evtype,int tmout)5352 test_wait_admin_result (rd_kafka_queue_t *q,
5353                         rd_kafka_event_type_t evtype,
5354                         int tmout) {
5355         rd_kafka_event_t *rkev;
5356 
5357         while (1) {
5358                 rkev = rd_kafka_queue_poll(q, tmout);
5359                 if (!rkev)
5360                         TEST_FAIL("Timed out waiting for admin result (%d)\n",
5361                                   evtype);
5362 
5363                 if (rd_kafka_event_type(rkev) == evtype)
5364                         return rkev;
5365 
5366 
5367                 if (rd_kafka_event_type(rkev) == RD_KAFKA_EVENT_ERROR) {
5368                         TEST_WARN("Received error event while waiting for %d: "
5369                                   "%s: ignoring",
5370                                   evtype, rd_kafka_event_error_string(rkev));
5371                         continue;
5372                 }
5373 
5374 
5375                 TEST_ASSERT(rd_kafka_event_type(rkev) == evtype,
5376                             "Expected event type %d, got %d (%s)",
5377                             evtype,
5378                             rd_kafka_event_type(rkev),
5379                             rd_kafka_event_name(rkev));
5380         }
5381 
5382         return NULL;
5383 }
5384 
5385 
5386 
5387 /**
5388  * @brief Wait for up to \p tmout for an admin API result and return the
5389  *        distilled error code.
5390  *
5391  *        Supported APIs:
5392  *        - AlterConfigs
5393  *        - CreatePartitions
5394  *        - CreateTopics
5395  *        - DeleteGroups
5396  *        - DeleteRecords
5397  *        - DeleteTopics
5398  *        * DeleteConsumerGroupOffsets
5399  *        - DescribeConfigs
5400  */
5401 rd_kafka_resp_err_t
test_wait_topic_admin_result(rd_kafka_queue_t * q,rd_kafka_event_type_t evtype,rd_kafka_event_t ** retevent,int tmout)5402 test_wait_topic_admin_result (rd_kafka_queue_t *q,
5403                               rd_kafka_event_type_t evtype,
5404                               rd_kafka_event_t **retevent,
5405                               int tmout) {
5406         rd_kafka_event_t *rkev;
5407         size_t i;
5408         const rd_kafka_topic_result_t **terr = NULL;
5409         size_t terr_cnt = 0;
5410         const rd_kafka_ConfigResource_t **cres = NULL;
5411         size_t cres_cnt = 0;
5412         int errcnt = 0;
5413         rd_kafka_resp_err_t err;
5414         const rd_kafka_group_result_t **gres = NULL;
5415         size_t gres_cnt = 0;
5416         const rd_kafka_topic_partition_list_t *offsets = NULL;
5417 
5418         rkev = test_wait_admin_result(q, evtype, tmout);
5419 
5420         if ((err = rd_kafka_event_error(rkev))) {
5421                 TEST_WARN("%s failed: %s\n",
5422                           rd_kafka_event_name(rkev),
5423                           rd_kafka_event_error_string(rkev));
5424                 rd_kafka_event_destroy(rkev);
5425                 return err;
5426         }
5427 
5428         if (evtype == RD_KAFKA_EVENT_CREATETOPICS_RESULT) {
5429                 const rd_kafka_CreateTopics_result_t *res;
5430                 if (!(res = rd_kafka_event_CreateTopics_result(rkev)))
5431                         TEST_FAIL("Expected a CreateTopics result, not %s",
5432                                   rd_kafka_event_name(rkev));
5433 
5434                 terr = rd_kafka_CreateTopics_result_topics(res, &terr_cnt);
5435 
5436         } else if (evtype == RD_KAFKA_EVENT_DELETETOPICS_RESULT) {
5437                 const rd_kafka_DeleteTopics_result_t *res;
5438                 if (!(res = rd_kafka_event_DeleteTopics_result(rkev)))
5439                         TEST_FAIL("Expected a DeleteTopics result, not %s",
5440                                   rd_kafka_event_name(rkev));
5441 
5442                 terr = rd_kafka_DeleteTopics_result_topics(res, &terr_cnt);
5443 
5444         } else if (evtype == RD_KAFKA_EVENT_CREATEPARTITIONS_RESULT) {
5445                 const rd_kafka_CreatePartitions_result_t *res;
5446                 if (!(res = rd_kafka_event_CreatePartitions_result(rkev)))
5447                         TEST_FAIL("Expected a CreatePartitions result, not %s",
5448                                   rd_kafka_event_name(rkev));
5449 
5450                 terr = rd_kafka_CreatePartitions_result_topics(res, &terr_cnt);
5451 
5452         } else if (evtype == RD_KAFKA_EVENT_DESCRIBECONFIGS_RESULT) {
5453                 const rd_kafka_DescribeConfigs_result_t *res;
5454 
5455                 if (!(res = rd_kafka_event_DescribeConfigs_result(rkev)))
5456                         TEST_FAIL("Expected a DescribeConfigs result, not %s",
5457                                   rd_kafka_event_name(rkev));
5458 
5459                 cres = rd_kafka_DescribeConfigs_result_resources(res,
5460                                                                  &cres_cnt);
5461 
5462         } else if (evtype == RD_KAFKA_EVENT_ALTERCONFIGS_RESULT) {
5463                 const rd_kafka_AlterConfigs_result_t *res;
5464 
5465                 if (!(res = rd_kafka_event_AlterConfigs_result(rkev)))
5466                         TEST_FAIL("Expected a AlterConfigs result, not %s",
5467                                   rd_kafka_event_name(rkev));
5468 
5469                 cres = rd_kafka_AlterConfigs_result_resources(res, &cres_cnt);
5470 
5471         } else if (evtype == RD_KAFKA_EVENT_DELETEGROUPS_RESULT) {
5472                 const rd_kafka_DeleteGroups_result_t *res;
5473                 if (!(res = rd_kafka_event_DeleteGroups_result(rkev)))
5474                         TEST_FAIL("Expected a DeleteGroups result, not %s",
5475                                   rd_kafka_event_name(rkev));
5476 
5477                 gres = rd_kafka_DeleteGroups_result_groups(res, &gres_cnt);
5478 
5479         } else if (evtype == RD_KAFKA_EVENT_DELETERECORDS_RESULT) {
5480                 const rd_kafka_DeleteRecords_result_t *res;
5481                 if (!(res = rd_kafka_event_DeleteRecords_result(rkev)))
5482                         TEST_FAIL("Expected a DeleteRecords result, not %s",
5483                                   rd_kafka_event_name(rkev));
5484 
5485                 offsets = rd_kafka_DeleteRecords_result_offsets(res);
5486 
5487         } else if (evtype == RD_KAFKA_EVENT_DELETECONSUMERGROUPOFFSETS_RESULT) {
5488                 const rd_kafka_DeleteConsumerGroupOffsets_result_t *res;
5489                 if (!(res =
5490                       rd_kafka_event_DeleteConsumerGroupOffsets_result(rkev)))
5491                         TEST_FAIL("Expected a DeleteConsumerGroupOffsets "
5492                                   "result, not %s",
5493                                   rd_kafka_event_name(rkev));
5494 
5495                 gres = rd_kafka_DeleteConsumerGroupOffsets_result_groups(
5496                         rkev, &gres_cnt);
5497 
5498         } else {
5499                 TEST_FAIL("Bad evtype: %d", evtype);
5500                 RD_NOTREACHED();
5501         }
5502 
5503         /* Check topic errors */
5504         for (i = 0 ; i < terr_cnt ; i++) {
5505                 if (rd_kafka_topic_result_error(terr[i])) {
5506                         TEST_WARN("..Topics result: %s: error: %s\n",
5507                                   rd_kafka_topic_result_name(terr[i]),
5508                                   rd_kafka_topic_result_error_string(terr[i]));
5509                         if (!(errcnt++))
5510                                 err = rd_kafka_topic_result_error(terr[i]);
5511                 }
5512         }
5513 
5514         /* Check resource errors */
5515         for (i = 0 ; i < cres_cnt ; i++) {
5516                 if (rd_kafka_ConfigResource_error(cres[i])) {
5517                         TEST_WARN("ConfigResource result: %d,%s: error: %s\n",
5518                                   rd_kafka_ConfigResource_type(cres[i]),
5519                                   rd_kafka_ConfigResource_name(cres[i]),
5520                                   rd_kafka_ConfigResource_error_string(cres[i]));
5521                         if (!(errcnt++))
5522                                 err = rd_kafka_ConfigResource_error(cres[i]);
5523                 }
5524         }
5525 
5526         /* Check group errors */
5527         for (i = 0 ; i < gres_cnt ; i++) {
5528                 const rd_kafka_topic_partition_list_t *parts;
5529 
5530                 if (rd_kafka_group_result_error(gres[i])) {
5531 
5532                         TEST_WARN("%s result: %s: error: %s\n",
5533                                   rd_kafka_event_name(rkev),
5534                                   rd_kafka_group_result_name(gres[i]),
5535                                   rd_kafka_error_string(rd_kafka_group_result_error(gres[i])));
5536                         if (!(errcnt++))
5537                                 err = rd_kafka_error_code(rd_kafka_group_result_error(gres[i]));
5538                 }
5539 
5540                 parts = rd_kafka_group_result_partitions(gres[i]);
5541                 if (parts) {
5542                         int j;
5543                         for (j = 0 ; j < parts->cnt ; i++) {
5544                                 if (!parts->elems[j].err)
5545                                         continue;
5546 
5547                                 TEST_WARN("%s result: %s: "
5548                                           "%s [%"PRId32"] error: %s\n",
5549                                           rd_kafka_event_name(rkev),
5550                                           rd_kafka_group_result_name(gres[i]),
5551                                           parts->elems[j].topic,
5552                                           parts->elems[j].partition,
5553                                           rd_kafka_err2str(
5554                                                   parts->elems[j].err));
5555                                 errcnt++;
5556                         }
5557                 }
5558         }
5559 
5560         /* Check offset errors */
5561         for (i = 0 ; (offsets && i < (size_t)offsets->cnt) ; i++) {
5562                 if (offsets->elems[i].err) {
5563                         TEST_WARN("DeleteRecords result: %s [%d]: error: %s\n",
5564                                   offsets->elems[i].topic, offsets->elems[i].partition,
5565                                   rd_kafka_err2str(offsets->elems[i].err));
5566                         if (!(errcnt++))
5567                                 err = offsets->elems[i].err;
5568                 }
5569         }
5570 
5571         if (!err && retevent)
5572                 *retevent = rkev;
5573         else
5574                 rd_kafka_event_destroy(rkev);
5575 
5576         return err;
5577 }
5578 
5579 
5580 
5581 /**
5582  * @brief Topic Admin API helpers
5583  *
5584  * @param useq Makes the call async and posts the response in this queue.
5585  *             If NULL this call will be synchronous and return the error
5586  *             result.
5587  *
5588  * @remark Fails the current test on failure.
5589  */
5590 
5591 rd_kafka_resp_err_t
test_CreateTopics_simple(rd_kafka_t * rk,rd_kafka_queue_t * useq,char ** topics,size_t topic_cnt,int num_partitions,void * opaque)5592 test_CreateTopics_simple (rd_kafka_t *rk,
5593                           rd_kafka_queue_t *useq,
5594                           char **topics, size_t topic_cnt,
5595                           int num_partitions,
5596                           void *opaque) {
5597         rd_kafka_NewTopic_t **new_topics;
5598         rd_kafka_AdminOptions_t *options;
5599         rd_kafka_queue_t *q;
5600         size_t i;
5601         const int tmout = 30 * 1000;
5602         rd_kafka_resp_err_t err;
5603 
5604         new_topics = malloc(sizeof(*new_topics) * topic_cnt);
5605 
5606         for (i = 0 ; i < topic_cnt ; i++) {
5607                 char errstr[512];
5608                 new_topics[i] = rd_kafka_NewTopic_new(topics[i],
5609                                                       num_partitions, 1,
5610                                                       errstr, sizeof(errstr));
5611                 TEST_ASSERT(new_topics[i],
5612                             "Failed to NewTopic(\"%s\", %d) #%"PRIusz": %s",
5613                             topics[i], num_partitions, i, errstr);
5614         }
5615 
5616         options = rd_kafka_AdminOptions_new(rk, RD_KAFKA_ADMIN_OP_CREATETOPICS);
5617         rd_kafka_AdminOptions_set_opaque(options, opaque);
5618 
5619         if (!useq) {
5620                 char errstr[512];
5621 
5622                 err = rd_kafka_AdminOptions_set_request_timeout(options,
5623                                                                 tmout,
5624                                                                 errstr,
5625                                                                 sizeof(errstr));
5626                 TEST_ASSERT(!err, "set_request_timeout: %s", errstr);
5627                 err = rd_kafka_AdminOptions_set_operation_timeout(options,
5628                                                                   tmout-5000,
5629                                                                   errstr,
5630                                                                   sizeof(errstr));
5631                 TEST_ASSERT(!err, "set_operation_timeout: %s", errstr);
5632 
5633                 q = rd_kafka_queue_new(rk);
5634         } else {
5635                 q = useq;
5636         }
5637 
5638         TEST_SAY("Creating %"PRIusz" topics\n", topic_cnt);
5639 
5640         rd_kafka_CreateTopics(rk, new_topics, topic_cnt, options, q);
5641 
5642         rd_kafka_AdminOptions_destroy(options);
5643 
5644         rd_kafka_NewTopic_destroy_array(new_topics, topic_cnt);
5645         free(new_topics);
5646 
5647         if (useq)
5648                 return RD_KAFKA_RESP_ERR_NO_ERROR;
5649 
5650 
5651         err = test_wait_topic_admin_result(q,
5652                                            RD_KAFKA_EVENT_CREATETOPICS_RESULT,
5653                                            NULL, tmout+5000);
5654 
5655         rd_kafka_queue_destroy(q);
5656 
5657         if (err)
5658                 TEST_FAIL("Failed to create %d topic(s): %s",
5659                           (int)topic_cnt, rd_kafka_err2str(err));
5660 
5661         return err;
5662 }
5663 
5664 
5665 rd_kafka_resp_err_t
test_CreatePartitions_simple(rd_kafka_t * rk,rd_kafka_queue_t * useq,const char * topic,size_t total_part_cnt,void * opaque)5666 test_CreatePartitions_simple (rd_kafka_t *rk,
5667                               rd_kafka_queue_t *useq,
5668                               const char *topic,
5669                               size_t total_part_cnt,
5670                               void *opaque) {
5671         rd_kafka_NewPartitions_t *newp[1];
5672         rd_kafka_AdminOptions_t *options;
5673         rd_kafka_queue_t *q;
5674         const int tmout = 30 * 1000;
5675         rd_kafka_resp_err_t err;
5676         char errstr[512];
5677 
5678         newp[0] = rd_kafka_NewPartitions_new(topic, total_part_cnt, errstr,
5679                                              sizeof(errstr));
5680         TEST_ASSERT(newp[0],
5681                     "Failed to NewPartitions(\"%s\", %"PRIusz"): %s",
5682                     topic, total_part_cnt, errstr);
5683 
5684         options = rd_kafka_AdminOptions_new(rk,
5685                                             RD_KAFKA_ADMIN_OP_CREATEPARTITIONS);
5686         rd_kafka_AdminOptions_set_opaque(options, opaque);
5687 
5688         if (!useq) {
5689                 err = rd_kafka_AdminOptions_set_request_timeout(options,
5690                                                                 tmout,
5691                                                                 errstr,
5692                                                                 sizeof(errstr));
5693                 TEST_ASSERT(!err, "set_request_timeout: %s", errstr);
5694                 err = rd_kafka_AdminOptions_set_operation_timeout(options,
5695                                                                   tmout-5000,
5696                                                                   errstr,
5697                                                                   sizeof(errstr));
5698                 TEST_ASSERT(!err, "set_operation_timeout: %s", errstr);
5699 
5700                 q = rd_kafka_queue_new(rk);
5701         } else {
5702                 q = useq;
5703         }
5704 
5705         TEST_SAY("Creating (up to) %"PRIusz" partitions for topic \"%s\"\n",
5706                  total_part_cnt, topic);
5707 
5708         rd_kafka_CreatePartitions(rk, newp, 1, options, q);
5709 
5710         rd_kafka_AdminOptions_destroy(options);
5711 
5712         rd_kafka_NewPartitions_destroy(newp[0]);
5713 
5714         if (useq)
5715                 return RD_KAFKA_RESP_ERR_NO_ERROR;
5716 
5717 
5718         err = test_wait_topic_admin_result(
5719                 q, RD_KAFKA_EVENT_CREATEPARTITIONS_RESULT, NULL, tmout+5000);
5720 
5721         rd_kafka_queue_destroy(q);
5722 
5723         if (err)
5724                 TEST_FAIL("Failed to create partitions: %s",
5725                           rd_kafka_err2str(err));
5726 
5727         return err;
5728 }
5729 
5730 
5731 rd_kafka_resp_err_t
test_DeleteTopics_simple(rd_kafka_t * rk,rd_kafka_queue_t * useq,char ** topics,size_t topic_cnt,void * opaque)5732 test_DeleteTopics_simple (rd_kafka_t *rk,
5733                           rd_kafka_queue_t *useq,
5734                           char **topics, size_t topic_cnt,
5735                           void *opaque) {
5736         rd_kafka_queue_t *q;
5737         rd_kafka_DeleteTopic_t **del_topics;
5738         rd_kafka_AdminOptions_t *options;
5739         size_t i;
5740         rd_kafka_resp_err_t err;
5741         const int tmout = 30*1000;
5742 
5743         del_topics = malloc(sizeof(*del_topics) * topic_cnt);
5744 
5745         for (i = 0 ; i < topic_cnt ; i++) {
5746                 del_topics[i] = rd_kafka_DeleteTopic_new(topics[i]);
5747                 TEST_ASSERT(del_topics[i]);
5748         }
5749 
5750         options = rd_kafka_AdminOptions_new(rk, RD_KAFKA_ADMIN_OP_DELETETOPICS);
5751         rd_kafka_AdminOptions_set_opaque(options, opaque);
5752 
5753         if (!useq) {
5754                 char errstr[512];
5755 
5756                 err = rd_kafka_AdminOptions_set_request_timeout(options,
5757                                                                 tmout,
5758                                                                 errstr,
5759                                                                 sizeof(errstr));
5760                 TEST_ASSERT(!err, "set_request_timeout: %s", errstr);
5761                 err = rd_kafka_AdminOptions_set_operation_timeout(options,
5762                                                                   tmout-5000,
5763                                                                   errstr,
5764                                                                   sizeof(errstr));
5765                 TEST_ASSERT(!err, "set_operation_timeout: %s", errstr);
5766 
5767                 q = rd_kafka_queue_new(rk);
5768         } else {
5769                 q = useq;
5770         }
5771 
5772         TEST_SAY("Deleting %"PRIusz" topics\n", topic_cnt);
5773 
5774         rd_kafka_DeleteTopics(rk, del_topics, topic_cnt, options, useq);
5775 
5776         rd_kafka_AdminOptions_destroy(options);
5777 
5778         rd_kafka_DeleteTopic_destroy_array(del_topics, topic_cnt);
5779 
5780         free(del_topics);
5781 
5782         if (useq)
5783                 return RD_KAFKA_RESP_ERR_NO_ERROR;
5784 
5785         err = test_wait_topic_admin_result(q,
5786                                            RD_KAFKA_EVENT_DELETETOPICS_RESULT,
5787                                            NULL, tmout+5000);
5788 
5789         rd_kafka_queue_destroy(q);
5790 
5791         if (err)
5792                 TEST_FAIL("Failed to delete topics: %s",
5793                           rd_kafka_err2str(err));
5794 
5795         return err;
5796 }
5797 
5798 rd_kafka_resp_err_t
test_DeleteGroups_simple(rd_kafka_t * rk,rd_kafka_queue_t * useq,char ** groups,size_t group_cnt,void * opaque)5799 test_DeleteGroups_simple (rd_kafka_t *rk,
5800                           rd_kafka_queue_t *useq,
5801                           char **groups, size_t group_cnt,
5802                           void *opaque) {
5803         rd_kafka_queue_t *q;
5804         rd_kafka_DeleteGroup_t **del_groups;
5805         rd_kafka_AdminOptions_t *options;
5806         size_t i;
5807         rd_kafka_resp_err_t err;
5808         const int tmout = 30*1000;
5809 
5810         del_groups = malloc(sizeof(*del_groups) * group_cnt);
5811 
5812         for (i = 0 ; i < group_cnt ; i++) {
5813                 del_groups[i] = rd_kafka_DeleteGroup_new(groups[i]);
5814                 TEST_ASSERT(del_groups[i]);
5815         }
5816 
5817         options = rd_kafka_AdminOptions_new(rk, RD_KAFKA_ADMIN_OP_DELETEGROUPS);
5818         rd_kafka_AdminOptions_set_opaque(options, opaque);
5819 
5820         if (!useq) {
5821                 char errstr[512];
5822 
5823                 err = rd_kafka_AdminOptions_set_request_timeout(options,
5824                                                                 tmout,
5825                                                                 errstr,
5826                                                                 sizeof(errstr));
5827                 TEST_ASSERT(!err, "set_request_timeout: %s", errstr);
5828 
5829                 q = rd_kafka_queue_new(rk);
5830         } else {
5831                 q = useq;
5832         }
5833 
5834         TEST_SAY("Deleting %"PRIusz" groups\n", group_cnt);
5835 
5836         rd_kafka_DeleteGroups(rk, del_groups, group_cnt, options, useq);
5837 
5838         rd_kafka_AdminOptions_destroy(options);
5839 
5840         rd_kafka_DeleteGroup_destroy_array(del_groups, group_cnt);
5841         free(del_groups);
5842 
5843         if (useq)
5844                 return RD_KAFKA_RESP_ERR_NO_ERROR;
5845 
5846         err = test_wait_topic_admin_result(q,
5847                                            RD_KAFKA_EVENT_DELETEGROUPS_RESULT,
5848                                            NULL, tmout+5000);
5849 
5850         rd_kafka_queue_destroy(q);
5851 
5852         rd_kafka_DeleteGroup_destroy_array(del_groups, group_cnt);
5853 
5854         if (err)
5855                 TEST_FAIL("Failed to delete groups: %s",
5856                           rd_kafka_err2str(err));
5857 
5858         return err;
5859 }
5860 
5861 rd_kafka_resp_err_t
test_DeleteRecords_simple(rd_kafka_t * rk,rd_kafka_queue_t * useq,const rd_kafka_topic_partition_list_t * offsets,void * opaque)5862 test_DeleteRecords_simple (rd_kafka_t *rk,
5863                            rd_kafka_queue_t *useq,
5864                            const rd_kafka_topic_partition_list_t *offsets,
5865                            void *opaque) {
5866         rd_kafka_queue_t *q;
5867         rd_kafka_AdminOptions_t *options;
5868         rd_kafka_resp_err_t err;
5869         rd_kafka_DeleteRecords_t *del_records =
5870                 rd_kafka_DeleteRecords_new(offsets);
5871         const int tmout = 30*1000;
5872 
5873         options = rd_kafka_AdminOptions_new(rk,
5874                                             RD_KAFKA_ADMIN_OP_DELETERECORDS);
5875         rd_kafka_AdminOptions_set_opaque(options, opaque);
5876 
5877         if (!useq) {
5878                 char errstr[512];
5879 
5880                 err = rd_kafka_AdminOptions_set_request_timeout(options,
5881                                                                 tmout,
5882                                                                 errstr,
5883                                                                 sizeof(errstr));
5884                 TEST_ASSERT(!err, "set_request_timeout: %s", errstr);
5885                 err = rd_kafka_AdminOptions_set_operation_timeout(
5886                         options,
5887                         tmout-5000,
5888                         errstr,
5889                         sizeof(errstr));
5890                 TEST_ASSERT(!err, "set_operation_timeout: %s", errstr);
5891 
5892                 q = rd_kafka_queue_new(rk);
5893         } else {
5894                 q = useq;
5895         }
5896 
5897         TEST_SAY("Deleting offsets from %d partitions\n", offsets->cnt);
5898 
5899         rd_kafka_DeleteRecords(rk, &del_records, 1, options, q);
5900 
5901         rd_kafka_DeleteRecords_destroy(del_records);
5902 
5903         rd_kafka_AdminOptions_destroy(options);
5904 
5905         if (useq)
5906                 return RD_KAFKA_RESP_ERR_NO_ERROR;
5907 
5908         err = test_wait_topic_admin_result(q,
5909                                            RD_KAFKA_EVENT_DELETERECORDS_RESULT,
5910                                            NULL, tmout+5000);
5911 
5912         rd_kafka_queue_destroy(q);
5913 
5914         if (err)
5915                 TEST_FAIL("Failed to delete records: %s",
5916                           rd_kafka_err2str(err));
5917 
5918         return err;
5919 }
5920 
5921 rd_kafka_resp_err_t
test_DeleteConsumerGroupOffsets_simple(rd_kafka_t * rk,rd_kafka_queue_t * useq,const char * group_id,const rd_kafka_topic_partition_list_t * offsets,void * opaque)5922 test_DeleteConsumerGroupOffsets_simple (
5923         rd_kafka_t *rk,
5924         rd_kafka_queue_t *useq,
5925         const char *group_id,
5926         const rd_kafka_topic_partition_list_t *offsets,
5927         void *opaque) {
5928         rd_kafka_queue_t *q;
5929         rd_kafka_AdminOptions_t *options;
5930         rd_kafka_resp_err_t err;
5931         const int tmout = 30*1000;
5932         rd_kafka_DeleteConsumerGroupOffsets_t *cgoffsets;
5933 
5934         options = rd_kafka_AdminOptions_new(
5935                 rk, RD_KAFKA_ADMIN_OP_DELETECONSUMERGROUPOFFSETS);
5936         rd_kafka_AdminOptions_set_opaque(options, opaque);
5937 
5938         if (!useq) {
5939                 char errstr[512];
5940 
5941                 err = rd_kafka_AdminOptions_set_request_timeout(options,
5942                                                                 tmout,
5943                                                                 errstr,
5944                                                                 sizeof(errstr));
5945                 TEST_ASSERT(!err, "set_request_timeout: %s", errstr);
5946                 err = rd_kafka_AdminOptions_set_operation_timeout(
5947                         options,
5948                         tmout-5000,
5949                         errstr,
5950                         sizeof(errstr));
5951                 TEST_ASSERT(!err, "set_operation_timeout: %s", errstr);
5952 
5953                 q = rd_kafka_queue_new(rk);
5954         } else {
5955                 q = useq;
5956         }
5957 
5958         if (offsets) {
5959                 TEST_SAY("Deleting committed offsets for group %s and "
5960                          "%d partitions\n",
5961                          group_id, offsets->cnt);
5962 
5963                 cgoffsets = rd_kafka_DeleteConsumerGroupOffsets_new(group_id,
5964                                                                     offsets);
5965         } else {
5966                 TEST_SAY("Provoking invalid DeleteConsumerGroupOffsets call\n");
5967                 cgoffsets = NULL;
5968         }
5969 
5970         rd_kafka_DeleteConsumerGroupOffsets(rk, &cgoffsets,
5971                                             cgoffsets ? 1 : 0,
5972                                             options, useq);
5973 
5974         if (cgoffsets)
5975                 rd_kafka_DeleteConsumerGroupOffsets_destroy(cgoffsets);
5976 
5977         rd_kafka_AdminOptions_destroy(options);
5978 
5979         if (useq)
5980                 return RD_KAFKA_RESP_ERR_NO_ERROR;
5981 
5982         err = test_wait_topic_admin_result(
5983                 q,
5984                 RD_KAFKA_EVENT_DELETECONSUMERGROUPOFFSETS_RESULT,
5985                 NULL, tmout+5000);
5986 
5987         rd_kafka_queue_destroy(q);
5988 
5989         if (err)
5990                 TEST_FAIL("Failed to delete committed offsets: %s",
5991                           rd_kafka_err2str(err));
5992 
5993         return err;
5994 }
5995 
5996 /**
5997  * @brief Delta Alter configuration for the given resource,
5998  *        overwriting/setting the configs provided in \p configs.
5999  *        Existing configuration remains intact.
6000  *
6001  * @param configs 'const char *name, const char *value' tuples
6002  * @param config_cnt is the number of tuples in \p configs
6003  */
6004 rd_kafka_resp_err_t
test_AlterConfigs_simple(rd_kafka_t * rk,rd_kafka_ResourceType_t restype,const char * resname,const char ** configs,size_t config_cnt)6005 test_AlterConfigs_simple (rd_kafka_t *rk,
6006                           rd_kafka_ResourceType_t restype,
6007                           const char *resname,
6008                           const char **configs, size_t config_cnt) {
6009         rd_kafka_queue_t *q;
6010         rd_kafka_ConfigResource_t *confres;
6011         rd_kafka_event_t *rkev;
6012         size_t i;
6013         rd_kafka_resp_err_t err;
6014         const rd_kafka_ConfigResource_t **results;
6015         size_t result_cnt;
6016         const rd_kafka_ConfigEntry_t **configents;
6017         size_t configent_cnt;
6018 
6019 
6020         q = rd_kafka_queue_new(rk);
6021 
6022         TEST_SAY("Getting configuration for %d %s\n", restype, resname);
6023 
6024         confres = rd_kafka_ConfigResource_new(restype, resname);
6025         rd_kafka_DescribeConfigs(rk, &confres, 1, NULL, q);
6026 
6027         err = test_wait_topic_admin_result(
6028                 q, RD_KAFKA_EVENT_DESCRIBECONFIGS_RESULT, &rkev, 15*1000);
6029         if (err) {
6030                 rd_kafka_queue_destroy(q);
6031                 rd_kafka_ConfigResource_destroy(confres);
6032                 return err;
6033         }
6034 
6035         results = rd_kafka_DescribeConfigs_result_resources(
6036                 rd_kafka_event_DescribeConfigs_result(rkev), &result_cnt);
6037         TEST_ASSERT(result_cnt == 1,
6038                     "expected 1 DescribeConfigs result, not %"PRIusz,
6039                     result_cnt);
6040 
6041         configents = rd_kafka_ConfigResource_configs(results[0],
6042                                                      &configent_cnt);
6043         TEST_ASSERT(configent_cnt > 0,
6044                     "expected > 0 ConfigEntry:s, not %"PRIusz, configent_cnt);
6045 
6046         TEST_SAY("Altering configuration for %d %s\n", restype, resname);
6047 
6048         /* Apply all existing configuration entries to resource object that
6049          * will later be passed to AlterConfigs. */
6050         for (i = 0 ; i < configent_cnt ; i++) {
6051                 err = rd_kafka_ConfigResource_set_config(
6052                         confres,
6053                         rd_kafka_ConfigEntry_name(configents[i]),
6054                         rd_kafka_ConfigEntry_value(configents[i]));
6055                 TEST_ASSERT(!err, "Failed to set read-back config %s=%s "
6056                             "on local resource object",
6057                             rd_kafka_ConfigEntry_name(configents[i]),
6058                             rd_kafka_ConfigEntry_value(configents[i]));
6059         }
6060 
6061         rd_kafka_event_destroy(rkev);
6062 
6063         /* Then apply the configuration to change. */
6064         for (i = 0 ; i < config_cnt ; i += 2) {
6065                 err = rd_kafka_ConfigResource_set_config(confres,
6066                                                          configs[i],
6067                                                          configs[i+1]);
6068                 TEST_ASSERT(!err, "Failed to set config %s=%s on "
6069                             "local resource object",
6070                             configs[i], configs[i+1]);
6071         }
6072 
6073         rd_kafka_AlterConfigs(rk, &confres, 1, NULL, q);
6074 
6075         rd_kafka_ConfigResource_destroy(confres);
6076 
6077         err = test_wait_topic_admin_result(
6078                 q, RD_KAFKA_EVENT_ALTERCONFIGS_RESULT, NULL, 15*1000);
6079 
6080         rd_kafka_queue_destroy(q);
6081 
6082         return err;
6083 }
6084 
6085 
6086 
test_free_string_array(char ** strs,size_t cnt)6087 static void test_free_string_array (char **strs, size_t cnt) {
6088         size_t i;
6089         for (i = 0 ; i < cnt ; i++)
6090                 free(strs[i]);
6091         free(strs);
6092 }
6093 
6094 
6095 /**
6096  * @return an array of all topics in the cluster matching our the
6097  *         rdkafka test prefix.
6098  */
6099 static rd_kafka_resp_err_t
test_get_all_test_topics(rd_kafka_t * rk,char *** topicsp,size_t * topic_cntp)6100 test_get_all_test_topics (rd_kafka_t *rk, char ***topicsp, size_t *topic_cntp) {
6101         size_t test_topic_prefix_len = strlen(test_topic_prefix);
6102         const rd_kafka_metadata_t *md;
6103         char **topics = NULL;
6104         size_t topic_cnt = 0;
6105         int i;
6106         rd_kafka_resp_err_t err;
6107 
6108         *topic_cntp = 0;
6109         if (topicsp)
6110                 *topicsp = NULL;
6111 
6112         /* Retrieve list of topics */
6113         err = rd_kafka_metadata(rk, 1/*all topics*/, NULL, &md,
6114                                 tmout_multip(10000));
6115         if (err) {
6116                 TEST_WARN("%s: Failed to acquire metadata: %s: "
6117                           "not deleting any topics\n",
6118                           __FUNCTION__, rd_kafka_err2str(err));
6119                 return err;
6120         }
6121 
6122         if (md->topic_cnt == 0) {
6123                 TEST_WARN("%s: No topics in cluster\n", __FUNCTION__);
6124                 rd_kafka_metadata_destroy(md);
6125                 return RD_KAFKA_RESP_ERR_NO_ERROR;
6126         }
6127 
6128         if (topicsp)
6129                 topics = malloc(sizeof(*topics) * md->topic_cnt);
6130 
6131         for (i = 0 ; i < md->topic_cnt ; i++) {
6132                 if (strlen(md->topics[i].topic) >= test_topic_prefix_len &&
6133                     !strncmp(md->topics[i].topic,
6134                              test_topic_prefix, test_topic_prefix_len)) {
6135                         if (topicsp)
6136                                 topics[topic_cnt++] =
6137                                         rd_strdup(md->topics[i].topic);
6138                         else
6139                                 topic_cnt++;
6140                 }
6141         }
6142 
6143         if (topic_cnt == 0) {
6144                 TEST_SAY("%s: No topics (out of %d) matching our "
6145                          "test prefix (%s)\n",
6146                          __FUNCTION__, md->topic_cnt, test_topic_prefix);
6147                 rd_kafka_metadata_destroy(md);
6148                 if (topics)
6149                         test_free_string_array(topics, topic_cnt);
6150                 return RD_KAFKA_RESP_ERR_NO_ERROR;
6151         }
6152 
6153         rd_kafka_metadata_destroy(md);
6154 
6155         if (topicsp)
6156                 *topicsp = topics;
6157         *topic_cntp = topic_cnt;
6158 
6159         return RD_KAFKA_RESP_ERR_NO_ERROR;
6160 }
6161 
6162 /**
6163  * @brief Delete all test topics using the Kafka Admin API.
6164  */
test_delete_all_test_topics(int timeout_ms)6165 rd_kafka_resp_err_t test_delete_all_test_topics (int timeout_ms) {
6166         rd_kafka_t *rk;
6167         char **topics;
6168         size_t topic_cnt = 0;
6169         rd_kafka_resp_err_t err;
6170         int i;
6171         rd_kafka_AdminOptions_t *options;
6172         rd_kafka_queue_t *q;
6173         char errstr[256];
6174         int64_t abs_timeout = test_clock() + (timeout_ms * 1000);
6175 
6176         rk = test_create_producer();
6177 
6178         err = test_get_all_test_topics(rk, &topics, &topic_cnt);
6179         if (err) {
6180                 /* Error already reported by test_get_all_test_topics() */
6181                 rd_kafka_destroy(rk);
6182                 return err;
6183         }
6184 
6185         if (topic_cnt == 0) {
6186                 rd_kafka_destroy(rk);
6187                 return RD_KAFKA_RESP_ERR_NO_ERROR;
6188         }
6189 
6190         q = rd_kafka_queue_get_main(rk);
6191 
6192         options = rd_kafka_AdminOptions_new(rk, RD_KAFKA_ADMIN_OP_DELETETOPICS);
6193         if (rd_kafka_AdminOptions_set_operation_timeout(options, 2*60*1000,
6194                                                         errstr,
6195                                                         sizeof(errstr)))
6196                 TEST_SAY(_C_YEL "Failed to set DeleteTopics timeout: %s: "
6197                          "ignoring\n",
6198                          errstr);
6199 
6200         TEST_SAY(_C_MAG "====> Deleting all test topics with <===="
6201                  "a timeout of 2 minutes\n");
6202 
6203         test_DeleteTopics_simple(rk, q, topics, topic_cnt, options);
6204 
6205         rd_kafka_AdminOptions_destroy(options);
6206 
6207         while (1) {
6208                 rd_kafka_event_t *rkev;
6209                 const rd_kafka_DeleteTopics_result_t *res;
6210 
6211                 rkev = rd_kafka_queue_poll(q, -1);
6212 
6213                 res = rd_kafka_event_DeleteTopics_result(rkev);
6214                 if (!res) {
6215                         TEST_SAY("%s: Ignoring event: %s: %s\n",
6216                                  __FUNCTION__, rd_kafka_event_name(rkev),
6217                                  rd_kafka_event_error_string(rkev));
6218                         rd_kafka_event_destroy(rkev);
6219                         continue;
6220                 }
6221 
6222                 if (rd_kafka_event_error(rkev)) {
6223                         TEST_WARN("%s: DeleteTopics for %"PRIusz" topics "
6224                                   "failed: %s\n",
6225                                   __FUNCTION__, topic_cnt,
6226                                   rd_kafka_event_error_string(rkev));
6227                         err = rd_kafka_event_error(rkev);
6228                 } else {
6229                         const rd_kafka_topic_result_t **terr;
6230                         size_t tcnt;
6231                         int okcnt = 0;
6232 
6233                         terr = rd_kafka_DeleteTopics_result_topics(res, &tcnt);
6234 
6235                         for(i = 0 ; i < (int)tcnt ; i++) {
6236                                 if (!rd_kafka_topic_result_error(terr[i])) {
6237                                         okcnt++;
6238                                         continue;
6239                                 }
6240 
6241                                 TEST_WARN("%s: Failed to delete topic %s: %s\n",
6242                                           __FUNCTION__,
6243                                           rd_kafka_topic_result_name(terr[i]),
6244                                           rd_kafka_topic_result_error_string(
6245                                                   terr[i]));
6246                         }
6247 
6248                         TEST_SAY("%s: DeleteTopics "
6249                                  "succeeded for %d/%"PRIusz" topics\n",
6250                                  __FUNCTION__, okcnt, topic_cnt);
6251                         err = RD_KAFKA_RESP_ERR_NO_ERROR;
6252                 }
6253 
6254                 rd_kafka_event_destroy(rkev);
6255                 break;
6256         }
6257 
6258         rd_kafka_queue_destroy(q);
6259 
6260         test_free_string_array(topics, topic_cnt);
6261 
6262         /* Wait for topics to be fully deleted */
6263         while (1) {
6264                 err = test_get_all_test_topics(rk, NULL, &topic_cnt);
6265 
6266                 if (!err && topic_cnt == 0)
6267                         break;
6268 
6269                 if (abs_timeout < test_clock()) {
6270                         TEST_WARN("%s: Timed out waiting for "
6271                                   "remaining %"PRIusz" deleted topics "
6272                                   "to disappear from cluster metadata\n",
6273                                   __FUNCTION__, topic_cnt);
6274                         break;
6275                 }
6276 
6277                 TEST_SAY("Waiting for remaining %"PRIusz" delete topics "
6278                          "to disappear from cluster metadata\n", topic_cnt);
6279 
6280                 rd_sleep(1);
6281         }
6282 
6283         rd_kafka_destroy(rk);
6284 
6285         return err;
6286 }
6287 
6288 
6289 
test_fail0(const char * file,int line,const char * function,int do_lock,int fail_now,const char * fmt,...)6290 void test_fail0 (const char *file, int line, const char *function,
6291                  int do_lock, int fail_now, const char *fmt, ...) {
6292         char buf[512];
6293         int is_thrd = 0;
6294         size_t of;
6295         va_list ap;
6296         char *t;
6297         char timestr[32];
6298         time_t tnow = time(NULL);
6299 
6300 #ifdef __MINGW32__
6301         strftime(timestr, sizeof(timestr), "%a %b %d %H:%M:%S %Y", localtime(&tnow));
6302 #elif defined(_WIN32)
6303         ctime_s(timestr, sizeof(timestr), &tnow);
6304 #else
6305         ctime_r(&tnow, timestr);
6306 #endif
6307         t = strchr(timestr, '\n');
6308         if (t)
6309                 *t = '\0';
6310 
6311         of = rd_snprintf(buf, sizeof(buf), "%s%s%s():%i: ",
6312                          test_curr->subtest, *test_curr->subtest ? ": " : "",
6313                          function, line);
6314         rd_assert(of < sizeof(buf));
6315 
6316         va_start(ap, fmt);
6317         rd_vsnprintf(buf+of, sizeof(buf)-of, fmt, ap);
6318         va_end(ap);
6319 
6320         /* Remove trailing newline */
6321         if ((t =  strchr(buf, '\n')) && !*(t+1))
6322                 *t = '\0';
6323 
6324         TEST_SAYL(0, "TEST FAILURE\n");
6325         fprintf(stderr,
6326                 "\033[31m### Test \"%s%s%s%s\" failed at %s:%i:%s() at %s: "
6327                 "###\n"
6328                 "%s\n",
6329                 test_curr->name,
6330                 *test_curr->subtest ? " (" : "",
6331                 test_curr->subtest,
6332                 *test_curr->subtest ? ")" : "",
6333                 file, line, function, timestr, buf+of);
6334         if (do_lock)
6335                 TEST_LOCK();
6336         test_curr->state = TEST_FAILED;
6337         test_curr->failcnt += 1;
6338         test_curr->is_fatal_cb = NULL;
6339 
6340         if (!*test_curr->failstr) {
6341                 strncpy(test_curr->failstr, buf, sizeof(test_curr->failstr));
6342                 test_curr->failstr[sizeof(test_curr->failstr)-1] = '\0';
6343         }
6344         if (fail_now && test_curr->mainfunc) {
6345                 tests_running_cnt--;
6346                 is_thrd = 1;
6347         }
6348         if (do_lock)
6349                 TEST_UNLOCK();
6350         if (!fail_now)
6351                 return;
6352         if (test_assert_on_fail || !is_thrd)
6353                 assert(0);
6354         else
6355                 thrd_exit(0);
6356 }
6357 
6358 
6359 /**
6360  * @brief Destroy a mock cluster and its underlying rd_kafka_t handle
6361  */
test_mock_cluster_destroy(rd_kafka_mock_cluster_t * mcluster)6362 void test_mock_cluster_destroy (rd_kafka_mock_cluster_t *mcluster) {
6363         rd_kafka_t *rk = rd_kafka_mock_cluster_handle(mcluster);
6364         rd_kafka_mock_cluster_destroy(mcluster);
6365         rd_kafka_destroy(rk);
6366 }
6367 
6368 
6369 
6370 /**
6371  * @brief Create a standalone mock cluster that can be used by multiple
6372  *        rd_kafka_t instances.
6373  */
test_mock_cluster_new(int broker_cnt,const char ** bootstraps)6374 rd_kafka_mock_cluster_t *test_mock_cluster_new (int broker_cnt,
6375                                                 const char **bootstraps) {
6376         rd_kafka_t *rk;
6377         rd_kafka_conf_t *conf = rd_kafka_conf_new();
6378         rd_kafka_mock_cluster_t *mcluster;
6379         char errstr[256];
6380 
6381         test_conf_common_init(conf, 0);
6382 
6383         test_conf_set(conf, "client.id", "MOCK");
6384 
6385         rk = rd_kafka_new(RD_KAFKA_PRODUCER, conf, errstr, sizeof(errstr));
6386         TEST_ASSERT(rk, "Failed to create mock cluster rd_kafka_t: %s", errstr);
6387 
6388         mcluster = rd_kafka_mock_cluster_new(rk, broker_cnt);
6389         TEST_ASSERT(mcluster, "Failed to acquire mock cluster");
6390 
6391         if (bootstraps)
6392                 *bootstraps = rd_kafka_mock_cluster_bootstraps(mcluster);
6393 
6394         return mcluster;
6395 }
6396 
6397 
6398 
6399 /**
6400  * @name Sub-tests
6401  */
6402 
6403 
6404 /**
6405  * @brief Start a sub-test. \p fmt is optional and allows additional
6406  *        sub-test info to be displayed, e.g., test parameters.
6407  *
6408  * @returns 0 if sub-test should not be run, else 1.
6409  */
test_sub_start(const char * func,int line,int is_quick,const char * fmt,...)6410 int test_sub_start (const char *func, int line, int is_quick,
6411                     const char *fmt, ...) {
6412 
6413         if (!is_quick && test_quick)
6414                 return 0;
6415 
6416         if (subtests_to_run && !strstr(func, subtests_to_run))
6417                 return 0;
6418 
6419         if (fmt && *fmt) {
6420                 va_list ap;
6421                 char buf[256];
6422 
6423                 va_start(ap, fmt);
6424                 rd_vsnprintf(buf, sizeof(buf), fmt, ap);
6425                 va_end(ap);
6426 
6427                 rd_snprintf(test_curr->subtest, sizeof(test_curr->subtest),
6428                             "%s:%d: %s", func, line, buf);
6429         } else {
6430                 rd_snprintf(test_curr->subtest, sizeof(test_curr->subtest),
6431                             "%s:%d", func, line);
6432         }
6433 
6434         TIMING_START(&test_curr->subtest_duration, "SUBTEST");
6435 
6436         TEST_SAY(_C_MAG "[ %s ]\n", test_curr->subtest);
6437 
6438         return 1;
6439 }
6440 
6441 
6442 /**
6443  * @brief Reset the current subtest state.
6444  */
test_sub_reset(void)6445 static void test_sub_reset (void) {
6446         *test_curr->subtest = '\0';
6447         test_curr->is_fatal_cb = NULL;
6448         test_curr->ignore_dr_err = rd_false;
6449         test_curr->exp_dr_err = RD_KAFKA_RESP_ERR_NO_ERROR;
6450         test_curr->dr_mv = NULL;
6451 }
6452 
6453 /**
6454  * @brief Sub-test has passed.
6455  */
test_sub_pass(void)6456 void test_sub_pass (void) {
6457 
6458         TEST_ASSERT(*test_curr->subtest);
6459 
6460         TEST_SAYL(1, _C_GRN "[ %s: PASS (%.02fs) ]\n", test_curr->subtest,
6461                   (float)(TIMING_DURATION(&test_curr->subtest_duration) /
6462                           1000000.0f));
6463 
6464         test_sub_reset();
6465 }
6466 
6467 
6468 /**
6469  * @brief Skip sub-test (must have been started with SUB_TEST*()).
6470  */
test_sub_skip(const char * fmt,...)6471 void test_sub_skip (const char *fmt, ...) {
6472         va_list ap;
6473         char buf[256];
6474 
6475         TEST_ASSERT(*test_curr->subtest);
6476 
6477         va_start(ap, fmt);
6478         rd_vsnprintf(buf, sizeof(buf), fmt, ap);
6479         va_end(ap);
6480 
6481         TEST_SAYL(1, _C_YEL "[ %s: SKIP: %s ]\n", test_curr->subtest, buf);
6482 
6483         test_sub_reset();
6484 }
6485