1 /*
2  * librdkafka - Apache Kafka C library
3  *
4  * Copyright (c) 2012-2013, Magnus Edenhill
5  * All rights reserved.
6  *
7  * Redistribution and use in source and binary forms, with or without
8  * modification, are permitted provided that the following conditions are met:
9  *
10  * 1. Redistributions of source code must retain the above copyright notice,
11  *    this list of conditions and the following disclaimer.
12  * 2. Redistributions in binary form must reproduce the above copyright notice,
13  *    this list of conditions and the following disclaimer in the documentation
14  *    and/or other materials provided with the distribution.
15  *
16  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
17  * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
18  * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
19  * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
20  * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
21  * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
22  * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
23  * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
24  * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
25  * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
26  * POSSIBILITY OF SUCH DAMAGE.
27  */
28 
29 
30 #define _CRT_RAND_S  // rand_s() on MSVC
31 #include <stdarg.h>
32 #include "test.h"
33 #include <signal.h>
34 #include <stdlib.h>
35 #include <stdio.h>
36 
37 #ifdef _MSC_VER
38 #include <direct.h> /* _getcwd */
39 #else
40 #include <sys/wait.h> /* waitpid */
41 #endif
42 
43 /* Typical include path would be <librdkafka/rdkafka.h>, but this program
44  * is built from within the librdkafka source tree and thus differs. */
45 #include "rdkafka.h"
46 
47 int test_level = 2;
48 int test_seed = 0;
49 
50 char test_mode[64] = "bare";
51 char test_scenario[64] = "default";
52 static volatile sig_atomic_t test_exit = 0;
53 static char test_topic_prefix[128] = "rdkafkatest";
54 static int  test_topic_random = 0;
55 int          tests_running_cnt = 0;
56 int          test_concurrent_max = 5;
57 int         test_assert_on_fail = 0;
58 double test_timeout_multiplier  = 1.0;
59 static char *test_sql_cmd = NULL;
60 int  test_session_timeout_ms = 6000;
61 int          test_broker_version;
62 static const char *test_broker_version_str = "2.4.0.0";
63 int          test_flags = 0;
64 int          test_neg_flags = TEST_F_KNOWN_ISSUE;
65 /* run delete-test-topics.sh between each test (when concurrent_max = 1) */
66 static int   test_delete_topics_between = 0;
67 static const char *test_git_version = "HEAD";
68 static const char *test_sockem_conf = "";
69 int          test_on_ci = 0; /* Tests are being run on CI, be more forgiving
70                               * with regards to timeouts, etc. */
71 int          test_quick = 0; /** Run tests quickly */
72 int          test_idempotent_producer = 0;
73 int          test_rusage = 0; /**< Check resource usage */
74 /**< CPU speed calibration for rusage threshold checks.
75  *   >1.0: CPU is slower than base line system,
76  *   <1.0: CPU is faster than base line system. */
77 double       test_rusage_cpu_calibration = 1.0;
78 static  const char *tests_to_run = NULL; /* all */
79 
80 static int show_summary = 1;
81 static int test_summary (int do_lock);
82 
83 /**
84  * Protects shared state, such as tests[]
85  */
86 mtx_t test_mtx;
87 cnd_t test_cnd;
88 
89 static const char *test_states[] = {
90         "DNS",
91         "SKIPPED",
92         "RUNNING",
93         "PASSED",
94         "FAILED",
95 };
96 
97 
98 
99 #define _TEST_DECL(NAME)                                                \
100         extern int main_ ## NAME (int, char **)
101 #define _TEST(NAME,FLAGS,...)						\
102         { .name = # NAME, .mainfunc = main_ ## NAME, .flags = FLAGS, __VA_ARGS__ }
103 
104 
105 /**
106  * Declare all tests here
107  */
108 _TEST_DECL(0000_unittests);
109 _TEST_DECL(0001_multiobj);
110 _TEST_DECL(0002_unkpart);
111 _TEST_DECL(0003_msgmaxsize);
112 _TEST_DECL(0004_conf);
113 _TEST_DECL(0005_order);
114 _TEST_DECL(0006_symbols);
115 _TEST_DECL(0007_autotopic);
116 _TEST_DECL(0008_reqacks);
117 _TEST_DECL(0009_mock_cluster);
118 _TEST_DECL(0011_produce_batch);
119 _TEST_DECL(0012_produce_consume);
120 _TEST_DECL(0013_null_msgs);
121 _TEST_DECL(0014_reconsume_191);
122 _TEST_DECL(0015_offsets_seek);
123 _TEST_DECL(0016_client_swname);
124 _TEST_DECL(0017_compression);
125 _TEST_DECL(0018_cgrp_term);
126 _TEST_DECL(0019_list_groups);
127 _TEST_DECL(0020_destroy_hang);
128 _TEST_DECL(0021_rkt_destroy);
129 _TEST_DECL(0022_consume_batch);
130 _TEST_DECL(0025_timers);
131 _TEST_DECL(0026_consume_pause);
132 _TEST_DECL(0028_long_topicnames);
133 _TEST_DECL(0029_assign_offset);
134 _TEST_DECL(0030_offset_commit);
135 _TEST_DECL(0031_get_offsets);
136 _TEST_DECL(0033_regex_subscribe);
137 _TEST_DECL(0033_regex_subscribe_local);
138 _TEST_DECL(0034_offset_reset);
139 _TEST_DECL(0035_api_version);
140 _TEST_DECL(0036_partial_fetch);
141 _TEST_DECL(0037_destroy_hang_local);
142 _TEST_DECL(0038_performance);
143 _TEST_DECL(0039_event_dr);
144 _TEST_DECL(0039_event);
145 _TEST_DECL(0040_io_event);
146 _TEST_DECL(0041_fetch_max_bytes);
147 _TEST_DECL(0042_many_topics);
148 _TEST_DECL(0043_no_connection);
149 _TEST_DECL(0044_partition_cnt);
150 _TEST_DECL(0045_subscribe_update);
151 _TEST_DECL(0045_subscribe_update_topic_remove);
152 _TEST_DECL(0045_subscribe_update_non_exist_and_partchange);
153 _TEST_DECL(0046_rkt_cache);
154 _TEST_DECL(0047_partial_buf_tmout);
155 _TEST_DECL(0048_partitioner);
156 _TEST_DECL(0049_consume_conn_close);
157 _TEST_DECL(0050_subscribe_adds);
158 _TEST_DECL(0051_assign_adds);
159 _TEST_DECL(0052_msg_timestamps);
160 _TEST_DECL(0053_stats_timing);
161 _TEST_DECL(0053_stats);
162 _TEST_DECL(0054_offset_time);
163 _TEST_DECL(0055_producer_latency);
164 _TEST_DECL(0056_balanced_group_mt);
165 _TEST_DECL(0057_invalid_topic);
166 _TEST_DECL(0058_log);
167 _TEST_DECL(0059_bsearch);
168 _TEST_DECL(0060_op_prio);
169 _TEST_DECL(0061_consumer_lag);
170 _TEST_DECL(0062_stats_event);
171 _TEST_DECL(0063_clusterid);
172 _TEST_DECL(0064_interceptors);
173 _TEST_DECL(0065_yield);
174 _TEST_DECL(0066_plugins);
175 _TEST_DECL(0067_empty_topic);
176 _TEST_DECL(0068_produce_timeout);
177 _TEST_DECL(0069_consumer_add_parts);
178 _TEST_DECL(0070_null_empty);
179 _TEST_DECL(0072_headers_ut);
180 _TEST_DECL(0073_headers);
181 _TEST_DECL(0074_producev);
182 _TEST_DECL(0075_retry);
183 _TEST_DECL(0076_produce_retry);
184 _TEST_DECL(0077_compaction);
185 _TEST_DECL(0078_c_from_cpp);
186 _TEST_DECL(0079_fork);
187 _TEST_DECL(0080_admin_ut);
188 _TEST_DECL(0081_admin);
189 _TEST_DECL(0082_fetch_max_bytes);
190 _TEST_DECL(0083_cb_event);
191 _TEST_DECL(0084_destroy_flags_local);
192 _TEST_DECL(0084_destroy_flags);
193 _TEST_DECL(0085_headers);
194 _TEST_DECL(0086_purge_local);
195 _TEST_DECL(0086_purge_remote);
196 _TEST_DECL(0088_produce_metadata_timeout);
197 _TEST_DECL(0089_max_poll_interval);
198 _TEST_DECL(0090_idempotence);
199 _TEST_DECL(0091_max_poll_interval_timeout);
200 _TEST_DECL(0092_mixed_msgver);
201 _TEST_DECL(0093_holb_consumer);
202 _TEST_DECL(0094_idempotence_msg_timeout);
203 _TEST_DECL(0095_all_brokers_down);
204 _TEST_DECL(0097_ssl_verify);
205 _TEST_DECL(0098_consumer_txn);
206 _TEST_DECL(0099_commit_metadata);
207 _TEST_DECL(0100_thread_interceptors);
208 _TEST_DECL(0101_fetch_from_follower);
209 _TEST_DECL(0102_static_group_rebalance);
210 _TEST_DECL(0103_transactions_local);
211 _TEST_DECL(0103_transactions);
212 _TEST_DECL(0104_fetch_from_follower_mock);
213 _TEST_DECL(0105_transactions_mock);
214 _TEST_DECL(0106_cgrp_sess_timeout);
215 _TEST_DECL(0107_topic_recreate);
216 
217 /* Manual tests */
218 _TEST_DECL(8000_idle);
219 
220 
221 /* Define test resource usage thresholds if the default limits
222  * are not tolerable.
223  *
224  * Fields:
225  *  .ucpu  - Max User CPU percentage  (double)
226  *  .scpu  - Max System/Kernel CPU percentage  (double)
227  *  .rss   - Max RSS (memory) in megabytes  (double)
228  *  .ctxsw - Max number of voluntary context switches  (int)
229  *
230  * Also see test_rusage_check_thresholds() in rusage.c
231  *
232  * Make a comment in the _THRES() below why the extra thresholds are required.
233  *
234  * Usage:
235  *  _TEST(00...., ...,
236  *        _THRES(.ucpu = 15.0)),  <--  Max 15% User CPU usage
237  */
238 #define _THRES(...) .rusage_thres = { __VA_ARGS__ }
239 
240 /**
241  * Define all tests here
242  */
243 struct test tests[] = {
244         /* Special MAIN test to hold over-all timings, etc. */
245         { .name = "<MAIN>", .flags = TEST_F_LOCAL },
246         _TEST(0000_unittests, TEST_F_LOCAL,
247               /* The msgq insert order tests are heavy on
248                * user CPU (memory scan), RSS, and
249                * system CPU (lots of allocations -> madvise(2)). */
250               _THRES(.ucpu = 100.0, .scpu = 20.0, .rss = 900.0)),
251         _TEST(0001_multiobj, 0),
252         _TEST(0002_unkpart, 0),
253         _TEST(0003_msgmaxsize, 0),
254         _TEST(0004_conf, TEST_F_LOCAL),
255         _TEST(0005_order, 0),
256         _TEST(0006_symbols, TEST_F_LOCAL),
257         _TEST(0007_autotopic, 0),
258         _TEST(0008_reqacks, 0),
259         _TEST(0009_mock_cluster, TEST_F_LOCAL,
260               /* Mock cluster requires MsgVersion 2 */
261               TEST_BRKVER(0,11,0,0)),
262         _TEST(0011_produce_batch, 0,
263               /* Produces a lot of messages */
264               _THRES(.ucpu = 40.0, .scpu = 8.0)),
265         _TEST(0012_produce_consume, 0),
266         _TEST(0013_null_msgs, 0),
267         _TEST(0014_reconsume_191, 0),
268         _TEST(0015_offsets_seek, 0),
269         _TEST(0016_client_swname, 0),
270         _TEST(0017_compression, 0),
271         _TEST(0018_cgrp_term, 0, TEST_BRKVER(0,9,0,0)),
272         _TEST(0019_list_groups, 0, TEST_BRKVER(0,9,0,0)),
273         _TEST(0020_destroy_hang, 0, TEST_BRKVER(0,9,0,0)),
274         _TEST(0021_rkt_destroy, 0),
275         _TEST(0022_consume_batch, 0),
276         _TEST(0025_timers, TEST_F_LOCAL),
277 	_TEST(0026_consume_pause, TEST_F_KNOWN_ISSUE, TEST_BRKVER(0,9,0,0),
278                 .extra = "Fragile test due to #2190"),
279 	_TEST(0028_long_topicnames, TEST_F_KNOWN_ISSUE, TEST_BRKVER(0,9,0,0),
280 	      .extra = "https://github.com/edenhill/librdkafka/issues/529"),
281 	_TEST(0029_assign_offset, 0),
282 	_TEST(0030_offset_commit, 0, TEST_BRKVER(0,9,0,0),
283               /* Loops over committed() until timeout */
284               _THRES(.ucpu = 10.0, .scpu = 5.0)),
285 	_TEST(0031_get_offsets, 0),
286 	_TEST(0033_regex_subscribe, 0, TEST_BRKVER(0,9,0,0)),
287         _TEST(0033_regex_subscribe_local, TEST_F_LOCAL),
288 	_TEST(0034_offset_reset, 0),
289 	_TEST(0035_api_version, 0),
290 	_TEST(0036_partial_fetch, 0),
291 	_TEST(0037_destroy_hang_local, TEST_F_LOCAL),
292 	_TEST(0038_performance, 0,
293               /* Produces and consumes a lot of messages */
294               _THRES(.ucpu = 150.0, .scpu = 10)),
295 	_TEST(0039_event_dr, 0),
296         _TEST(0039_event, TEST_F_LOCAL),
297 	_TEST(0040_io_event, 0, TEST_BRKVER(0,9,0,0)),
298 	_TEST(0041_fetch_max_bytes, 0,
299               /* Re-fetches large messages multiple times */
300               _THRES(.ucpu = 20.0, .scpu = 10.0)),
301 	_TEST(0042_many_topics, 0),
302 	_TEST(0043_no_connection, TEST_F_LOCAL),
303 	_TEST(0044_partition_cnt, 0, TEST_BRKVER(1,0,0,0),
304               /* Produces a lot of messages */
305               _THRES(.ucpu = 30.0)),
306 	_TEST(0045_subscribe_update, 0, TEST_BRKVER(0,9,0,0)),
307 	_TEST(0045_subscribe_update_topic_remove, TEST_F_KNOWN_ISSUE,
308               TEST_BRKVER(0,9,0,0)),
309         _TEST(0045_subscribe_update_non_exist_and_partchange, 0,
310               TEST_BRKVER(0,9,0,0)),
311 	_TEST(0046_rkt_cache, TEST_F_LOCAL),
312 	_TEST(0047_partial_buf_tmout, TEST_F_KNOWN_ISSUE),
313 	_TEST(0048_partitioner, 0,
314               /* Produces many small messages */
315               _THRES(.ucpu = 10.0, .scpu = 5.0)),
316 #if WITH_SOCKEM
317         _TEST(0049_consume_conn_close, TEST_F_SOCKEM, TEST_BRKVER(0,9,0,0)),
318 #endif
319         _TEST(0050_subscribe_adds, 0, TEST_BRKVER(0,9,0,0)),
320         _TEST(0051_assign_adds, 0, TEST_BRKVER(0,9,0,0)),
321         _TEST(0052_msg_timestamps, 0, TEST_BRKVER(0,10,0,0)),
322         _TEST(0053_stats_timing, TEST_F_LOCAL),
323         _TEST(0053_stats, 0),
324         _TEST(0054_offset_time, 0, TEST_BRKVER(0,10,1,0)),
325         _TEST(0055_producer_latency, TEST_F_KNOWN_ISSUE_WIN32),
326         _TEST(0056_balanced_group_mt, 0, TEST_BRKVER(0,9,0,0)),
327         _TEST(0057_invalid_topic, 0, TEST_BRKVER(0,9,0,0)),
328         _TEST(0058_log, TEST_F_LOCAL),
329         _TEST(0059_bsearch, 0, TEST_BRKVER(0,10,0,0)),
330         _TEST(0060_op_prio, 0, TEST_BRKVER(0,9,0,0)),
331         _TEST(0061_consumer_lag, 0),
332         _TEST(0062_stats_event, TEST_F_LOCAL),
333         _TEST(0063_clusterid, 0, TEST_BRKVER(0,10,1,0)),
334         _TEST(0064_interceptors, 0, TEST_BRKVER(0,9,0,0)),
335         _TEST(0065_yield, 0),
336         _TEST(0066_plugins,
337               TEST_F_LOCAL|TEST_F_KNOWN_ISSUE_WIN32|TEST_F_KNOWN_ISSUE_OSX,
338               .extra = "dynamic loading of tests might not be fixed for this platform"),
339         _TEST(0067_empty_topic, 0),
340 #if WITH_SOCKEM
341         _TEST(0068_produce_timeout, TEST_F_SOCKEM),
342 #endif
343         _TEST(0069_consumer_add_parts, TEST_F_KNOWN_ISSUE_WIN32,
344               TEST_BRKVER(1,0,0,0)),
345         _TEST(0070_null_empty, 0),
346         _TEST(0072_headers_ut, TEST_F_LOCAL),
347         _TEST(0073_headers, 0, TEST_BRKVER(0,11,0,0)),
348         _TEST(0074_producev, TEST_F_LOCAL),
349 #if WITH_SOCKEM
350         _TEST(0075_retry, TEST_F_SOCKEM),
351 #endif
352         _TEST(0076_produce_retry, TEST_F_SOCKEM),
353         _TEST(0077_compaction, 0,
354               /* The test itself requires message headers */
355               TEST_BRKVER(0,11,0,0)),
356         _TEST(0078_c_from_cpp, TEST_F_LOCAL),
357         _TEST(0079_fork, TEST_F_LOCAL|TEST_F_KNOWN_ISSUE,
358               .extra = "using a fork():ed rd_kafka_t is not supported and will "
359               "most likely hang"),
360         _TEST(0080_admin_ut, TEST_F_LOCAL),
361         _TEST(0081_admin, 0, TEST_BRKVER(0,10,2,0)),
362         _TEST(0082_fetch_max_bytes, 0, TEST_BRKVER(0,10,1,0)),
363         _TEST(0083_cb_event, 0, TEST_BRKVER(0,9,0,0)),
364         _TEST(0084_destroy_flags_local, TEST_F_LOCAL),
365         _TEST(0084_destroy_flags, 0),
366         _TEST(0085_headers, 0, TEST_BRKVER(0,11,0,0)),
367         _TEST(0086_purge_local, TEST_F_LOCAL),
368         _TEST(0086_purge_remote, 0),
369 #if WITH_SOCKEM
370         _TEST(0088_produce_metadata_timeout, TEST_F_SOCKEM),
371 #endif
372         _TEST(0089_max_poll_interval, 0, TEST_BRKVER(0,10,1,0)),
373         _TEST(0090_idempotence, 0, TEST_BRKVER(0,11,0,0)),
374         _TEST(0091_max_poll_interval_timeout, 0, TEST_BRKVER(0,10,1,0)),
375         _TEST(0092_mixed_msgver, 0, TEST_BRKVER(0,11,0,0)),
376         _TEST(0093_holb_consumer, 0, TEST_BRKVER(0,10,1,0)),
377 #if WITH_SOCKEM
378         _TEST(0094_idempotence_msg_timeout, TEST_F_SOCKEM,
379               TEST_BRKVER(0,11,0,0)),
380 #endif
381         _TEST(0095_all_brokers_down, TEST_F_LOCAL),
382         _TEST(0097_ssl_verify, 0),
383         _TEST(0098_consumer_txn, 0, TEST_BRKVER(0,11,0,0)),
384         _TEST(0099_commit_metadata, 0),
385         _TEST(0100_thread_interceptors, TEST_F_LOCAL),
386         _TEST(0101_fetch_from_follower, 0, TEST_BRKVER(2,4,0,0)),
387         _TEST(0102_static_group_rebalance, 0,
388               TEST_BRKVER(2,3,0,0)),
389         _TEST(0103_transactions_local, TEST_F_LOCAL),
390         _TEST(0103_transactions, 0, TEST_BRKVER(0, 11, 0, 0)),
391         _TEST(0104_fetch_from_follower_mock, TEST_F_LOCAL,
392               TEST_BRKVER(2,4,0,0)),
393         _TEST(0105_transactions_mock, TEST_F_LOCAL, TEST_BRKVER(0,11,0,0)),
394         _TEST(0106_cgrp_sess_timeout, TEST_F_LOCAL, TEST_BRKVER(0,11,0,0)),
395         _TEST(0107_topic_recreate, 0, TEST_BRKVER_TOPIC_ADMINAPI,
396               .scenario = "noautocreate"),
397 
398         /* Manual tests */
399         _TEST(8000_idle, TEST_F_MANUAL),
400 
401         { NULL }
402 };
403 
404 
405 RD_TLS struct test *test_curr = &tests[0];
406 
407 
408 
409 #if WITH_SOCKEM
410 /**
411  * Socket network emulation with sockem
412  */
413 
test_socket_add(struct test * test,sockem_t * skm)414 static void test_socket_add (struct test *test, sockem_t *skm) {
415         TEST_LOCK();
416         rd_list_add(&test->sockets, skm);
417         TEST_UNLOCK();
418 }
419 
test_socket_del(struct test * test,sockem_t * skm,int do_lock)420 static void test_socket_del (struct test *test, sockem_t *skm, int do_lock) {
421         if (do_lock)
422                 TEST_LOCK();
423         /* Best effort, skm might not have been added if connect_cb failed */
424         rd_list_remove(&test->sockets, skm);
425         if (do_lock)
426                 TEST_UNLOCK();
427 }
428 
test_socket_sockem_set_all(const char * key,int val)429 int test_socket_sockem_set_all (const char *key, int val) {
430         int i;
431         sockem_t *skm;
432         int cnt = 0;
433 
434         TEST_LOCK();
435 
436         cnt = rd_list_cnt(&test_curr->sockets);
437         TEST_SAY("Setting sockem %s=%d on %s%d socket(s)\n", key, val,
438                  cnt > 0 ? "" : _C_RED, cnt);
439 
440         RD_LIST_FOREACH(skm, &test_curr->sockets, i) {
441                 if (sockem_set(skm, key, val, NULL) == -1)
442                         TEST_FAIL("sockem_set(%s, %d) failed", key, val);
443         }
444 
445         TEST_UNLOCK();
446 
447         return cnt;
448 }
449 
test_socket_sockem_set(int s,const char * key,int value)450 void test_socket_sockem_set (int s, const char *key, int value) {
451         sockem_t *skm;
452 
453         TEST_LOCK();
454         skm = sockem_find(s);
455         if (skm)
456                 sockem_set(skm, key, value, NULL);
457         TEST_UNLOCK();
458 }
459 
test_socket_close_all(struct test * test,int reinit)460 void test_socket_close_all (struct test *test, int reinit) {
461         TEST_LOCK();
462         rd_list_destroy(&test->sockets);
463         if (reinit)
464                 rd_list_init(&test->sockets, 16, (void *)sockem_close);
465         TEST_UNLOCK();
466 }
467 
468 
test_connect_cb(int s,const struct sockaddr * addr,int addrlen,const char * id,void * opaque)469 static int test_connect_cb (int s, const struct sockaddr *addr,
470                             int addrlen, const char *id, void *opaque) {
471         struct test *test = opaque;
472         sockem_t *skm;
473         int r;
474 
475         skm = sockem_connect(s, addr, addrlen, test_sockem_conf, 0, NULL);
476         if (!skm)
477                 return errno;
478 
479         if (test->connect_cb) {
480                 r = test->connect_cb(test, skm, id);
481                 if (r)
482                         return r;
483         }
484 
485         test_socket_add(test, skm);
486 
487         return 0;
488 }
489 
test_closesocket_cb(int s,void * opaque)490 static int test_closesocket_cb (int s, void *opaque) {
491         struct test *test = opaque;
492         sockem_t *skm;
493 
494         TEST_LOCK();
495         skm = sockem_find(s);
496         if (skm) {
497                 sockem_close(skm);
498                 test_socket_del(test, skm, 0/*nolock*/);
499         } else {
500 #ifdef _MSC_VER
501                 closesocket(s);
502 #else
503                 close(s);
504 #endif
505         }
506         TEST_UNLOCK();
507 
508         return 0;
509 }
510 
511 
test_socket_enable(rd_kafka_conf_t * conf)512 void test_socket_enable (rd_kafka_conf_t *conf) {
513         rd_kafka_conf_set_connect_cb(conf, test_connect_cb);
514         rd_kafka_conf_set_closesocket_cb(conf, test_closesocket_cb);
515 	rd_kafka_conf_set_opaque(conf, test_curr);
516 }
517 #endif /* WITH_SOCKEM */
518 
519 
test_error_cb(rd_kafka_t * rk,int err,const char * reason,void * opaque)520 static void test_error_cb (rd_kafka_t *rk, int err,
521 			   const char *reason, void *opaque) {
522         if (test_curr->is_fatal_cb && !test_curr->is_fatal_cb(rk, err, reason)) {
523                 TEST_SAY(_C_YEL "%s rdkafka error (non-testfatal): %s: %s\n",
524                          rd_kafka_name(rk), rd_kafka_err2str(err), reason);
525         } else {
526                 if (err == RD_KAFKA_RESP_ERR__FATAL) {
527                         char errstr[512];
528                         TEST_SAY(_C_RED "%s Fatal error: %s\n",
529                                  rd_kafka_name(rk), reason);
530 
531                         err = rd_kafka_fatal_error(rk, errstr, sizeof(errstr));
532 
533                         if (test_curr->is_fatal_cb &&
534                             !test_curr->is_fatal_cb(rk,  err, reason))
535                                 TEST_SAY(_C_YEL
536                                          "%s rdkafka ignored FATAL error: "
537                                          "%s: %s\n",
538                                          rd_kafka_name(rk),
539                                          rd_kafka_err2str(err), errstr);
540                         else
541                                 TEST_FAIL("%s rdkafka FATAL error: %s: %s",
542                                           rd_kafka_name(rk),
543                                           rd_kafka_err2str(err), errstr);
544 
545                 } else {
546                         TEST_FAIL("%s rdkafka error: %s: %s",
547                                   rd_kafka_name(rk),
548                                   rd_kafka_err2str(err), reason);
549                 }
550         }
551 }
552 
test_stats_cb(rd_kafka_t * rk,char * json,size_t json_len,void * opaque)553 static int test_stats_cb (rd_kafka_t *rk, char *json, size_t json_len,
554                            void *opaque) {
555         struct test *test = test_curr;
556         if (test->stats_fp)
557                 fprintf(test->stats_fp,
558                         "{\"test\": \"%s\", \"instance\":\"%s\", "
559                         "\"stats\": %s}\n",
560                         test->name, rd_kafka_name(rk), json);
561         return 0;
562 }
563 
564 
565 /**
566  * @brief Limit the test run time (in seconds)
567  */
test_timeout_set(int timeout)568 void test_timeout_set (int timeout) {
569 	TEST_LOCK();
570         TEST_SAY("Setting test timeout to %ds * %.1f\n",
571 		 timeout, test_timeout_multiplier);
572 	timeout = (int)((double)timeout * test_timeout_multiplier);
573 	test_curr->timeout = test_clock() + (timeout * 1000000);
574 	TEST_UNLOCK();
575 }
576 
tmout_multip(int msecs)577 int tmout_multip (int msecs) {
578         int r;
579         TEST_LOCK();
580         r = (int)(((double)(msecs)) * test_timeout_multiplier);
581         TEST_UNLOCK();
582         return r;
583 }
584 
585 
586 
587 #ifdef _MSC_VER
test_init_win32(void)588 static void test_init_win32 (void) {
589         /* Enable VT emulation to support colored output. */
590         HANDLE hOut = GetStdHandle(STD_OUTPUT_HANDLE);
591         DWORD dwMode = 0;
592 
593         if (hOut == INVALID_HANDLE_VALUE ||
594             !GetConsoleMode(hOut, &dwMode))
595                 return;
596 
597 #ifndef ENABLE_VIRTUAL_TERMINAL_PROCESSING
598 #define ENABLE_VIRTUAL_TERMINAL_PROCESSING 0x4
599 #endif
600         dwMode |= ENABLE_VIRTUAL_TERMINAL_PROCESSING;
601         SetConsoleMode(hOut, dwMode);
602 }
603 #endif
604 
605 
test_init(void)606 static void test_init (void) {
607         int seed;
608         const char *tmp;
609 
610 
611         if (test_seed)
612                 return;
613 
614         if ((tmp = test_getenv("TEST_LEVEL", NULL)))
615                 test_level = atoi(tmp);
616         if ((tmp = test_getenv("TEST_MODE", NULL)))
617                 strncpy(test_mode, tmp, sizeof(test_mode)-1);
618         if ((tmp = test_getenv("TEST_SCENARIO", NULL)))
619                 strncpy(test_scenario, tmp, sizeof(test_scenario)-1);
620         if ((tmp = test_getenv("TEST_SOCKEM", NULL)))
621                 test_sockem_conf = tmp;
622         if ((tmp = test_getenv("TEST_SEED", NULL)))
623                 seed = atoi(tmp);
624         else
625                 seed = test_clock() & 0xffffffff;
626         if ((tmp = test_getenv("TEST_CPU_CALIBRATION", NULL))) {
627                 test_rusage_cpu_calibration = strtod(tmp, NULL);
628                 if (test_rusage_cpu_calibration < 0.00001) {
629                         fprintf(stderr,
630                                 "%% Invalid CPU calibration "
631                                 "value (from TEST_CPU_CALIBRATION env): %s\n",
632                                 tmp);
633                         exit(1);
634                 }
635         }
636 
637 #ifdef _MSC_VER
638         test_init_win32();
639 	{
640 		LARGE_INTEGER cycl;
641 		QueryPerformanceCounter(&cycl);
642 		seed = (int)cycl.QuadPart;
643 	}
644 #endif
645 	srand(seed);
646 	test_seed = seed;
647 }
648 
649 
test_mk_topic_name(const char * suffix,int randomized)650 const char *test_mk_topic_name (const char *suffix, int randomized) {
651         static RD_TLS char ret[512];
652 
653         /* Strip main_ prefix (caller is using __FUNCTION__) */
654         if (!strncmp(suffix, "main_", 5))
655                 suffix += 5;
656 
657         if (test_topic_random || randomized)
658                 rd_snprintf(ret, sizeof(ret), "%s_rnd%"PRIx64"_%s",
659                          test_topic_prefix, test_id_generate(), suffix);
660         else
661                 rd_snprintf(ret, sizeof(ret), "%s_%s", test_topic_prefix, suffix);
662 
663         TEST_SAY("Using topic \"%s\"\n", ret);
664 
665         return ret;
666 }
667 
668 
669 /**
670  * @brief Set special test config property
671  * @returns 1 if property was known, else 0.
672  */
test_set_special_conf(const char * name,const char * val,int * timeoutp)673 int test_set_special_conf (const char *name, const char *val, int *timeoutp) {
674         if (!strcmp(name, "test.timeout.multiplier")) {
675                 TEST_LOCK();
676                 test_timeout_multiplier = strtod(val, NULL);
677                 TEST_UNLOCK();
678                 *timeoutp = tmout_multip((*timeoutp)*1000) / 1000;
679         } else if (!strcmp(name, "test.topic.prefix")) {
680                 rd_snprintf(test_topic_prefix, sizeof(test_topic_prefix),
681                             "%s", val);
682         } else if (!strcmp(name, "test.topic.random")) {
683                 if (!strcmp(val, "true") ||
684                     !strcmp(val, "1"))
685                         test_topic_random = 1;
686                 else
687                         test_topic_random = 0;
688         } else if (!strcmp(name, "test.concurrent.max")) {
689                 TEST_LOCK();
690                 test_concurrent_max = (int)strtod(val, NULL);
691                 TEST_UNLOCK();
692         } else if (!strcmp(name, "test.sql.command")) {
693                 TEST_LOCK();
694                 if (test_sql_cmd)
695                         rd_free(test_sql_cmd);
696                 test_sql_cmd = rd_strdup(val);
697                 TEST_UNLOCK();
698         } else
699                 return 0;
700 
701         return 1;
702 }
703 
test_read_conf_file(const char * conf_path,rd_kafka_conf_t * conf,rd_kafka_topic_conf_t * topic_conf,int * timeoutp)704 static void test_read_conf_file (const char *conf_path,
705                                  rd_kafka_conf_t *conf,
706                                  rd_kafka_topic_conf_t *topic_conf,
707                                  int *timeoutp) {
708         FILE *fp;
709 	char buf[1024];
710 	int line = 0;
711 
712 #ifndef _MSC_VER
713 	fp = fopen(conf_path, "r");
714 #else
715 	fp = NULL;
716 	errno = fopen_s(&fp, conf_path, "r");
717 #endif
718 	if (!fp) {
719 		if (errno == ENOENT) {
720 			TEST_SAY("Test config file %s not found\n", conf_path);
721                         return;
722 		} else
723 			TEST_FAIL("Failed to read %s: %s",
724 				  conf_path, strerror(errno));
725 	}
726 
727 	while (fgets(buf, sizeof(buf)-1, fp)) {
728 		char *t;
729 		char *b = buf;
730 		rd_kafka_conf_res_t res = RD_KAFKA_CONF_UNKNOWN;
731 		char *name, *val;
732                 char errstr[512];
733 
734 		line++;
735 		if ((t = strchr(b, '\n')))
736 			*t = '\0';
737 
738 		if (*b == '#' || !*b)
739 			continue;
740 
741 		if (!(t = strchr(b, '=')))
742 			TEST_FAIL("%s:%i: expected name=value format\n",
743 				  conf_path, line);
744 
745 		name = b;
746 		*t = '\0';
747 		val = t+1;
748 
749                 if (test_set_special_conf(name, val, timeoutp))
750                         continue;
751 
752                 if (!strncmp(name, "topic.", strlen("topic."))) {
753 			name += strlen("topic.");
754                         if (topic_conf)
755                                 res = rd_kafka_topic_conf_set(topic_conf,
756                                                               name, val,
757                                                               errstr,
758                                                               sizeof(errstr));
759                         else
760                                 res = RD_KAFKA_CONF_OK;
761                         name -= strlen("topic.");
762                 }
763 
764                 if (res == RD_KAFKA_CONF_UNKNOWN) {
765                         if (conf)
766                                 res = rd_kafka_conf_set(conf,
767                                                         name, val,
768                                                         errstr, sizeof(errstr));
769                         else
770                                 res = RD_KAFKA_CONF_OK;
771                 }
772 
773 		if (res != RD_KAFKA_CONF_OK)
774 			TEST_FAIL("%s:%i: %s\n",
775 				  conf_path, line, errstr);
776 	}
777 
778 	fclose(fp);
779 }
780 
781 /**
782  * @brief Get path to test config file
783  */
test_conf_get_path(void)784 const char *test_conf_get_path (void) {
785         return test_getenv("RDKAFKA_TEST_CONF", "test.conf");
786 }
787 
test_getenv(const char * env,const char * def)788 const char *test_getenv (const char *env, const char *def) {
789         return rd_getenv(env, def);
790 }
791 
test_conf_common_init(rd_kafka_conf_t * conf,int timeout)792 void test_conf_common_init (rd_kafka_conf_t *conf, int timeout) {
793         if (conf) {
794                 const char *tmp = test_getenv("TEST_DEBUG", NULL);
795                 if (tmp)
796                         test_conf_set(conf, "debug", tmp);
797         }
798 
799         if (timeout)
800                 test_timeout_set(timeout);
801 }
802 
803 
804 /**
805  * Creates and sets up kafka configuration objects.
806  * Will read "test.conf" file if it exists.
807  */
test_conf_init(rd_kafka_conf_t ** conf,rd_kafka_topic_conf_t ** topic_conf,int timeout)808 void test_conf_init (rd_kafka_conf_t **conf, rd_kafka_topic_conf_t **topic_conf,
809                      int timeout) {
810         const char *test_conf = test_conf_get_path();
811 
812         if (conf) {
813                 *conf = rd_kafka_conf_new();
814                 rd_kafka_conf_set(*conf, "client.id", test_curr->name, NULL, 0);
815                 if (test_idempotent_producer)
816                         test_conf_set(*conf, "enable.idempotence", "true");
817                 rd_kafka_conf_set_error_cb(*conf, test_error_cb);
818                 rd_kafka_conf_set_stats_cb(*conf, test_stats_cb);
819 
820                 /* Allow higher request timeouts on CI */
821                 if (test_on_ci)
822                         test_conf_set(*conf, "request.timeout.ms", "10000");
823 
824 #ifdef SIGIO
825 		{
826 			char buf[64];
827 
828 			/* Quick termination */
829 			rd_snprintf(buf, sizeof(buf), "%i", SIGIO);
830 			rd_kafka_conf_set(*conf, "internal.termination.signal",
831 					  buf, NULL, 0);
832 			signal(SIGIO, SIG_IGN);
833 		}
834 #endif
835         }
836 
837 #if WITH_SOCKEM
838         if (*test_sockem_conf && conf)
839                 test_socket_enable(*conf);
840 #endif
841 
842 	if (topic_conf)
843 		*topic_conf = rd_kafka_topic_conf_new();
844 
845 	/* Open and read optional local test configuration file, if any. */
846         test_read_conf_file(test_conf,
847                             conf ? *conf : NULL,
848                             topic_conf ? *topic_conf : NULL, &timeout);
849 
850         test_conf_common_init(conf ? *conf : NULL, timeout);
851 }
852 
853 
test_rand(void)854 static RD_INLINE unsigned int test_rand(void) {
855 	unsigned int r;
856 #if _MSC_VER
857 	rand_s(&r);
858 #else
859 	r = rand();
860 #endif
861 	return r;
862 }
863 /**
864  * Generate a "unique" test id.
865  */
test_id_generate(void)866 uint64_t test_id_generate (void) {
867 	return (((uint64_t)test_rand()) << 32) | (uint64_t)test_rand();
868 }
869 
870 
871 /**
872  * Generate a "unique" string id
873  */
test_str_id_generate(char * dest,size_t dest_size)874 char *test_str_id_generate (char *dest, size_t dest_size) {
875         rd_snprintf(dest, dest_size, "%"PRId64, test_id_generate());
876 	return dest;
877 }
878 
879 /**
880  * Same as test_str_id_generate but returns a temporary string.
881  */
test_str_id_generate_tmp(void)882 const char *test_str_id_generate_tmp (void) {
883 	static RD_TLS char ret[64];
884 	return test_str_id_generate(ret, sizeof(ret));
885 }
886 
887 /**
888  * Format a message token.
889  * Pad's to dest_size.
890  */
test_msg_fmt(char * dest,size_t dest_size,uint64_t testid,int32_t partition,int msgid)891 void test_msg_fmt (char *dest, size_t dest_size,
892                    uint64_t testid, int32_t partition, int msgid) {
893         size_t of;
894 
895         of = rd_snprintf(dest, dest_size,
896                          "testid=%"PRIu64", partition=%"PRId32", msg=%i\n",
897                          testid, partition, msgid);
898         if (of < dest_size - 1) {
899                 memset(dest+of, '!', dest_size-of);
900                 dest[dest_size-1] = '\0';
901         }
902 }
903 
904 /**
905  * @brief Prepare message value and key for test produce.
906  */
test_prepare_msg(uint64_t testid,int32_t partition,int msg_id,char * val,size_t val_size,char * key,size_t key_size)907 void test_prepare_msg (uint64_t testid, int32_t partition, int msg_id,
908                        char *val, size_t val_size,
909                        char *key, size_t key_size) {
910         size_t of = 0;
911 
912         test_msg_fmt(key, key_size, testid, partition, msg_id);
913 
914         while (of < val_size) {
915                 /* Copy-repeat key into val until val_size */
916                 size_t len = RD_MIN(val_size-of, key_size);
917                 memcpy(val+of, key, len);
918                 of += len;
919         }
920 }
921 
922 
923 
924 /**
925  * Parse a message token
926  */
test_msg_parse00(const char * func,int line,uint64_t testid,int32_t exp_partition,int * msgidp,const char * topic,int32_t partition,int64_t offset,const char * key,size_t key_size)927 void test_msg_parse00 (const char *func, int line,
928                        uint64_t testid, int32_t exp_partition, int *msgidp,
929                        const char *topic, int32_t partition, int64_t offset,
930                        const char *key, size_t key_size) {
931         char buf[128];
932         uint64_t in_testid;
933         int in_part;
934 
935         if (!key)
936                 TEST_FAIL("%s:%i: Message (%s [%"PRId32"] @ %"PRId64") "
937                           "has empty key\n",
938                           func, line, topic, partition, offset);
939 
940         rd_snprintf(buf, sizeof(buf), "%.*s", (int)key_size, key);
941 
942         if (sscanf(buf, "testid=%"SCNu64", partition=%i, msg=%i\n",
943                    &in_testid, &in_part, msgidp) != 3)
944                 TEST_FAIL("%s:%i: Incorrect key format: %s", func, line, buf);
945 
946 
947         if (testid != in_testid ||
948             (exp_partition != -1 && exp_partition != in_part))
949                 TEST_FAIL("%s:%i: Our testid %"PRIu64", part %i did "
950                           "not match message: \"%s\"\n",
951                   func, line, testid, (int)exp_partition, buf);
952 }
953 
test_msg_parse0(const char * func,int line,uint64_t testid,rd_kafka_message_t * rkmessage,int32_t exp_partition,int * msgidp)954 void test_msg_parse0 (const char *func, int line,
955                       uint64_t testid, rd_kafka_message_t *rkmessage,
956                       int32_t exp_partition, int *msgidp) {
957         test_msg_parse00(func, line, testid, exp_partition, msgidp,
958                          rd_kafka_topic_name(rkmessage->rkt),
959                          rkmessage->partition, rkmessage->offset,
960                          (const char *)rkmessage->key, rkmessage->key_len);
961 }
962 
963 
964 struct run_args {
965         struct test *test;
966         int argc;
967         char **argv;
968 };
969 
run_test0(struct run_args * run_args)970 static int run_test0 (struct run_args *run_args) {
971         struct test *test = run_args->test;
972 	test_timing_t t_run;
973 	int r;
974         char stats_file[256];
975 
976         rd_snprintf(stats_file, sizeof(stats_file), "stats_%s_%"PRIu64".json",
977                     test->name, test_id_generate());
978         if (!(test->stats_fp = fopen(stats_file, "w+")))
979                 TEST_SAY("=== Failed to create stats file %s: %s ===\n",
980                          stats_file, strerror(errno));
981 
982 	test_curr = test;
983 
984 #if WITH_SOCKEM
985         rd_list_init(&test->sockets, 16, (void *)sockem_close);
986 #endif
987         /* Don't check message status by default */
988         test->exp_dr_status = (rd_kafka_msg_status_t)-1;
989 
990 	TEST_SAY("================= Running test %s =================\n",
991 		 test->name);
992         if (test->stats_fp)
993                 TEST_SAY("==== Stats written to file %s ====\n", stats_file);
994 
995         test_rusage_start(test_curr);
996 	TIMING_START(&t_run, "%s", test->name);
997         test->start = t_run.ts_start;
998 
999         /* Run test main function */
1000 	r = test->mainfunc(run_args->argc, run_args->argv);
1001 
1002         TIMING_STOP(&t_run);
1003         test_rusage_stop(test_curr,
1004                          (double)TIMING_DURATION(&t_run) / 1000000.0);
1005 
1006         TEST_LOCK();
1007         test->duration = TIMING_DURATION(&t_run);
1008 
1009 	if (test->state == TEST_SKIPPED) {
1010 		TEST_SAY("================= Test %s SKIPPED "
1011 			 "=================\n",
1012                          run_args->test->name);
1013 	} else if (r) {
1014                 test->state = TEST_FAILED;
1015 		TEST_SAY("\033[31m"
1016 			 "================= Test %s FAILED ================="
1017 			 "\033[0m\n",
1018                          run_args->test->name);
1019         } else {
1020                 test->state = TEST_PASSED;
1021 		TEST_SAY("\033[32m"
1022 			 "================= Test %s PASSED ================="
1023 			 "\033[0m\n",
1024                          run_args->test->name);
1025         }
1026         TEST_UNLOCK();
1027 
1028         cnd_broadcast(&test_cnd);
1029 
1030 #if WITH_SOCKEM
1031         test_socket_close_all(test, 0);
1032 #endif
1033 
1034         if (test->stats_fp) {
1035                 long pos = ftell(test->stats_fp);
1036                 fclose(test->stats_fp);
1037                 test->stats_fp = NULL;
1038                 /* Delete file if nothing was written */
1039                 if (pos == 0) {
1040 #ifndef _MSC_VER
1041                         unlink(stats_file);
1042 #else
1043                         _unlink(stats_file);
1044 #endif
1045                 }
1046         }
1047 
1048         if (test_delete_topics_between && test_concurrent_max == 1)
1049                 test_delete_all_test_topics(60*1000);
1050 
1051 	return r;
1052 }
1053 
1054 
1055 
1056 
run_test_from_thread(void * arg)1057 static int run_test_from_thread (void *arg) {
1058         struct run_args *run_args = arg;
1059 
1060 	thrd_detach(thrd_current());
1061 
1062 	run_test0(run_args);
1063 
1064         TEST_LOCK();
1065         tests_running_cnt--;
1066         TEST_UNLOCK();
1067 
1068         free(run_args);
1069 
1070         return 0;
1071 }
1072 
1073 
1074 /**
1075  * @brief Check running tests for timeouts.
1076  * @locks TEST_LOCK MUST be held
1077  */
check_test_timeouts(void)1078 static void check_test_timeouts (void) {
1079         int64_t now = test_clock();
1080         struct test *test;
1081 
1082         for (test = tests ; test->name ; test++) {
1083                 if (test->state != TEST_RUNNING)
1084                         continue;
1085 
1086                 /* Timeout check */
1087                 if (now > test->timeout) {
1088                         struct test *save_test = test_curr;
1089                         test_curr = test;
1090                         test->state = TEST_FAILED;
1091                         test_summary(0/*no-locks*/);
1092                         TEST_FAIL0(__FILE__,__LINE__,0/*nolock*/,
1093                                    0/*fail-later*/,
1094                                    "Test %s timed out "
1095                                    "(timeout set to %d seconds)\n",
1096                                    test->name,
1097                                    (int)(test->timeout-
1098                                          test->start)/
1099                                    1000000);
1100                         test_curr = save_test;
1101                         tests_running_cnt--; /* fail-later misses this*/
1102 #ifdef _MSC_VER
1103                         TerminateThread(test->thrd, -1);
1104 #else
1105                         pthread_kill(test->thrd, SIGKILL);
1106 #endif
1107                 }
1108         }
1109 }
1110 
1111 
run_test(struct test * test,int argc,char ** argv)1112 static int run_test (struct test *test, int argc, char **argv) {
1113         struct run_args *run_args = calloc(1, sizeof(*run_args));
1114         int wait_cnt = 0;
1115 
1116         run_args->test = test;
1117         run_args->argc = argc;
1118         run_args->argv = argv;
1119 
1120         TEST_LOCK();
1121         while (tests_running_cnt >= test_concurrent_max) {
1122                 if (!(wait_cnt++ % 100))
1123                         TEST_SAY("Too many tests running (%d >= %d): "
1124                                  "postponing %s start...\n",
1125                                  tests_running_cnt, test_concurrent_max,
1126                                  test->name);
1127                 cnd_timedwait_ms(&test_cnd, &test_mtx, 100);
1128 
1129                 check_test_timeouts();
1130         }
1131         tests_running_cnt++;
1132         test->timeout = test_clock() + (int64_t)(30.0 * 1000000.0 *
1133                                                  test_timeout_multiplier);
1134         test->state = TEST_RUNNING;
1135         TEST_UNLOCK();
1136 
1137         if (thrd_create(&test->thrd, run_test_from_thread, run_args) !=
1138             thrd_success) {
1139                 TEST_LOCK();
1140                 tests_running_cnt--;
1141                 test->state = TEST_FAILED;
1142                 TEST_UNLOCK();
1143 
1144                 TEST_FAIL("Failed to start thread for test %s\n",
1145                           test->name);
1146         }
1147 
1148         return 0;
1149 }
1150 
run_tests(int argc,char ** argv)1151 static void run_tests (int argc, char **argv) {
1152         struct test *test;
1153 
1154         for (test = tests ; test->name ; test++) {
1155                 char testnum[128];
1156                 char *t;
1157                 const char *skip_reason = NULL;
1158                 rd_bool_t skip_silent = rd_false;
1159 		char tmp[128];
1160                 const char *scenario =
1161                         test->scenario ? test->scenario : "default";
1162 
1163                 if (!test->mainfunc)
1164                         continue;
1165 
1166                 /* Extract test number, as string */
1167                 strncpy(testnum, test->name, sizeof(testnum)-1);
1168                 testnum[sizeof(testnum)-1] = '\0';
1169                 if ((t = strchr(testnum, '_')))
1170                         *t = '\0';
1171 
1172                 if ((test_flags && (test_flags & test->flags) != test_flags)) {
1173                         skip_reason = "filtered due to test flags";
1174                         skip_silent = rd_true;
1175                 } if ((test_neg_flags & ~test_flags) & test->flags)
1176 			skip_reason = "Filtered due to negative test flags";
1177 		if (test_broker_version &&
1178 		    (test->minver > test_broker_version ||
1179 		     (test->maxver && test->maxver < test_broker_version))) {
1180 			rd_snprintf(tmp, sizeof(tmp),
1181 				    "not applicable for broker "
1182 				    "version %d.%d.%d.%d",
1183 				    TEST_BRKVER_X(test_broker_version, 0),
1184 				    TEST_BRKVER_X(test_broker_version, 1),
1185 				    TEST_BRKVER_X(test_broker_version, 2),
1186 				    TEST_BRKVER_X(test_broker_version, 3));
1187 			skip_reason = tmp;
1188 		}
1189 
1190                 if (strcmp(scenario, test_scenario)) {
1191                         rd_snprintf(tmp, sizeof(tmp),
1192                                     "requires test scenario %s", scenario);
1193                         skip_silent = rd_true;
1194                         skip_reason = tmp;
1195                 }
1196 
1197                 if (tests_to_run && !strstr(tests_to_run, testnum)) {
1198                         skip_reason = "not included in TESTS list";
1199                         skip_silent = rd_true;
1200                 } else if (!tests_to_run && (test->flags & TEST_F_MANUAL)) {
1201                         skip_reason = "manual test";
1202                         skip_silent = rd_true;
1203                 }
1204 
1205                 if (!skip_reason) {
1206                         run_test(test, argc, argv);
1207                 } else {
1208                         if (skip_silent) {
1209                                 TEST_SAYL(3,
1210                                           "================= Skipping test %s "
1211                                           "(%s) ================\n",
1212                                           test->name, skip_reason);
1213                                 TEST_LOCK();
1214                                 test->state = TEST_SKIPPED;
1215                                 TEST_UNLOCK();
1216                         } else {
1217                                 test_curr = test;
1218                                 TEST_SKIP("%s\n", skip_reason);
1219                                 test_curr = &tests[0];
1220                         }
1221 
1222                 }
1223         }
1224 
1225 
1226 }
1227 
1228 /**
1229  * @brief Print summary for all tests.
1230  *
1231  * @returns the number of failed tests.
1232  */
test_summary(int do_lock)1233 static int test_summary (int do_lock) {
1234         struct test *test;
1235         FILE *report_fp;
1236         char report_path[128];
1237         time_t t;
1238         struct tm *tm;
1239         char datestr[64];
1240         int64_t total_duration = 0;
1241         int tests_run = 0;
1242         int tests_failed = 0;
1243 	int tests_failed_known = 0;
1244         int tests_passed = 0;
1245 	FILE *sql_fp = NULL;
1246         const char *tmp;
1247 
1248         t = time(NULL);
1249         tm = localtime(&t);
1250         strftime(datestr, sizeof(datestr), "%Y%m%d%H%M%S", tm);
1251 
1252         if ((tmp = test_getenv("TEST_REPORT", NULL)))
1253                 rd_snprintf(report_path, sizeof(report_path), "%s", tmp);
1254         else
1255                 rd_snprintf(report_path, sizeof(report_path),
1256                             "test_report_%s.json", datestr);
1257 
1258         report_fp = fopen(report_path, "w+");
1259         if (!report_fp)
1260                 TEST_WARN("Failed to create report file %s: %s\n",
1261                           report_path, strerror(errno));
1262         else
1263                 fprintf(report_fp,
1264                         "{ \"id\": \"%s_%s\", \"mode\": \"%s\", "
1265                         "\"scenario\": \"%s\", "
1266 			"\"date\": \"%s\", "
1267 			"\"git_version\": \"%s\", "
1268 			"\"broker_version\": \"%s\", "
1269 			"\"tests\": {",
1270 			datestr, test_mode, test_mode, test_scenario, datestr,
1271 			test_git_version,
1272 			test_broker_version_str);
1273 
1274         if (do_lock)
1275                 TEST_LOCK();
1276 
1277 	if (test_sql_cmd) {
1278 #ifdef _MSC_VER
1279 		sql_fp = _popen(test_sql_cmd, "w");
1280 #else
1281 		sql_fp = popen(test_sql_cmd, "w");
1282 #endif
1283 
1284 		fprintf(sql_fp,
1285 			"CREATE TABLE IF NOT EXISTS "
1286 			"runs(runid text PRIMARY KEY, mode text, "
1287 			"date datetime, cnt int, passed int, failed int, "
1288 			"duration numeric);\n"
1289 			"CREATE TABLE IF NOT EXISTS "
1290 			"tests(runid text, mode text, name text, state text, "
1291 			"extra text, duration numeric);\n");
1292 	}
1293 
1294 	if (show_summary)
1295 		printf("TEST %s (%s, scenario %s) SUMMARY\n"
1296 		       "#==================================================================#\n",
1297 		       datestr, test_mode, test_scenario);
1298 
1299         for (test = tests ; test->name ; test++) {
1300                 const char *color;
1301                 int64_t duration;
1302 		char extra[128] = "";
1303 		int do_count = 1;
1304 
1305                 if (!(duration = test->duration) && test->start > 0)
1306                         duration = test_clock() - test->start;
1307 
1308                 if (test == tests) {
1309 			/* <MAIN> test:
1310 			 * test accounts for total runtime.
1311 			 * dont include in passed/run/failed counts. */
1312                         total_duration = duration;
1313 			do_count = 0;
1314 		}
1315 
1316                 switch (test->state)
1317                 {
1318                 case TEST_PASSED:
1319                         color = _C_GRN;
1320 			if (do_count) {
1321 				tests_passed++;
1322 				tests_run++;
1323 			}
1324                         break;
1325                 case TEST_FAILED:
1326 			if (test->flags & TEST_F_KNOWN_ISSUE) {
1327 				rd_snprintf(extra, sizeof(extra),
1328 					    " <-- known issue%s%s",
1329 					    test->extra ? ": " : "",
1330 					    test->extra ? test->extra : "");
1331 				if (do_count)
1332 					tests_failed_known++;
1333 			}
1334                         color = _C_RED;
1335 			if (do_count) {
1336 				tests_failed++;
1337 				tests_run++;
1338 			}
1339                         break;
1340                 case TEST_RUNNING:
1341                         color = _C_MAG;
1342 			if (do_count) {
1343 				tests_failed++; /* All tests should be finished */
1344 				tests_run++;
1345 			}
1346                         break;
1347                 case TEST_NOT_STARTED:
1348                         color = _C_YEL;
1349                         if (test->extra)
1350                                 rd_snprintf(extra, sizeof(extra), " %s",
1351                                             test->extra);
1352                         break;
1353                 default:
1354                         color = _C_CYA;
1355                         break;
1356                 }
1357 
1358                 if (show_summary &&
1359                     (test->state != TEST_SKIPPED || *test->failstr ||
1360                      (tests_to_run &&
1361                       !strncmp(tests_to_run, test->name,
1362                                strlen(tests_to_run))))) {
1363                         printf("|%s %-40s | %10s | %7.3fs %s|",
1364                                color,
1365                                test->name, test_states[test->state],
1366                                (double)duration/1000000.0, _C_CLR);
1367                         if (test->state == TEST_FAILED)
1368                                 printf(_C_RED " %s" _C_CLR, test->failstr);
1369                         else if (test->state == TEST_SKIPPED)
1370                                 printf(_C_CYA " %s" _C_CLR, test->failstr);
1371                         printf("%s\n", extra);
1372                 }
1373 
1374                 if (report_fp) {
1375 			int i;
1376                         fprintf(report_fp,
1377                                 "%s\"%s\": {"
1378                                 "\"name\": \"%s\", "
1379                                 "\"state\": \"%s\", "
1380 				"\"known_issue\": %s, "
1381 				"\"extra\": \"%s\", "
1382                                 "\"duration\": %.3f, "
1383 				"\"report\": [ ",
1384                                 test == tests ? "": ", ",
1385 				test->name,
1386                                 test->name, test_states[test->state],
1387 				test->flags & TEST_F_KNOWN_ISSUE ? "true":"false",
1388 				test->extra ? test->extra : "",
1389                                 (double)duration/1000000.0);
1390 
1391 			for (i = 0 ; i < test->report_cnt ; i++) {
1392 				fprintf(report_fp, "%s%s ",
1393 					i == 0 ? "":",",
1394 					test->report_arr[i]);
1395 			}
1396 
1397 			fprintf(report_fp, "] }");
1398 		}
1399 
1400 		if (sql_fp)
1401 			fprintf(sql_fp,
1402 				"INSERT INTO tests VALUES("
1403 				"'%s_%s', '%s', '%s', '%s', '%s', %f);\n",
1404 				datestr, test_mode, test_mode,
1405                                 test->name, test_states[test->state],
1406 				test->extra ? test->extra : "",
1407 				(double)duration/1000000.0);
1408         }
1409         if (do_lock)
1410                 TEST_UNLOCK();
1411 
1412 	if (show_summary)
1413 		printf("#==================================================================#\n");
1414 
1415         if (report_fp) {
1416                 fprintf(report_fp,
1417                         "}, "
1418                         "\"tests_run\": %d, "
1419                         "\"tests_passed\": %d, "
1420                         "\"tests_failed\": %d, "
1421                         "\"duration\": %.3f"
1422                         "}\n",
1423                         tests_run, tests_passed, tests_failed,
1424                         (double)total_duration/1000000.0);
1425 
1426                 fclose(report_fp);
1427                 TEST_SAY("# Test report written to %s\n", report_path);
1428         }
1429 
1430 	if (sql_fp) {
1431 		fprintf(sql_fp,
1432 			"INSERT INTO runs VALUES('%s_%s', '%s', datetime(), "
1433 			"%d, %d, %d, %f);\n",
1434 			datestr, test_mode, test_mode,
1435 			tests_run, tests_passed, tests_failed,
1436 			(double)total_duration/1000000.0);
1437 		fclose(sql_fp);
1438 	}
1439 
1440         return tests_failed - tests_failed_known;
1441 }
1442 
1443 #ifndef _MSC_VER
test_sig_term(int sig)1444 static void test_sig_term (int sig) {
1445 	if (test_exit)
1446 		exit(1);
1447 	fprintf(stderr, "Exiting tests, waiting for running tests to finish.\n");
1448 	test_exit = 1;
1449 }
1450 #endif
1451 
1452 /**
1453  * Wait 'timeout' seconds for rdkafka to kill all its threads and clean up.
1454  */
test_wait_exit(int timeout)1455 static void test_wait_exit (int timeout) {
1456 	int r;
1457         time_t start = time(NULL);
1458 
1459 	while ((r = rd_kafka_thread_cnt()) && timeout-- >= 0) {
1460 		TEST_SAY("%i thread(s) in use by librdkafka, waiting...\n", r);
1461 		rd_sleep(1);
1462 	}
1463 
1464 	TEST_SAY("%i thread(s) in use by librdkafka\n", r);
1465 
1466         if (r > 0)
1467                 TEST_FAIL("%i thread(s) still active in librdkafka", r);
1468 
1469         timeout -= (int)(time(NULL) - start);
1470         if (timeout > 0) {
1471 		TEST_SAY("Waiting %d seconds for all librdkafka memory "
1472 			 "to be released\n", timeout);
1473                 if (rd_kafka_wait_destroyed(timeout * 1000) == -1)
1474                         TEST_FAIL("Not all internal librdkafka "
1475                                   "objects destroyed\n");
1476 	}
1477 }
1478 
1479 
1480 
1481 
1482 /**
1483  * @brief Test framework cleanup before termination.
1484  */
test_cleanup(void)1485 static void test_cleanup (void) {
1486 	struct test *test;
1487 
1488 	/* Free report arrays */
1489 	for (test = tests ; test->name ; test++) {
1490 		int i;
1491 		if (!test->report_arr)
1492 			continue;
1493 		for (i = 0 ; i < test->report_cnt ; i++)
1494 			rd_free(test->report_arr[i]);
1495 		rd_free(test->report_arr);
1496 		test->report_arr = NULL;
1497 	}
1498 
1499 	if (test_sql_cmd)
1500 		rd_free(test_sql_cmd);
1501 }
1502 
1503 
main(int argc,char ** argv)1504 int main(int argc, char **argv) {
1505         int i, r;
1506 	test_timing_t t_all;
1507 	int a,b,c,d;
1508         const char *tmpver;
1509 
1510 	mtx_init(&test_mtx, mtx_plain);
1511         cnd_init(&test_cnd);
1512 
1513         test_init();
1514 
1515 #ifndef _MSC_VER
1516         signal(SIGINT, test_sig_term);
1517 #endif
1518         tests_to_run = test_getenv("TESTS", NULL);
1519         tmpver = test_getenv("TEST_KAFKA_VERSION", NULL);
1520         if (!tmpver)
1521                 tmpver = test_getenv("KAFKA_VERSION", test_broker_version_str);
1522         test_broker_version_str = tmpver;
1523 
1524         test_git_version = test_getenv("RDKAFKA_GITVER", "HEAD");
1525 
1526         /* Are we running on CI? */
1527         if (test_getenv("CI", NULL)) {
1528                 test_on_ci = 1;
1529                 test_concurrent_max = 3;
1530         }
1531 
1532 	test_conf_init(NULL, NULL, 10);
1533 
1534         for (i = 1 ; i < argc ; i++) {
1535                 if (!strncmp(argv[i], "-p", 2) && strlen(argv[i]) > 2) {
1536                         if (test_rusage) {
1537                                 fprintf(stderr,
1538                                         "%% %s ignored: -R takes preceedence\n",
1539                                         argv[i]);
1540                                 continue;
1541                         }
1542                         test_concurrent_max = (int)strtod(argv[i]+2, NULL);
1543                 } else if (!strcmp(argv[i], "-l"))
1544                         test_flags |= TEST_F_LOCAL;
1545 		else if (!strcmp(argv[i], "-L"))
1546                         test_neg_flags |= TEST_F_LOCAL;
1547                 else if (!strcmp(argv[i], "-a"))
1548                         test_assert_on_fail = 1;
1549 		else if (!strcmp(argv[i], "-k"))
1550 			test_flags |= TEST_F_KNOWN_ISSUE;
1551 		else if (!strcmp(argv[i], "-K"))
1552 			test_neg_flags |= TEST_F_KNOWN_ISSUE;
1553                 else if (!strcmp(argv[i], "-E"))
1554                         test_neg_flags |= TEST_F_SOCKEM;
1555 		else if (!strcmp(argv[i], "-V") && i+1 < argc)
1556  			test_broker_version_str = argv[++i];
1557                 else if (!strcmp(argv[i], "-s") && i+1 < argc)
1558                         strncpy(test_scenario, argv[i],
1559                                 sizeof(test_scenario)-1);
1560 		else if (!strcmp(argv[i], "-S"))
1561 			show_summary = 0;
1562                 else if (!strcmp(argv[i], "-D"))
1563                         test_delete_topics_between = 1;
1564                 else if (!strcmp(argv[i], "-P"))
1565                         test_idempotent_producer = 1;
1566                 else if (!strcmp(argv[i], "-Q"))
1567                         test_quick = 1;
1568                 else if (!strncmp(argv[i], "-R", 2)) {
1569                         test_rusage = 1;
1570                         test_concurrent_max = 1;
1571                         if (strlen(argv[i]) > strlen("-R")) {
1572                                 test_rusage_cpu_calibration =
1573                                         strtod(argv[i]+2, NULL);
1574                                 if (test_rusage_cpu_calibration < 0.00001) {
1575                                         fprintf(stderr,
1576                                                 "%% Invalid CPU calibration "
1577                                                 "value: %s\n", argv[i]+2);
1578                                         exit(1);
1579                                 }
1580                         }
1581                 } else if (*argv[i] != '-')
1582                         tests_to_run = argv[i];
1583                 else {
1584                         printf("Unknown option: %s\n"
1585                                "\n"
1586                                "Usage: %s [options] [<test-match-substr>]\n"
1587                                "Options:\n"
1588                                "  -p<N>  Run N tests in parallel\n"
1589                                "  -l/-L  Only/dont run local tests (no broker needed)\n"
1590 			       "  -k/-K  Only/dont run tests with known issues\n"
1591                                "  -E     Don't run sockem tests\n"
1592                                "  -a     Assert on failures\n"
1593 			       "  -S     Dont show test summary\n"
1594                                "  -s <scenario> Test scenario.\n"
1595 			       "  -V <N.N.N.N> Broker version.\n"
1596                                "  -D     Delete all test topics between each test (-p1) or after all tests\n"
1597                                "  -P     Run all tests with `enable.idempotency=true`\n"
1598                                "  -Q     Run tests in quick mode: faster tests, fewer iterations, less data.\n"
1599                                "  -R     Check resource usage thresholds.\n"
1600                                "  -R<C>  Check resource usage thresholds but adjust CPU thresholds by C (float):\n"
1601                                "            C < 1.0: CPU is faster than base line system.\n"
1602                                "            C > 1.0: CPU is slower than base line system.\n"
1603                                "            E.g. -R2.5 = CPU is 2.5x slower than base line system.\n"
1604 			       "\n"
1605 			       "Environment variables:\n"
1606 			       "  TESTS - substring matched test to run (e.g., 0033)\n"
1607 			       "  TEST_KAFKA_VERSION - broker version (e.g., 0.9.0.1)\n"
1608                                "  TEST_SCENARIO - Test scenario\n"
1609 			       "  TEST_LEVEL - Test verbosity level\n"
1610 			       "  TEST_MODE - bare, helgrind, valgrind\n"
1611 			       "  TEST_SEED - random seed\n"
1612 			       "  RDKAFKA_TEST_CONF - test config file (test.conf)\n"
1613 			       "  KAFKA_PATH - Path to kafka source dir\n"
1614 			       "  ZK_ADDRESS - Zookeeper address\n"
1615                                "\n",
1616                                argv[i], argv[0]);
1617                         exit(1);
1618                 }
1619         }
1620 
1621 	TEST_SAY("Git version: %s\n", test_git_version);
1622 
1623         if (!strcmp(test_broker_version_str, "trunk"))
1624                 test_broker_version_str = "9.9.9.9"; /* for now */
1625 
1626         d = 0;
1627         if (sscanf(test_broker_version_str, "%d.%d.%d.%d",
1628 		   &a, &b, &c, &d) < 3) {
1629 		printf("%% Expected broker version to be in format "
1630 		       "N.N.N (N=int), not %s\n",
1631 		       test_broker_version_str);
1632 		exit(1);
1633 	}
1634 	test_broker_version = TEST_BRKVER(a, b, c, d);
1635 	TEST_SAY("Broker version: %s (%d.%d.%d.%d)\n",
1636 		 test_broker_version_str,
1637 		 TEST_BRKVER_X(test_broker_version, 0),
1638 		 TEST_BRKVER_X(test_broker_version, 1),
1639 		 TEST_BRKVER_X(test_broker_version, 2),
1640 		 TEST_BRKVER_X(test_broker_version, 3));
1641 
1642 	/* Set up fake "<MAIN>" test for all operations performed in
1643 	 * the main thread rather than the per-test threads.
1644 	 * Nice side effect is that we get timing and status for main as well.*/
1645         test_curr = &tests[0];
1646         test_curr->state = TEST_PASSED;
1647         test_curr->start = test_clock();
1648 
1649         if (test_on_ci) {
1650                 TEST_LOCK();
1651                 test_timeout_multiplier += 2;
1652                 TEST_UNLOCK();
1653         }
1654 
1655 	if (!strcmp(test_mode, "helgrind") ||
1656 	    !strcmp(test_mode, "drd")) {
1657 		TEST_LOCK();
1658 		test_timeout_multiplier += 5;
1659 		TEST_UNLOCK();
1660 	} else if (!strcmp(test_mode, "valgrind")) {
1661 		TEST_LOCK();
1662 		test_timeout_multiplier += 3;
1663 		TEST_UNLOCK();
1664 	}
1665 
1666         /* Broker version 0.9 and api.version.request=true (which is default)
1667          * will cause a 10s stall per connection. Instead of fixing
1668          * that for each affected API in every test we increase the timeout
1669          * multiplier accordingly instead. The typical consume timeout is 5
1670          * seconds, so a multiplier of 3 should be good. */
1671         if ((test_broker_version & 0xffff0000) == 0x00090000)
1672                 test_timeout_multiplier += 3;
1673 
1674         if (test_concurrent_max > 1)
1675                 test_timeout_multiplier += (double)test_concurrent_max / 3;
1676 
1677 	TEST_SAY("Tests to run : %s\n", tests_to_run ? tests_to_run : "all");
1678 	TEST_SAY("Test mode    : %s%s\n", test_quick ? "quick, ":"", test_mode);
1679         TEST_SAY("Test scenario: %s\n", test_scenario);
1680         TEST_SAY("Test filter  : %s\n",
1681                  (test_flags & TEST_F_LOCAL) ? "local tests only" : "no filter");
1682         TEST_SAY("Test timeout multiplier: %.1f\n", test_timeout_multiplier);
1683         TEST_SAY("Action on test failure: %s\n",
1684                  test_assert_on_fail ? "assert crash" : "continue other tests");
1685         if (test_rusage)
1686                 TEST_SAY("Test rusage : yes (%.2fx CPU calibration)\n",
1687                          test_rusage_cpu_calibration);
1688         if (test_idempotent_producer)
1689                 TEST_SAY("Test Idempotent Producer: enabled\n");
1690 
1691         {
1692                 char cwd[512], *pcwd;
1693 #ifdef _MSC_VER
1694                 pcwd = _getcwd(cwd, sizeof(cwd) - 1);
1695 #else
1696                 pcwd = getcwd(cwd, sizeof(cwd) - 1);
1697 #endif
1698                 if (pcwd)
1699                         TEST_SAY("Current directory: %s\n", cwd);
1700         }
1701 
1702         test_timeout_set(30);
1703 
1704         TIMING_START(&t_all, "ALL-TESTS");
1705 
1706         /* Run tests */
1707         run_tests(argc, argv);
1708 
1709         TEST_LOCK();
1710         while (tests_running_cnt > 0 && !test_exit) {
1711                 struct test *test;
1712 
1713                 if (!test_quick && test_level >= 2) {
1714                         TEST_SAY("%d test(s) running:", tests_running_cnt);
1715 
1716                         for (test = tests ; test->name ; test++) {
1717                                 if (test->state != TEST_RUNNING)
1718                                         continue;
1719 
1720                                 TEST_SAY0(" %s", test->name);
1721                         }
1722 
1723                         TEST_SAY0("\n");
1724                 }
1725 
1726                 check_test_timeouts();
1727 
1728                 TEST_UNLOCK();
1729 
1730                 if (test_quick)
1731                         rd_usleep(200*1000, NULL);
1732                 else
1733                         rd_sleep(1);
1734                 TEST_LOCK();
1735         }
1736 
1737 	TIMING_STOP(&t_all);
1738 
1739         test_curr = &tests[0];
1740         test_curr->duration = test_clock() - test_curr->start;
1741 
1742         TEST_UNLOCK();
1743 
1744         if (test_delete_topics_between)
1745                 test_delete_all_test_topics(60*1000);
1746 
1747         r = test_summary(1/*lock*/) ? 1 : 0;
1748 
1749         /* Wait for everything to be cleaned up since broker destroys are
1750 	 * handled in its own thread. */
1751 	test_wait_exit(0);
1752 
1753 	/* If we havent failed at this point then
1754 	 * there were no threads leaked */
1755         if (r == 0)
1756                 TEST_SAY("\n============== ALL TESTS PASSED ==============\n");
1757 
1758 	test_cleanup();
1759 
1760 	if (r > 0)
1761 		TEST_FAIL("%d test(s) failed, see previous errors", r);
1762 
1763 	return r;
1764 }
1765 
1766 
1767 
1768 
1769 
1770 /******************************************************************************
1771  *
1772  * Helpers
1773  *
1774  ******************************************************************************/
1775 
test_dr_msg_cb(rd_kafka_t * rk,const rd_kafka_message_t * rkmessage,void * opaque)1776 void test_dr_msg_cb (rd_kafka_t *rk, const rd_kafka_message_t *rkmessage,
1777                      void *opaque) {
1778         int *remainsp = rkmessage->_private;
1779         static const char *status_names[] = {
1780                 [RD_KAFKA_MSG_STATUS_NOT_PERSISTED] = "NotPersisted",
1781                 [RD_KAFKA_MSG_STATUS_POSSIBLY_PERSISTED] = "PossiblyPersisted",
1782                 [RD_KAFKA_MSG_STATUS_PERSISTED] = "Persisted"
1783         };
1784 
1785         TEST_SAYL(4, "Delivery report: %s (%s) to %s [%"PRId32"]\n",
1786                   rd_kafka_err2str(rkmessage->err),
1787                   status_names[rd_kafka_message_status(rkmessage)],
1788                   rd_kafka_topic_name(rkmessage->rkt),
1789                   rkmessage->partition);
1790 
1791         if (!test_curr->produce_sync) {
1792                 if (!test_curr->ignore_dr_err &&
1793                     rkmessage->err != test_curr->exp_dr_err)
1794                         TEST_FAIL("Message delivery (to %s [%"PRId32"]) "
1795                                   "failed: expected %s, got %s",
1796                                   rd_kafka_topic_name(rkmessage->rkt),
1797                                   rkmessage->partition,
1798                                   rd_kafka_err2str(test_curr->exp_dr_err),
1799                                   rd_kafka_err2str(rkmessage->err));
1800 
1801                 if ((int)test_curr->exp_dr_status != -1) {
1802                         rd_kafka_msg_status_t status =
1803                                 rd_kafka_message_status(rkmessage);
1804 
1805                         TEST_ASSERT(status == test_curr->exp_dr_status,
1806                                     "Expected message status %s, not %s",
1807                                     status_names[test_curr->exp_dr_status],
1808                                     status_names[status]);
1809                 }
1810         }
1811 
1812 	if (remainsp) {
1813                 TEST_ASSERT(*remainsp > 0,
1814                             "Too many messages delivered (remains %i)",
1815                             *remainsp);
1816 
1817                 (*remainsp)--;
1818         }
1819 
1820         if (test_curr->produce_sync)
1821                 test_curr->produce_sync_err = rkmessage->err;
1822 }
1823 
1824 
test_create_handle(int mode,rd_kafka_conf_t * conf)1825 rd_kafka_t *test_create_handle (int mode, rd_kafka_conf_t *conf) {
1826 	rd_kafka_t *rk;
1827 	char errstr[512];
1828 
1829         if (!conf) {
1830                 test_conf_init(&conf, NULL, 0);
1831 #if WITH_SOCKEM
1832                 if (*test_sockem_conf)
1833                         test_socket_enable(conf);
1834 #endif
1835         } else {
1836                 test_conf_set(conf, "client.id", test_curr->name);
1837         }
1838 
1839 
1840 
1841 	/* Creat kafka instance */
1842 	rk = rd_kafka_new(mode, conf, errstr, sizeof(errstr));
1843 	if (!rk)
1844 		TEST_FAIL("Failed to create rdkafka instance: %s\n", errstr);
1845 
1846 	TEST_SAY("Created    kafka instance %s\n", rd_kafka_name(rk));
1847 
1848 	return rk;
1849 }
1850 
1851 
test_create_producer(void)1852 rd_kafka_t *test_create_producer (void) {
1853 	rd_kafka_conf_t *conf;
1854 
1855 	test_conf_init(&conf, NULL, 0);
1856         rd_kafka_conf_set_dr_msg_cb(conf, test_dr_msg_cb);
1857 
1858 	return test_create_handle(RD_KAFKA_PRODUCER, conf);
1859 }
1860 
1861 
1862 /**
1863  * Create topic_t object with va-arg list as key-value config pairs
1864  * terminated by NULL.
1865  */
test_create_topic_object(rd_kafka_t * rk,const char * topic,...)1866 rd_kafka_topic_t *test_create_topic_object (rd_kafka_t *rk,
1867 					    const char *topic, ...) {
1868 	rd_kafka_topic_t *rkt;
1869 	rd_kafka_topic_conf_t *topic_conf;
1870 	va_list ap;
1871 	const char *name, *val;
1872 
1873 	test_conf_init(NULL, &topic_conf, 0);
1874 
1875 	va_start(ap, topic);
1876 	while ((name = va_arg(ap, const char *)) &&
1877 	       (val = va_arg(ap, const char *))) {
1878                 test_topic_conf_set(topic_conf, name, val);
1879 	}
1880 	va_end(ap);
1881 
1882 	rkt = rd_kafka_topic_new(rk, topic, topic_conf);
1883 	if (!rkt)
1884 		TEST_FAIL("Failed to create topic: %s\n",
1885                           rd_kafka_err2str(rd_kafka_last_error()));
1886 
1887 	return rkt;
1888 
1889 }
1890 
1891 
test_create_producer_topic(rd_kafka_t * rk,const char * topic,...)1892 rd_kafka_topic_t *test_create_producer_topic (rd_kafka_t *rk,
1893 	const char *topic, ...) {
1894 	rd_kafka_topic_t *rkt;
1895 	rd_kafka_topic_conf_t *topic_conf;
1896 	char errstr[512];
1897 	va_list ap;
1898 	const char *name, *val;
1899 
1900 	test_conf_init(NULL, &topic_conf, 0);
1901 
1902 	va_start(ap, topic);
1903 	while ((name = va_arg(ap, const char *)) &&
1904 	       (val = va_arg(ap, const char *))) {
1905 		if (rd_kafka_topic_conf_set(topic_conf, name, val,
1906 			errstr, sizeof(errstr)) != RD_KAFKA_CONF_OK)
1907 			TEST_FAIL("Conf failed: %s\n", errstr);
1908 	}
1909 	va_end(ap);
1910 
1911 	/* Make sure all replicas are in-sync after producing
1912 	 * so that consume test wont fail. */
1913         rd_kafka_topic_conf_set(topic_conf, "request.required.acks", "-1",
1914                                 errstr, sizeof(errstr));
1915 
1916 
1917 	rkt = rd_kafka_topic_new(rk, topic, topic_conf);
1918 	if (!rkt)
1919 		TEST_FAIL("Failed to create topic: %s\n",
1920                           rd_kafka_err2str(rd_kafka_last_error()));
1921 
1922 	return rkt;
1923 
1924 }
1925 
1926 
1927 
1928 /**
1929  * Produces \p cnt messages and returns immediately.
1930  * Does not wait for delivery.
1931  * \p msgcounterp is incremented for each produced messages and passed
1932  * as \p msg_opaque which is later used in test_dr_msg_cb to decrement
1933  * the counter on delivery.
1934  *
1935  * If \p payload is NULL the message key and payload will be formatted
1936  * according to standard test format, otherwise the key will be NULL and
1937  * payload send as message payload.
1938  *
1939  * Default message size is 128 bytes, if \p size is non-zero and \p payload
1940  * is NULL the message size of \p size will be used.
1941  */
test_produce_msgs_nowait(rd_kafka_t * rk,rd_kafka_topic_t * rkt,uint64_t testid,int32_t partition,int msg_base,int cnt,const char * payload,size_t size,int msgrate,int * msgcounterp)1942 void test_produce_msgs_nowait (rd_kafka_t *rk, rd_kafka_topic_t *rkt,
1943                                uint64_t testid, int32_t partition,
1944                                int msg_base, int cnt,
1945                                const char *payload, size_t size, int msgrate,
1946                                int *msgcounterp) {
1947 	int msg_id;
1948 	test_timing_t t_all, t_poll;
1949 	char key[128];
1950 	void *buf;
1951 	int64_t tot_bytes = 0;
1952         int64_t tot_time_poll = 0;
1953         int64_t per_msg_wait = 0;
1954 
1955         if (msgrate > 0)
1956                 per_msg_wait = 1000000 / (int64_t)msgrate;
1957 
1958 
1959 	if (payload)
1960 		buf = (void *)payload;
1961 	else {
1962 		if (size == 0)
1963 			size = 128;
1964 		buf = calloc(1, size);
1965 	}
1966 
1967 	TEST_SAY("Produce to %s [%"PRId32"]: messages #%d..%d\n",
1968 		 rd_kafka_topic_name(rkt), partition, msg_base, msg_base+cnt);
1969 
1970 	TIMING_START(&t_all, "PRODUCE");
1971         TIMING_START(&t_poll, "SUM(POLL)");
1972 
1973 	for (msg_id = msg_base ; msg_id < msg_base + cnt ; msg_id++) {
1974                 int wait_time = 0;
1975 
1976                 if (!payload)
1977                         test_prepare_msg(testid, partition, msg_id,
1978                                          buf, size, key, sizeof(key));
1979 
1980 
1981 		if (rd_kafka_produce(rkt, partition,
1982 				     RD_KAFKA_MSG_F_COPY,
1983 				     buf, size,
1984 				     !payload ? key : NULL,
1985 				     !payload ? strlen(key) : 0,
1986 				     msgcounterp) == -1)
1987 			TEST_FAIL("Failed to produce message %i "
1988 				  "to partition %i: %s",
1989 				  msg_id, (int)partition,
1990                                   rd_kafka_err2str(rd_kafka_last_error()));
1991 
1992                 (*msgcounterp)++;
1993 		tot_bytes += size;
1994 
1995                 TIMING_RESTART(&t_poll);
1996                 do {
1997                         if (per_msg_wait) {
1998                                 wait_time = (int)(per_msg_wait -
1999                                                   TIMING_DURATION(&t_poll)) /
2000                                         1000;
2001                                 if (wait_time < 0)
2002                                         wait_time = 0;
2003                         }
2004                         rd_kafka_poll(rk, wait_time);
2005                 } while (wait_time > 0);
2006 
2007                 tot_time_poll = TIMING_DURATION(&t_poll);
2008 
2009 		if (TIMING_EVERY(&t_all, 3*1000000))
2010 			TEST_SAY("produced %3d%%: %d/%d messages "
2011 				 "(%d msgs/s, %d bytes/s)\n",
2012 				 ((msg_id - msg_base) * 100) / cnt,
2013 				 msg_id - msg_base, cnt,
2014 				 (int)((msg_id - msg_base) /
2015 				       (TIMING_DURATION(&t_all) / 1000000)),
2016 				 (int)((tot_bytes) /
2017 				       (TIMING_DURATION(&t_all) / 1000000)));
2018         }
2019 
2020 	if (!payload)
2021 		free(buf);
2022 
2023         t_poll.duration = tot_time_poll;
2024         TIMING_STOP(&t_poll);
2025 	TIMING_STOP(&t_all);
2026 }
2027 
2028 /**
2029  * Waits for the messages tracked by counter \p msgcounterp to be delivered.
2030  */
test_wait_delivery(rd_kafka_t * rk,int * msgcounterp)2031 void test_wait_delivery (rd_kafka_t *rk, int *msgcounterp) {
2032 	test_timing_t t_all;
2033         int start_cnt = *msgcounterp;
2034 
2035         TIMING_START(&t_all, "PRODUCE.DELIVERY.WAIT");
2036 
2037 	/* Wait for messages to be delivered */
2038 	while (*msgcounterp > 0 && rd_kafka_outq_len(rk) > 0) {
2039 		rd_kafka_poll(rk, 10);
2040                 if (TIMING_EVERY(&t_all, 3*1000000)) {
2041                         int delivered = start_cnt - *msgcounterp;
2042                         TEST_SAY("wait_delivery: "
2043                                  "%d/%d messages delivered: %d msgs/s\n",
2044                                  delivered, start_cnt,
2045                                  (int)(delivered /
2046                                        (TIMING_DURATION(&t_all) / 1000000)));
2047                 }
2048         }
2049 
2050 	TIMING_STOP(&t_all);
2051 
2052         TEST_ASSERT(*msgcounterp == 0,
2053                     "Not all messages delivered: msgcounter still at %d, "
2054                     "outq_len %d",
2055                     *msgcounterp, rd_kafka_outq_len(rk));
2056 }
2057 
2058 /**
2059  * Produces \p cnt messages and waits for succesful delivery
2060  */
test_produce_msgs(rd_kafka_t * rk,rd_kafka_topic_t * rkt,uint64_t testid,int32_t partition,int msg_base,int cnt,const char * payload,size_t size)2061 void test_produce_msgs (rd_kafka_t *rk, rd_kafka_topic_t *rkt,
2062                         uint64_t testid, int32_t partition,
2063                         int msg_base, int cnt,
2064 			const char *payload, size_t size) {
2065 	int remains = 0;
2066 
2067         test_produce_msgs_nowait(rk, rkt, testid, partition, msg_base, cnt,
2068                                  payload, size, 0, &remains);
2069 
2070         test_wait_delivery(rk, &remains);
2071 }
2072 
2073 
2074 /**
2075  * @brief Produces \p cnt messages and waits for succesful delivery
2076  */
test_produce_msgs2(rd_kafka_t * rk,const char * topic,uint64_t testid,int32_t partition,int msg_base,int cnt,const char * payload,size_t size)2077 void test_produce_msgs2 (rd_kafka_t *rk, const char *topic,
2078                          uint64_t testid, int32_t partition,
2079                          int msg_base, int cnt,
2080                          const char *payload, size_t size) {
2081         int remains = 0;
2082         rd_kafka_topic_t *rkt = test_create_topic_object(rk, topic, NULL);
2083 
2084         test_produce_msgs_nowait(rk, rkt, testid, partition, msg_base, cnt,
2085                                  payload, size, 0, &remains);
2086 
2087         test_wait_delivery(rk, &remains);
2088 
2089         rd_kafka_topic_destroy(rkt);
2090 }
2091 
2092 /**
2093  * @brief Produces \p cnt messages without waiting for delivery.
2094  */
test_produce_msgs2_nowait(rd_kafka_t * rk,const char * topic,uint64_t testid,int32_t partition,int msg_base,int cnt,const char * payload,size_t size,int * remainsp)2095 void test_produce_msgs2_nowait (rd_kafka_t *rk, const char *topic,
2096                                 uint64_t testid, int32_t partition,
2097                                 int msg_base, int cnt,
2098                                 const char *payload, size_t size,
2099                                 int *remainsp) {
2100         rd_kafka_topic_t *rkt = test_create_topic_object(rk, topic, NULL);
2101 
2102         test_produce_msgs_nowait(rk, rkt, testid, partition, msg_base, cnt,
2103                                  payload, size, 0, remainsp);
2104 
2105         rd_kafka_topic_destroy(rkt);
2106 }
2107 
2108 
2109 /**
2110  * Produces \p cnt messages at \p msgs/s, and waits for succesful delivery
2111  */
test_produce_msgs_rate(rd_kafka_t * rk,rd_kafka_topic_t * rkt,uint64_t testid,int32_t partition,int msg_base,int cnt,const char * payload,size_t size,int msgrate)2112 void test_produce_msgs_rate (rd_kafka_t *rk, rd_kafka_topic_t *rkt,
2113                              uint64_t testid, int32_t partition,
2114                              int msg_base, int cnt,
2115                              const char *payload, size_t size, int msgrate) {
2116 	int remains = 0;
2117 
2118         test_produce_msgs_nowait(rk, rkt, testid, partition, msg_base, cnt,
2119                                  payload, size, msgrate, &remains);
2120 
2121         test_wait_delivery(rk, &remains);
2122 }
2123 
2124 
2125 
2126 /**
2127  * Create producer, produce \p msgcnt messages to \p topic \p partition,
2128  * destroy consumer, and returns the used testid.
2129  */
2130 uint64_t
test_produce_msgs_easy_size(const char * topic,uint64_t testid,int32_t partition,int msgcnt,size_t size)2131 test_produce_msgs_easy_size (const char *topic, uint64_t testid,
2132                              int32_t partition, int msgcnt, size_t size) {
2133         rd_kafka_t *rk;
2134         rd_kafka_topic_t *rkt;
2135         test_timing_t t_produce;
2136 
2137         if (!testid)
2138                 testid = test_id_generate();
2139         rk = test_create_producer();
2140         rkt = test_create_producer_topic(rk, topic, NULL);
2141 
2142         TIMING_START(&t_produce, "PRODUCE");
2143         test_produce_msgs(rk, rkt, testid, partition, 0, msgcnt, NULL, size);
2144         TIMING_STOP(&t_produce);
2145         rd_kafka_topic_destroy(rkt);
2146         rd_kafka_destroy(rk);
2147 
2148         return testid;
2149 }
2150 
test_produce_sync(rd_kafka_t * rk,rd_kafka_topic_t * rkt,uint64_t testid,int32_t partition)2151 rd_kafka_resp_err_t test_produce_sync (rd_kafka_t *rk, rd_kafka_topic_t *rkt,
2152                                        uint64_t testid, int32_t partition) {
2153         test_curr->produce_sync = 1;
2154         test_produce_msgs(rk, rkt, testid, partition, 0, 1, NULL, 0);
2155         test_curr->produce_sync = 0;
2156         return test_curr->produce_sync_err;
2157 }
2158 
2159 
2160 /**
2161  * @brief Easy produce function.
2162  *
2163  * @param ... is a NULL-terminated list of key, value config property pairs.
2164  */
test_produce_msgs_easy_v(const char * topic,int32_t partition,uint64_t testid,int msg_base,int cnt,size_t size,...)2165 void test_produce_msgs_easy_v (const char *topic,
2166                                int32_t partition, uint64_t testid,
2167                                int msg_base, int cnt, size_t size, ...) {
2168         rd_kafka_conf_t *conf;
2169         rd_kafka_t *p;
2170         rd_kafka_topic_t *rkt;
2171         va_list ap;
2172         const char *key, *val;
2173 
2174         test_conf_init(&conf, NULL, 0);
2175 
2176         va_start(ap, size);
2177         while ((key = va_arg(ap, const char *)) &&
2178                (val = va_arg(ap, const char *)))
2179                 test_conf_set(conf, key, val);
2180         va_end(ap);
2181 
2182         rd_kafka_conf_set_dr_msg_cb(conf, test_dr_msg_cb);
2183 
2184         p = test_create_handle(RD_KAFKA_PRODUCER, conf);
2185 
2186         rkt = test_create_producer_topic(p, topic, NULL);
2187 
2188         test_produce_msgs(p, rkt, testid, partition, msg_base, cnt, NULL, size);
2189 
2190         rd_kafka_topic_destroy(rkt);
2191         rd_kafka_destroy(p);
2192 }
2193 
2194 
2195 
test_create_consumer(const char * group_id,void (* rebalance_cb)(rd_kafka_t * rk,rd_kafka_resp_err_t err,rd_kafka_topic_partition_list_t * partitions,void * opaque),rd_kafka_conf_t * conf,rd_kafka_topic_conf_t * default_topic_conf)2196 rd_kafka_t *test_create_consumer (const char *group_id,
2197 				  void (*rebalance_cb) (
2198 					  rd_kafka_t *rk,
2199 					  rd_kafka_resp_err_t err,
2200 					  rd_kafka_topic_partition_list_t
2201 					  *partitions,
2202 					  void *opaque),
2203 				  rd_kafka_conf_t *conf,
2204                                   rd_kafka_topic_conf_t *default_topic_conf) {
2205 	rd_kafka_t *rk;
2206 	char tmp[64];
2207 
2208 	if (!conf)
2209 		test_conf_init(&conf, NULL, 0);
2210 
2211         if (group_id) {
2212 		test_conf_set(conf, "group.id", group_id);
2213 
2214 		rd_snprintf(tmp, sizeof(tmp), "%d", test_session_timeout_ms);
2215 		test_conf_set(conf, "session.timeout.ms", tmp);
2216 
2217 		if (rebalance_cb)
2218 			rd_kafka_conf_set_rebalance_cb(conf, rebalance_cb);
2219 	} else {
2220 		TEST_ASSERT(!rebalance_cb);
2221 	}
2222 
2223         if (default_topic_conf)
2224                 rd_kafka_conf_set_default_topic_conf(conf, default_topic_conf);
2225 
2226 	/* Create kafka instance */
2227 	rk = test_create_handle(RD_KAFKA_CONSUMER, conf);
2228 
2229 	if (group_id)
2230 		rd_kafka_poll_set_consumer(rk);
2231 
2232 	return rk;
2233 }
2234 
test_create_consumer_topic(rd_kafka_t * rk,const char * topic)2235 rd_kafka_topic_t *test_create_consumer_topic (rd_kafka_t *rk,
2236                                               const char *topic) {
2237 	rd_kafka_topic_t *rkt;
2238 	rd_kafka_topic_conf_t *topic_conf;
2239 
2240 	test_conf_init(NULL, &topic_conf, 0);
2241 
2242 	rkt = rd_kafka_topic_new(rk, topic, topic_conf);
2243 	if (!rkt)
2244 		TEST_FAIL("Failed to create topic: %s\n",
2245                           rd_kafka_err2str(rd_kafka_last_error()));
2246 
2247 	return rkt;
2248 }
2249 
2250 
test_consumer_start(const char * what,rd_kafka_topic_t * rkt,int32_t partition,int64_t start_offset)2251 void test_consumer_start (const char *what,
2252                           rd_kafka_topic_t *rkt, int32_t partition,
2253                           int64_t start_offset) {
2254 
2255 	TEST_SAY("%s: consumer_start: %s [%"PRId32"] at offset %"PRId64"\n",
2256 		 what, rd_kafka_topic_name(rkt), partition, start_offset);
2257 
2258 	if (rd_kafka_consume_start(rkt, partition, start_offset) == -1)
2259 		TEST_FAIL("%s: consume_start failed: %s\n",
2260 			  what, rd_kafka_err2str(rd_kafka_last_error()));
2261 }
2262 
test_consumer_stop(const char * what,rd_kafka_topic_t * rkt,int32_t partition)2263 void test_consumer_stop (const char *what,
2264                          rd_kafka_topic_t *rkt, int32_t partition) {
2265 
2266 	TEST_SAY("%s: consumer_stop: %s [%"PRId32"]\n",
2267 		 what, rd_kafka_topic_name(rkt), partition);
2268 
2269 	if (rd_kafka_consume_stop(rkt, partition) == -1)
2270 		TEST_FAIL("%s: consume_stop failed: %s\n",
2271 			  what, rd_kafka_err2str(rd_kafka_last_error()));
2272 }
2273 
test_consumer_seek(const char * what,rd_kafka_topic_t * rkt,int32_t partition,int64_t offset)2274 void test_consumer_seek (const char *what, rd_kafka_topic_t *rkt,
2275                          int32_t partition, int64_t offset) {
2276 	int err;
2277 
2278 	TEST_SAY("%s: consumer_seek: %s [%"PRId32"] to offset %"PRId64"\n",
2279 		 what, rd_kafka_topic_name(rkt), partition, offset);
2280 
2281 	if ((err = rd_kafka_seek(rkt, partition, offset, 2000)))
2282 		TEST_FAIL("%s: consume_seek(%s, %"PRId32", %"PRId64") "
2283 			  "failed: %s\n",
2284 			  what,
2285 			  rd_kafka_topic_name(rkt), partition, offset,
2286 			  rd_kafka_err2str(err));
2287 }
2288 
2289 
2290 
2291 /**
2292  * Returns offset of the last message consumed
2293  */
test_consume_msgs(const char * what,rd_kafka_topic_t * rkt,uint64_t testid,int32_t partition,int64_t offset,int exp_msg_base,int exp_cnt,int parse_fmt)2294 int64_t test_consume_msgs (const char *what, rd_kafka_topic_t *rkt,
2295                            uint64_t testid, int32_t partition, int64_t offset,
2296                            int exp_msg_base, int exp_cnt, int parse_fmt) {
2297 	int cnt = 0;
2298 	int msg_next = exp_msg_base;
2299 	int fails = 0;
2300 	int64_t offset_last = -1;
2301 	int64_t tot_bytes = 0;
2302 	test_timing_t t_first, t_all;
2303 
2304 	TEST_SAY("%s: consume_msgs: %s [%"PRId32"]: expect msg #%d..%d "
2305 		 "at offset %"PRId64"\n",
2306 		 what, rd_kafka_topic_name(rkt), partition,
2307 		 exp_msg_base, exp_msg_base+exp_cnt, offset);
2308 
2309 	if (offset != TEST_NO_SEEK) {
2310 		rd_kafka_resp_err_t err;
2311 		test_timing_t t_seek;
2312 
2313 		TIMING_START(&t_seek, "SEEK");
2314 		if ((err = rd_kafka_seek(rkt, partition, offset, 5000)))
2315 			TEST_FAIL("%s: consume_msgs: %s [%"PRId32"]: "
2316 				  "seek to %"PRId64" failed: %s\n",
2317 				  what, rd_kafka_topic_name(rkt), partition,
2318 				  offset, rd_kafka_err2str(err));
2319 		TIMING_STOP(&t_seek);
2320 		TEST_SAY("%s: seeked to offset %"PRId64"\n", what, offset);
2321 	}
2322 
2323 	TIMING_START(&t_first, "FIRST MSG");
2324 	TIMING_START(&t_all, "ALL MSGS");
2325 
2326 	while (cnt < exp_cnt) {
2327 		rd_kafka_message_t *rkmessage;
2328 		int msg_id;
2329 
2330                 rkmessage = rd_kafka_consume(rkt, partition,
2331                                              tmout_multip(5000));
2332 
2333 		if (TIMING_EVERY(&t_all, 3*1000000))
2334 			TEST_SAY("%s: "
2335 				 "consumed %3d%%: %d/%d messages "
2336 				 "(%d msgs/s, %d bytes/s)\n",
2337 				 what, cnt * 100 / exp_cnt, cnt, exp_cnt,
2338 				 (int)(cnt /
2339 				       (TIMING_DURATION(&t_all) / 1000000)),
2340 				 (int)(tot_bytes /
2341 				       (TIMING_DURATION(&t_all) / 1000000)));
2342 
2343 		if (!rkmessage)
2344 			TEST_FAIL("%s: consume_msgs: %s [%"PRId32"]: "
2345 				  "expected msg #%d (%d/%d): timed out\n",
2346 				  what, rd_kafka_topic_name(rkt), partition,
2347 				  msg_next, cnt, exp_cnt);
2348 
2349 		if (rkmessage->err)
2350 			TEST_FAIL("%s: consume_msgs: %s [%"PRId32"]: "
2351 				  "expected msg #%d (%d/%d): got error: %s\n",
2352 				  what, rd_kafka_topic_name(rkt), partition,
2353 				  msg_next, cnt, exp_cnt,
2354 				  rd_kafka_err2str(rkmessage->err));
2355 
2356 		if (cnt == 0)
2357 			TIMING_STOP(&t_first);
2358 
2359 		if (parse_fmt)
2360 			test_msg_parse(testid, rkmessage, partition, &msg_id);
2361 		else
2362 			msg_id = 0;
2363 
2364 		if (test_level >= 3)
2365 			TEST_SAY("%s: consume_msgs: %s [%"PRId32"]: "
2366 				 "got msg #%d at offset %"PRId64
2367 				 " (expect #%d at offset %"PRId64")\n",
2368 				 what, rd_kafka_topic_name(rkt), partition,
2369 				 msg_id, rkmessage->offset,
2370 				 msg_next,
2371 				 offset >= 0 ? offset + cnt : -1);
2372 
2373 		if (parse_fmt && msg_id != msg_next) {
2374 			TEST_SAY("%s: consume_msgs: %s [%"PRId32"]: "
2375 				 "expected msg #%d (%d/%d): got msg #%d\n",
2376 				 what, rd_kafka_topic_name(rkt), partition,
2377 				 msg_next, cnt, exp_cnt, msg_id);
2378 			fails++;
2379 		}
2380 
2381 		cnt++;
2382 		tot_bytes += rkmessage->len;
2383 		msg_next++;
2384 		offset_last = rkmessage->offset;
2385 
2386 		rd_kafka_message_destroy(rkmessage);
2387 	}
2388 
2389 	TIMING_STOP(&t_all);
2390 
2391 	if (fails)
2392 		TEST_FAIL("%s: consume_msgs: %s [%"PRId32"]: %d failures\n",
2393 			  what, rd_kafka_topic_name(rkt), partition, fails);
2394 
2395 	TEST_SAY("%s: consume_msgs: %s [%"PRId32"]: "
2396 		 "%d/%d messages consumed succesfully\n",
2397 		 what, rd_kafka_topic_name(rkt), partition,
2398 		 cnt, exp_cnt);
2399 	return offset_last;
2400 }
2401 
2402 
2403 /**
2404  * Create high-level consumer subscribing to \p topic from BEGINNING
2405  * and expects \d exp_msgcnt with matching \p testid
2406  * Destroys consumer when done.
2407  *
2408  * @param txn If true, isolation.level is set to read_committed.
2409  * @param partition If -1 the topic will be subscribed to, otherwise the
2410  *                  single partition will be assigned immediately.
2411  *
2412  * If \p group_id is NULL a new unique group is generated
2413  */
2414 void
test_consume_msgs_easy_mv0(const char * group_id,const char * topic,rd_bool_t txn,int32_t partition,uint64_t testid,int exp_eofcnt,int exp_msgcnt,rd_kafka_topic_conf_t * tconf,test_msgver_t * mv)2415 test_consume_msgs_easy_mv0 (const char *group_id, const char *topic,
2416                             rd_bool_t txn,
2417                             int32_t partition,
2418                             uint64_t testid, int exp_eofcnt, int exp_msgcnt,
2419                             rd_kafka_topic_conf_t *tconf,
2420                             test_msgver_t *mv) {
2421         rd_kafka_t *rk;
2422         char grpid0[64];
2423         rd_kafka_conf_t *conf;
2424 
2425         test_conf_init(&conf, tconf ? NULL : &tconf, 0);
2426 
2427         if (!group_id)
2428                 group_id = test_str_id_generate(grpid0, sizeof(grpid0));
2429 
2430         if (txn)
2431                 test_conf_set(conf, "isolation.level", "read_committed");
2432 
2433         test_topic_conf_set(tconf, "auto.offset.reset", "smallest");
2434         if (exp_eofcnt != -1)
2435                 test_conf_set(conf, "enable.partition.eof", "true");
2436         rk = test_create_consumer(group_id, NULL, conf, tconf);
2437 
2438         rd_kafka_poll_set_consumer(rk);
2439 
2440         if (partition == -1) {
2441                 TEST_SAY("Subscribing to topic %s in group %s "
2442                          "(expecting %d msgs with testid %"PRIu64")\n",
2443                          topic, group_id, exp_msgcnt, testid);
2444 
2445                 test_consumer_subscribe(rk, topic);
2446         } else {
2447                 rd_kafka_topic_partition_list_t *plist;
2448 
2449                 TEST_SAY("Assign topic %s [%"PRId32"] in group %s "
2450                          "(expecting %d msgs with testid %"PRIu64")\n",
2451                          topic, partition, group_id, exp_msgcnt, testid);
2452 
2453                 plist = rd_kafka_topic_partition_list_new(1);
2454                 rd_kafka_topic_partition_list_add(plist, topic, partition);
2455                 test_consumer_assign("consume_easy_mv", rk, plist);
2456                 rd_kafka_topic_partition_list_destroy(plist);
2457         }
2458 
2459         /* Consume messages */
2460         test_consumer_poll("consume.easy", rk, testid, exp_eofcnt,
2461                            -1, exp_msgcnt, mv);
2462 
2463         test_consumer_close(rk);
2464 
2465         rd_kafka_destroy(rk);
2466 }
2467 
2468 void
test_consume_msgs_easy(const char * group_id,const char * topic,uint64_t testid,int exp_eofcnt,int exp_msgcnt,rd_kafka_topic_conf_t * tconf)2469 test_consume_msgs_easy (const char *group_id, const char *topic,
2470                         uint64_t testid, int exp_eofcnt, int exp_msgcnt,
2471                         rd_kafka_topic_conf_t *tconf) {
2472         test_msgver_t mv;
2473 
2474         test_msgver_init(&mv, testid);
2475 
2476         test_consume_msgs_easy_mv(group_id, topic, -1, testid, exp_eofcnt,
2477                                   exp_msgcnt, tconf, &mv);
2478 
2479         test_msgver_clear(&mv);
2480 }
2481 
2482 
2483 void
test_consume_txn_msgs_easy(const char * group_id,const char * topic,uint64_t testid,int exp_eofcnt,int exp_msgcnt,rd_kafka_topic_conf_t * tconf)2484 test_consume_txn_msgs_easy (const char *group_id, const char *topic,
2485                             uint64_t testid, int exp_eofcnt, int exp_msgcnt,
2486                             rd_kafka_topic_conf_t *tconf) {
2487         test_msgver_t mv;
2488 
2489         test_msgver_init(&mv, testid);
2490 
2491         test_consume_msgs_easy_mv0(group_id, topic, rd_true/*txn*/,
2492                                    -1, testid, exp_eofcnt,
2493                                    exp_msgcnt, tconf, &mv);
2494 
2495         test_msgver_clear(&mv);
2496 }
2497 
2498 
2499 /**
2500  * @brief Waits for up to \p timeout_ms for consumer to receive assignment.
2501  *        If no assignment received without the timeout the test fails.
2502  */
test_consumer_wait_assignment(rd_kafka_t * rk)2503 void test_consumer_wait_assignment (rd_kafka_t *rk) {
2504         rd_kafka_topic_partition_list_t *assignment = NULL;
2505         int i;
2506 
2507         while (1) {
2508                 rd_kafka_resp_err_t err;
2509 
2510                 err = rd_kafka_assignment(rk, &assignment);
2511                 TEST_ASSERT(!err, "rd_kafka_assignment() failed: %s",
2512                             rd_kafka_err2str(err));
2513 
2514                 if (assignment->cnt > 0)
2515                         break;
2516 
2517                 rd_kafka_topic_partition_list_destroy(assignment);
2518 
2519                 test_consumer_poll_once(rk, NULL, 1000);
2520         }
2521 
2522         TEST_SAY("Assignment (%d partition(s)): ", assignment->cnt);
2523         for (i = 0 ; i < assignment->cnt ; i++)
2524                 TEST_SAY0("%s%s[%"PRId32"]",
2525                           i == 0 ? "" : ", ",
2526                           assignment->elems[i].topic,
2527                           assignment->elems[i].partition);
2528         TEST_SAY0("\n");
2529 
2530         rd_kafka_topic_partition_list_destroy(assignment);
2531 }
2532 
2533 
2534 /**
2535  * @brief Start subscribing for 'topic'
2536  */
test_consumer_subscribe(rd_kafka_t * rk,const char * topic)2537 void test_consumer_subscribe (rd_kafka_t *rk, const char *topic) {
2538         rd_kafka_topic_partition_list_t *topics;
2539 	rd_kafka_resp_err_t err;
2540 
2541 	topics = rd_kafka_topic_partition_list_new(1);
2542         rd_kafka_topic_partition_list_add(topics, topic,
2543 					  RD_KAFKA_PARTITION_UA);
2544 
2545         err = rd_kafka_subscribe(rk, topics);
2546         if (err)
2547                 TEST_FAIL("%s: Failed to subscribe to %s: %s\n",
2548                           rd_kafka_name(rk), topic, rd_kafka_err2str(err));
2549 
2550         rd_kafka_topic_partition_list_destroy(topics);
2551 }
2552 
2553 
test_consumer_assign(const char * what,rd_kafka_t * rk,rd_kafka_topic_partition_list_t * partitions)2554 void test_consumer_assign (const char *what, rd_kafka_t *rk,
2555 			   rd_kafka_topic_partition_list_t *partitions) {
2556         rd_kafka_resp_err_t err;
2557         test_timing_t timing;
2558 
2559         TIMING_START(&timing, "ASSIGN.PARTITIONS");
2560         err = rd_kafka_assign(rk, partitions);
2561         TIMING_STOP(&timing);
2562         if (err)
2563                 TEST_FAIL("%s: failed to assign %d partition(s): %s\n",
2564 			  what, partitions->cnt, rd_kafka_err2str(err));
2565         else
2566                 TEST_SAY("%s: assigned %d partition(s)\n",
2567 			 what, partitions->cnt);
2568 }
2569 
2570 
test_consumer_unassign(const char * what,rd_kafka_t * rk)2571 void test_consumer_unassign (const char *what, rd_kafka_t *rk) {
2572         rd_kafka_resp_err_t err;
2573         test_timing_t timing;
2574 
2575         TIMING_START(&timing, "UNASSIGN.PARTITIONS");
2576         err = rd_kafka_assign(rk, NULL);
2577         TIMING_STOP(&timing);
2578         if (err)
2579                 TEST_FAIL("%s: failed to unassign current partitions: %s\n",
2580                           what, rd_kafka_err2str(err));
2581         else
2582                 TEST_SAY("%s: unassigned current partitions\n", what);
2583 }
2584 
2585 
2586 /**
2587  * @brief Assign a single partition with an optional starting offset
2588  */
test_consumer_assign_partition(const char * what,rd_kafka_t * rk,const char * topic,int32_t partition,int64_t offset)2589 void test_consumer_assign_partition (const char *what, rd_kafka_t *rk,
2590                                      const char *topic, int32_t partition,
2591                                      int64_t offset) {
2592         rd_kafka_topic_partition_list_t *part;
2593 
2594         part = rd_kafka_topic_partition_list_new(1);
2595         rd_kafka_topic_partition_list_add(part, topic, partition)->offset =
2596                 offset;
2597 
2598         test_consumer_assign(what, rk, part);
2599 
2600         rd_kafka_topic_partition_list_destroy(part);
2601 }
2602 
2603 
2604 /**
2605  * Message verification services
2606  *
2607  */
2608 
test_msgver_init(test_msgver_t * mv,uint64_t testid)2609 void test_msgver_init (test_msgver_t *mv, uint64_t testid) {
2610 	memset(mv, 0, sizeof(*mv));
2611 	mv->testid = testid;
2612 	/* Max warning logs before suppressing. */
2613 	mv->log_max = (test_level + 1) * 100;
2614 }
2615 
2616 #define TEST_MV_WARN(mv,...) do {			\
2617 		if ((mv)->log_cnt++ > (mv)->log_max)	\
2618 			(mv)->log_suppr_cnt++;		\
2619 		else					\
2620 			TEST_WARN(__VA_ARGS__);		\
2621 	} while (0)
2622 
2623 
2624 
test_mv_mvec_grow(struct test_mv_mvec * mvec,int tot_size)2625 static void test_mv_mvec_grow (struct test_mv_mvec *mvec, int tot_size) {
2626 	if (tot_size <= mvec->size)
2627 		return;
2628 	mvec->size = tot_size;
2629 	mvec->m = realloc(mvec->m, sizeof(*mvec->m) * mvec->size);
2630 }
2631 
2632 /**
2633  * Make sure there is room for at least \p cnt messages, else grow mvec.
2634  */
test_mv_mvec_reserve(struct test_mv_mvec * mvec,int cnt)2635 static void test_mv_mvec_reserve (struct test_mv_mvec *mvec, int cnt) {
2636 	test_mv_mvec_grow(mvec, mvec->cnt + cnt);
2637 }
2638 
test_mv_mvec_init(struct test_mv_mvec * mvec,int exp_cnt)2639 void test_mv_mvec_init (struct test_mv_mvec *mvec, int exp_cnt) {
2640 	TEST_ASSERT(mvec->m == NULL, "mvec not cleared");
2641 
2642 	if (!exp_cnt)
2643 		return;
2644 
2645 	test_mv_mvec_grow(mvec, exp_cnt);
2646 }
2647 
2648 
test_mv_mvec_clear(struct test_mv_mvec * mvec)2649 void test_mv_mvec_clear (struct test_mv_mvec *mvec) {
2650 	if (mvec->m)
2651 		free(mvec->m);
2652 }
2653 
test_msgver_clear(test_msgver_t * mv)2654 void test_msgver_clear (test_msgver_t *mv) {
2655 	int i;
2656 	for (i = 0 ; i < mv->p_cnt ; i++) {
2657 		struct test_mv_p *p = mv->p[i];
2658 		free(p->topic);
2659 		test_mv_mvec_clear(&p->mvec);
2660 		free(p);
2661 	}
2662 
2663 	free(mv->p);
2664 
2665 	test_msgver_init(mv, mv->testid);
2666 }
2667 
test_msgver_p_get(test_msgver_t * mv,const char * topic,int32_t partition,int do_create)2668 struct test_mv_p *test_msgver_p_get (test_msgver_t *mv, const char *topic,
2669 				     int32_t partition, int do_create) {
2670 	int i;
2671 	struct test_mv_p *p;
2672 
2673 	for (i = 0 ; i < mv->p_cnt ; i++) {
2674 		p = mv->p[i];
2675 		if (p->partition == partition && !strcmp(p->topic, topic))
2676 			return p;
2677 	}
2678 
2679 	if (!do_create)
2680 		TEST_FAIL("Topic %s [%d] not found in msgver", topic, partition);
2681 
2682 	if (mv->p_cnt == mv->p_size) {
2683 		mv->p_size = (mv->p_size + 4) * 2;
2684 		mv->p = realloc(mv->p, sizeof(*mv->p) * mv->p_size);
2685 	}
2686 
2687 	mv->p[mv->p_cnt++] = p = calloc(1, sizeof(*p));
2688 
2689 	p->topic = rd_strdup(topic);
2690 	p->partition = partition;
2691 	p->eof_offset = RD_KAFKA_OFFSET_INVALID;
2692 
2693 	return p;
2694 }
2695 
2696 
2697 /**
2698  * Add (room for) message to message vector.
2699  * Resizes the vector as needed.
2700  */
test_mv_mvec_add(struct test_mv_mvec * mvec)2701 static struct test_mv_m *test_mv_mvec_add (struct test_mv_mvec *mvec) {
2702 	if (mvec->cnt == mvec->size) {
2703 		test_mv_mvec_grow(mvec, (mvec->size ? mvec->size * 2 : 10000));
2704 	}
2705 
2706 	mvec->cnt++;
2707 
2708 	return &mvec->m[mvec->cnt-1];
2709 }
2710 
2711 /**
2712  * Returns message at index \p mi
2713  */
test_mv_mvec_get(struct test_mv_mvec * mvec,int mi)2714 static RD_INLINE struct test_mv_m *test_mv_mvec_get (struct test_mv_mvec *mvec,
2715 						    int mi) {
2716         if (mi >= mvec->cnt)
2717                 return NULL;
2718 	return &mvec->m[mi];
2719 }
2720 
2721 /**
2722  * @returns the message with msgid \p msgid, or NULL.
2723  */
test_mv_mvec_find_by_msgid(struct test_mv_mvec * mvec,int msgid)2724 static struct test_mv_m *test_mv_mvec_find_by_msgid (struct test_mv_mvec *mvec,
2725                                                      int msgid) {
2726         int mi;
2727 
2728         for (mi = 0 ; mi < mvec->cnt ; mi++)
2729                 if (mvec->m[mi].msgid == msgid)
2730                         return &mvec->m[mi];
2731 
2732         return NULL;
2733 }
2734 
2735 
2736 /**
2737  * Print message list to \p fp
2738  */
2739 static RD_UNUSED
test_mv_mvec_dump(FILE * fp,const struct test_mv_mvec * mvec)2740 void test_mv_mvec_dump (FILE *fp, const struct test_mv_mvec *mvec) {
2741 	int mi;
2742 
2743 	fprintf(fp, "*** Dump mvec with %d messages (capacity %d): ***\n",
2744 		mvec->cnt, mvec->size);
2745 	for (mi = 0 ; mi < mvec->cnt ; mi++)
2746 		fprintf(fp, "  msgid %d, offset %"PRId64"\n",
2747 			mvec->m[mi].msgid, mvec->m[mi].offset);
2748 	fprintf(fp, "*** Done ***\n");
2749 
2750 }
2751 
test_mv_mvec_sort(struct test_mv_mvec * mvec,int (* cmp)(const void *,const void *))2752 static void test_mv_mvec_sort (struct test_mv_mvec *mvec,
2753 			       int (*cmp) (const void *, const void *)) {
2754 	qsort(mvec->m, mvec->cnt, sizeof(*mvec->m), cmp);
2755 }
2756 
2757 
2758 /**
2759  * @brief Adds a message to the msgver service.
2760  *
2761  * @returns 1 if message is from the expected testid, else 0 (not added)
2762  */
test_msgver_add_msg00(const char * func,int line,const char * clientname,test_msgver_t * mv,uint64_t testid,const char * topic,int32_t partition,int64_t offset,int64_t timestamp,rd_kafka_resp_err_t err,int msgnum)2763 int test_msgver_add_msg00 (const char *func, int line, const char *clientname,
2764                            test_msgver_t *mv,
2765                            uint64_t testid,
2766                            const char *topic, int32_t partition,
2767                            int64_t offset, int64_t timestamp,
2768                            rd_kafka_resp_err_t err, int msgnum) {
2769         struct test_mv_p *p;
2770         struct test_mv_m *m;
2771 
2772         if (testid != mv->testid) {
2773                 TEST_SAYL(3, "%s:%d: %s: mismatching testid %"PRIu64" != %"PRIu64"\n",
2774                           func, line, clientname, testid, mv->testid);
2775                 return 0; /* Ignore message */
2776         }
2777 
2778         p = test_msgver_p_get(mv, topic, partition, 1);
2779 
2780         if (err == RD_KAFKA_RESP_ERR__PARTITION_EOF) {
2781                 p->eof_offset = offset;
2782                 return 1;
2783         }
2784 
2785         m = test_mv_mvec_add(&p->mvec);
2786 
2787         m->offset = offset;
2788         m->msgid  = msgnum;
2789         m->timestamp = timestamp;
2790 
2791         if (test_level > 2) {
2792                 TEST_SAY("%s:%d: %s: "
2793                          "Recv msg %s [%"PRId32"] offset %"PRId64" msgid %d "
2794                          "timestamp %"PRId64"\n",
2795                          func, line, clientname,
2796                          p->topic, p->partition, m->offset, m->msgid,
2797                          m->timestamp);
2798         }
2799 
2800         mv->msgcnt++;
2801 
2802         return 1;
2803 }
2804 
2805 /**
2806  * Adds a message to the msgver service.
2807  *
2808  * Message must be a proper message or PARTITION_EOF.
2809  *
2810  * @param override_topic if non-NULL, overrides the rkmessage's topic
2811  *                       with this one.
2812  *
2813  * @returns 1 if message is from the expected testid, else 0 (not added).
2814  */
test_msgver_add_msg0(const char * func,int line,const char * clientname,test_msgver_t * mv,rd_kafka_message_t * rkmessage,const char * override_topic)2815 int test_msgver_add_msg0 (const char *func, int line, const char *clientname,
2816                           test_msgver_t *mv, rd_kafka_message_t *rkmessage,
2817                           const char *override_topic) {
2818 	uint64_t in_testid;
2819 	int in_part;
2820 	int in_msgnum = -1;
2821 	char buf[128];
2822         const void *val;
2823         size_t valsize;
2824 
2825         if (mv->fwd)
2826                 test_msgver_add_msg0(func, line, clientname,
2827                                      mv->fwd, rkmessage, override_topic);
2828 
2829 	if (rkmessage->err) {
2830 		if (rkmessage->err != RD_KAFKA_RESP_ERR__PARTITION_EOF)
2831 			return 0; /* Ignore error */
2832 
2833 		in_testid = mv->testid;
2834 
2835 	} else {
2836 
2837                 if (!mv->msgid_hdr) {
2838                         rd_snprintf(buf, sizeof(buf), "%.*s",
2839                                     (int)rkmessage->len,
2840                                     (char *)rkmessage->payload);
2841                         val = buf;
2842                 } else {
2843                         /* msgid is in message header */
2844                         rd_kafka_headers_t *hdrs;
2845 
2846                         if (rd_kafka_message_headers(rkmessage, &hdrs) ||
2847                             rd_kafka_header_get_last(hdrs, mv->msgid_hdr,
2848                                                      &val, &valsize)) {
2849                                 TEST_SAYL(3,
2850                                           "%s:%d: msgid expected in header %s "
2851                                           "but %s exists for "
2852                                           "message at offset %"PRId64
2853                                           " has no headers\n",
2854                                           func, line, mv->msgid_hdr,
2855                                           hdrs ? "no such header" : "no headers",
2856                                           rkmessage->offset);
2857 
2858                                 return 0;
2859                         }
2860                 }
2861 
2862                 if (sscanf(val, "testid=%"SCNu64", partition=%i, msg=%i\n",
2863                            &in_testid, &in_part, &in_msgnum) != 3)
2864                         TEST_FAIL("%s:%d: Incorrect format at offset %"PRId64
2865                                   ": %s",
2866                                   func, line, rkmessage->offset,
2867                                   (const char *)val);
2868         }
2869 
2870         return test_msgver_add_msg00(func, line, clientname, mv, in_testid,
2871                                      override_topic ?
2872                                      override_topic :
2873                                      rd_kafka_topic_name(rkmessage->rkt),
2874                                      rkmessage->partition,
2875                                      rkmessage->offset,
2876                                      rd_kafka_message_timestamp(rkmessage, NULL),
2877                                      rkmessage->err,
2878                                      in_msgnum);
2879         return 1;
2880 }
2881 
2882 
2883 
2884 /**
2885  * Verify that all messages were received in order.
2886  *
2887  * - Offsets need to occur without gaps
2888  * - msgids need to be increasing: but may have gaps, e.g., using partitioner)
2889  */
test_mv_mvec_verify_order(test_msgver_t * mv,int flags,struct test_mv_p * p,struct test_mv_mvec * mvec,struct test_mv_vs * vs)2890 static int test_mv_mvec_verify_order (test_msgver_t *mv, int flags,
2891 				      struct test_mv_p *p,
2892 				      struct test_mv_mvec *mvec,
2893 				      struct test_mv_vs *vs) {
2894 	int mi;
2895 	int fails = 0;
2896 
2897 	for (mi = 1/*skip first*/ ; mi < mvec->cnt ; mi++) {
2898 		struct test_mv_m *prev = test_mv_mvec_get(mvec, mi-1);
2899 		struct test_mv_m *this = test_mv_mvec_get(mvec, mi);
2900 
2901 		if (((flags & TEST_MSGVER_BY_OFFSET) &&
2902 		     prev->offset + 1 != this->offset) ||
2903 		    ((flags & TEST_MSGVER_BY_MSGID) &&
2904 		     prev->msgid > this->msgid)) {
2905 			TEST_MV_WARN(
2906 				mv,
2907 				" %s [%"PRId32"] msg rcvidx #%d/%d: "
2908 				"out of order (prev vs this): "
2909 				"offset %"PRId64" vs %"PRId64", "
2910 				"msgid %d vs %d\n",
2911 				p ? p->topic : "*",
2912 				p ? p->partition : -1,
2913 				mi, mvec->cnt,
2914 				prev->offset, this->offset,
2915 				prev->msgid, this->msgid);
2916 			fails++;
2917 		}
2918 	}
2919 
2920 	return fails;
2921 }
2922 
2923 
2924 /**
2925  * @brief Verify that messages correspond to 'correct' msgver.
2926  */
test_mv_mvec_verify_corr(test_msgver_t * mv,int flags,struct test_mv_p * p,struct test_mv_mvec * mvec,struct test_mv_vs * vs)2927 static int test_mv_mvec_verify_corr (test_msgver_t *mv, int flags,
2928                                       struct test_mv_p *p,
2929                                       struct test_mv_mvec *mvec,
2930                                       struct test_mv_vs *vs) {
2931         int mi;
2932         int fails = 0;
2933         struct test_mv_p *corr_p = NULL;
2934         struct test_mv_mvec *corr_mvec;
2935         int verifycnt = 0;
2936 
2937         TEST_ASSERT(vs->corr);
2938 
2939         /* Get correct mvec for comparison. */
2940         if (p)
2941                 corr_p = test_msgver_p_get(vs->corr, p->topic, p->partition, 0);
2942         if (!corr_p) {
2943                 TEST_MV_WARN(mv,
2944                              " %s [%"PRId32"]: "
2945                              "no corresponding correct partition found\n",
2946                              p ? p->topic : "*",
2947                              p ? p->partition : -1);
2948                 return 1;
2949         }
2950 
2951         corr_mvec = &corr_p->mvec;
2952 
2953         for (mi = 0 ; mi < mvec->cnt ; mi++) {
2954                 struct test_mv_m *this = test_mv_mvec_get(mvec, mi);
2955                 const struct test_mv_m *corr;
2956 
2957 
2958                 if (flags & TEST_MSGVER_SUBSET)
2959                         corr = test_mv_mvec_find_by_msgid(corr_mvec,
2960                                                           this->msgid);
2961                 else
2962                         corr = test_mv_mvec_get(corr_mvec, mi);
2963 
2964                 if (0)
2965                         TEST_MV_WARN(mv,
2966                                      "msg #%d: msgid %d, offset %"PRId64"\n",
2967                                      mi, this->msgid, this->offset);
2968                 if (!corr) {
2969                         if (!(flags & TEST_MSGVER_SUBSET)) {
2970                                 TEST_MV_WARN(
2971                                         mv,
2972                                         " %s [%"PRId32"] msg rcvidx #%d/%d: "
2973                                         "out of range: correct mvec has "
2974                                         "%d messages: "
2975                                         "message offset %"PRId64", msgid %d\n",
2976                                         p ? p->topic : "*",
2977                                         p ? p->partition : -1,
2978                                         mi, mvec->cnt, corr_mvec->cnt,
2979                                         this->offset, this->msgid);
2980                                 fails++;
2981                         }
2982                         continue;
2983                 }
2984 
2985                 if (((flags & TEST_MSGVER_BY_OFFSET) &&
2986                      this->offset != corr->offset) ||
2987                     ((flags & TEST_MSGVER_BY_MSGID) &&
2988                      this->msgid != corr->msgid) ||
2989                     ((flags & TEST_MSGVER_BY_TIMESTAMP) &&
2990                      this->timestamp != corr->timestamp)) {
2991                         TEST_MV_WARN(
2992                                 mv,
2993                                 " %s [%"PRId32"] msg rcvidx #%d/%d: "
2994                                 "did not match correct msg: "
2995                                 "offset %"PRId64" vs %"PRId64", "
2996                                 "msgid %d vs %d, "
2997                                 "timestamp %"PRId64" vs %"PRId64" (fl 0x%x)\n",
2998                                 p ? p->topic : "*",
2999                                 p ? p->partition : -1,
3000                                 mi, mvec->cnt,
3001                                 this->offset, corr->offset,
3002                                 this->msgid, corr->msgid,
3003                                 this->timestamp, corr->timestamp,
3004                                 flags);
3005                         fails++;
3006                 } else {
3007                         verifycnt++;
3008                 }
3009         }
3010 
3011         if (verifycnt != corr_mvec->cnt &&
3012             !(flags & TEST_MSGVER_SUBSET)) {
3013                 TEST_MV_WARN(
3014                         mv,
3015                         " %s [%"PRId32"]: of %d input messages, "
3016                         "only %d/%d matched correct messages\n",
3017                         p ? p->topic : "*",
3018                         p ? p->partition : -1,
3019                         mvec->cnt, verifycnt, corr_mvec->cnt);
3020                 fails++;
3021         }
3022 
3023         return fails;
3024 }
3025 
3026 
3027 
test_mv_m_cmp_offset(const void * _a,const void * _b)3028 static int test_mv_m_cmp_offset (const void *_a, const void *_b) {
3029 	const struct test_mv_m *a = _a, *b = _b;
3030 
3031         return RD_CMP(a->offset, b->offset);
3032 }
3033 
test_mv_m_cmp_msgid(const void * _a,const void * _b)3034 static int test_mv_m_cmp_msgid (const void *_a, const void *_b) {
3035 	const struct test_mv_m *a = _a, *b = _b;
3036 
3037         return RD_CMP(a->msgid, b->msgid);
3038 }
3039 
3040 
3041 /**
3042  * Verify that there are no duplicate message.
3043  *
3044  * - Offsets are checked
3045  * - msgids are checked
3046  *
3047  * * NOTE: This sorts the message (.m) array, first by offset, then by msgid
3048  *         and leaves the message array sorted (by msgid)
3049  */
test_mv_mvec_verify_dup(test_msgver_t * mv,int flags,struct test_mv_p * p,struct test_mv_mvec * mvec,struct test_mv_vs * vs)3050 static int test_mv_mvec_verify_dup (test_msgver_t *mv, int flags,
3051 				    struct test_mv_p *p,
3052 				    struct test_mv_mvec *mvec,
3053 				    struct test_mv_vs *vs) {
3054 	int mi;
3055 	int fails = 0;
3056 	enum {
3057 		_P_OFFSET,
3058 		_P_MSGID
3059 	} pass;
3060 
3061 	for (pass = _P_OFFSET ; pass <= _P_MSGID ; pass++) {
3062 
3063 		if (pass == _P_OFFSET) {
3064 			if (!(flags & TEST_MSGVER_BY_OFFSET))
3065 				continue;
3066 			test_mv_mvec_sort(mvec, test_mv_m_cmp_offset);
3067 		} else if (pass == _P_MSGID) {
3068 			if (!(flags & TEST_MSGVER_BY_MSGID))
3069 				continue;
3070 			test_mv_mvec_sort(mvec, test_mv_m_cmp_msgid);
3071 		}
3072 
3073 		for (mi = 1/*skip first*/ ; mi < mvec->cnt ; mi++) {
3074 			struct test_mv_m *prev = test_mv_mvec_get(mvec, mi-1);
3075 			struct test_mv_m *this = test_mv_mvec_get(mvec, mi);
3076 			int is_dup = 0;
3077 
3078 			if (pass == _P_OFFSET)
3079 				is_dup = prev->offset == this->offset;
3080 			else if (pass == _P_MSGID)
3081 				is_dup = prev->msgid == this->msgid;
3082 
3083 			if (!is_dup)
3084 				continue;
3085 
3086 			TEST_MV_WARN(mv,
3087 				     " %s [%"PRId32"] "
3088 				     "duplicate msg (prev vs this): "
3089 				     "offset %"PRId64" vs %"PRId64", "
3090 				     "msgid %d vs %d\n",
3091 				     p ? p->topic : "*",
3092 				     p ? p->partition : -1,
3093 				     prev->offset, this->offset,
3094 				     prev->msgid,  this->msgid);
3095 			fails++;
3096 		}
3097 	}
3098 
3099 	return fails;
3100 }
3101 
3102 
3103 
3104 /**
3105  * Verify that \p mvec contains the expected range:
3106  *  - TEST_MSGVER_BY_MSGID: msgid within \p vs->msgid_min .. \p vs->msgid_max
3107  *  - TEST_MSGVER_BY_TIMESTAMP: timestamp with \p vs->timestamp_min .. _max
3108  *
3109  * * NOTE: TEST_MSGVER_BY_MSGID is required
3110  *
3111  * * NOTE: This sorts the message (.m) array by msgid
3112  *         and leaves the message array sorted (by msgid)
3113  */
test_mv_mvec_verify_range(test_msgver_t * mv,int flags,struct test_mv_p * p,struct test_mv_mvec * mvec,struct test_mv_vs * vs)3114 static int test_mv_mvec_verify_range (test_msgver_t *mv, int flags,
3115                                       struct test_mv_p *p,
3116                                       struct test_mv_mvec *mvec,
3117                                       struct test_mv_vs *vs) {
3118         int mi;
3119         int fails = 0;
3120         int cnt = 0;
3121         int exp_cnt = vs->msgid_max - vs->msgid_min + 1;
3122         int skip_cnt = 0;
3123 
3124         if (!(flags & TEST_MSGVER_BY_MSGID))
3125                 return 0;
3126 
3127         test_mv_mvec_sort(mvec, test_mv_m_cmp_msgid);
3128 
3129         //test_mv_mvec_dump(stdout, mvec);
3130 
3131         for (mi = 0 ; mi < mvec->cnt ; mi++) {
3132                 struct test_mv_m *prev = mi ? test_mv_mvec_get(mvec, mi-1):NULL;
3133                 struct test_mv_m *this = test_mv_mvec_get(mvec, mi);
3134 
3135                 if (this->msgid < vs->msgid_min) {
3136                         skip_cnt++;
3137                         continue;
3138                 } else if (this->msgid > vs->msgid_max)
3139                         break;
3140 
3141                 if (flags & TEST_MSGVER_BY_TIMESTAMP) {
3142                         if (this->timestamp < vs->timestamp_min ||
3143                             this->timestamp > vs->timestamp_max) {
3144                                 TEST_MV_WARN(
3145                                         mv,
3146                                         " %s [%"PRId32"] range check: "
3147                                         "msgid #%d (at mi %d): "
3148                                         "timestamp %"PRId64" outside "
3149                                         "expected range %"PRId64"..%"PRId64"\n",
3150                                         p ? p->topic : "*",
3151                                         p ? p->partition : -1,
3152                                         this->msgid, mi,
3153                                         this->timestamp,
3154                                         vs->timestamp_min, vs->timestamp_max);
3155                                 fails++;
3156                         }
3157                 }
3158 
3159                 if (cnt++ == 0) {
3160                         if (this->msgid != vs->msgid_min) {
3161                                 TEST_MV_WARN(mv,
3162                                              " %s [%"PRId32"] range check: "
3163                                              "first message #%d (at mi %d) "
3164                                              "is not first in "
3165                                              "expected range %d..%d\n",
3166                                              p ? p->topic : "*",
3167                                              p ? p->partition : -1,
3168                                              this->msgid, mi,
3169                                              vs->msgid_min, vs->msgid_max);
3170                                 fails++;
3171                         }
3172                 } else if (cnt > exp_cnt) {
3173                         TEST_MV_WARN(mv,
3174                                      " %s [%"PRId32"] range check: "
3175                                      "too many messages received (%d/%d) at "
3176                                      "msgid %d for expected range %d..%d\n",
3177                                      p ? p->topic : "*",
3178                                      p ? p->partition : -1,
3179                                      cnt, exp_cnt, this->msgid,
3180                                      vs->msgid_min, vs->msgid_max);
3181                         fails++;
3182                 }
3183 
3184                 if (!prev) {
3185                         skip_cnt++;
3186                         continue;
3187                 }
3188 
3189                 if (prev->msgid + 1 != this->msgid) {
3190                         TEST_MV_WARN(mv, " %s [%"PRId32"] range check: "
3191                                      " %d message(s) missing between "
3192                                      "msgid %d..%d in expected range %d..%d\n",
3193                                      p ? p->topic : "*",
3194                                      p ? p->partition : -1,
3195                                      this->msgid - prev->msgid - 1,
3196                                      prev->msgid+1, this->msgid-1,
3197                                      vs->msgid_min, vs->msgid_max);
3198                         fails++;
3199                 }
3200         }
3201 
3202         if (cnt != exp_cnt) {
3203                 TEST_MV_WARN(mv,
3204                              " %s [%"PRId32"] range check: "
3205                              " wrong number of messages seen, wanted %d got %d "
3206                              "in expected range %d..%d (%d messages skipped)\n",
3207                              p ? p->topic : "*",
3208                              p ? p->partition : -1,
3209                              exp_cnt, cnt, vs->msgid_min, vs->msgid_max,
3210                              skip_cnt);
3211                 fails++;
3212         }
3213 
3214         return fails;
3215 }
3216 
3217 
3218 
3219 /**
3220  * Run verifier \p f for all partitions.
3221  */
3222 #define test_mv_p_verify_f(mv,flags,f,vs)	\
3223 	test_mv_p_verify_f0(mv,flags,f, # f, vs)
test_mv_p_verify_f0(test_msgver_t * mv,int flags,int (* f)(test_msgver_t * mv,int flags,struct test_mv_p * p,struct test_mv_mvec * mvec,struct test_mv_vs * vs),const char * f_name,struct test_mv_vs * vs)3224 static int test_mv_p_verify_f0 (test_msgver_t *mv, int flags,
3225 				int (*f) (test_msgver_t *mv,
3226 					  int flags,
3227 					  struct test_mv_p *p,
3228 					  struct test_mv_mvec *mvec,
3229 					  struct test_mv_vs *vs),
3230 				const char *f_name,
3231 				struct test_mv_vs *vs) {
3232 	int i;
3233 	int fails = 0;
3234 
3235 	for (i = 0 ; i < mv->p_cnt ; i++) {
3236 		TEST_SAY("Verifying %s [%"PRId32"] %d msgs with %s\n",
3237 			 mv->p[i]->topic, mv->p[i]->partition,
3238 			 mv->p[i]->mvec.cnt, f_name);
3239 		fails += f(mv, flags, mv->p[i], &mv->p[i]->mvec, vs);
3240 	}
3241 
3242 	return fails;
3243 }
3244 
3245 
3246 /**
3247  * Collect all messages from all topics and partitions into vs->mvec
3248  */
test_mv_collect_all_msgs(test_msgver_t * mv,struct test_mv_vs * vs)3249 static void test_mv_collect_all_msgs (test_msgver_t *mv,
3250 				      struct test_mv_vs *vs) {
3251 	int i;
3252 
3253 	for (i = 0 ; i < mv->p_cnt ; i++) {
3254 		struct test_mv_p *p = mv->p[i];
3255 		int mi;
3256 
3257 		test_mv_mvec_reserve(&vs->mvec, p->mvec.cnt);
3258 		for (mi = 0 ; mi < p->mvec.cnt ; mi++) {
3259 			struct test_mv_m *m = test_mv_mvec_get(&p->mvec, mi);
3260 			struct test_mv_m *m_new = test_mv_mvec_add(&vs->mvec);
3261 			*m_new = *m;
3262 		}
3263 	}
3264 }
3265 
3266 
3267 /**
3268  * Verify that all messages (by msgid) in range msg_base+exp_cnt were received
3269  * and received only once.
3270  * This works across all partitions.
3271  */
test_msgver_verify_range(test_msgver_t * mv,int flags,struct test_mv_vs * vs)3272 static int test_msgver_verify_range (test_msgver_t *mv, int flags,
3273 				     struct test_mv_vs *vs) {
3274 	int fails = 0;
3275 
3276 	/**
3277 	 * Create temporary array to hold expected message set,
3278 	 * then traverse all topics and partitions and move matching messages
3279 	 * to that set. Then verify the message set.
3280 	 */
3281 
3282 	test_mv_mvec_init(&vs->mvec, vs->exp_cnt);
3283 
3284 	/* Collect all msgs into vs mvec */
3285 	test_mv_collect_all_msgs(mv, vs);
3286 
3287 	fails += test_mv_mvec_verify_range(mv, TEST_MSGVER_BY_MSGID|flags,
3288 					   NULL, &vs->mvec, vs);
3289 	fails += test_mv_mvec_verify_dup(mv, TEST_MSGVER_BY_MSGID|flags,
3290 					 NULL, &vs->mvec, vs);
3291 
3292 	test_mv_mvec_clear(&vs->mvec);
3293 
3294 	return fails;
3295 }
3296 
3297 
3298 /**
3299  * Verify that \p exp_cnt messages were received for \p topic and \p partition
3300  * starting at msgid base \p msg_base.
3301  */
test_msgver_verify_part0(const char * func,int line,const char * what,test_msgver_t * mv,int flags,const char * topic,int partition,int msg_base,int exp_cnt)3302 int test_msgver_verify_part0 (const char *func, int line, const char *what,
3303 			      test_msgver_t *mv, int flags,
3304 			      const char *topic, int partition,
3305 			      int msg_base, int exp_cnt) {
3306 	int fails = 0;
3307 	struct test_mv_vs vs = { .msg_base = msg_base, .exp_cnt = exp_cnt };
3308 	struct test_mv_p *p;
3309 
3310 	TEST_SAY("%s:%d: %s: Verifying %d received messages (flags 0x%x) "
3311 		 "in %s [%d]: expecting msgids %d..%d (%d)\n",
3312 		 func, line, what, mv->msgcnt, flags, topic, partition,
3313 		 msg_base, msg_base+exp_cnt, exp_cnt);
3314 
3315 	p = test_msgver_p_get(mv, topic, partition, 0);
3316 
3317 	/* Per-partition checks */
3318 	if (flags & TEST_MSGVER_ORDER)
3319 		fails += test_mv_mvec_verify_order(mv, flags, p, &p->mvec, &vs);
3320 	if (flags & TEST_MSGVER_DUP)
3321 		fails += test_mv_mvec_verify_dup(mv, flags, p, &p->mvec, &vs);
3322 
3323 	if (mv->msgcnt < vs.exp_cnt) {
3324 		TEST_MV_WARN(mv,
3325 			     "%s:%d: "
3326 			     "%s [%"PRId32"] expected %d messages but only "
3327 			     "%d received\n",
3328 			     func, line,
3329 			     p ? p->topic : "*",
3330 			     p ? p->partition : -1,
3331 			     vs.exp_cnt, mv->msgcnt);
3332 		fails++;
3333 	}
3334 
3335 
3336 	if (mv->log_suppr_cnt > 0)
3337 		TEST_WARN("%s:%d: %s: %d message warning logs suppressed\n",
3338 			  func, line, what, mv->log_suppr_cnt);
3339 
3340 	if (fails)
3341 		TEST_FAIL("%s:%d: %s: Verification of %d received messages "
3342 			  "failed: "
3343 			  "expected msgids %d..%d (%d): see previous errors\n",
3344 			  func, line, what,
3345 			  mv->msgcnt, msg_base, msg_base+exp_cnt, exp_cnt);
3346 	else
3347 		TEST_SAY("%s:%d: %s: Verification of %d received messages "
3348 			 "succeeded: "
3349 			 "expected msgids %d..%d (%d)\n",
3350 			 func, line, what,
3351 			 mv->msgcnt, msg_base, msg_base+exp_cnt, exp_cnt);
3352 
3353 	return fails;
3354 
3355 }
3356 
3357 /**
3358  * Verify that \p exp_cnt messages were received starting at
3359  * msgid base \p msg_base.
3360  */
test_msgver_verify0(const char * func,int line,const char * what,test_msgver_t * mv,int flags,struct test_mv_vs vs)3361 int test_msgver_verify0 (const char *func, int line, const char *what,
3362 			 test_msgver_t *mv,
3363 			 int flags, struct test_mv_vs vs) {
3364 	int fails = 0;
3365 
3366 	TEST_SAY("%s:%d: %s: Verifying %d received messages (flags 0x%x): "
3367 		 "expecting msgids %d..%d (%d)\n",
3368 		 func, line, what, mv->msgcnt, flags,
3369 		 vs.msg_base, vs.msg_base+vs.exp_cnt, vs.exp_cnt);
3370         if (flags & TEST_MSGVER_BY_TIMESTAMP) {
3371                 assert((flags & TEST_MSGVER_BY_MSGID)); /* Required */
3372                 TEST_SAY("%s:%d: %s: "
3373                          " and expecting timestamps %"PRId64"..%"PRId64"\n",
3374                          func, line, what,
3375                          vs.timestamp_min, vs.timestamp_max);
3376         }
3377 
3378 	/* Per-partition checks */
3379 	if (flags & TEST_MSGVER_ORDER)
3380 		fails += test_mv_p_verify_f(mv, flags,
3381 					    test_mv_mvec_verify_order, &vs);
3382 	if (flags & TEST_MSGVER_DUP)
3383 		fails += test_mv_p_verify_f(mv, flags,
3384 					    test_mv_mvec_verify_dup, &vs);
3385 
3386 	/* Checks across all partitions */
3387 	if ((flags & TEST_MSGVER_RANGE) && vs.exp_cnt > 0) {
3388 		vs.msgid_min = vs.msg_base;
3389 		vs.msgid_max = vs.msgid_min + vs.exp_cnt - 1;
3390 		fails += test_msgver_verify_range(mv, flags, &vs);
3391 	}
3392 
3393 	if (mv->log_suppr_cnt > 0)
3394 		TEST_WARN("%s:%d: %s: %d message warning logs suppressed\n",
3395 			  func, line, what, mv->log_suppr_cnt);
3396 
3397 	if (vs.exp_cnt != mv->msgcnt) {
3398                 if (!(flags & TEST_MSGVER_SUBSET)) {
3399                         TEST_WARN("%s:%d: %s: expected %d messages, got %d\n",
3400                                   func, line, what, vs.exp_cnt, mv->msgcnt);
3401                         fails++;
3402                 }
3403 	}
3404 
3405 	if (fails)
3406 		TEST_FAIL("%s:%d: %s: Verification of %d received messages "
3407 			  "failed: "
3408 			  "expected msgids %d..%d (%d): see previous errors\n",
3409 			  func, line, what,
3410 			  mv->msgcnt, vs.msg_base, vs.msg_base+vs.exp_cnt,
3411                           vs.exp_cnt);
3412 	else
3413 		TEST_SAY("%s:%d: %s: Verification of %d received messages "
3414 			 "succeeded: "
3415 			 "expected msgids %d..%d (%d)\n",
3416 			 func, line, what,
3417 			 mv->msgcnt, vs.msg_base, vs.msg_base+vs.exp_cnt,
3418                          vs.exp_cnt);
3419 
3420 	return fails;
3421 }
3422 
3423 
3424 
3425 
test_verify_rkmessage0(const char * func,int line,rd_kafka_message_t * rkmessage,uint64_t testid,int32_t partition,int msgnum)3426 void test_verify_rkmessage0 (const char *func, int line,
3427                              rd_kafka_message_t *rkmessage, uint64_t testid,
3428                              int32_t partition, int msgnum) {
3429 	uint64_t in_testid;
3430 	int in_part;
3431 	int in_msgnum;
3432 	char buf[128];
3433 
3434 	rd_snprintf(buf, sizeof(buf), "%.*s",
3435 		 (int)rkmessage->len, (char *)rkmessage->payload);
3436 
3437 	if (sscanf(buf, "testid=%"SCNu64", partition=%i, msg=%i\n",
3438 		   &in_testid, &in_part, &in_msgnum) != 3)
3439 		TEST_FAIL("Incorrect format: %s", buf);
3440 
3441 	if (testid != in_testid ||
3442 	    (partition != -1 && partition != in_part) ||
3443 	    (msgnum != -1 && msgnum != in_msgnum) ||
3444 	    in_msgnum < 0)
3445 		goto fail_match;
3446 
3447 	if (test_level > 2) {
3448 		TEST_SAY("%s:%i: Our testid %"PRIu64", part %i (%i), msg %i\n",
3449 			 func, line,
3450 			 testid, (int)partition, (int)rkmessage->partition,
3451 			 msgnum);
3452 	}
3453 
3454 
3455         return;
3456 
3457 fail_match:
3458 	TEST_FAIL("%s:%i: Our testid %"PRIu64", part %i, msg %i did "
3459 		  "not match message: \"%s\"\n",
3460 		  func, line,
3461 		  testid, (int)partition, msgnum, buf);
3462 }
3463 
3464 
3465 /**
3466  * @brief Verify that \p mv is identical to \p corr according to flags.
3467  */
test_msgver_verify_compare0(const char * func,int line,const char * what,test_msgver_t * mv,test_msgver_t * corr,int flags)3468 void test_msgver_verify_compare0 (const char *func, int line,
3469                                   const char *what, test_msgver_t *mv,
3470                                   test_msgver_t *corr, int flags) {
3471         struct test_mv_vs vs;
3472         int fails = 0;
3473 
3474         memset(&vs, 0, sizeof(vs));
3475 
3476         TEST_SAY("%s:%d: %s: Verifying %d received messages (flags 0x%x) by "
3477                  "comparison to correct msgver (%d messages)\n",
3478                  func, line, what, mv->msgcnt, flags, corr->msgcnt);
3479 
3480         vs.corr = corr;
3481 
3482         /* Per-partition checks */
3483         fails += test_mv_p_verify_f(mv, flags,
3484                                     test_mv_mvec_verify_corr, &vs);
3485 
3486         if (mv->log_suppr_cnt > 0)
3487                 TEST_WARN("%s:%d: %s: %d message warning logs suppressed\n",
3488                           func, line, what, mv->log_suppr_cnt);
3489 
3490         if (corr->msgcnt != mv->msgcnt) {
3491                 if (!(flags & TEST_MSGVER_SUBSET)) {
3492                         TEST_WARN("%s:%d: %s: expected %d messages, got %d\n",
3493                                   func, line, what, corr->msgcnt, mv->msgcnt);
3494                         fails++;
3495                 }
3496         }
3497 
3498         if (fails)
3499                 TEST_FAIL("%s:%d: %s: Verification of %d received messages "
3500                           "failed: expected %d messages: see previous errors\n",
3501                           func, line, what,
3502                           mv->msgcnt, corr->msgcnt);
3503         else
3504                 TEST_SAY("%s:%d: %s: Verification of %d received messages "
3505                          "succeeded: matching %d messages from correct msgver\n",
3506                          func, line, what,
3507                          mv->msgcnt, corr->msgcnt);
3508 
3509 }
3510 
3511 
3512 /**
3513  * Consumer poll but dont expect any proper messages for \p timeout_ms.
3514  */
test_consumer_poll_no_msgs(const char * what,rd_kafka_t * rk,uint64_t testid,int timeout_ms)3515 void test_consumer_poll_no_msgs (const char *what, rd_kafka_t *rk,
3516 				 uint64_t testid, int timeout_ms) {
3517 	int64_t tmout = test_clock() + timeout_ms * 1000;
3518         int cnt = 0;
3519         test_timing_t t_cons;
3520 	test_msgver_t mv;
3521 
3522 	test_msgver_init(&mv, testid);
3523 
3524         TEST_SAY("%s: not expecting any messages for %dms\n",
3525 		 what, timeout_ms);
3526 
3527         TIMING_START(&t_cons, "CONSUME");
3528 
3529 	do {
3530                 rd_kafka_message_t *rkmessage;
3531 
3532                 rkmessage = rd_kafka_consumer_poll(rk, timeout_ms);
3533                 if (!rkmessage)
3534 			continue;
3535 
3536                 if (rkmessage->err == RD_KAFKA_RESP_ERR__PARTITION_EOF) {
3537                         TEST_SAY("%s [%"PRId32"] reached EOF at "
3538                                  "offset %"PRId64"\n",
3539                                  rd_kafka_topic_name(rkmessage->rkt),
3540                                  rkmessage->partition,
3541                                  rkmessage->offset);
3542                         test_msgver_add_msg(rk, &mv, rkmessage);
3543 
3544                 } else if (rkmessage->err) {
3545                         TEST_FAIL("%s [%"PRId32"] error (offset %"PRId64"): %s",
3546                                  rkmessage->rkt ?
3547                                  rd_kafka_topic_name(rkmessage->rkt) :
3548                                  "(no-topic)",
3549                                  rkmessage->partition,
3550                                  rkmessage->offset,
3551                                  rd_kafka_message_errstr(rkmessage));
3552 
3553                 } else {
3554                         if (test_msgver_add_msg(rk, &mv, rkmessage)) {
3555 				TEST_MV_WARN(&mv,
3556 					     "Received unexpected message on "
3557 					     "%s [%"PRId32"] at offset "
3558 					     "%"PRId64"\n",
3559 					     rd_kafka_topic_name(rkmessage->
3560 								 rkt),
3561 					     rkmessage->partition,
3562 					     rkmessage->offset);
3563 				cnt++;
3564 			}
3565                 }
3566 
3567                 rd_kafka_message_destroy(rkmessage);
3568         } while (test_clock() <= tmout);
3569 
3570         TIMING_STOP(&t_cons);
3571 
3572 	test_msgver_verify(what, &mv, TEST_MSGVER_ALL, 0, 0);
3573 	test_msgver_clear(&mv);
3574 
3575 	TEST_ASSERT(cnt == 0, "Expected 0 messages, got %d", cnt);
3576 }
3577 
3578 /**
3579  * @brief Consumer poll with expectation that a \p err will be reached
3580  * within \p timeout_ms.
3581  */
test_consumer_poll_expect_err(rd_kafka_t * rk,uint64_t testid,int timeout_ms,rd_kafka_resp_err_t err)3582 void test_consumer_poll_expect_err (rd_kafka_t *rk, uint64_t testid,
3583                                     int timeout_ms, rd_kafka_resp_err_t err) {
3584         int64_t tmout = test_clock() + timeout_ms * 1000;
3585 
3586         TEST_SAY("%s: expecting error %s within %dms\n",
3587                  rd_kafka_name(rk), rd_kafka_err2name(err), timeout_ms);
3588 
3589         do {
3590                 rd_kafka_message_t *rkmessage;
3591                 rkmessage = rd_kafka_consumer_poll(rk, timeout_ms);
3592                 if (!rkmessage)
3593                         continue;
3594 
3595                 if (rkmessage->err == err) {
3596                         TEST_SAY("Got expected error: %s: %s\n",
3597                                  rd_kafka_err2name(rkmessage->err),
3598                                  rd_kafka_message_errstr(rkmessage));
3599                         rd_kafka_message_destroy(rkmessage);
3600 
3601                         return;
3602                 } else if (rkmessage->err) {
3603                         TEST_FAIL("%s [%"PRId32"] unexpected error "
3604                                  "(offset %"PRId64"): %s",
3605                                  rkmessage->rkt ?
3606                                  rd_kafka_topic_name(rkmessage->rkt) :
3607                                  "(no-topic)",
3608                                  rkmessage->partition,
3609                                  rkmessage->offset,
3610                                  rd_kafka_err2name(rkmessage->err));
3611                 }
3612 
3613                 rd_kafka_message_destroy(rkmessage);
3614         } while (test_clock() <= tmout);
3615         TEST_FAIL("Expected error %s not seen in %dms",
3616                   rd_kafka_err2name(err), timeout_ms);
3617 }
3618 
3619 /**
3620  * Call consumer poll once and then return.
3621  * Messages are handled.
3622  *
3623  * \p mv is optional
3624  *
3625  * @returns 0 on timeout, 1 if a message was received or .._PARTITION_EOF
3626  *          if EOF was reached.
3627  *          TEST_FAIL()s on all errors.
3628  */
test_consumer_poll_once(rd_kafka_t * rk,test_msgver_t * mv,int timeout_ms)3629 int test_consumer_poll_once (rd_kafka_t *rk, test_msgver_t *mv, int timeout_ms){
3630 	rd_kafka_message_t *rkmessage;
3631 
3632 	rkmessage = rd_kafka_consumer_poll(rk, timeout_ms);
3633 	if (!rkmessage)
3634 		return 0;
3635 
3636 	if (rkmessage->err == RD_KAFKA_RESP_ERR__PARTITION_EOF) {
3637 		TEST_SAY("%s [%"PRId32"] reached EOF at "
3638 			 "offset %"PRId64"\n",
3639 			 rd_kafka_topic_name(rkmessage->rkt),
3640 			 rkmessage->partition,
3641 			 rkmessage->offset);
3642 		if (mv)
3643 			test_msgver_add_msg(rk, mv, rkmessage);
3644 		rd_kafka_message_destroy(rkmessage);
3645 		return RD_KAFKA_RESP_ERR__PARTITION_EOF;
3646 
3647 	} else if (rkmessage->err) {
3648 		TEST_FAIL("%s [%"PRId32"] error (offset %"PRId64"): %s",
3649 			  rkmessage->rkt ?
3650 			  rd_kafka_topic_name(rkmessage->rkt) :
3651 			  "(no-topic)",
3652 			  rkmessage->partition,
3653 			  rkmessage->offset,
3654 			  rd_kafka_message_errstr(rkmessage));
3655 
3656 	} else {
3657 		if (mv)
3658 			test_msgver_add_msg(rk, mv, rkmessage);
3659 	}
3660 
3661 	rd_kafka_message_destroy(rkmessage);
3662 	return 1;
3663 }
3664 
3665 
test_consumer_poll(const char * what,rd_kafka_t * rk,uint64_t testid,int exp_eof_cnt,int exp_msg_base,int exp_cnt,test_msgver_t * mv)3666 int test_consumer_poll (const char *what, rd_kafka_t *rk, uint64_t testid,
3667                         int exp_eof_cnt, int exp_msg_base, int exp_cnt,
3668 			test_msgver_t *mv) {
3669         int eof_cnt = 0;
3670         int cnt = 0;
3671         test_timing_t t_cons;
3672 
3673         TEST_SAY("%s: consume %d messages\n", what, exp_cnt);
3674 
3675         TIMING_START(&t_cons, "CONSUME");
3676 
3677         while ((exp_eof_cnt <= 0 || eof_cnt < exp_eof_cnt) &&
3678                (exp_cnt <= 0 || cnt < exp_cnt)) {
3679                 rd_kafka_message_t *rkmessage;
3680 
3681                 rkmessage = rd_kafka_consumer_poll(rk, tmout_multip(10*1000));
3682                 if (!rkmessage) /* Shouldn't take this long to get a msg */
3683                         TEST_FAIL("%s: consumer_poll() timeout "
3684 				  "(%d/%d eof, %d/%d msgs)\n", what,
3685 				  eof_cnt, exp_eof_cnt, cnt, exp_cnt);
3686 
3687 
3688                 if (rkmessage->err == RD_KAFKA_RESP_ERR__PARTITION_EOF) {
3689                         TEST_SAY("%s [%"PRId32"] reached EOF at "
3690                                  "offset %"PRId64"\n",
3691                                  rd_kafka_topic_name(rkmessage->rkt),
3692                                  rkmessage->partition,
3693                                  rkmessage->offset);
3694                         TEST_ASSERT(exp_eof_cnt != 0, "expected no EOFs");
3695 			if (mv)
3696 				test_msgver_add_msg(rk, mv, rkmessage);
3697                         eof_cnt++;
3698 
3699                 } else if (rkmessage->err) {
3700                         TEST_FAIL("%s [%"PRId32"] error (offset %"PRId64
3701 				  "): %s",
3702                                  rkmessage->rkt ?
3703                                  rd_kafka_topic_name(rkmessage->rkt) :
3704                                  "(no-topic)",
3705                                  rkmessage->partition,
3706                                  rkmessage->offset,
3707                                  rd_kafka_message_errstr(rkmessage));
3708 
3709                 } else {
3710 			if (!mv || test_msgver_add_msg(rk, mv, rkmessage))
3711 				cnt++;
3712                 }
3713 
3714                 rd_kafka_message_destroy(rkmessage);
3715         }
3716 
3717         TIMING_STOP(&t_cons);
3718 
3719         TEST_SAY("%s: consumed %d/%d messages (%d/%d EOFs)\n",
3720                  what, cnt, exp_cnt, eof_cnt, exp_eof_cnt);
3721 
3722         if (exp_cnt == 0)
3723                 TEST_ASSERT(cnt == 0 && eof_cnt == exp_eof_cnt,
3724                             "%s: expected no messages and %d EOFs: "
3725                             "got %d messages and %d EOFs",
3726                             what, exp_eof_cnt, cnt, eof_cnt);
3727         return cnt;
3728 }
3729 
test_consumer_close(rd_kafka_t * rk)3730 void test_consumer_close (rd_kafka_t *rk) {
3731         rd_kafka_resp_err_t err;
3732         test_timing_t timing;
3733 
3734         TEST_SAY("Closing consumer\n");
3735 
3736         TIMING_START(&timing, "CONSUMER.CLOSE");
3737         err = rd_kafka_consumer_close(rk);
3738         TIMING_STOP(&timing);
3739         if (err)
3740                 TEST_FAIL("Failed to close consumer: %s\n",
3741                           rd_kafka_err2str(err));
3742 }
3743 
3744 
test_flush(rd_kafka_t * rk,int timeout_ms)3745 void test_flush (rd_kafka_t *rk, int timeout_ms) {
3746 	test_timing_t timing;
3747 	rd_kafka_resp_err_t err;
3748 
3749 	TEST_SAY("%s: Flushing %d messages\n",
3750 		 rd_kafka_name(rk), rd_kafka_outq_len(rk));
3751 	TIMING_START(&timing, "FLUSH");
3752 	err = rd_kafka_flush(rk, timeout_ms);
3753 	TIMING_STOP(&timing);
3754 	if (err)
3755 		TEST_FAIL("Failed to flush(%s, %d): %s: len() = %d\n",
3756 			  rd_kafka_name(rk), timeout_ms,
3757 			  rd_kafka_err2str(err),
3758                           rd_kafka_outq_len(rk));
3759 }
3760 
3761 
test_conf_set(rd_kafka_conf_t * conf,const char * name,const char * val)3762 void test_conf_set (rd_kafka_conf_t *conf, const char *name, const char *val) {
3763         char errstr[512];
3764         if (rd_kafka_conf_set(conf, name, val, errstr, sizeof(errstr)) !=
3765             RD_KAFKA_CONF_OK)
3766                 TEST_FAIL("Failed to set config \"%s\"=\"%s\": %s\n",
3767                           name, val, errstr);
3768 }
3769 
test_conf_get(const rd_kafka_conf_t * conf,const char * name)3770 char *test_conf_get (const rd_kafka_conf_t *conf, const char *name) {
3771 	static RD_TLS char ret[256];
3772 	size_t ret_sz = sizeof(ret);
3773 	if (rd_kafka_conf_get(conf, name, ret, &ret_sz) != RD_KAFKA_CONF_OK)
3774 		TEST_FAIL("Failed to get config \"%s\": %s\n", name,
3775 			  "unknown property");
3776 	return ret;
3777 }
3778 
3779 
3780 /**
3781  * @brief Check if property \name matches \p val in \p conf.
3782  *        If \p conf is NULL the test config will be used. */
test_conf_match(rd_kafka_conf_t * conf,const char * name,const char * val)3783 int test_conf_match (rd_kafka_conf_t *conf, const char *name, const char *val) {
3784         char *real;
3785         int free_conf = 0;
3786 
3787         if (!conf) {
3788                 test_conf_init(&conf, NULL, 0);
3789                 free_conf = 1;
3790         }
3791 
3792         real = test_conf_get(conf, name);
3793 
3794         if (free_conf)
3795                 rd_kafka_conf_destroy(conf);
3796 
3797         return !strcmp(real, val);
3798 }
3799 
3800 
test_topic_conf_set(rd_kafka_topic_conf_t * tconf,const char * name,const char * val)3801 void test_topic_conf_set (rd_kafka_topic_conf_t *tconf,
3802                           const char *name, const char *val) {
3803         char errstr[512];
3804         if (rd_kafka_topic_conf_set(tconf, name, val, errstr, sizeof(errstr)) !=
3805             RD_KAFKA_CONF_OK)
3806                 TEST_FAIL("Failed to set topic config \"%s\"=\"%s\": %s\n",
3807                           name, val, errstr);
3808 }
3809 
3810 /**
3811  * @brief First attempt to set topic level property, then global.
3812  */
test_any_conf_set(rd_kafka_conf_t * conf,rd_kafka_topic_conf_t * tconf,const char * name,const char * val)3813 void test_any_conf_set (rd_kafka_conf_t *conf,
3814                         rd_kafka_topic_conf_t *tconf,
3815                         const char *name, const char *val) {
3816         rd_kafka_conf_res_t res = RD_KAFKA_CONF_UNKNOWN;
3817         char errstr[512] = {"Missing conf_t"};
3818 
3819         if (tconf)
3820                 res = rd_kafka_topic_conf_set(tconf, name, val,
3821                                               errstr, sizeof(errstr));
3822         if (res == RD_KAFKA_CONF_UNKNOWN && conf)
3823                 res = rd_kafka_conf_set(conf, name, val,
3824                                         errstr, sizeof(errstr));
3825 
3826         if (res != RD_KAFKA_CONF_OK)
3827                 TEST_FAIL("Failed to set any config \"%s\"=\"%s\": %s\n",
3828                           name, val, errstr);
3829 }
3830 
3831 
3832 /**
3833  * @returns true if test clients need to be configured for authentication
3834  *          or other security measures (SSL), else false for unauthed plaintext.
3835  */
test_needs_auth(void)3836 int test_needs_auth (void) {
3837         rd_kafka_conf_t *conf;
3838         const char *sec;
3839 
3840         test_conf_init(&conf, NULL, 0);
3841 
3842         sec = test_conf_get(conf, "security.protocol");
3843 
3844         rd_kafka_conf_destroy(conf);
3845 
3846         return strcmp(sec, "plaintext");
3847 }
3848 
3849 
test_print_partition_list(const rd_kafka_topic_partition_list_t * partitions)3850 void test_print_partition_list (const rd_kafka_topic_partition_list_t
3851 				*partitions) {
3852         int i;
3853         for (i = 0 ; i < partitions->cnt ; i++) {
3854 		TEST_SAY(" %s [%"PRId32"] offset %"PRId64"%s%s\n",
3855 			 partitions->elems[i].topic,
3856 			 partitions->elems[i].partition,
3857 			 partitions->elems[i].offset,
3858 			 partitions->elems[i].err ? ": " : "",
3859 			 partitions->elems[i].err ?
3860 			 rd_kafka_err2str(partitions->elems[i].err) : "");
3861         }
3862 }
3863 
3864 /**
3865  * @brief Execute kafka-topics.sh from the Kafka distribution.
3866  */
test_kafka_topics(const char * fmt,...)3867 void test_kafka_topics (const char *fmt, ...) {
3868 #ifdef _MSC_VER
3869 	TEST_FAIL("%s not supported on Windows, yet", __FUNCTION__);
3870 #else
3871 	char cmd[512];
3872 	int r;
3873 	va_list ap;
3874 	test_timing_t t_cmd;
3875 	const char *kpath, *zk;
3876 
3877 	kpath = test_getenv("KAFKA_PATH", NULL);
3878 	zk = test_getenv("ZK_ADDRESS", NULL);
3879 
3880 	if (!kpath || !zk)
3881 		TEST_FAIL("%s: KAFKA_PATH and ZK_ADDRESS must be set",
3882 			  __FUNCTION__);
3883 
3884 	r = rd_snprintf(cmd, sizeof(cmd),
3885 			"%s/bin/kafka-topics.sh --zookeeper %s ", kpath, zk);
3886 	TEST_ASSERT(r < (int)sizeof(cmd));
3887 
3888 	va_start(ap, fmt);
3889 	rd_vsnprintf(cmd+r, sizeof(cmd)-r, fmt, ap);
3890 	va_end(ap);
3891 
3892 	TEST_SAY("Executing: %s\n", cmd);
3893 	TIMING_START(&t_cmd, "exec");
3894 	r = system(cmd);
3895 	TIMING_STOP(&t_cmd);
3896 
3897 	if (r == -1)
3898 		TEST_FAIL("system(\"%s\") failed: %s", cmd, strerror(errno));
3899 	else if (WIFSIGNALED(r))
3900 		TEST_FAIL("system(\"%s\") terminated by signal %d\n", cmd,
3901 			  WTERMSIG(r));
3902 	else if (WEXITSTATUS(r))
3903 		TEST_FAIL("system(\"%s\") failed with exit status %d\n",
3904 			  cmd, WEXITSTATUS(r));
3905 #endif
3906 }
3907 
3908 
3909 
3910 /**
3911  * @brief Create topic using Topic Admin API
3912  */
test_admin_create_topic(rd_kafka_t * use_rk,const char * topicname,int partition_cnt,int replication_factor)3913 static void test_admin_create_topic (rd_kafka_t *use_rk,
3914                                      const char *topicname, int partition_cnt,
3915                                      int replication_factor) {
3916         rd_kafka_t *rk;
3917         rd_kafka_NewTopic_t *newt[1];
3918         const size_t newt_cnt = 1;
3919         rd_kafka_AdminOptions_t *options;
3920         rd_kafka_queue_t *rkqu;
3921         rd_kafka_event_t *rkev;
3922         const rd_kafka_CreateTopics_result_t *res;
3923         const rd_kafka_topic_result_t **terr;
3924         int timeout_ms = tmout_multip(10000);
3925         size_t res_cnt;
3926         rd_kafka_resp_err_t err;
3927         char errstr[512];
3928         test_timing_t t_create;
3929 
3930         if (!(rk = use_rk))
3931                 rk = test_create_producer();
3932 
3933         rkqu = rd_kafka_queue_new(rk);
3934 
3935         newt[0] = rd_kafka_NewTopic_new(topicname, partition_cnt,
3936                                         replication_factor,
3937                                         errstr, sizeof(errstr));
3938         TEST_ASSERT(newt[0] != NULL, "%s", errstr);
3939 
3940         options = rd_kafka_AdminOptions_new(rk, RD_KAFKA_ADMIN_OP_CREATETOPICS);
3941         err = rd_kafka_AdminOptions_set_operation_timeout(options, timeout_ms,
3942                                                           errstr,
3943                                                           sizeof(errstr));
3944         TEST_ASSERT(!err, "%s", errstr);
3945 
3946         TEST_SAY("Creating topic \"%s\" "
3947                  "(partitions=%d, replication_factor=%d, timeout=%d)\n",
3948                  topicname, partition_cnt, replication_factor, timeout_ms);
3949 
3950         TIMING_START(&t_create, "CreateTopics");
3951         rd_kafka_CreateTopics(rk, newt, newt_cnt, options, rkqu);
3952 
3953         /* Wait for result */
3954         rkev = rd_kafka_queue_poll(rkqu, timeout_ms + 2000);
3955         TEST_ASSERT(rkev, "Timed out waiting for CreateTopics result");
3956 
3957         TIMING_STOP(&t_create);
3958 
3959         TEST_ASSERT(!rd_kafka_event_error(rkev),
3960                     "CreateTopics failed: %s",
3961                     rd_kafka_event_error_string(rkev));
3962 
3963         res = rd_kafka_event_CreateTopics_result(rkev);
3964         TEST_ASSERT(res, "Expected CreateTopics_result, not %s",
3965                     rd_kafka_event_name(rkev));
3966 
3967         terr = rd_kafka_CreateTopics_result_topics(res, &res_cnt);
3968         TEST_ASSERT(terr, "CreateTopics_result_topics returned NULL");
3969         TEST_ASSERT(res_cnt == newt_cnt,
3970                     "CreateTopics_result_topics returned %"PRIusz" topics, "
3971                     "not the expected %"PRIusz,
3972                     res_cnt, newt_cnt);
3973 
3974         TEST_ASSERT(!rd_kafka_topic_result_error(terr[0]) ||
3975                     rd_kafka_topic_result_error(terr[0]) ==
3976                     RD_KAFKA_RESP_ERR_TOPIC_ALREADY_EXISTS,
3977                     "Topic %s result error: %s",
3978                     rd_kafka_topic_result_name(terr[0]),
3979                     rd_kafka_topic_result_error_string(terr[0]));
3980 
3981         rd_kafka_event_destroy(rkev);
3982 
3983         rd_kafka_queue_destroy(rkqu);
3984 
3985         rd_kafka_AdminOptions_destroy(options);
3986 
3987         rd_kafka_NewTopic_destroy(newt[0]);
3988 
3989         if (!use_rk)
3990                 rd_kafka_destroy(rk);
3991 }
3992 
3993 
3994 
3995 
3996 /**
3997  * @brief Create topic using kafka-topics.sh --create
3998  */
test_create_topic_sh(const char * topicname,int partition_cnt,int replication_factor)3999 static void test_create_topic_sh (const char *topicname, int partition_cnt,
4000                                   int replication_factor) {
4001 	test_kafka_topics("--create --topic \"%s\" "
4002 			  "--replication-factor %d --partitions %d",
4003 			  topicname, replication_factor, partition_cnt);
4004 }
4005 
4006 
4007 /**
4008  * @brief Create topic
4009  */
test_create_topic(rd_kafka_t * use_rk,const char * topicname,int partition_cnt,int replication_factor)4010 void test_create_topic (rd_kafka_t *use_rk,
4011                         const char *topicname, int partition_cnt,
4012                         int replication_factor) {
4013         if (test_broker_version < TEST_BRKVER(0,10,2,0))
4014                 test_create_topic_sh(topicname, partition_cnt,
4015                                      replication_factor);
4016         else
4017                 test_admin_create_topic(use_rk, topicname, partition_cnt,
4018                                         replication_factor);
4019 }
4020 
4021 
4022 /**
4023  * @brief Create topic using kafka-topics.sh --delete
4024  */
test_delete_topic_sh(const char * topicname)4025 static void test_delete_topic_sh (const char *topicname) {
4026 	test_kafka_topics("--delete --topic \"%s\" ", topicname);
4027 }
4028 
4029 
4030 /**
4031  * @brief Delete topic using Topic Admin API
4032  */
test_admin_delete_topic(rd_kafka_t * use_rk,const char * topicname)4033 static void test_admin_delete_topic (rd_kafka_t *use_rk,
4034                                      const char *topicname) {
4035         rd_kafka_t *rk;
4036         rd_kafka_DeleteTopic_t *delt[1];
4037         const size_t delt_cnt = 1;
4038         rd_kafka_AdminOptions_t *options;
4039         rd_kafka_queue_t *rkqu;
4040         rd_kafka_event_t *rkev;
4041         const rd_kafka_DeleteTopics_result_t *res;
4042         const rd_kafka_topic_result_t **terr;
4043         int timeout_ms = tmout_multip(10000);
4044         size_t res_cnt;
4045         rd_kafka_resp_err_t err;
4046         char errstr[512];
4047         test_timing_t t_create;
4048 
4049         if (!(rk = use_rk))
4050                 rk = test_create_producer();
4051 
4052         rkqu = rd_kafka_queue_new(rk);
4053 
4054         delt[0] = rd_kafka_DeleteTopic_new(topicname);
4055 
4056         options = rd_kafka_AdminOptions_new(rk, RD_KAFKA_ADMIN_OP_DELETETOPICS);
4057         err = rd_kafka_AdminOptions_set_operation_timeout(options, timeout_ms,
4058                                                           errstr,
4059                                                           sizeof(errstr));
4060         TEST_ASSERT(!err, "%s", errstr);
4061 
4062         TEST_SAY("Deleting topic \"%s\" "
4063                  "(timeout=%d)\n",
4064                  topicname, timeout_ms);
4065 
4066         TIMING_START(&t_create, "DeleteTopics");
4067         rd_kafka_DeleteTopics(rk, delt, delt_cnt, options, rkqu);
4068 
4069         /* Wait for result */
4070         rkev = rd_kafka_queue_poll(rkqu, timeout_ms + 2000);
4071         TEST_ASSERT(rkev, "Timed out waiting for DeleteTopics result");
4072 
4073         TIMING_STOP(&t_create);
4074 
4075         res = rd_kafka_event_DeleteTopics_result(rkev);
4076         TEST_ASSERT(res, "Expected DeleteTopics_result, not %s",
4077                     rd_kafka_event_name(rkev));
4078 
4079         terr = rd_kafka_DeleteTopics_result_topics(res, &res_cnt);
4080         TEST_ASSERT(terr, "DeleteTopics_result_topics returned NULL");
4081         TEST_ASSERT(res_cnt == delt_cnt,
4082                     "DeleteTopics_result_topics returned %"PRIusz" topics, "
4083                     "not the expected %"PRIusz,
4084                     res_cnt, delt_cnt);
4085 
4086         TEST_ASSERT(!rd_kafka_topic_result_error(terr[0]),
4087                     "Topic %s result error: %s",
4088                     rd_kafka_topic_result_name(terr[0]),
4089                     rd_kafka_topic_result_error_string(terr[0]));
4090 
4091         rd_kafka_event_destroy(rkev);
4092 
4093         rd_kafka_queue_destroy(rkqu);
4094 
4095         rd_kafka_AdminOptions_destroy(options);
4096 
4097         rd_kafka_DeleteTopic_destroy(delt[0]);
4098 
4099         if (!use_rk)
4100                 rd_kafka_destroy(rk);
4101 }
4102 
4103 
4104 /**
4105  * @brief Delete a topic
4106  */
test_delete_topic(rd_kafka_t * use_rk,const char * topicname)4107 void test_delete_topic (rd_kafka_t *use_rk, const char *topicname) {
4108         if (test_broker_version < TEST_BRKVER(0,10,2,0))
4109                 test_delete_topic_sh(topicname);
4110         else
4111                 test_admin_delete_topic(use_rk, topicname);
4112 }
4113 
4114 
4115 /**
4116  * @brief Create additional partitions for a topic using Admin API
4117  */
test_admin_create_partitions(rd_kafka_t * use_rk,const char * topicname,int new_partition_cnt)4118 static void test_admin_create_partitions (rd_kafka_t *use_rk,
4119                                           const char *topicname,
4120                                           int new_partition_cnt) {
4121         rd_kafka_t *rk;
4122         rd_kafka_NewPartitions_t *newp[1];
4123         const size_t newp_cnt = 1;
4124         rd_kafka_AdminOptions_t *options;
4125         rd_kafka_queue_t *rkqu;
4126         rd_kafka_event_t *rkev;
4127         const rd_kafka_CreatePartitions_result_t *res;
4128         const rd_kafka_topic_result_t **terr;
4129         int timeout_ms = tmout_multip(10000);
4130         size_t res_cnt;
4131         rd_kafka_resp_err_t err;
4132         char errstr[512];
4133         test_timing_t t_create;
4134 
4135         if (!(rk = use_rk))
4136                 rk = test_create_producer();
4137 
4138         rkqu = rd_kafka_queue_new(rk);
4139 
4140         newp[0] = rd_kafka_NewPartitions_new(topicname, new_partition_cnt,
4141                                              errstr, sizeof(errstr));
4142         TEST_ASSERT(newp[0] != NULL, "%s", errstr);
4143 
4144         options = rd_kafka_AdminOptions_new(rk,
4145                                             RD_KAFKA_ADMIN_OP_CREATEPARTITIONS);
4146         err = rd_kafka_AdminOptions_set_operation_timeout(options, timeout_ms,
4147                                                           errstr,
4148                                                           sizeof(errstr));
4149         TEST_ASSERT(!err, "%s", errstr);
4150 
4151         TEST_SAY("Creating %d (total) partitions for topic \"%s\"\n",
4152                  new_partition_cnt, topicname);
4153 
4154         TIMING_START(&t_create, "CreatePartitions");
4155         rd_kafka_CreatePartitions(rk, newp, newp_cnt, options, rkqu);
4156 
4157         /* Wait for result */
4158         rkev = rd_kafka_queue_poll(rkqu, timeout_ms + 2000);
4159         TEST_ASSERT(rkev, "Timed out waiting for CreatePartitions result");
4160 
4161         TIMING_STOP(&t_create);
4162 
4163         res = rd_kafka_event_CreatePartitions_result(rkev);
4164         TEST_ASSERT(res, "Expected CreatePartitions_result, not %s",
4165                     rd_kafka_event_name(rkev));
4166 
4167         terr = rd_kafka_CreatePartitions_result_topics(res, &res_cnt);
4168         TEST_ASSERT(terr, "CreatePartitions_result_topics returned NULL");
4169         TEST_ASSERT(res_cnt == newp_cnt,
4170                     "CreatePartitions_result_topics returned %"PRIusz
4171                     " topics, not the expected %"PRIusz,
4172                     res_cnt, newp_cnt);
4173 
4174         TEST_ASSERT(!rd_kafka_topic_result_error(terr[0]),
4175                     "Topic %s result error: %s",
4176                     rd_kafka_topic_result_name(terr[0]),
4177                     rd_kafka_topic_result_error_string(terr[0]));
4178 
4179         rd_kafka_event_destroy(rkev);
4180 
4181         rd_kafka_queue_destroy(rkqu);
4182 
4183         rd_kafka_AdminOptions_destroy(options);
4184 
4185         rd_kafka_NewPartitions_destroy(newp[0]);
4186 
4187         if (!use_rk)
4188                 rd_kafka_destroy(rk);
4189 }
4190 
4191 
4192 /**
4193  * @brief Create partitions for topic
4194  */
test_create_partitions(rd_kafka_t * use_rk,const char * topicname,int new_partition_cnt)4195 void test_create_partitions (rd_kafka_t *use_rk,
4196                              const char *topicname, int new_partition_cnt) {
4197         if (test_broker_version < TEST_BRKVER(0,10,2,0))
4198                 test_kafka_topics("--alter --topic %s --partitions %d",
4199                                   topicname, new_partition_cnt);
4200         else
4201                 test_admin_create_partitions(use_rk, topicname,
4202                                              new_partition_cnt);
4203 }
4204 
4205 
test_get_partition_count(rd_kafka_t * rk,const char * topicname,int timeout_ms)4206 int test_get_partition_count (rd_kafka_t *rk, const char *topicname,
4207                               int timeout_ms) {
4208         rd_kafka_t *use_rk;
4209         rd_kafka_resp_err_t err;
4210         rd_kafka_topic_t *rkt;
4211         int64_t abs_timeout = test_clock() + (timeout_ms * 1000);
4212         int ret = -1;
4213 
4214         if (!rk)
4215                 use_rk = test_create_producer();
4216         else
4217                 use_rk = rk;
4218 
4219         rkt = rd_kafka_topic_new(use_rk, topicname, NULL);
4220 
4221         do {
4222                 const struct rd_kafka_metadata *metadata;
4223 
4224                 err = rd_kafka_metadata(use_rk, 0, rkt, &metadata,
4225                                         tmout_multip(15000));
4226                 if (err)
4227                         TEST_WARN("metadata() for %s failed: %s\n",
4228                                   rkt ? rd_kafka_topic_name(rkt) :
4229                                   "(all-local)",
4230                                   rd_kafka_err2str(err));
4231                 else {
4232                         if (metadata->topic_cnt == 1) {
4233                                 if (metadata->topics[0].err == 0 ||
4234                                     metadata->topics[0].partition_cnt > 0) {
4235                                         int32_t cnt;
4236                                         cnt = metadata->topics[0].partition_cnt;
4237                                         rd_kafka_metadata_destroy(metadata);
4238                                         ret = (int)cnt;
4239                                         break;
4240                                 }
4241                                 TEST_SAY("metadata(%s) returned %s: retrying\n",
4242                                          rd_kafka_topic_name(rkt),
4243                                          rd_kafka_err2str(metadata->
4244                                                           topics[0].err));
4245                         }
4246                         rd_kafka_metadata_destroy(metadata);
4247                         rd_sleep(1);
4248                 }
4249         } while (test_clock() < abs_timeout);
4250 
4251         rd_kafka_topic_destroy(rkt);
4252 
4253         if (!rk)
4254                 rd_kafka_destroy(use_rk);
4255 
4256         return ret;
4257 }
4258 
4259 /**
4260  * @brief Let the broker auto-create the topic for us.
4261  */
test_auto_create_topic_rkt(rd_kafka_t * rk,rd_kafka_topic_t * rkt,int timeout_ms)4262 rd_kafka_resp_err_t test_auto_create_topic_rkt (rd_kafka_t *rk,
4263                                                 rd_kafka_topic_t *rkt,
4264                                                 int timeout_ms) {
4265 	const struct rd_kafka_metadata *metadata;
4266 	rd_kafka_resp_err_t err;
4267 	test_timing_t t;
4268         int64_t abs_timeout = test_clock() + (timeout_ms * 1000);
4269 
4270         do {
4271                 TIMING_START(&t, "auto_create_topic");
4272                 err = rd_kafka_metadata(rk, 0, rkt, &metadata,
4273                                         tmout_multip(15000));
4274                 TIMING_STOP(&t);
4275                 if (err)
4276                         TEST_WARN("metadata() for %s failed: %s\n",
4277                                   rkt ? rd_kafka_topic_name(rkt) :
4278                                   "(all-local)",
4279                                   rd_kafka_err2str(err));
4280                 else {
4281                         if (metadata->topic_cnt == 1) {
4282                                 if (metadata->topics[0].err == 0 ||
4283                                     metadata->topics[0].partition_cnt > 0) {
4284                                         rd_kafka_metadata_destroy(metadata);
4285                                         return 0;
4286                                 }
4287                                 TEST_SAY("metadata(%s) returned %s: retrying\n",
4288                                          rd_kafka_topic_name(rkt),
4289                                          rd_kafka_err2str(metadata->
4290                                                           topics[0].err));
4291                         }
4292                         rd_kafka_metadata_destroy(metadata);
4293                         rd_sleep(1);
4294                 }
4295         } while (test_clock() < abs_timeout);
4296 
4297         return err;
4298 }
4299 
test_auto_create_topic(rd_kafka_t * rk,const char * name,int timeout_ms)4300 rd_kafka_resp_err_t test_auto_create_topic (rd_kafka_t *rk, const char *name,
4301                                             int timeout_ms) {
4302         rd_kafka_topic_t *rkt = rd_kafka_topic_new(rk, name, NULL);
4303         rd_kafka_resp_err_t err;
4304         if (!rkt)
4305                 return rd_kafka_last_error();
4306         err = test_auto_create_topic_rkt(rk, rkt, timeout_ms);
4307         rd_kafka_topic_destroy(rkt);
4308         return err;
4309 }
4310 
4311 
4312 /**
4313  * @brief Check if topic auto creation works.
4314  * @returns 1 if it does, else 0.
4315  */
test_check_auto_create_topic(void)4316 int test_check_auto_create_topic (void) {
4317         rd_kafka_t *rk;
4318         rd_kafka_conf_t *conf;
4319         rd_kafka_resp_err_t err;
4320         const char *topic = test_mk_topic_name("autocreatetest", 1);
4321 
4322         test_conf_init(&conf, NULL, 0);
4323         rk = test_create_handle(RD_KAFKA_PRODUCER, conf);
4324         err = test_auto_create_topic(rk, topic, tmout_multip(5000));
4325         if (err)
4326                 TEST_SAY("Auto topic creation of \"%s\" failed: %s\n",
4327                          topic, rd_kafka_err2str(err));
4328         rd_kafka_destroy(rk);
4329 
4330         return err ? 0 : 1;
4331 }
4332 
4333 
4334 /**
4335  * @brief Builds and runs a Java application from the java/ directory.
4336  *
4337  *        The application is started in the background, use
4338  *        test_waitpid() to await its demise.
4339  *
4340  * @param cls The app class to run using java/run-class.sh
4341  *
4342  * @returns -1 if the application could not be started, else the pid.
4343  */
test_run_java(const char * cls,const char ** argv)4344 int test_run_java (const char *cls, const char **argv) {
4345 #ifdef _MSC_VER
4346         TEST_WARN("%s(%s) not supported Windows, yet",
4347                   __FUNCTION__, cls);
4348         return -1;
4349 #else
4350         int r;
4351         const char *kpath;
4352         pid_t pid;
4353         const char **full_argv, **p;
4354         int cnt;
4355         extern char **environ;
4356 
4357         kpath = test_getenv("KAFKA_PATH", NULL);
4358 
4359         if (!kpath) {
4360                 TEST_WARN("%s(%s): KAFKA_PATH must be set\n",
4361                           __FUNCTION__, cls);
4362                 return -1;
4363         }
4364 
4365         /* Build */
4366         r = system("make -s java");
4367 
4368         if (r == -1 || WIFSIGNALED(r) || WEXITSTATUS(r)) {
4369                 TEST_WARN("%s(%s): failed to build java class (code %d)\n",
4370                           __FUNCTION__, cls, r);
4371                 return -1;
4372         }
4373 
4374         /* For child process and run cls */
4375         pid = fork();
4376         if (pid == -1) {
4377                 TEST_WARN("%s(%s): failed to fork: %s\n",
4378                           __FUNCTION__, cls, strerror(errno));
4379                 return -1;
4380         }
4381 
4382         if (pid > 0)
4383                 return (int)pid; /* In parent process */
4384 
4385         /* In child process */
4386 
4387         /* Reconstruct argv to contain run-class.sh and the cls */
4388         for (cnt = 0 ; argv[cnt] ; cnt++)
4389                 ;
4390 
4391         cnt += 3; /* run-class.sh, cls, .., NULL */
4392         full_argv = malloc(sizeof(*full_argv) * cnt);
4393         full_argv[0] = "java/run-class.sh";
4394         full_argv[1] = (const char *)cls;
4395 
4396         /* Copy arguments */
4397         for (p = &full_argv[2] ; *argv ; p++, argv++)
4398                 *p = *argv;
4399         *p = NULL;
4400 
4401         /* Run */
4402         r = execve(full_argv[0], (char *const*)full_argv, environ);
4403 
4404         TEST_WARN("%s(%s): failed to execute run-class.sh: %s\n",
4405                   __FUNCTION__, cls, strerror(errno));
4406         exit(2);
4407 
4408         return -1; /* NOTREACHED */
4409 #endif
4410 }
4411 
4412 
4413 /**
4414  * @brief Wait for child-process \p pid to exit.
4415  *
4416  * @returns -1 if the child process exited successfully, else -1.
4417  */
test_waitpid(int pid)4418 int test_waitpid (int pid) {
4419 #ifdef _MSC_VER
4420         TEST_WARN("%s() not supported Windows, yet",
4421                   __FUNCTION__);
4422         return -1;
4423 #else
4424         pid_t r;
4425         int status = 0;
4426 
4427         r = waitpid((pid_t)pid, &status, 0);
4428 
4429         if (r == -1) {
4430                 TEST_WARN("waitpid(%d) failed: %s\n",
4431                           pid, strerror(errno));
4432                 return -1;
4433         }
4434 
4435         if (WIFSIGNALED(status)) {
4436                 TEST_WARN("Process %d terminated by signal %d\n", pid,
4437                           WTERMSIG(status));
4438                 return -1;
4439         } else if (WEXITSTATUS(status)) {
4440                 TEST_WARN("Process %d exited with status %d\n",
4441                           pid, WEXITSTATUS(status));
4442                 return -1;
4443         }
4444 
4445         return 0;
4446 #endif
4447 }
4448 
4449 
4450 /**
4451  * @brief Check if \p feature is builtin to librdkafka.
4452  * @returns returns 1 if feature is built in, else 0.
4453  */
test_check_builtin(const char * feature)4454 int test_check_builtin (const char *feature) {
4455 	rd_kafka_conf_t *conf;
4456 	char errstr[128];
4457 	int r;
4458 
4459 	conf = rd_kafka_conf_new();
4460 	if (rd_kafka_conf_set(conf, "builtin.features", feature,
4461 			      errstr, sizeof(errstr)) != RD_KAFKA_CONF_OK) {
4462 		TEST_SAY("Feature \"%s\" not built-in: %s\n",
4463 			 feature, errstr);
4464 		r = 0;
4465 	} else {
4466 		TEST_SAY("Feature \"%s\" is built-in\n", feature);
4467 		r = 1;
4468 	}
4469 
4470 	rd_kafka_conf_destroy(conf);
4471 	return r;
4472 }
4473 
4474 
tsprintf(const char * fmt,...)4475 char *tsprintf (const char *fmt, ...) {
4476 	static RD_TLS char ret[8][512];
4477 	static RD_TLS int i;
4478 	va_list ap;
4479 
4480 
4481 	i = (i + 1) % 8;
4482 
4483 	va_start(ap, fmt);
4484 	rd_vsnprintf(ret[i], sizeof(ret[i]), fmt, ap);
4485 	va_end(ap);
4486 
4487 	return ret[i];
4488 }
4489 
4490 
4491 /**
4492  * @brief Add a test report JSON object.
4493  * These will be written as a JSON array to the test report file.
4494  */
test_report_add(struct test * test,const char * fmt,...)4495 void test_report_add (struct test *test, const char *fmt, ...) {
4496 	va_list ap;
4497 	char buf[512];
4498 
4499 	va_start(ap, fmt);
4500 	vsnprintf(buf, sizeof(buf), fmt, ap);
4501 	va_end(ap);
4502 
4503 	if (test->report_cnt == test->report_size) {
4504 		if (test->report_size == 0)
4505 			test->report_size = 8;
4506 		else
4507 			test->report_size *= 2;
4508 
4509 		test->report_arr = realloc(test->report_arr,
4510 					   sizeof(*test->report_arr) *
4511 					   test->report_size);
4512 	}
4513 
4514 	test->report_arr[test->report_cnt++] = rd_strdup(buf);
4515 
4516 	TEST_SAYL(1, "Report #%d: %s\n", test->report_cnt-1, buf);
4517 }
4518 
4519 /**
4520  * Returns 1 if KAFKA_PATH and ZK_ADDRESS is set to se we can use the
4521  * kafka-topics.sh script to manually create topics.
4522  *
4523  * If \p skip is set TEST_SKIP() will be called with a helpful message.
4524  */
test_can_create_topics(int skip)4525 int test_can_create_topics (int skip) {
4526         /* Has AdminAPI */
4527         if (test_broker_version >= TEST_BRKVER(0,10,2,0))
4528                 return 1;
4529 
4530 #ifdef _MSC_VER
4531 	if (skip)
4532 		TEST_SKIP("Cannot create topics on Win32\n");
4533 	return 0;
4534 #else
4535 
4536 	if (!test_getenv("KAFKA_PATH", NULL) ||
4537 	    !test_getenv("ZK_ADDRESS", NULL)) {
4538 		if (skip)
4539 			TEST_SKIP("Cannot create topics "
4540 				  "(set KAFKA_PATH and ZK_ADDRESS)\n");
4541 		return 0;
4542 	}
4543 
4544 
4545 	return 1;
4546 #endif
4547 }
4548 
4549 
4550 /**
4551  * Wait for \p event_type, discarding all other events prior to it.
4552  */
test_wait_event(rd_kafka_queue_t * eventq,rd_kafka_event_type_t event_type,int timeout_ms)4553 rd_kafka_event_t *test_wait_event (rd_kafka_queue_t *eventq,
4554 				   rd_kafka_event_type_t event_type,
4555 				   int timeout_ms) {
4556 	test_timing_t t_w;
4557 	int64_t abs_timeout = test_clock() + (timeout_ms * 1000);
4558 
4559 	TIMING_START(&t_w, "wait_event");
4560 	while (test_clock() < abs_timeout) {
4561 		rd_kafka_event_t *rkev;
4562 
4563 		rkev = rd_kafka_queue_poll(eventq,
4564 					   (int)(abs_timeout - test_clock())/
4565 					   1000);
4566 
4567 		if (rd_kafka_event_type(rkev) == event_type) {
4568 			TIMING_STOP(&t_w);
4569 			return rkev;
4570 		}
4571 
4572 		if (!rkev)
4573 			continue;
4574 
4575 		if (rd_kafka_event_error(rkev))
4576 			TEST_SAY("discarding ignored event %s: %s\n",
4577 				 rd_kafka_event_name(rkev),
4578 				 rd_kafka_event_error_string(rkev));
4579 		else
4580 			TEST_SAY("discarding ignored event %s\n",
4581 				 rd_kafka_event_name(rkev));
4582 		rd_kafka_event_destroy(rkev);
4583 
4584 	}
4585 	TIMING_STOP(&t_w);
4586 
4587 	return NULL;
4588 }
4589 
4590 
test_SAY(const char * file,int line,int level,const char * str)4591 void test_SAY (const char *file, int line, int level, const char *str) {
4592         TEST_SAYL(level, "%s", str);
4593 }
4594 
test_SKIP(const char * file,int line,const char * str)4595 void test_SKIP (const char *file, int line, const char *str) {
4596         TEST_WARN("SKIPPING TEST: %s", str);
4597         TEST_LOCK();
4598         test_curr->state = TEST_SKIPPED;
4599         if (!*test_curr->failstr) {
4600                 rd_snprintf(test_curr->failstr,
4601                             sizeof(test_curr->failstr), "%s", str);
4602                 rtrim(test_curr->failstr);
4603         }
4604         TEST_UNLOCK();
4605 }
4606 
test_curr_name(void)4607 const char *test_curr_name (void) {
4608         return test_curr->name;
4609 }
4610 
4611 
4612 /**
4613  * @brief Dump/print message haders
4614  */
test_headers_dump(const char * what,int lvl,const rd_kafka_headers_t * hdrs)4615 void test_headers_dump (const char *what, int lvl,
4616                         const rd_kafka_headers_t *hdrs) {
4617         size_t idx = 0;
4618         const char *name, *value;
4619         size_t size;
4620 
4621         while (!rd_kafka_header_get_all(hdrs, idx++, &name,
4622                                         (const void **)&value, &size))
4623                 TEST_SAYL(lvl, "%s: Header #%"PRIusz": %s='%s'\n",
4624                           what, idx-1, name,
4625                           value ? value : "(NULL)");
4626 }
4627 
4628 
4629 /**
4630  * @brief Retrieve and return the list of broker ids in the cluster.
4631  *
4632  * @param rk Optional instance to use.
4633  * @param cntp Will be updated to the number of brokers returned.
4634  *
4635  * @returns a malloc:ed list of int32_t broker ids.
4636  */
test_get_broker_ids(rd_kafka_t * use_rk,size_t * cntp)4637 int32_t *test_get_broker_ids (rd_kafka_t *use_rk, size_t *cntp) {
4638         int32_t *ids;
4639         rd_kafka_t *rk;
4640         const rd_kafka_metadata_t *md;
4641         rd_kafka_resp_err_t err;
4642         size_t i;
4643 
4644         if (!(rk = use_rk))
4645                 rk = test_create_producer();
4646 
4647         err = rd_kafka_metadata(rk, 0, NULL, &md, tmout_multip(5000));
4648         TEST_ASSERT(!err, "%s", rd_kafka_err2str(err));
4649         TEST_ASSERT(md->broker_cnt > 0,
4650                     "%d brokers, expected > 0", md->broker_cnt);
4651 
4652         ids = malloc(sizeof(*ids) * md->broker_cnt);
4653 
4654         for (i = 0 ; i < (size_t)md->broker_cnt ; i++)
4655                 ids[i] = md->brokers[i].id;
4656 
4657         *cntp = md->broker_cnt;
4658 
4659         rd_kafka_metadata_destroy(md);
4660 
4661         if (!use_rk)
4662                 rd_kafka_destroy(rk);
4663 
4664         return ids;
4665 }
4666 
4667 
4668 
4669 /**
4670  * @brief Verify that all topics in \p topics are reported in metadata,
4671  *        and that none of the topics in \p not_topics are reported.
4672  *
4673  * @returns the number of failures (but does not FAIL).
4674  */
verify_topics_in_metadata(rd_kafka_t * rk,rd_kafka_metadata_topic_t * topics,size_t topic_cnt,rd_kafka_metadata_topic_t * not_topics,size_t not_topic_cnt)4675 static int verify_topics_in_metadata (rd_kafka_t *rk,
4676                                       rd_kafka_metadata_topic_t *topics,
4677                                       size_t topic_cnt,
4678                                       rd_kafka_metadata_topic_t *not_topics,
4679                                       size_t not_topic_cnt) {
4680         const rd_kafka_metadata_t *md;
4681         rd_kafka_resp_err_t err;
4682         int ti;
4683         size_t i;
4684         int fails = 0;
4685 
4686         /* Mark topics with dummy error which is overwritten
4687          * when topic is found in metadata, allowing us to check
4688          * for missed topics. */
4689         for (i = 0 ; i < topic_cnt ; i++)
4690                 topics[i].err = 12345;
4691 
4692         err = rd_kafka_metadata(rk, 1/*all_topics*/, NULL, &md,
4693                                 tmout_multip(5000));
4694         TEST_ASSERT(!err, "metadata failed: %s", rd_kafka_err2str(err));
4695 
4696         for (ti = 0 ; ti < md->topic_cnt ; ti++) {
4697                 const rd_kafka_metadata_topic_t *mdt = &md->topics[ti];
4698 
4699                 for (i = 0 ; i < topic_cnt ; i++) {
4700                         int pi;
4701                         rd_kafka_metadata_topic_t *exp_mdt;
4702 
4703                         if (strcmp(topics[i].topic, mdt->topic))
4704                                 continue;
4705 
4706                         exp_mdt = &topics[i];
4707 
4708                         exp_mdt->err = mdt->err; /* indicate found */
4709                         if (mdt->err) {
4710                                 TEST_SAY("metadata: "
4711                                          "Topic %s has error %s\n",
4712                                          mdt->topic,
4713                                          rd_kafka_err2str(mdt->err));
4714                                 fails++;
4715                         }
4716 
4717                         if (exp_mdt->partition_cnt != 0 &&
4718                             mdt->partition_cnt != exp_mdt->partition_cnt) {
4719                                 TEST_SAY("metadata: "
4720                                          "Topic %s, expected %d partitions"
4721                                          ", not %d\n",
4722                                          mdt->topic,
4723                                          exp_mdt->partition_cnt,
4724                                          mdt->partition_cnt);
4725                                 fails++;
4726                                 continue;
4727                         }
4728 
4729                         /* Verify per-partition values */
4730                         for (pi = 0 ; exp_mdt->partitions &&
4731                                      pi < exp_mdt->partition_cnt ; pi++) {
4732                                 const rd_kafka_metadata_partition_t *mdp =
4733                                         &mdt->partitions[pi];
4734                                 const rd_kafka_metadata_partition_t *exp_mdp =
4735                                         &exp_mdt->partitions[pi];
4736 
4737                                 if (mdp->id != exp_mdp->id) {
4738                                         TEST_SAY("metadata: "
4739                                                  "Topic %s, "
4740                                                  "partition %d, "
4741                                                  "partition list out of order,"
4742                                                  " expected %d, not %d\n",
4743                                                  mdt->topic, pi,
4744                                                  exp_mdp->id, mdp->id);
4745                                         fails++;
4746                                         continue;
4747                                 }
4748 
4749                                 if (exp_mdp->replicas) {
4750                                         if (mdp->replica_cnt !=
4751                                             exp_mdp->replica_cnt) {
4752                                                 TEST_SAY("metadata: "
4753                                                          "Topic %s, "
4754                                                          "partition %d, "
4755                                                          "expected %d replicas,"
4756                                                          " not %d\n",
4757                                                          mdt->topic, pi,
4758                                                          exp_mdp->replica_cnt,
4759                                                          mdp->replica_cnt);
4760                                                 fails++;
4761                                         } else if (memcmp(mdp->replicas,
4762                                                           exp_mdp->replicas,
4763                                                           mdp->replica_cnt *
4764                                                           sizeof(*mdp->replicas))) {
4765                                                 int ri;
4766 
4767                                                 TEST_SAY("metadata: "
4768                                                          "Topic %s, "
4769                                                          "partition %d, "
4770                                                          "replica mismatch:\n",
4771                                                          mdt->topic, pi);
4772 
4773                                                 for (ri = 0 ;
4774                                                      ri < mdp->replica_cnt ;
4775                                                      ri++) {
4776                                                         TEST_SAY(" #%d: "
4777                                                                  "expected "
4778                                                                  "replica %d, "
4779                                                                  "not %d\n",
4780                                                                  ri,
4781                                                                  exp_mdp->
4782                                                                  replicas[ri],
4783                                                                  mdp->
4784                                                                  replicas[ri]);
4785                                                 }
4786 
4787                                                 fails++;
4788                                         }
4789 
4790                                 }
4791                         }
4792                 }
4793 
4794                 for (i = 0 ; i < not_topic_cnt ; i++) {
4795                         if (strcmp(not_topics[i].topic, mdt->topic))
4796                                 continue;
4797 
4798                         TEST_SAY("metadata: "
4799                                  "Topic %s found in metadata, unexpected\n",
4800                                  mdt->topic);
4801                         fails++;
4802                 }
4803 
4804         }
4805 
4806         for (i  = 0 ; i < topic_cnt ; i++) {
4807                 if ((int)topics[i].err == 12345) {
4808                         TEST_SAY("metadata: "
4809                                  "Topic %s not seen in metadata\n",
4810                                  topics[i].topic);
4811                         fails++;
4812                 }
4813         }
4814 
4815         if (fails > 0)
4816                 TEST_SAY("Metadata verification for %"PRIusz" topics failed "
4817                          "with %d errors (see above)\n",
4818                          topic_cnt, fails);
4819         else
4820                 TEST_SAY("Metadata verification succeeded: "
4821                          "%"PRIusz" desired topics seen, "
4822                          "%"PRIusz" undesired topics not seen\n",
4823                          topic_cnt, not_topic_cnt);
4824 
4825         rd_kafka_metadata_destroy(md);
4826 
4827         return fails;
4828 }
4829 
4830 
4831 
4832 /**
4833  * @brief Wait for metadata to reflect expected and not expected topics
4834  */
test_wait_metadata_update(rd_kafka_t * rk,rd_kafka_metadata_topic_t * topics,size_t topic_cnt,rd_kafka_metadata_topic_t * not_topics,size_t not_topic_cnt,int tmout)4835 void test_wait_metadata_update (rd_kafka_t *rk,
4836                                 rd_kafka_metadata_topic_t *topics,
4837                                 size_t topic_cnt,
4838                                 rd_kafka_metadata_topic_t *not_topics,
4839                                 size_t not_topic_cnt,
4840                                 int tmout) {
4841         int64_t abs_timeout;
4842         test_timing_t t_md;
4843 
4844         abs_timeout = test_clock() + (tmout * 1000);
4845 
4846         test_timeout_set(10 + (tmout/1000));
4847 
4848         TEST_SAY("Waiting for up to %dms for metadata update\n", tmout);
4849 
4850         TIMING_START(&t_md, "METADATA.WAIT");
4851         do {
4852                 int md_fails;
4853 
4854                 md_fails = verify_topics_in_metadata(
4855                         rk,
4856                         topics, topic_cnt,
4857                         not_topics, not_topic_cnt);
4858 
4859                 if (!md_fails) {
4860                         TEST_SAY("All expected topics (not?) "
4861                                  "seen in metadata\n");
4862                         abs_timeout = 0;
4863                         break;
4864                 }
4865 
4866                 rd_sleep(1);
4867         } while (test_clock() < abs_timeout);
4868         TIMING_STOP(&t_md);
4869 
4870         if (abs_timeout)
4871                 TEST_FAIL("Expected topics not seen in given time.");
4872 }
4873 
4874 
4875 
4876 /**
4877  * @brief Wait for up to \p tmout for any type of admin result.
4878  * @returns the event
4879  */
4880 rd_kafka_event_t *
test_wait_admin_result(rd_kafka_queue_t * q,rd_kafka_event_type_t evtype,int tmout)4881 test_wait_admin_result (rd_kafka_queue_t *q,
4882                         rd_kafka_event_type_t evtype,
4883                         int tmout) {
4884         rd_kafka_event_t *rkev;
4885 
4886         while (1) {
4887                 rkev = rd_kafka_queue_poll(q, tmout);
4888                 if (!rkev)
4889                         TEST_FAIL("Timed out waiting for admin result (%d)\n",
4890                                   evtype);
4891 
4892                 if (rd_kafka_event_type(rkev) == evtype)
4893                         return rkev;
4894 
4895 
4896                 if (rd_kafka_event_type(rkev) == RD_KAFKA_EVENT_ERROR) {
4897                         TEST_WARN("Received error event while waiting for %d: "
4898                                   "%s: ignoring",
4899                                   evtype, rd_kafka_event_error_string(rkev));
4900                         continue;
4901                 }
4902 
4903 
4904                 TEST_ASSERT(rd_kafka_event_type(rkev) == evtype,
4905                             "Expected event type %d, got %d (%s)",
4906                             evtype,
4907                             rd_kafka_event_type(rkev),
4908                             rd_kafka_event_name(rkev));
4909         }
4910 
4911         return NULL;
4912 }
4913 
4914 
4915 
4916 /**
4917  * @brief Wait for up to \p tmout for a
4918  *        CreateTopics/DeleteTopics/CreatePartitions or
4919  *        DescribeConfigs/AlterConfigs result and return the
4920  *        distilled error code.
4921  */
4922 rd_kafka_resp_err_t
test_wait_topic_admin_result(rd_kafka_queue_t * q,rd_kafka_event_type_t evtype,rd_kafka_event_t ** retevent,int tmout)4923 test_wait_topic_admin_result (rd_kafka_queue_t *q,
4924                               rd_kafka_event_type_t evtype,
4925                               rd_kafka_event_t **retevent,
4926                               int tmout) {
4927         rd_kafka_event_t *rkev;
4928         size_t i;
4929         const rd_kafka_topic_result_t **terr = NULL;
4930         size_t terr_cnt = 0;
4931         const rd_kafka_ConfigResource_t **cres = NULL;
4932         size_t cres_cnt = 0;
4933         int errcnt = 0;
4934         rd_kafka_resp_err_t err;
4935 
4936         rkev = test_wait_admin_result(q, evtype, tmout);
4937 
4938         if ((err = rd_kafka_event_error(rkev))) {
4939                 TEST_WARN("%s failed: %s\n",
4940                           rd_kafka_event_name(rkev),
4941                           rd_kafka_event_error_string(rkev));
4942                 rd_kafka_event_destroy(rkev);
4943                 return err;
4944         }
4945 
4946         if (evtype == RD_KAFKA_EVENT_CREATETOPICS_RESULT) {
4947                 const rd_kafka_CreateTopics_result_t *res;
4948                 if (!(res = rd_kafka_event_CreateTopics_result(rkev)))
4949                         TEST_FAIL("Expected a CreateTopics result, not %s",
4950                                   rd_kafka_event_name(rkev));
4951 
4952                 terr = rd_kafka_CreateTopics_result_topics(res, &terr_cnt);
4953 
4954         } else if (evtype == RD_KAFKA_EVENT_DELETETOPICS_RESULT) {
4955                 const rd_kafka_DeleteTopics_result_t *res;
4956                 if (!(res = rd_kafka_event_DeleteTopics_result(rkev)))
4957                         TEST_FAIL("Expected a DeleteTopics result, not %s",
4958                                   rd_kafka_event_name(rkev));
4959 
4960                 terr = rd_kafka_DeleteTopics_result_topics(res, &terr_cnt);
4961 
4962         } else if (evtype == RD_KAFKA_EVENT_CREATEPARTITIONS_RESULT) {
4963                 const rd_kafka_CreatePartitions_result_t *res;
4964                 if (!(res = rd_kafka_event_CreatePartitions_result(rkev)))
4965                         TEST_FAIL("Expected a CreatePartitions result, not %s",
4966                                   rd_kafka_event_name(rkev));
4967 
4968                 terr = rd_kafka_CreatePartitions_result_topics(res, &terr_cnt);
4969 
4970         } else if (evtype == RD_KAFKA_EVENT_DESCRIBECONFIGS_RESULT) {
4971                 const rd_kafka_DescribeConfigs_result_t *res;
4972 
4973                 if (!(res = rd_kafka_event_DescribeConfigs_result(rkev)))
4974                         TEST_FAIL("Expected a DescribeConfigs result, not %s",
4975                                   rd_kafka_event_name(rkev));
4976 
4977                 cres = rd_kafka_DescribeConfigs_result_resources(res,
4978                                                                  &cres_cnt);
4979 
4980         } else if (evtype == RD_KAFKA_EVENT_ALTERCONFIGS_RESULT) {
4981                 const rd_kafka_AlterConfigs_result_t *res;
4982 
4983                 if (!(res = rd_kafka_event_AlterConfigs_result(rkev)))
4984                         TEST_FAIL("Expected a AlterConfigs result, not %s",
4985                                   rd_kafka_event_name(rkev));
4986 
4987                 cres = rd_kafka_AlterConfigs_result_resources(res, &cres_cnt);
4988 
4989         } else {
4990                 TEST_FAIL("Bad evtype: %d", evtype);
4991                 RD_NOTREACHED();
4992         }
4993 
4994         /* Check topic errors */
4995         for (i = 0 ; i < terr_cnt ; i++) {
4996                 if (rd_kafka_topic_result_error(terr[i])) {
4997                         TEST_WARN("..Topics result: %s: error: %s\n",
4998                                   rd_kafka_topic_result_name(terr[i]),
4999                                   rd_kafka_topic_result_error_string(terr[i]));
5000                         if (!(errcnt++))
5001                                 err = rd_kafka_topic_result_error(terr[i]);
5002                 }
5003         }
5004 
5005         /* Check resource errors */
5006         for (i = 0 ; i < cres_cnt ; i++) {
5007                 if (rd_kafka_ConfigResource_error(cres[i])) {
5008                         TEST_WARN("ConfigResource result: %d,%s: error: %s\n",
5009                                   rd_kafka_ConfigResource_type(cres[i]),
5010                                   rd_kafka_ConfigResource_name(cres[i]),
5011                                   rd_kafka_ConfigResource_error_string(cres[i]));
5012                         if (!(errcnt++))
5013                                 err = rd_kafka_ConfigResource_error(cres[i]);
5014                 }
5015         }
5016 
5017         if (!err && retevent)
5018                 *retevent = rkev;
5019         else
5020                 rd_kafka_event_destroy(rkev);
5021 
5022         return err;
5023 }
5024 
5025 
5026 
5027 /**
5028  * @brief Topic Admin API helpers
5029  *
5030  * @param useq Makes the call async and posts the response in this queue.
5031  *             If NULL this call will be synchronous and return the error
5032  *             result.
5033  *
5034  * @remark Fails the current test on failure.
5035  */
5036 
5037 rd_kafka_resp_err_t
test_CreateTopics_simple(rd_kafka_t * rk,rd_kafka_queue_t * useq,char ** topics,size_t topic_cnt,int num_partitions,void * opaque)5038 test_CreateTopics_simple (rd_kafka_t *rk,
5039                           rd_kafka_queue_t *useq,
5040                           char **topics, size_t topic_cnt,
5041                           int num_partitions,
5042                           void *opaque) {
5043         rd_kafka_NewTopic_t **new_topics;
5044         rd_kafka_AdminOptions_t *options;
5045         rd_kafka_queue_t *q;
5046         size_t i;
5047         const int tmout = 30 * 1000;
5048         rd_kafka_resp_err_t err;
5049 
5050         new_topics = malloc(sizeof(*new_topics) * topic_cnt);
5051 
5052         for (i = 0 ; i < topic_cnt ; i++) {
5053                 char errstr[512];
5054                 new_topics[i] = rd_kafka_NewTopic_new(topics[i],
5055                                                       num_partitions, 1,
5056                                                       errstr, sizeof(errstr));
5057                 TEST_ASSERT(new_topics[i],
5058                             "Failed to NewTopic(\"%s\", %d) #%"PRIusz": %s",
5059                             topics[i], num_partitions, i, errstr);
5060         }
5061 
5062         options = rd_kafka_AdminOptions_new(rk, RD_KAFKA_ADMIN_OP_CREATETOPICS);
5063         rd_kafka_AdminOptions_set_opaque(options, opaque);
5064 
5065         if (!useq) {
5066                 char errstr[512];
5067 
5068                 err = rd_kafka_AdminOptions_set_request_timeout(options,
5069                                                                 tmout,
5070                                                                 errstr,
5071                                                                 sizeof(errstr));
5072                 TEST_ASSERT(!err, "set_request_timeout: %s", errstr);
5073                 err = rd_kafka_AdminOptions_set_operation_timeout(options,
5074                                                                   tmout-5000,
5075                                                                   errstr,
5076                                                                   sizeof(errstr));
5077                 TEST_ASSERT(!err, "set_operation_timeout: %s", errstr);
5078 
5079                 q = rd_kafka_queue_new(rk);
5080         } else {
5081                 q = useq;
5082         }
5083 
5084         TEST_SAY("Creating %"PRIusz" topics\n", topic_cnt);
5085 
5086         rd_kafka_CreateTopics(rk, new_topics, topic_cnt, options, q);
5087 
5088         rd_kafka_AdminOptions_destroy(options);
5089 
5090         rd_kafka_NewTopic_destroy_array(new_topics, topic_cnt);
5091         free(new_topics);
5092 
5093         if (useq)
5094                 return RD_KAFKA_RESP_ERR_NO_ERROR;
5095 
5096 
5097         err = test_wait_topic_admin_result(q,
5098                                            RD_KAFKA_EVENT_CREATETOPICS_RESULT,
5099                                            NULL, tmout+5000);
5100 
5101         rd_kafka_queue_destroy(q);
5102 
5103         if (err)
5104                 TEST_FAIL("Failed to create %d topic(s): %s",
5105                           (int)topic_cnt, rd_kafka_err2str(err));
5106 
5107         return err;
5108 }
5109 
5110 
5111 rd_kafka_resp_err_t
test_CreatePartitions_simple(rd_kafka_t * rk,rd_kafka_queue_t * useq,const char * topic,size_t total_part_cnt,void * opaque)5112 test_CreatePartitions_simple (rd_kafka_t *rk,
5113                               rd_kafka_queue_t *useq,
5114                               const char *topic,
5115                               size_t total_part_cnt,
5116                               void *opaque) {
5117         rd_kafka_NewPartitions_t *newp[1];
5118         rd_kafka_AdminOptions_t *options;
5119         rd_kafka_queue_t *q;
5120         const int tmout = 30 * 1000;
5121         rd_kafka_resp_err_t err;
5122         char errstr[512];
5123 
5124         newp[0] = rd_kafka_NewPartitions_new(topic, total_part_cnt, errstr,
5125                                              sizeof(errstr));
5126         TEST_ASSERT(newp[0],
5127                     "Failed to NewPartitions(\"%s\", %"PRIusz"): %s",
5128                     topic, total_part_cnt, errstr);
5129 
5130         options = rd_kafka_AdminOptions_new(rk,
5131                                             RD_KAFKA_ADMIN_OP_CREATEPARTITIONS);
5132         rd_kafka_AdminOptions_set_opaque(options, opaque);
5133 
5134         if (!useq) {
5135                 char errstr[512];
5136 
5137                 err = rd_kafka_AdminOptions_set_request_timeout(options,
5138                                                                 tmout,
5139                                                                 errstr,
5140                                                                 sizeof(errstr));
5141                 TEST_ASSERT(!err, "set_request_timeout: %s", errstr);
5142                 err = rd_kafka_AdminOptions_set_operation_timeout(options,
5143                                                                   tmout-5000,
5144                                                                   errstr,
5145                                                                   sizeof(errstr));
5146                 TEST_ASSERT(!err, "set_operation_timeout: %s", errstr);
5147 
5148                 q = rd_kafka_queue_new(rk);
5149         } else {
5150                 q = useq;
5151         }
5152 
5153         TEST_SAY("Creating (up to) %"PRIusz" partitions for topic \"%s\"\n",
5154                  total_part_cnt, topic);
5155 
5156         rd_kafka_CreatePartitions(rk, newp, 1, options, q);
5157 
5158         rd_kafka_AdminOptions_destroy(options);
5159 
5160         rd_kafka_NewPartitions_destroy(newp[0]);
5161 
5162         if (useq)
5163                 return RD_KAFKA_RESP_ERR_NO_ERROR;
5164 
5165 
5166         err = test_wait_topic_admin_result(
5167                 q, RD_KAFKA_EVENT_CREATEPARTITIONS_RESULT, NULL, tmout+5000);
5168 
5169         rd_kafka_queue_destroy(q);
5170 
5171         if (err)
5172                 TEST_FAIL("Failed to create partitions: %s",
5173                           rd_kafka_err2str(err));
5174 
5175         return err;
5176 }
5177 
5178 
5179 rd_kafka_resp_err_t
test_DeleteTopics_simple(rd_kafka_t * rk,rd_kafka_queue_t * useq,char ** topics,size_t topic_cnt,void * opaque)5180 test_DeleteTopics_simple (rd_kafka_t *rk,
5181                           rd_kafka_queue_t *useq,
5182                           char **topics, size_t topic_cnt,
5183                           void *opaque) {
5184         rd_kafka_queue_t *q;
5185         rd_kafka_DeleteTopic_t **del_topics;
5186         rd_kafka_AdminOptions_t *options;
5187         size_t i;
5188         rd_kafka_resp_err_t err;
5189         const int tmout = 30*1000;
5190 
5191         del_topics = malloc(sizeof(*del_topics) * topic_cnt);
5192 
5193         for (i = 0 ; i < topic_cnt ; i++) {
5194                 del_topics[i] = rd_kafka_DeleteTopic_new(topics[i]);
5195                 TEST_ASSERT(del_topics[i]);
5196         }
5197 
5198         options = rd_kafka_AdminOptions_new(rk, RD_KAFKA_ADMIN_OP_DELETETOPICS);
5199         rd_kafka_AdminOptions_set_opaque(options, opaque);
5200 
5201         if (!useq) {
5202                 char errstr[512];
5203 
5204                 err = rd_kafka_AdminOptions_set_request_timeout(options,
5205                                                                 tmout,
5206                                                                 errstr,
5207                                                                 sizeof(errstr));
5208                 TEST_ASSERT(!err, "set_request_timeout: %s", errstr);
5209                 err = rd_kafka_AdminOptions_set_operation_timeout(options,
5210                                                                   tmout-5000,
5211                                                                   errstr,
5212                                                                   sizeof(errstr));
5213                 TEST_ASSERT(!err, "set_operation_timeout: %s", errstr);
5214 
5215                 q = rd_kafka_queue_new(rk);
5216         } else {
5217                 q = useq;
5218         }
5219 
5220         TEST_SAY("Deleting %"PRIusz" topics\n", topic_cnt);
5221 
5222         rd_kafka_DeleteTopics(rk, del_topics, topic_cnt, options, useq);
5223 
5224         rd_kafka_AdminOptions_destroy(options);
5225 
5226         rd_kafka_DeleteTopic_destroy_array(del_topics, topic_cnt);
5227 
5228         free(del_topics);
5229 
5230         if (useq)
5231                 return RD_KAFKA_RESP_ERR_NO_ERROR;
5232 
5233         err = test_wait_topic_admin_result(q,
5234                                            RD_KAFKA_EVENT_CREATETOPICS_RESULT,
5235                                            NULL, tmout+5000);
5236 
5237         rd_kafka_queue_destroy(q);
5238 
5239         if (err)
5240                 TEST_FAIL("Failed to delete topics: %s",
5241                           rd_kafka_err2str(err));
5242 
5243         return err;
5244 }
5245 
5246 
5247 /**
5248  * @brief Delta Alter configuration for the given resource,
5249  *        overwriting/setting the configs provided in \p configs.
5250  *        Existing configuration remains intact.
5251  *
5252  * @param configs 'const char *name, const char *value' tuples
5253  * @param config_cnt is the number of tuples in \p configs
5254  */
5255 rd_kafka_resp_err_t
test_AlterConfigs_simple(rd_kafka_t * rk,rd_kafka_ResourceType_t restype,const char * resname,const char ** configs,size_t config_cnt)5256 test_AlterConfigs_simple (rd_kafka_t *rk,
5257                           rd_kafka_ResourceType_t restype,
5258                           const char *resname,
5259                           const char **configs, size_t config_cnt) {
5260         rd_kafka_queue_t *q;
5261         rd_kafka_ConfigResource_t *confres;
5262         rd_kafka_event_t *rkev;
5263         size_t i;
5264         rd_kafka_resp_err_t err;
5265         const rd_kafka_ConfigResource_t **results;
5266         size_t result_cnt;
5267         const rd_kafka_ConfigEntry_t **configents;
5268         size_t configent_cnt;
5269 
5270 
5271         q = rd_kafka_queue_new(rk);
5272 
5273         TEST_SAY("Getting configuration for %d %s\n", restype, resname);
5274 
5275         confres = rd_kafka_ConfigResource_new(restype, resname);
5276         rd_kafka_DescribeConfigs(rk, &confres, 1, NULL, q);
5277 
5278         err = test_wait_topic_admin_result(
5279                 q, RD_KAFKA_EVENT_DESCRIBECONFIGS_RESULT, &rkev, 15*1000);
5280         if (err) {
5281                 rd_kafka_queue_destroy(q);
5282                 rd_kafka_ConfigResource_destroy(confres);
5283                 return err;
5284         }
5285 
5286         results = rd_kafka_DescribeConfigs_result_resources(
5287                 rd_kafka_event_DescribeConfigs_result(rkev), &result_cnt);
5288         TEST_ASSERT(result_cnt == 1,
5289                     "expected 1 DescribeConfigs result, not %"PRIusz,
5290                     result_cnt);
5291 
5292         configents = rd_kafka_ConfigResource_configs(results[0],
5293                                                      &configent_cnt);
5294         TEST_ASSERT(configent_cnt > 0,
5295                     "expected > 0 ConfigEntry:s, not %"PRIusz, configent_cnt);
5296 
5297         TEST_SAY("Altering configuration for %d %s\n", restype, resname);
5298 
5299         /* Apply all existing configuration entries to resource object that
5300          * will later be passed to AlterConfigs. */
5301         for (i = 0 ; i < configent_cnt ; i++) {
5302                 err = rd_kafka_ConfigResource_set_config(
5303                         confres,
5304                         rd_kafka_ConfigEntry_name(configents[i]),
5305                         rd_kafka_ConfigEntry_value(configents[i]));
5306                 TEST_ASSERT(!err, "Failed to set read-back config %s=%s "
5307                             "on local resource object",
5308                             rd_kafka_ConfigEntry_name(configents[i]),
5309                             rd_kafka_ConfigEntry_value(configents[i]));
5310         }
5311 
5312         rd_kafka_event_destroy(rkev);
5313 
5314         /* Then apply the configuration to change. */
5315         for (i = 0 ; i < config_cnt ; i += 2) {
5316                 err = rd_kafka_ConfigResource_set_config(confres,
5317                                                          configs[i],
5318                                                          configs[i+1]);
5319                 TEST_ASSERT(!err, "Failed to set config %s=%s on "
5320                             "local resource object",
5321                             configs[i], configs[i+1]);
5322         }
5323 
5324         rd_kafka_AlterConfigs(rk, &confres, 1, NULL, q);
5325 
5326         rd_kafka_ConfigResource_destroy(confres);
5327 
5328         err = test_wait_topic_admin_result(
5329                 q, RD_KAFKA_EVENT_ALTERCONFIGS_RESULT, NULL, 15*1000);
5330 
5331         rd_kafka_queue_destroy(q);
5332 
5333         return err;
5334 }
5335 
5336 
5337 
test_free_string_array(char ** strs,size_t cnt)5338 static void test_free_string_array (char **strs, size_t cnt) {
5339         size_t i;
5340         for (i = 0 ; i < cnt ; i++)
5341                 free(strs[i]);
5342         free(strs);
5343 }
5344 
5345 
5346 /**
5347  * @return an array of all topics in the cluster matching our the
5348  *         rdkafka test prefix.
5349  */
5350 static rd_kafka_resp_err_t
test_get_all_test_topics(rd_kafka_t * rk,char *** topicsp,size_t * topic_cntp)5351 test_get_all_test_topics (rd_kafka_t *rk, char ***topicsp, size_t *topic_cntp) {
5352         size_t test_topic_prefix_len = strlen(test_topic_prefix);
5353         const rd_kafka_metadata_t *md;
5354         char **topics = NULL;
5355         size_t topic_cnt = 0;
5356         int i;
5357         rd_kafka_resp_err_t err;
5358 
5359         *topic_cntp = 0;
5360         if (topicsp)
5361                 *topicsp = NULL;
5362 
5363         /* Retrieve list of topics */
5364         err = rd_kafka_metadata(rk, 1/*all topics*/, NULL, &md,
5365                                 tmout_multip(10000));
5366         if (err) {
5367                 TEST_WARN("%s: Failed to acquire metadata: %s: "
5368                           "not deleting any topics\n",
5369                           __FUNCTION__, rd_kafka_err2str(err));
5370                 return err;
5371         }
5372 
5373         if (md->topic_cnt == 0) {
5374                 TEST_WARN("%s: No topics in cluster\n", __FUNCTION__);
5375                 rd_kafka_metadata_destroy(md);
5376                 return RD_KAFKA_RESP_ERR_NO_ERROR;
5377         }
5378 
5379         if (topicsp)
5380                 topics = malloc(sizeof(*topics) * md->topic_cnt);
5381 
5382         for (i = 0 ; i < md->topic_cnt ; i++) {
5383                 if (strlen(md->topics[i].topic) >= test_topic_prefix_len &&
5384                     !strncmp(md->topics[i].topic,
5385                              test_topic_prefix, test_topic_prefix_len)) {
5386                         if (topicsp)
5387                                 topics[topic_cnt++] =
5388                                         rd_strdup(md->topics[i].topic);
5389                         else
5390                                 topic_cnt++;
5391                 }
5392         }
5393 
5394         if (topic_cnt == 0) {
5395                 TEST_SAY("%s: No topics (out of %d) matching our "
5396                          "test prefix (%s)\n",
5397                          __FUNCTION__, md->topic_cnt, test_topic_prefix);
5398                 rd_kafka_metadata_destroy(md);
5399                 if (topics)
5400                         test_free_string_array(topics, topic_cnt);
5401                 return RD_KAFKA_RESP_ERR_NO_ERROR;
5402         }
5403 
5404         rd_kafka_metadata_destroy(md);
5405 
5406         if (topicsp)
5407                 *topicsp = topics;
5408         *topic_cntp = topic_cnt;
5409 
5410         return RD_KAFKA_RESP_ERR_NO_ERROR;
5411 }
5412 
5413 /**
5414  * @brief Delete all test topics using the Kafka Admin API.
5415  */
test_delete_all_test_topics(int timeout_ms)5416 rd_kafka_resp_err_t test_delete_all_test_topics (int timeout_ms) {
5417         rd_kafka_t *rk;
5418         char **topics;
5419         size_t topic_cnt = 0;
5420         rd_kafka_resp_err_t err;
5421         int i;
5422         rd_kafka_AdminOptions_t *options;
5423         rd_kafka_queue_t *q;
5424         char errstr[256];
5425         int64_t abs_timeout = test_clock() + (timeout_ms * 1000);
5426 
5427         rk = test_create_producer();
5428 
5429         err = test_get_all_test_topics(rk, &topics, &topic_cnt);
5430         if (err) {
5431                 /* Error already reported by test_get_all_test_topics() */
5432                 rd_kafka_destroy(rk);
5433                 return err;
5434         }
5435 
5436         if (topic_cnt == 0) {
5437                 rd_kafka_destroy(rk);
5438                 return RD_KAFKA_RESP_ERR_NO_ERROR;
5439         }
5440 
5441         q = rd_kafka_queue_get_main(rk);
5442 
5443         options = rd_kafka_AdminOptions_new(rk, RD_KAFKA_ADMIN_OP_DELETETOPICS);
5444         if (rd_kafka_AdminOptions_set_operation_timeout(options, 2*60*1000,
5445                                                         errstr,
5446                                                         sizeof(errstr)))
5447                 TEST_SAY(_C_YEL "Failed to set DeleteTopics timeout: %s: "
5448                          "ignoring\n",
5449                          errstr);
5450 
5451         TEST_SAY(_C_MAG "====> Deleting all test topics with <===="
5452                  "a timeout of 2 minutes\n");
5453 
5454         test_DeleteTopics_simple(rk, q, topics, topic_cnt, options);
5455 
5456         rd_kafka_AdminOptions_destroy(options);
5457 
5458         while (1) {
5459                 rd_kafka_event_t *rkev;
5460                 const rd_kafka_DeleteTopics_result_t *res;
5461 
5462                 rkev = rd_kafka_queue_poll(q, -1);
5463 
5464                 res = rd_kafka_event_DeleteTopics_result(rkev);
5465                 if (!res) {
5466                         TEST_SAY("%s: Ignoring event: %s: %s\n",
5467                                  __FUNCTION__, rd_kafka_event_name(rkev),
5468                                  rd_kafka_event_error_string(rkev));
5469                         rd_kafka_event_destroy(rkev);
5470                         continue;
5471                 }
5472 
5473                 if (rd_kafka_event_error(rkev)) {
5474                         TEST_WARN("%s: DeleteTopics for %"PRIusz" topics "
5475                                   "failed: %s\n",
5476                                   __FUNCTION__, topic_cnt,
5477                                   rd_kafka_event_error_string(rkev));
5478                         err = rd_kafka_event_error(rkev);
5479                 } else {
5480                         const rd_kafka_topic_result_t **terr;
5481                         size_t tcnt;
5482                         int okcnt = 0;
5483 
5484                         terr = rd_kafka_DeleteTopics_result_topics(res, &tcnt);
5485 
5486                         for(i = 0 ; i < (int)tcnt ; i++) {
5487                                 if (!rd_kafka_topic_result_error(terr[i])) {
5488                                         okcnt++;
5489                                         continue;
5490                                 }
5491 
5492                                 TEST_WARN("%s: Failed to delete topic %s: %s\n",
5493                                           __FUNCTION__,
5494                                           rd_kafka_topic_result_name(terr[i]),
5495                                           rd_kafka_topic_result_error_string(
5496                                                   terr[i]));
5497                         }
5498 
5499                         TEST_SAY("%s: DeleteTopics "
5500                                  "succeeded for %d/%"PRIusz" topics\n",
5501                                  __FUNCTION__, okcnt, topic_cnt);
5502                         err = RD_KAFKA_RESP_ERR_NO_ERROR;
5503                 }
5504 
5505                 rd_kafka_event_destroy(rkev);
5506                 break;
5507         }
5508 
5509         rd_kafka_queue_destroy(q);
5510 
5511         test_free_string_array(topics, topic_cnt);
5512 
5513         /* Wait for topics to be fully deleted */
5514         while (1) {
5515                 err = test_get_all_test_topics(rk, NULL, &topic_cnt);
5516 
5517                 if (!err && topic_cnt == 0)
5518                         break;
5519 
5520                 if (abs_timeout < test_clock()) {
5521                         TEST_WARN("%s: Timed out waiting for "
5522                                   "remaining %"PRIusz" deleted topics "
5523                                   "to disappear from cluster metadata\n",
5524                                   __FUNCTION__, topic_cnt);
5525                         break;
5526                 }
5527 
5528                 TEST_SAY("Waiting for remaining %"PRIusz" delete topics "
5529                          "to disappear from cluster metadata\n", topic_cnt);
5530 
5531                 rd_sleep(1);
5532         }
5533 
5534         rd_kafka_destroy(rk);
5535 
5536         return err;
5537 }
5538 
5539 
5540 
test_fail0(const char * file,int line,const char * function,int do_lock,int fail_now,const char * fmt,...)5541 void test_fail0 (const char *file, int line, const char *function,
5542                  int do_lock, int fail_now, const char *fmt, ...) {
5543         char buf[512];
5544         int is_thrd = 0;
5545         size_t of;
5546         va_list ap;
5547         char *t;
5548         char timestr[32];
5549         time_t tnow = time(NULL);
5550 
5551 #ifdef _MSC_VER
5552         ctime_s(timestr, sizeof(timestr), &tnow);
5553 #else
5554         ctime_r(&tnow, timestr);
5555 #endif
5556         t = strchr(timestr, '\n');
5557         if (t)
5558                 *t = '\0';
5559 
5560         of = rd_snprintf(buf, sizeof(buf), "%s():%i: ", function, line);
5561         rd_assert(of < sizeof(buf));
5562 
5563         va_start(ap, fmt);
5564         rd_vsnprintf(buf+of, sizeof(buf)-of, fmt, ap);
5565         va_end(ap);
5566 
5567         /* Remove trailing newline */
5568         if ((t =  strchr(buf, '\n')) && !*(t+1))
5569                 *t = '\0';
5570 
5571         TEST_SAYL(0, "TEST FAILURE\n");
5572         fprintf(stderr, "\033[31m### Test \"%s\" failed at %s:%i:%s() at %s: "
5573                 "###\n"
5574                 "%s\n",
5575                 test_curr->name, file, line, function, timestr, buf+of);
5576         if (do_lock)
5577                 TEST_LOCK();
5578         test_curr->state = TEST_FAILED;
5579         test_curr->failcnt += 1;
5580 
5581         if (!*test_curr->failstr) {
5582                 strncpy(test_curr->failstr, buf, sizeof(test_curr->failstr));
5583                 test_curr->failstr[sizeof(test_curr->failstr)-1] = '\0';
5584         }
5585         if (fail_now && test_curr->mainfunc) {
5586                 tests_running_cnt--;
5587                 is_thrd = 1;
5588         }
5589         if (do_lock)
5590                 TEST_UNLOCK();
5591         if (!fail_now)
5592                 return;
5593         if (test_assert_on_fail || !is_thrd)
5594                 assert(0);
5595         else
5596                 thrd_exit(0);
5597 }
5598 
5599 
5600 /**
5601  * @brief Destroy a mock cluster and its underlying rd_kafka_t handle
5602  */
test_mock_cluster_destroy(rd_kafka_mock_cluster_t * mcluster)5603 void test_mock_cluster_destroy (rd_kafka_mock_cluster_t *mcluster) {
5604         rd_kafka_t *rk = rd_kafka_mock_cluster_handle(mcluster);
5605         rd_kafka_mock_cluster_destroy(mcluster);
5606         rd_kafka_destroy(rk);
5607 }
5608 
5609 
5610 
5611 /**
5612  * @brief Create a standalone mock cluster that can be used by multiple
5613  *        rd_kafka_t instances.
5614  */
test_mock_cluster_new(int broker_cnt,const char ** bootstraps)5615 rd_kafka_mock_cluster_t *test_mock_cluster_new (int broker_cnt,
5616                                                 const char **bootstraps) {
5617         rd_kafka_t *rk;
5618         rd_kafka_conf_t *conf = rd_kafka_conf_new();
5619         rd_kafka_mock_cluster_t *mcluster;
5620         char errstr[256];
5621 
5622         test_conf_common_init(conf, 0);
5623 
5624         test_conf_set(conf, "client.id", "MOCK");
5625 
5626         rk = rd_kafka_new(RD_KAFKA_PRODUCER, conf, errstr, sizeof(errstr));
5627         TEST_ASSERT(rk, "Failed to create mock cluster rd_kafka_t: %s", errstr);
5628 
5629         mcluster = rd_kafka_mock_cluster_new(rk, broker_cnt);
5630         TEST_ASSERT(mcluster, "Failed to acquire mock cluster");
5631 
5632         if (bootstraps)
5633                 *bootstraps = rd_kafka_mock_cluster_bootstraps(mcluster);
5634 
5635         return mcluster;
5636 }
5637