1 /*
2 * librdkafka - Apache Kafka C library
3 *
4 * Copyright (c) 2012-2013, Magnus Edenhill
5 * All rights reserved.
6 *
7 * Redistribution and use in source and binary forms, with or without
8 * modification, are permitted provided that the following conditions are met:
9 *
10 * 1. Redistributions of source code must retain the above copyright notice,
11 * this list of conditions and the following disclaimer.
12 * 2. Redistributions in binary form must reproduce the above copyright notice,
13 * this list of conditions and the following disclaimer in the documentation
14 * and/or other materials provided with the distribution.
15 *
16 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
17 * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
18 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
19 * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
20 * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
21 * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
22 * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
23 * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
24 * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
25 * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
26 * POSSIBILITY OF SUCH DAMAGE.
27 */
28
29
30 #define _CRT_RAND_S // rand_s() on MSVC
31 #include <stdarg.h>
32 #include "test.h"
33 #include <signal.h>
34 #include <stdlib.h>
35 #include <stdio.h>
36
37 #ifdef _MSC_VER
38 #include <direct.h> /* _getcwd */
39 #else
40 #include <sys/wait.h> /* waitpid */
41 #endif
42
43 /* Typical include path would be <librdkafka/rdkafka.h>, but this program
44 * is built from within the librdkafka source tree and thus differs. */
45 #include "rdkafka.h"
46
47 int test_level = 2;
48 int test_seed = 0;
49
50 char test_mode[64] = "bare";
51 char test_scenario[64] = "default";
52 static volatile sig_atomic_t test_exit = 0;
53 static char test_topic_prefix[128] = "rdkafkatest";
54 static int test_topic_random = 0;
55 int tests_running_cnt = 0;
56 int test_concurrent_max = 5;
57 int test_assert_on_fail = 0;
58 double test_timeout_multiplier = 1.0;
59 static char *test_sql_cmd = NULL;
60 int test_session_timeout_ms = 6000;
61 int test_broker_version;
62 static const char *test_broker_version_str = "2.4.0.0";
63 int test_flags = 0;
64 int test_neg_flags = TEST_F_KNOWN_ISSUE;
65 /* run delete-test-topics.sh between each test (when concurrent_max = 1) */
66 static int test_delete_topics_between = 0;
67 static const char *test_git_version = "HEAD";
68 static const char *test_sockem_conf = "";
69 int test_on_ci = 0; /* Tests are being run on CI, be more forgiving
70 * with regards to timeouts, etc. */
71 int test_quick = 0; /** Run tests quickly */
72 int test_idempotent_producer = 0;
73 int test_rusage = 0; /**< Check resource usage */
74 /**< CPU speed calibration for rusage threshold checks.
75 * >1.0: CPU is slower than base line system,
76 * <1.0: CPU is faster than base line system. */
77 double test_rusage_cpu_calibration = 1.0;
78 static const char *tests_to_run = NULL; /* all */
79
80 static int show_summary = 1;
81 static int test_summary (int do_lock);
82
83 /**
84 * Protects shared state, such as tests[]
85 */
86 mtx_t test_mtx;
87 cnd_t test_cnd;
88
89 static const char *test_states[] = {
90 "DNS",
91 "SKIPPED",
92 "RUNNING",
93 "PASSED",
94 "FAILED",
95 };
96
97
98
99 #define _TEST_DECL(NAME) \
100 extern int main_ ## NAME (int, char **)
101 #define _TEST(NAME,FLAGS,...) \
102 { .name = # NAME, .mainfunc = main_ ## NAME, .flags = FLAGS, __VA_ARGS__ }
103
104
105 /**
106 * Declare all tests here
107 */
108 _TEST_DECL(0000_unittests);
109 _TEST_DECL(0001_multiobj);
110 _TEST_DECL(0002_unkpart);
111 _TEST_DECL(0003_msgmaxsize);
112 _TEST_DECL(0004_conf);
113 _TEST_DECL(0005_order);
114 _TEST_DECL(0006_symbols);
115 _TEST_DECL(0007_autotopic);
116 _TEST_DECL(0008_reqacks);
117 _TEST_DECL(0009_mock_cluster);
118 _TEST_DECL(0011_produce_batch);
119 _TEST_DECL(0012_produce_consume);
120 _TEST_DECL(0013_null_msgs);
121 _TEST_DECL(0014_reconsume_191);
122 _TEST_DECL(0015_offsets_seek);
123 _TEST_DECL(0016_client_swname);
124 _TEST_DECL(0017_compression);
125 _TEST_DECL(0018_cgrp_term);
126 _TEST_DECL(0019_list_groups);
127 _TEST_DECL(0020_destroy_hang);
128 _TEST_DECL(0021_rkt_destroy);
129 _TEST_DECL(0022_consume_batch);
130 _TEST_DECL(0025_timers);
131 _TEST_DECL(0026_consume_pause);
132 _TEST_DECL(0028_long_topicnames);
133 _TEST_DECL(0029_assign_offset);
134 _TEST_DECL(0030_offset_commit);
135 _TEST_DECL(0031_get_offsets);
136 _TEST_DECL(0033_regex_subscribe);
137 _TEST_DECL(0033_regex_subscribe_local);
138 _TEST_DECL(0034_offset_reset);
139 _TEST_DECL(0035_api_version);
140 _TEST_DECL(0036_partial_fetch);
141 _TEST_DECL(0037_destroy_hang_local);
142 _TEST_DECL(0038_performance);
143 _TEST_DECL(0039_event_dr);
144 _TEST_DECL(0039_event);
145 _TEST_DECL(0040_io_event);
146 _TEST_DECL(0041_fetch_max_bytes);
147 _TEST_DECL(0042_many_topics);
148 _TEST_DECL(0043_no_connection);
149 _TEST_DECL(0044_partition_cnt);
150 _TEST_DECL(0045_subscribe_update);
151 _TEST_DECL(0045_subscribe_update_topic_remove);
152 _TEST_DECL(0045_subscribe_update_non_exist_and_partchange);
153 _TEST_DECL(0046_rkt_cache);
154 _TEST_DECL(0047_partial_buf_tmout);
155 _TEST_DECL(0048_partitioner);
156 _TEST_DECL(0049_consume_conn_close);
157 _TEST_DECL(0050_subscribe_adds);
158 _TEST_DECL(0051_assign_adds);
159 _TEST_DECL(0052_msg_timestamps);
160 _TEST_DECL(0053_stats_timing);
161 _TEST_DECL(0053_stats);
162 _TEST_DECL(0054_offset_time);
163 _TEST_DECL(0055_producer_latency);
164 _TEST_DECL(0056_balanced_group_mt);
165 _TEST_DECL(0057_invalid_topic);
166 _TEST_DECL(0058_log);
167 _TEST_DECL(0059_bsearch);
168 _TEST_DECL(0060_op_prio);
169 _TEST_DECL(0061_consumer_lag);
170 _TEST_DECL(0062_stats_event);
171 _TEST_DECL(0063_clusterid);
172 _TEST_DECL(0064_interceptors);
173 _TEST_DECL(0065_yield);
174 _TEST_DECL(0066_plugins);
175 _TEST_DECL(0067_empty_topic);
176 _TEST_DECL(0068_produce_timeout);
177 _TEST_DECL(0069_consumer_add_parts);
178 _TEST_DECL(0070_null_empty);
179 _TEST_DECL(0072_headers_ut);
180 _TEST_DECL(0073_headers);
181 _TEST_DECL(0074_producev);
182 _TEST_DECL(0075_retry);
183 _TEST_DECL(0076_produce_retry);
184 _TEST_DECL(0077_compaction);
185 _TEST_DECL(0078_c_from_cpp);
186 _TEST_DECL(0079_fork);
187 _TEST_DECL(0080_admin_ut);
188 _TEST_DECL(0081_admin);
189 _TEST_DECL(0082_fetch_max_bytes);
190 _TEST_DECL(0083_cb_event);
191 _TEST_DECL(0084_destroy_flags_local);
192 _TEST_DECL(0084_destroy_flags);
193 _TEST_DECL(0085_headers);
194 _TEST_DECL(0086_purge_local);
195 _TEST_DECL(0086_purge_remote);
196 _TEST_DECL(0088_produce_metadata_timeout);
197 _TEST_DECL(0089_max_poll_interval);
198 _TEST_DECL(0090_idempotence);
199 _TEST_DECL(0091_max_poll_interval_timeout);
200 _TEST_DECL(0092_mixed_msgver);
201 _TEST_DECL(0093_holb_consumer);
202 _TEST_DECL(0094_idempotence_msg_timeout);
203 _TEST_DECL(0095_all_brokers_down);
204 _TEST_DECL(0097_ssl_verify);
205 _TEST_DECL(0098_consumer_txn);
206 _TEST_DECL(0099_commit_metadata);
207 _TEST_DECL(0100_thread_interceptors);
208 _TEST_DECL(0101_fetch_from_follower);
209 _TEST_DECL(0102_static_group_rebalance);
210 _TEST_DECL(0103_transactions_local);
211 _TEST_DECL(0103_transactions);
212 _TEST_DECL(0104_fetch_from_follower_mock);
213 _TEST_DECL(0105_transactions_mock);
214 _TEST_DECL(0106_cgrp_sess_timeout);
215 _TEST_DECL(0107_topic_recreate);
216
217 /* Manual tests */
218 _TEST_DECL(8000_idle);
219
220
221 /* Define test resource usage thresholds if the default limits
222 * are not tolerable.
223 *
224 * Fields:
225 * .ucpu - Max User CPU percentage (double)
226 * .scpu - Max System/Kernel CPU percentage (double)
227 * .rss - Max RSS (memory) in megabytes (double)
228 * .ctxsw - Max number of voluntary context switches (int)
229 *
230 * Also see test_rusage_check_thresholds() in rusage.c
231 *
232 * Make a comment in the _THRES() below why the extra thresholds are required.
233 *
234 * Usage:
235 * _TEST(00...., ...,
236 * _THRES(.ucpu = 15.0)), <-- Max 15% User CPU usage
237 */
238 #define _THRES(...) .rusage_thres = { __VA_ARGS__ }
239
240 /**
241 * Define all tests here
242 */
243 struct test tests[] = {
244 /* Special MAIN test to hold over-all timings, etc. */
245 { .name = "<MAIN>", .flags = TEST_F_LOCAL },
246 _TEST(0000_unittests, TEST_F_LOCAL,
247 /* The msgq insert order tests are heavy on
248 * user CPU (memory scan), RSS, and
249 * system CPU (lots of allocations -> madvise(2)). */
250 _THRES(.ucpu = 100.0, .scpu = 20.0, .rss = 900.0)),
251 _TEST(0001_multiobj, 0),
252 _TEST(0002_unkpart, 0),
253 _TEST(0003_msgmaxsize, 0),
254 _TEST(0004_conf, TEST_F_LOCAL),
255 _TEST(0005_order, 0),
256 _TEST(0006_symbols, TEST_F_LOCAL),
257 _TEST(0007_autotopic, 0),
258 _TEST(0008_reqacks, 0),
259 _TEST(0009_mock_cluster, TEST_F_LOCAL,
260 /* Mock cluster requires MsgVersion 2 */
261 TEST_BRKVER(0,11,0,0)),
262 _TEST(0011_produce_batch, 0,
263 /* Produces a lot of messages */
264 _THRES(.ucpu = 40.0, .scpu = 8.0)),
265 _TEST(0012_produce_consume, 0),
266 _TEST(0013_null_msgs, 0),
267 _TEST(0014_reconsume_191, 0),
268 _TEST(0015_offsets_seek, 0),
269 _TEST(0016_client_swname, 0),
270 _TEST(0017_compression, 0),
271 _TEST(0018_cgrp_term, 0, TEST_BRKVER(0,9,0,0)),
272 _TEST(0019_list_groups, 0, TEST_BRKVER(0,9,0,0)),
273 _TEST(0020_destroy_hang, 0, TEST_BRKVER(0,9,0,0)),
274 _TEST(0021_rkt_destroy, 0),
275 _TEST(0022_consume_batch, 0),
276 _TEST(0025_timers, TEST_F_LOCAL),
277 _TEST(0026_consume_pause, TEST_F_KNOWN_ISSUE, TEST_BRKVER(0,9,0,0),
278 .extra = "Fragile test due to #2190"),
279 _TEST(0028_long_topicnames, TEST_F_KNOWN_ISSUE, TEST_BRKVER(0,9,0,0),
280 .extra = "https://github.com/edenhill/librdkafka/issues/529"),
281 _TEST(0029_assign_offset, 0),
282 _TEST(0030_offset_commit, 0, TEST_BRKVER(0,9,0,0),
283 /* Loops over committed() until timeout */
284 _THRES(.ucpu = 10.0, .scpu = 5.0)),
285 _TEST(0031_get_offsets, 0),
286 _TEST(0033_regex_subscribe, 0, TEST_BRKVER(0,9,0,0)),
287 _TEST(0033_regex_subscribe_local, TEST_F_LOCAL),
288 _TEST(0034_offset_reset, 0),
289 _TEST(0035_api_version, 0),
290 _TEST(0036_partial_fetch, 0),
291 _TEST(0037_destroy_hang_local, TEST_F_LOCAL),
292 _TEST(0038_performance, 0,
293 /* Produces and consumes a lot of messages */
294 _THRES(.ucpu = 150.0, .scpu = 10)),
295 _TEST(0039_event_dr, 0),
296 _TEST(0039_event, TEST_F_LOCAL),
297 _TEST(0040_io_event, 0, TEST_BRKVER(0,9,0,0)),
298 _TEST(0041_fetch_max_bytes, 0,
299 /* Re-fetches large messages multiple times */
300 _THRES(.ucpu = 20.0, .scpu = 10.0)),
301 _TEST(0042_many_topics, 0),
302 _TEST(0043_no_connection, TEST_F_LOCAL),
303 _TEST(0044_partition_cnt, 0, TEST_BRKVER(1,0,0,0),
304 /* Produces a lot of messages */
305 _THRES(.ucpu = 30.0)),
306 _TEST(0045_subscribe_update, 0, TEST_BRKVER(0,9,0,0)),
307 _TEST(0045_subscribe_update_topic_remove, TEST_F_KNOWN_ISSUE,
308 TEST_BRKVER(0,9,0,0)),
309 _TEST(0045_subscribe_update_non_exist_and_partchange, 0,
310 TEST_BRKVER(0,9,0,0)),
311 _TEST(0046_rkt_cache, TEST_F_LOCAL),
312 _TEST(0047_partial_buf_tmout, TEST_F_KNOWN_ISSUE),
313 _TEST(0048_partitioner, 0,
314 /* Produces many small messages */
315 _THRES(.ucpu = 10.0, .scpu = 5.0)),
316 #if WITH_SOCKEM
317 _TEST(0049_consume_conn_close, TEST_F_SOCKEM, TEST_BRKVER(0,9,0,0)),
318 #endif
319 _TEST(0050_subscribe_adds, 0, TEST_BRKVER(0,9,0,0)),
320 _TEST(0051_assign_adds, 0, TEST_BRKVER(0,9,0,0)),
321 _TEST(0052_msg_timestamps, 0, TEST_BRKVER(0,10,0,0)),
322 _TEST(0053_stats_timing, TEST_F_LOCAL),
323 _TEST(0053_stats, 0),
324 _TEST(0054_offset_time, 0, TEST_BRKVER(0,10,1,0)),
325 _TEST(0055_producer_latency, TEST_F_KNOWN_ISSUE_WIN32),
326 _TEST(0056_balanced_group_mt, 0, TEST_BRKVER(0,9,0,0)),
327 _TEST(0057_invalid_topic, 0, TEST_BRKVER(0,9,0,0)),
328 _TEST(0058_log, TEST_F_LOCAL),
329 _TEST(0059_bsearch, 0, TEST_BRKVER(0,10,0,0)),
330 _TEST(0060_op_prio, 0, TEST_BRKVER(0,9,0,0)),
331 _TEST(0061_consumer_lag, 0),
332 _TEST(0062_stats_event, TEST_F_LOCAL),
333 _TEST(0063_clusterid, 0, TEST_BRKVER(0,10,1,0)),
334 _TEST(0064_interceptors, 0, TEST_BRKVER(0,9,0,0)),
335 _TEST(0065_yield, 0),
336 _TEST(0066_plugins,
337 TEST_F_LOCAL|TEST_F_KNOWN_ISSUE_WIN32|TEST_F_KNOWN_ISSUE_OSX,
338 .extra = "dynamic loading of tests might not be fixed for this platform"),
339 _TEST(0067_empty_topic, 0),
340 #if WITH_SOCKEM
341 _TEST(0068_produce_timeout, TEST_F_SOCKEM),
342 #endif
343 _TEST(0069_consumer_add_parts, TEST_F_KNOWN_ISSUE_WIN32,
344 TEST_BRKVER(1,0,0,0)),
345 _TEST(0070_null_empty, 0),
346 _TEST(0072_headers_ut, TEST_F_LOCAL),
347 _TEST(0073_headers, 0, TEST_BRKVER(0,11,0,0)),
348 _TEST(0074_producev, TEST_F_LOCAL),
349 #if WITH_SOCKEM
350 _TEST(0075_retry, TEST_F_SOCKEM),
351 #endif
352 _TEST(0076_produce_retry, TEST_F_SOCKEM),
353 _TEST(0077_compaction, 0,
354 /* The test itself requires message headers */
355 TEST_BRKVER(0,11,0,0)),
356 _TEST(0078_c_from_cpp, TEST_F_LOCAL),
357 _TEST(0079_fork, TEST_F_LOCAL|TEST_F_KNOWN_ISSUE,
358 .extra = "using a fork():ed rd_kafka_t is not supported and will "
359 "most likely hang"),
360 _TEST(0080_admin_ut, TEST_F_LOCAL),
361 _TEST(0081_admin, 0, TEST_BRKVER(0,10,2,0)),
362 _TEST(0082_fetch_max_bytes, 0, TEST_BRKVER(0,10,1,0)),
363 _TEST(0083_cb_event, 0, TEST_BRKVER(0,9,0,0)),
364 _TEST(0084_destroy_flags_local, TEST_F_LOCAL),
365 _TEST(0084_destroy_flags, 0),
366 _TEST(0085_headers, 0, TEST_BRKVER(0,11,0,0)),
367 _TEST(0086_purge_local, TEST_F_LOCAL),
368 _TEST(0086_purge_remote, 0),
369 #if WITH_SOCKEM
370 _TEST(0088_produce_metadata_timeout, TEST_F_SOCKEM),
371 #endif
372 _TEST(0089_max_poll_interval, 0, TEST_BRKVER(0,10,1,0)),
373 _TEST(0090_idempotence, 0, TEST_BRKVER(0,11,0,0)),
374 _TEST(0091_max_poll_interval_timeout, 0, TEST_BRKVER(0,10,1,0)),
375 _TEST(0092_mixed_msgver, 0, TEST_BRKVER(0,11,0,0)),
376 _TEST(0093_holb_consumer, 0, TEST_BRKVER(0,10,1,0)),
377 #if WITH_SOCKEM
378 _TEST(0094_idempotence_msg_timeout, TEST_F_SOCKEM,
379 TEST_BRKVER(0,11,0,0)),
380 #endif
381 _TEST(0095_all_brokers_down, TEST_F_LOCAL),
382 _TEST(0097_ssl_verify, 0),
383 _TEST(0098_consumer_txn, 0, TEST_BRKVER(0,11,0,0)),
384 _TEST(0099_commit_metadata, 0),
385 _TEST(0100_thread_interceptors, TEST_F_LOCAL),
386 _TEST(0101_fetch_from_follower, 0, TEST_BRKVER(2,4,0,0)),
387 _TEST(0102_static_group_rebalance, 0,
388 TEST_BRKVER(2,3,0,0)),
389 _TEST(0103_transactions_local, TEST_F_LOCAL),
390 _TEST(0103_transactions, 0, TEST_BRKVER(0, 11, 0, 0)),
391 _TEST(0104_fetch_from_follower_mock, TEST_F_LOCAL,
392 TEST_BRKVER(2,4,0,0)),
393 _TEST(0105_transactions_mock, TEST_F_LOCAL, TEST_BRKVER(0,11,0,0)),
394 _TEST(0106_cgrp_sess_timeout, TEST_F_LOCAL, TEST_BRKVER(0,11,0,0)),
395 _TEST(0107_topic_recreate, 0, TEST_BRKVER_TOPIC_ADMINAPI,
396 .scenario = "noautocreate"),
397
398 /* Manual tests */
399 _TEST(8000_idle, TEST_F_MANUAL),
400
401 { NULL }
402 };
403
404
405 RD_TLS struct test *test_curr = &tests[0];
406
407
408
409 #if WITH_SOCKEM
410 /**
411 * Socket network emulation with sockem
412 */
413
test_socket_add(struct test * test,sockem_t * skm)414 static void test_socket_add (struct test *test, sockem_t *skm) {
415 TEST_LOCK();
416 rd_list_add(&test->sockets, skm);
417 TEST_UNLOCK();
418 }
419
test_socket_del(struct test * test,sockem_t * skm,int do_lock)420 static void test_socket_del (struct test *test, sockem_t *skm, int do_lock) {
421 if (do_lock)
422 TEST_LOCK();
423 /* Best effort, skm might not have been added if connect_cb failed */
424 rd_list_remove(&test->sockets, skm);
425 if (do_lock)
426 TEST_UNLOCK();
427 }
428
test_socket_sockem_set_all(const char * key,int val)429 int test_socket_sockem_set_all (const char *key, int val) {
430 int i;
431 sockem_t *skm;
432 int cnt = 0;
433
434 TEST_LOCK();
435
436 cnt = rd_list_cnt(&test_curr->sockets);
437 TEST_SAY("Setting sockem %s=%d on %s%d socket(s)\n", key, val,
438 cnt > 0 ? "" : _C_RED, cnt);
439
440 RD_LIST_FOREACH(skm, &test_curr->sockets, i) {
441 if (sockem_set(skm, key, val, NULL) == -1)
442 TEST_FAIL("sockem_set(%s, %d) failed", key, val);
443 }
444
445 TEST_UNLOCK();
446
447 return cnt;
448 }
449
test_socket_sockem_set(int s,const char * key,int value)450 void test_socket_sockem_set (int s, const char *key, int value) {
451 sockem_t *skm;
452
453 TEST_LOCK();
454 skm = sockem_find(s);
455 if (skm)
456 sockem_set(skm, key, value, NULL);
457 TEST_UNLOCK();
458 }
459
test_socket_close_all(struct test * test,int reinit)460 void test_socket_close_all (struct test *test, int reinit) {
461 TEST_LOCK();
462 rd_list_destroy(&test->sockets);
463 if (reinit)
464 rd_list_init(&test->sockets, 16, (void *)sockem_close);
465 TEST_UNLOCK();
466 }
467
468
test_connect_cb(int s,const struct sockaddr * addr,int addrlen,const char * id,void * opaque)469 static int test_connect_cb (int s, const struct sockaddr *addr,
470 int addrlen, const char *id, void *opaque) {
471 struct test *test = opaque;
472 sockem_t *skm;
473 int r;
474
475 skm = sockem_connect(s, addr, addrlen, test_sockem_conf, 0, NULL);
476 if (!skm)
477 return errno;
478
479 if (test->connect_cb) {
480 r = test->connect_cb(test, skm, id);
481 if (r)
482 return r;
483 }
484
485 test_socket_add(test, skm);
486
487 return 0;
488 }
489
test_closesocket_cb(int s,void * opaque)490 static int test_closesocket_cb (int s, void *opaque) {
491 struct test *test = opaque;
492 sockem_t *skm;
493
494 TEST_LOCK();
495 skm = sockem_find(s);
496 if (skm) {
497 sockem_close(skm);
498 test_socket_del(test, skm, 0/*nolock*/);
499 } else {
500 #ifdef _MSC_VER
501 closesocket(s);
502 #else
503 close(s);
504 #endif
505 }
506 TEST_UNLOCK();
507
508 return 0;
509 }
510
511
test_socket_enable(rd_kafka_conf_t * conf)512 void test_socket_enable (rd_kafka_conf_t *conf) {
513 rd_kafka_conf_set_connect_cb(conf, test_connect_cb);
514 rd_kafka_conf_set_closesocket_cb(conf, test_closesocket_cb);
515 rd_kafka_conf_set_opaque(conf, test_curr);
516 }
517 #endif /* WITH_SOCKEM */
518
519
test_error_cb(rd_kafka_t * rk,int err,const char * reason,void * opaque)520 static void test_error_cb (rd_kafka_t *rk, int err,
521 const char *reason, void *opaque) {
522 if (test_curr->is_fatal_cb && !test_curr->is_fatal_cb(rk, err, reason)) {
523 TEST_SAY(_C_YEL "%s rdkafka error (non-testfatal): %s: %s\n",
524 rd_kafka_name(rk), rd_kafka_err2str(err), reason);
525 } else {
526 if (err == RD_KAFKA_RESP_ERR__FATAL) {
527 char errstr[512];
528 TEST_SAY(_C_RED "%s Fatal error: %s\n",
529 rd_kafka_name(rk), reason);
530
531 err = rd_kafka_fatal_error(rk, errstr, sizeof(errstr));
532
533 if (test_curr->is_fatal_cb &&
534 !test_curr->is_fatal_cb(rk, err, reason))
535 TEST_SAY(_C_YEL
536 "%s rdkafka ignored FATAL error: "
537 "%s: %s\n",
538 rd_kafka_name(rk),
539 rd_kafka_err2str(err), errstr);
540 else
541 TEST_FAIL("%s rdkafka FATAL error: %s: %s",
542 rd_kafka_name(rk),
543 rd_kafka_err2str(err), errstr);
544
545 } else {
546 TEST_FAIL("%s rdkafka error: %s: %s",
547 rd_kafka_name(rk),
548 rd_kafka_err2str(err), reason);
549 }
550 }
551 }
552
test_stats_cb(rd_kafka_t * rk,char * json,size_t json_len,void * opaque)553 static int test_stats_cb (rd_kafka_t *rk, char *json, size_t json_len,
554 void *opaque) {
555 struct test *test = test_curr;
556 if (test->stats_fp)
557 fprintf(test->stats_fp,
558 "{\"test\": \"%s\", \"instance\":\"%s\", "
559 "\"stats\": %s}\n",
560 test->name, rd_kafka_name(rk), json);
561 return 0;
562 }
563
564
565 /**
566 * @brief Limit the test run time (in seconds)
567 */
test_timeout_set(int timeout)568 void test_timeout_set (int timeout) {
569 TEST_LOCK();
570 TEST_SAY("Setting test timeout to %ds * %.1f\n",
571 timeout, test_timeout_multiplier);
572 timeout = (int)((double)timeout * test_timeout_multiplier);
573 test_curr->timeout = test_clock() + (timeout * 1000000);
574 TEST_UNLOCK();
575 }
576
tmout_multip(int msecs)577 int tmout_multip (int msecs) {
578 int r;
579 TEST_LOCK();
580 r = (int)(((double)(msecs)) * test_timeout_multiplier);
581 TEST_UNLOCK();
582 return r;
583 }
584
585
586
587 #ifdef _MSC_VER
test_init_win32(void)588 static void test_init_win32 (void) {
589 /* Enable VT emulation to support colored output. */
590 HANDLE hOut = GetStdHandle(STD_OUTPUT_HANDLE);
591 DWORD dwMode = 0;
592
593 if (hOut == INVALID_HANDLE_VALUE ||
594 !GetConsoleMode(hOut, &dwMode))
595 return;
596
597 #ifndef ENABLE_VIRTUAL_TERMINAL_PROCESSING
598 #define ENABLE_VIRTUAL_TERMINAL_PROCESSING 0x4
599 #endif
600 dwMode |= ENABLE_VIRTUAL_TERMINAL_PROCESSING;
601 SetConsoleMode(hOut, dwMode);
602 }
603 #endif
604
605
test_init(void)606 static void test_init (void) {
607 int seed;
608 const char *tmp;
609
610
611 if (test_seed)
612 return;
613
614 if ((tmp = test_getenv("TEST_LEVEL", NULL)))
615 test_level = atoi(tmp);
616 if ((tmp = test_getenv("TEST_MODE", NULL)))
617 strncpy(test_mode, tmp, sizeof(test_mode)-1);
618 if ((tmp = test_getenv("TEST_SCENARIO", NULL)))
619 strncpy(test_scenario, tmp, sizeof(test_scenario)-1);
620 if ((tmp = test_getenv("TEST_SOCKEM", NULL)))
621 test_sockem_conf = tmp;
622 if ((tmp = test_getenv("TEST_SEED", NULL)))
623 seed = atoi(tmp);
624 else
625 seed = test_clock() & 0xffffffff;
626 if ((tmp = test_getenv("TEST_CPU_CALIBRATION", NULL))) {
627 test_rusage_cpu_calibration = strtod(tmp, NULL);
628 if (test_rusage_cpu_calibration < 0.00001) {
629 fprintf(stderr,
630 "%% Invalid CPU calibration "
631 "value (from TEST_CPU_CALIBRATION env): %s\n",
632 tmp);
633 exit(1);
634 }
635 }
636
637 #ifdef _MSC_VER
638 test_init_win32();
639 {
640 LARGE_INTEGER cycl;
641 QueryPerformanceCounter(&cycl);
642 seed = (int)cycl.QuadPart;
643 }
644 #endif
645 srand(seed);
646 test_seed = seed;
647 }
648
649
test_mk_topic_name(const char * suffix,int randomized)650 const char *test_mk_topic_name (const char *suffix, int randomized) {
651 static RD_TLS char ret[512];
652
653 /* Strip main_ prefix (caller is using __FUNCTION__) */
654 if (!strncmp(suffix, "main_", 5))
655 suffix += 5;
656
657 if (test_topic_random || randomized)
658 rd_snprintf(ret, sizeof(ret), "%s_rnd%"PRIx64"_%s",
659 test_topic_prefix, test_id_generate(), suffix);
660 else
661 rd_snprintf(ret, sizeof(ret), "%s_%s", test_topic_prefix, suffix);
662
663 TEST_SAY("Using topic \"%s\"\n", ret);
664
665 return ret;
666 }
667
668
669 /**
670 * @brief Set special test config property
671 * @returns 1 if property was known, else 0.
672 */
test_set_special_conf(const char * name,const char * val,int * timeoutp)673 int test_set_special_conf (const char *name, const char *val, int *timeoutp) {
674 if (!strcmp(name, "test.timeout.multiplier")) {
675 TEST_LOCK();
676 test_timeout_multiplier = strtod(val, NULL);
677 TEST_UNLOCK();
678 *timeoutp = tmout_multip((*timeoutp)*1000) / 1000;
679 } else if (!strcmp(name, "test.topic.prefix")) {
680 rd_snprintf(test_topic_prefix, sizeof(test_topic_prefix),
681 "%s", val);
682 } else if (!strcmp(name, "test.topic.random")) {
683 if (!strcmp(val, "true") ||
684 !strcmp(val, "1"))
685 test_topic_random = 1;
686 else
687 test_topic_random = 0;
688 } else if (!strcmp(name, "test.concurrent.max")) {
689 TEST_LOCK();
690 test_concurrent_max = (int)strtod(val, NULL);
691 TEST_UNLOCK();
692 } else if (!strcmp(name, "test.sql.command")) {
693 TEST_LOCK();
694 if (test_sql_cmd)
695 rd_free(test_sql_cmd);
696 test_sql_cmd = rd_strdup(val);
697 TEST_UNLOCK();
698 } else
699 return 0;
700
701 return 1;
702 }
703
test_read_conf_file(const char * conf_path,rd_kafka_conf_t * conf,rd_kafka_topic_conf_t * topic_conf,int * timeoutp)704 static void test_read_conf_file (const char *conf_path,
705 rd_kafka_conf_t *conf,
706 rd_kafka_topic_conf_t *topic_conf,
707 int *timeoutp) {
708 FILE *fp;
709 char buf[1024];
710 int line = 0;
711
712 #ifndef _MSC_VER
713 fp = fopen(conf_path, "r");
714 #else
715 fp = NULL;
716 errno = fopen_s(&fp, conf_path, "r");
717 #endif
718 if (!fp) {
719 if (errno == ENOENT) {
720 TEST_SAY("Test config file %s not found\n", conf_path);
721 return;
722 } else
723 TEST_FAIL("Failed to read %s: %s",
724 conf_path, strerror(errno));
725 }
726
727 while (fgets(buf, sizeof(buf)-1, fp)) {
728 char *t;
729 char *b = buf;
730 rd_kafka_conf_res_t res = RD_KAFKA_CONF_UNKNOWN;
731 char *name, *val;
732 char errstr[512];
733
734 line++;
735 if ((t = strchr(b, '\n')))
736 *t = '\0';
737
738 if (*b == '#' || !*b)
739 continue;
740
741 if (!(t = strchr(b, '=')))
742 TEST_FAIL("%s:%i: expected name=value format\n",
743 conf_path, line);
744
745 name = b;
746 *t = '\0';
747 val = t+1;
748
749 if (test_set_special_conf(name, val, timeoutp))
750 continue;
751
752 if (!strncmp(name, "topic.", strlen("topic."))) {
753 name += strlen("topic.");
754 if (topic_conf)
755 res = rd_kafka_topic_conf_set(topic_conf,
756 name, val,
757 errstr,
758 sizeof(errstr));
759 else
760 res = RD_KAFKA_CONF_OK;
761 name -= strlen("topic.");
762 }
763
764 if (res == RD_KAFKA_CONF_UNKNOWN) {
765 if (conf)
766 res = rd_kafka_conf_set(conf,
767 name, val,
768 errstr, sizeof(errstr));
769 else
770 res = RD_KAFKA_CONF_OK;
771 }
772
773 if (res != RD_KAFKA_CONF_OK)
774 TEST_FAIL("%s:%i: %s\n",
775 conf_path, line, errstr);
776 }
777
778 fclose(fp);
779 }
780
781 /**
782 * @brief Get path to test config file
783 */
test_conf_get_path(void)784 const char *test_conf_get_path (void) {
785 return test_getenv("RDKAFKA_TEST_CONF", "test.conf");
786 }
787
test_getenv(const char * env,const char * def)788 const char *test_getenv (const char *env, const char *def) {
789 return rd_getenv(env, def);
790 }
791
test_conf_common_init(rd_kafka_conf_t * conf,int timeout)792 void test_conf_common_init (rd_kafka_conf_t *conf, int timeout) {
793 if (conf) {
794 const char *tmp = test_getenv("TEST_DEBUG", NULL);
795 if (tmp)
796 test_conf_set(conf, "debug", tmp);
797 }
798
799 if (timeout)
800 test_timeout_set(timeout);
801 }
802
803
804 /**
805 * Creates and sets up kafka configuration objects.
806 * Will read "test.conf" file if it exists.
807 */
test_conf_init(rd_kafka_conf_t ** conf,rd_kafka_topic_conf_t ** topic_conf,int timeout)808 void test_conf_init (rd_kafka_conf_t **conf, rd_kafka_topic_conf_t **topic_conf,
809 int timeout) {
810 const char *test_conf = test_conf_get_path();
811
812 if (conf) {
813 *conf = rd_kafka_conf_new();
814 rd_kafka_conf_set(*conf, "client.id", test_curr->name, NULL, 0);
815 if (test_idempotent_producer)
816 test_conf_set(*conf, "enable.idempotence", "true");
817 rd_kafka_conf_set_error_cb(*conf, test_error_cb);
818 rd_kafka_conf_set_stats_cb(*conf, test_stats_cb);
819
820 /* Allow higher request timeouts on CI */
821 if (test_on_ci)
822 test_conf_set(*conf, "request.timeout.ms", "10000");
823
824 #ifdef SIGIO
825 {
826 char buf[64];
827
828 /* Quick termination */
829 rd_snprintf(buf, sizeof(buf), "%i", SIGIO);
830 rd_kafka_conf_set(*conf, "internal.termination.signal",
831 buf, NULL, 0);
832 signal(SIGIO, SIG_IGN);
833 }
834 #endif
835 }
836
837 #if WITH_SOCKEM
838 if (*test_sockem_conf && conf)
839 test_socket_enable(*conf);
840 #endif
841
842 if (topic_conf)
843 *topic_conf = rd_kafka_topic_conf_new();
844
845 /* Open and read optional local test configuration file, if any. */
846 test_read_conf_file(test_conf,
847 conf ? *conf : NULL,
848 topic_conf ? *topic_conf : NULL, &timeout);
849
850 test_conf_common_init(conf ? *conf : NULL, timeout);
851 }
852
853
test_rand(void)854 static RD_INLINE unsigned int test_rand(void) {
855 unsigned int r;
856 #if _MSC_VER
857 rand_s(&r);
858 #else
859 r = rand();
860 #endif
861 return r;
862 }
863 /**
864 * Generate a "unique" test id.
865 */
test_id_generate(void)866 uint64_t test_id_generate (void) {
867 return (((uint64_t)test_rand()) << 32) | (uint64_t)test_rand();
868 }
869
870
871 /**
872 * Generate a "unique" string id
873 */
test_str_id_generate(char * dest,size_t dest_size)874 char *test_str_id_generate (char *dest, size_t dest_size) {
875 rd_snprintf(dest, dest_size, "%"PRId64, test_id_generate());
876 return dest;
877 }
878
879 /**
880 * Same as test_str_id_generate but returns a temporary string.
881 */
test_str_id_generate_tmp(void)882 const char *test_str_id_generate_tmp (void) {
883 static RD_TLS char ret[64];
884 return test_str_id_generate(ret, sizeof(ret));
885 }
886
887 /**
888 * Format a message token.
889 * Pad's to dest_size.
890 */
test_msg_fmt(char * dest,size_t dest_size,uint64_t testid,int32_t partition,int msgid)891 void test_msg_fmt (char *dest, size_t dest_size,
892 uint64_t testid, int32_t partition, int msgid) {
893 size_t of;
894
895 of = rd_snprintf(dest, dest_size,
896 "testid=%"PRIu64", partition=%"PRId32", msg=%i\n",
897 testid, partition, msgid);
898 if (of < dest_size - 1) {
899 memset(dest+of, '!', dest_size-of);
900 dest[dest_size-1] = '\0';
901 }
902 }
903
904 /**
905 * @brief Prepare message value and key for test produce.
906 */
test_prepare_msg(uint64_t testid,int32_t partition,int msg_id,char * val,size_t val_size,char * key,size_t key_size)907 void test_prepare_msg (uint64_t testid, int32_t partition, int msg_id,
908 char *val, size_t val_size,
909 char *key, size_t key_size) {
910 size_t of = 0;
911
912 test_msg_fmt(key, key_size, testid, partition, msg_id);
913
914 while (of < val_size) {
915 /* Copy-repeat key into val until val_size */
916 size_t len = RD_MIN(val_size-of, key_size);
917 memcpy(val+of, key, len);
918 of += len;
919 }
920 }
921
922
923
924 /**
925 * Parse a message token
926 */
test_msg_parse00(const char * func,int line,uint64_t testid,int32_t exp_partition,int * msgidp,const char * topic,int32_t partition,int64_t offset,const char * key,size_t key_size)927 void test_msg_parse00 (const char *func, int line,
928 uint64_t testid, int32_t exp_partition, int *msgidp,
929 const char *topic, int32_t partition, int64_t offset,
930 const char *key, size_t key_size) {
931 char buf[128];
932 uint64_t in_testid;
933 int in_part;
934
935 if (!key)
936 TEST_FAIL("%s:%i: Message (%s [%"PRId32"] @ %"PRId64") "
937 "has empty key\n",
938 func, line, topic, partition, offset);
939
940 rd_snprintf(buf, sizeof(buf), "%.*s", (int)key_size, key);
941
942 if (sscanf(buf, "testid=%"SCNu64", partition=%i, msg=%i\n",
943 &in_testid, &in_part, msgidp) != 3)
944 TEST_FAIL("%s:%i: Incorrect key format: %s", func, line, buf);
945
946
947 if (testid != in_testid ||
948 (exp_partition != -1 && exp_partition != in_part))
949 TEST_FAIL("%s:%i: Our testid %"PRIu64", part %i did "
950 "not match message: \"%s\"\n",
951 func, line, testid, (int)exp_partition, buf);
952 }
953
test_msg_parse0(const char * func,int line,uint64_t testid,rd_kafka_message_t * rkmessage,int32_t exp_partition,int * msgidp)954 void test_msg_parse0 (const char *func, int line,
955 uint64_t testid, rd_kafka_message_t *rkmessage,
956 int32_t exp_partition, int *msgidp) {
957 test_msg_parse00(func, line, testid, exp_partition, msgidp,
958 rd_kafka_topic_name(rkmessage->rkt),
959 rkmessage->partition, rkmessage->offset,
960 (const char *)rkmessage->key, rkmessage->key_len);
961 }
962
963
964 struct run_args {
965 struct test *test;
966 int argc;
967 char **argv;
968 };
969
run_test0(struct run_args * run_args)970 static int run_test0 (struct run_args *run_args) {
971 struct test *test = run_args->test;
972 test_timing_t t_run;
973 int r;
974 char stats_file[256];
975
976 rd_snprintf(stats_file, sizeof(stats_file), "stats_%s_%"PRIu64".json",
977 test->name, test_id_generate());
978 if (!(test->stats_fp = fopen(stats_file, "w+")))
979 TEST_SAY("=== Failed to create stats file %s: %s ===\n",
980 stats_file, strerror(errno));
981
982 test_curr = test;
983
984 #if WITH_SOCKEM
985 rd_list_init(&test->sockets, 16, (void *)sockem_close);
986 #endif
987 /* Don't check message status by default */
988 test->exp_dr_status = (rd_kafka_msg_status_t)-1;
989
990 TEST_SAY("================= Running test %s =================\n",
991 test->name);
992 if (test->stats_fp)
993 TEST_SAY("==== Stats written to file %s ====\n", stats_file);
994
995 test_rusage_start(test_curr);
996 TIMING_START(&t_run, "%s", test->name);
997 test->start = t_run.ts_start;
998
999 /* Run test main function */
1000 r = test->mainfunc(run_args->argc, run_args->argv);
1001
1002 TIMING_STOP(&t_run);
1003 test_rusage_stop(test_curr,
1004 (double)TIMING_DURATION(&t_run) / 1000000.0);
1005
1006 TEST_LOCK();
1007 test->duration = TIMING_DURATION(&t_run);
1008
1009 if (test->state == TEST_SKIPPED) {
1010 TEST_SAY("================= Test %s SKIPPED "
1011 "=================\n",
1012 run_args->test->name);
1013 } else if (r) {
1014 test->state = TEST_FAILED;
1015 TEST_SAY("\033[31m"
1016 "================= Test %s FAILED ================="
1017 "\033[0m\n",
1018 run_args->test->name);
1019 } else {
1020 test->state = TEST_PASSED;
1021 TEST_SAY("\033[32m"
1022 "================= Test %s PASSED ================="
1023 "\033[0m\n",
1024 run_args->test->name);
1025 }
1026 TEST_UNLOCK();
1027
1028 cnd_broadcast(&test_cnd);
1029
1030 #if WITH_SOCKEM
1031 test_socket_close_all(test, 0);
1032 #endif
1033
1034 if (test->stats_fp) {
1035 long pos = ftell(test->stats_fp);
1036 fclose(test->stats_fp);
1037 test->stats_fp = NULL;
1038 /* Delete file if nothing was written */
1039 if (pos == 0) {
1040 #ifndef _MSC_VER
1041 unlink(stats_file);
1042 #else
1043 _unlink(stats_file);
1044 #endif
1045 }
1046 }
1047
1048 if (test_delete_topics_between && test_concurrent_max == 1)
1049 test_delete_all_test_topics(60*1000);
1050
1051 return r;
1052 }
1053
1054
1055
1056
run_test_from_thread(void * arg)1057 static int run_test_from_thread (void *arg) {
1058 struct run_args *run_args = arg;
1059
1060 thrd_detach(thrd_current());
1061
1062 run_test0(run_args);
1063
1064 TEST_LOCK();
1065 tests_running_cnt--;
1066 TEST_UNLOCK();
1067
1068 free(run_args);
1069
1070 return 0;
1071 }
1072
1073
1074 /**
1075 * @brief Check running tests for timeouts.
1076 * @locks TEST_LOCK MUST be held
1077 */
check_test_timeouts(void)1078 static void check_test_timeouts (void) {
1079 int64_t now = test_clock();
1080 struct test *test;
1081
1082 for (test = tests ; test->name ; test++) {
1083 if (test->state != TEST_RUNNING)
1084 continue;
1085
1086 /* Timeout check */
1087 if (now > test->timeout) {
1088 struct test *save_test = test_curr;
1089 test_curr = test;
1090 test->state = TEST_FAILED;
1091 test_summary(0/*no-locks*/);
1092 TEST_FAIL0(__FILE__,__LINE__,0/*nolock*/,
1093 0/*fail-later*/,
1094 "Test %s timed out "
1095 "(timeout set to %d seconds)\n",
1096 test->name,
1097 (int)(test->timeout-
1098 test->start)/
1099 1000000);
1100 test_curr = save_test;
1101 tests_running_cnt--; /* fail-later misses this*/
1102 #ifdef _MSC_VER
1103 TerminateThread(test->thrd, -1);
1104 #else
1105 pthread_kill(test->thrd, SIGKILL);
1106 #endif
1107 }
1108 }
1109 }
1110
1111
run_test(struct test * test,int argc,char ** argv)1112 static int run_test (struct test *test, int argc, char **argv) {
1113 struct run_args *run_args = calloc(1, sizeof(*run_args));
1114 int wait_cnt = 0;
1115
1116 run_args->test = test;
1117 run_args->argc = argc;
1118 run_args->argv = argv;
1119
1120 TEST_LOCK();
1121 while (tests_running_cnt >= test_concurrent_max) {
1122 if (!(wait_cnt++ % 100))
1123 TEST_SAY("Too many tests running (%d >= %d): "
1124 "postponing %s start...\n",
1125 tests_running_cnt, test_concurrent_max,
1126 test->name);
1127 cnd_timedwait_ms(&test_cnd, &test_mtx, 100);
1128
1129 check_test_timeouts();
1130 }
1131 tests_running_cnt++;
1132 test->timeout = test_clock() + (int64_t)(30.0 * 1000000.0 *
1133 test_timeout_multiplier);
1134 test->state = TEST_RUNNING;
1135 TEST_UNLOCK();
1136
1137 if (thrd_create(&test->thrd, run_test_from_thread, run_args) !=
1138 thrd_success) {
1139 TEST_LOCK();
1140 tests_running_cnt--;
1141 test->state = TEST_FAILED;
1142 TEST_UNLOCK();
1143
1144 TEST_FAIL("Failed to start thread for test %s\n",
1145 test->name);
1146 }
1147
1148 return 0;
1149 }
1150
run_tests(int argc,char ** argv)1151 static void run_tests (int argc, char **argv) {
1152 struct test *test;
1153
1154 for (test = tests ; test->name ; test++) {
1155 char testnum[128];
1156 char *t;
1157 const char *skip_reason = NULL;
1158 rd_bool_t skip_silent = rd_false;
1159 char tmp[128];
1160 const char *scenario =
1161 test->scenario ? test->scenario : "default";
1162
1163 if (!test->mainfunc)
1164 continue;
1165
1166 /* Extract test number, as string */
1167 strncpy(testnum, test->name, sizeof(testnum)-1);
1168 testnum[sizeof(testnum)-1] = '\0';
1169 if ((t = strchr(testnum, '_')))
1170 *t = '\0';
1171
1172 if ((test_flags && (test_flags & test->flags) != test_flags)) {
1173 skip_reason = "filtered due to test flags";
1174 skip_silent = rd_true;
1175 } if ((test_neg_flags & ~test_flags) & test->flags)
1176 skip_reason = "Filtered due to negative test flags";
1177 if (test_broker_version &&
1178 (test->minver > test_broker_version ||
1179 (test->maxver && test->maxver < test_broker_version))) {
1180 rd_snprintf(tmp, sizeof(tmp),
1181 "not applicable for broker "
1182 "version %d.%d.%d.%d",
1183 TEST_BRKVER_X(test_broker_version, 0),
1184 TEST_BRKVER_X(test_broker_version, 1),
1185 TEST_BRKVER_X(test_broker_version, 2),
1186 TEST_BRKVER_X(test_broker_version, 3));
1187 skip_reason = tmp;
1188 }
1189
1190 if (strcmp(scenario, test_scenario)) {
1191 rd_snprintf(tmp, sizeof(tmp),
1192 "requires test scenario %s", scenario);
1193 skip_silent = rd_true;
1194 skip_reason = tmp;
1195 }
1196
1197 if (tests_to_run && !strstr(tests_to_run, testnum)) {
1198 skip_reason = "not included in TESTS list";
1199 skip_silent = rd_true;
1200 } else if (!tests_to_run && (test->flags & TEST_F_MANUAL)) {
1201 skip_reason = "manual test";
1202 skip_silent = rd_true;
1203 }
1204
1205 if (!skip_reason) {
1206 run_test(test, argc, argv);
1207 } else {
1208 if (skip_silent) {
1209 TEST_SAYL(3,
1210 "================= Skipping test %s "
1211 "(%s) ================\n",
1212 test->name, skip_reason);
1213 TEST_LOCK();
1214 test->state = TEST_SKIPPED;
1215 TEST_UNLOCK();
1216 } else {
1217 test_curr = test;
1218 TEST_SKIP("%s\n", skip_reason);
1219 test_curr = &tests[0];
1220 }
1221
1222 }
1223 }
1224
1225
1226 }
1227
1228 /**
1229 * @brief Print summary for all tests.
1230 *
1231 * @returns the number of failed tests.
1232 */
test_summary(int do_lock)1233 static int test_summary (int do_lock) {
1234 struct test *test;
1235 FILE *report_fp;
1236 char report_path[128];
1237 time_t t;
1238 struct tm *tm;
1239 char datestr[64];
1240 int64_t total_duration = 0;
1241 int tests_run = 0;
1242 int tests_failed = 0;
1243 int tests_failed_known = 0;
1244 int tests_passed = 0;
1245 FILE *sql_fp = NULL;
1246 const char *tmp;
1247
1248 t = time(NULL);
1249 tm = localtime(&t);
1250 strftime(datestr, sizeof(datestr), "%Y%m%d%H%M%S", tm);
1251
1252 if ((tmp = test_getenv("TEST_REPORT", NULL)))
1253 rd_snprintf(report_path, sizeof(report_path), "%s", tmp);
1254 else
1255 rd_snprintf(report_path, sizeof(report_path),
1256 "test_report_%s.json", datestr);
1257
1258 report_fp = fopen(report_path, "w+");
1259 if (!report_fp)
1260 TEST_WARN("Failed to create report file %s: %s\n",
1261 report_path, strerror(errno));
1262 else
1263 fprintf(report_fp,
1264 "{ \"id\": \"%s_%s\", \"mode\": \"%s\", "
1265 "\"scenario\": \"%s\", "
1266 "\"date\": \"%s\", "
1267 "\"git_version\": \"%s\", "
1268 "\"broker_version\": \"%s\", "
1269 "\"tests\": {",
1270 datestr, test_mode, test_mode, test_scenario, datestr,
1271 test_git_version,
1272 test_broker_version_str);
1273
1274 if (do_lock)
1275 TEST_LOCK();
1276
1277 if (test_sql_cmd) {
1278 #ifdef _MSC_VER
1279 sql_fp = _popen(test_sql_cmd, "w");
1280 #else
1281 sql_fp = popen(test_sql_cmd, "w");
1282 #endif
1283
1284 fprintf(sql_fp,
1285 "CREATE TABLE IF NOT EXISTS "
1286 "runs(runid text PRIMARY KEY, mode text, "
1287 "date datetime, cnt int, passed int, failed int, "
1288 "duration numeric);\n"
1289 "CREATE TABLE IF NOT EXISTS "
1290 "tests(runid text, mode text, name text, state text, "
1291 "extra text, duration numeric);\n");
1292 }
1293
1294 if (show_summary)
1295 printf("TEST %s (%s, scenario %s) SUMMARY\n"
1296 "#==================================================================#\n",
1297 datestr, test_mode, test_scenario);
1298
1299 for (test = tests ; test->name ; test++) {
1300 const char *color;
1301 int64_t duration;
1302 char extra[128] = "";
1303 int do_count = 1;
1304
1305 if (!(duration = test->duration) && test->start > 0)
1306 duration = test_clock() - test->start;
1307
1308 if (test == tests) {
1309 /* <MAIN> test:
1310 * test accounts for total runtime.
1311 * dont include in passed/run/failed counts. */
1312 total_duration = duration;
1313 do_count = 0;
1314 }
1315
1316 switch (test->state)
1317 {
1318 case TEST_PASSED:
1319 color = _C_GRN;
1320 if (do_count) {
1321 tests_passed++;
1322 tests_run++;
1323 }
1324 break;
1325 case TEST_FAILED:
1326 if (test->flags & TEST_F_KNOWN_ISSUE) {
1327 rd_snprintf(extra, sizeof(extra),
1328 " <-- known issue%s%s",
1329 test->extra ? ": " : "",
1330 test->extra ? test->extra : "");
1331 if (do_count)
1332 tests_failed_known++;
1333 }
1334 color = _C_RED;
1335 if (do_count) {
1336 tests_failed++;
1337 tests_run++;
1338 }
1339 break;
1340 case TEST_RUNNING:
1341 color = _C_MAG;
1342 if (do_count) {
1343 tests_failed++; /* All tests should be finished */
1344 tests_run++;
1345 }
1346 break;
1347 case TEST_NOT_STARTED:
1348 color = _C_YEL;
1349 if (test->extra)
1350 rd_snprintf(extra, sizeof(extra), " %s",
1351 test->extra);
1352 break;
1353 default:
1354 color = _C_CYA;
1355 break;
1356 }
1357
1358 if (show_summary &&
1359 (test->state != TEST_SKIPPED || *test->failstr ||
1360 (tests_to_run &&
1361 !strncmp(tests_to_run, test->name,
1362 strlen(tests_to_run))))) {
1363 printf("|%s %-40s | %10s | %7.3fs %s|",
1364 color,
1365 test->name, test_states[test->state],
1366 (double)duration/1000000.0, _C_CLR);
1367 if (test->state == TEST_FAILED)
1368 printf(_C_RED " %s" _C_CLR, test->failstr);
1369 else if (test->state == TEST_SKIPPED)
1370 printf(_C_CYA " %s" _C_CLR, test->failstr);
1371 printf("%s\n", extra);
1372 }
1373
1374 if (report_fp) {
1375 int i;
1376 fprintf(report_fp,
1377 "%s\"%s\": {"
1378 "\"name\": \"%s\", "
1379 "\"state\": \"%s\", "
1380 "\"known_issue\": %s, "
1381 "\"extra\": \"%s\", "
1382 "\"duration\": %.3f, "
1383 "\"report\": [ ",
1384 test == tests ? "": ", ",
1385 test->name,
1386 test->name, test_states[test->state],
1387 test->flags & TEST_F_KNOWN_ISSUE ? "true":"false",
1388 test->extra ? test->extra : "",
1389 (double)duration/1000000.0);
1390
1391 for (i = 0 ; i < test->report_cnt ; i++) {
1392 fprintf(report_fp, "%s%s ",
1393 i == 0 ? "":",",
1394 test->report_arr[i]);
1395 }
1396
1397 fprintf(report_fp, "] }");
1398 }
1399
1400 if (sql_fp)
1401 fprintf(sql_fp,
1402 "INSERT INTO tests VALUES("
1403 "'%s_%s', '%s', '%s', '%s', '%s', %f);\n",
1404 datestr, test_mode, test_mode,
1405 test->name, test_states[test->state],
1406 test->extra ? test->extra : "",
1407 (double)duration/1000000.0);
1408 }
1409 if (do_lock)
1410 TEST_UNLOCK();
1411
1412 if (show_summary)
1413 printf("#==================================================================#\n");
1414
1415 if (report_fp) {
1416 fprintf(report_fp,
1417 "}, "
1418 "\"tests_run\": %d, "
1419 "\"tests_passed\": %d, "
1420 "\"tests_failed\": %d, "
1421 "\"duration\": %.3f"
1422 "}\n",
1423 tests_run, tests_passed, tests_failed,
1424 (double)total_duration/1000000.0);
1425
1426 fclose(report_fp);
1427 TEST_SAY("# Test report written to %s\n", report_path);
1428 }
1429
1430 if (sql_fp) {
1431 fprintf(sql_fp,
1432 "INSERT INTO runs VALUES('%s_%s', '%s', datetime(), "
1433 "%d, %d, %d, %f);\n",
1434 datestr, test_mode, test_mode,
1435 tests_run, tests_passed, tests_failed,
1436 (double)total_duration/1000000.0);
1437 fclose(sql_fp);
1438 }
1439
1440 return tests_failed - tests_failed_known;
1441 }
1442
1443 #ifndef _MSC_VER
test_sig_term(int sig)1444 static void test_sig_term (int sig) {
1445 if (test_exit)
1446 exit(1);
1447 fprintf(stderr, "Exiting tests, waiting for running tests to finish.\n");
1448 test_exit = 1;
1449 }
1450 #endif
1451
1452 /**
1453 * Wait 'timeout' seconds for rdkafka to kill all its threads and clean up.
1454 */
test_wait_exit(int timeout)1455 static void test_wait_exit (int timeout) {
1456 int r;
1457 time_t start = time(NULL);
1458
1459 while ((r = rd_kafka_thread_cnt()) && timeout-- >= 0) {
1460 TEST_SAY("%i thread(s) in use by librdkafka, waiting...\n", r);
1461 rd_sleep(1);
1462 }
1463
1464 TEST_SAY("%i thread(s) in use by librdkafka\n", r);
1465
1466 if (r > 0)
1467 TEST_FAIL("%i thread(s) still active in librdkafka", r);
1468
1469 timeout -= (int)(time(NULL) - start);
1470 if (timeout > 0) {
1471 TEST_SAY("Waiting %d seconds for all librdkafka memory "
1472 "to be released\n", timeout);
1473 if (rd_kafka_wait_destroyed(timeout * 1000) == -1)
1474 TEST_FAIL("Not all internal librdkafka "
1475 "objects destroyed\n");
1476 }
1477 }
1478
1479
1480
1481
1482 /**
1483 * @brief Test framework cleanup before termination.
1484 */
test_cleanup(void)1485 static void test_cleanup (void) {
1486 struct test *test;
1487
1488 /* Free report arrays */
1489 for (test = tests ; test->name ; test++) {
1490 int i;
1491 if (!test->report_arr)
1492 continue;
1493 for (i = 0 ; i < test->report_cnt ; i++)
1494 rd_free(test->report_arr[i]);
1495 rd_free(test->report_arr);
1496 test->report_arr = NULL;
1497 }
1498
1499 if (test_sql_cmd)
1500 rd_free(test_sql_cmd);
1501 }
1502
1503
main(int argc,char ** argv)1504 int main(int argc, char **argv) {
1505 int i, r;
1506 test_timing_t t_all;
1507 int a,b,c,d;
1508 const char *tmpver;
1509
1510 mtx_init(&test_mtx, mtx_plain);
1511 cnd_init(&test_cnd);
1512
1513 test_init();
1514
1515 #ifndef _MSC_VER
1516 signal(SIGINT, test_sig_term);
1517 #endif
1518 tests_to_run = test_getenv("TESTS", NULL);
1519 tmpver = test_getenv("TEST_KAFKA_VERSION", NULL);
1520 if (!tmpver)
1521 tmpver = test_getenv("KAFKA_VERSION", test_broker_version_str);
1522 test_broker_version_str = tmpver;
1523
1524 test_git_version = test_getenv("RDKAFKA_GITVER", "HEAD");
1525
1526 /* Are we running on CI? */
1527 if (test_getenv("CI", NULL)) {
1528 test_on_ci = 1;
1529 test_concurrent_max = 3;
1530 }
1531
1532 test_conf_init(NULL, NULL, 10);
1533
1534 for (i = 1 ; i < argc ; i++) {
1535 if (!strncmp(argv[i], "-p", 2) && strlen(argv[i]) > 2) {
1536 if (test_rusage) {
1537 fprintf(stderr,
1538 "%% %s ignored: -R takes preceedence\n",
1539 argv[i]);
1540 continue;
1541 }
1542 test_concurrent_max = (int)strtod(argv[i]+2, NULL);
1543 } else if (!strcmp(argv[i], "-l"))
1544 test_flags |= TEST_F_LOCAL;
1545 else if (!strcmp(argv[i], "-L"))
1546 test_neg_flags |= TEST_F_LOCAL;
1547 else if (!strcmp(argv[i], "-a"))
1548 test_assert_on_fail = 1;
1549 else if (!strcmp(argv[i], "-k"))
1550 test_flags |= TEST_F_KNOWN_ISSUE;
1551 else if (!strcmp(argv[i], "-K"))
1552 test_neg_flags |= TEST_F_KNOWN_ISSUE;
1553 else if (!strcmp(argv[i], "-E"))
1554 test_neg_flags |= TEST_F_SOCKEM;
1555 else if (!strcmp(argv[i], "-V") && i+1 < argc)
1556 test_broker_version_str = argv[++i];
1557 else if (!strcmp(argv[i], "-s") && i+1 < argc)
1558 strncpy(test_scenario, argv[i],
1559 sizeof(test_scenario)-1);
1560 else if (!strcmp(argv[i], "-S"))
1561 show_summary = 0;
1562 else if (!strcmp(argv[i], "-D"))
1563 test_delete_topics_between = 1;
1564 else if (!strcmp(argv[i], "-P"))
1565 test_idempotent_producer = 1;
1566 else if (!strcmp(argv[i], "-Q"))
1567 test_quick = 1;
1568 else if (!strncmp(argv[i], "-R", 2)) {
1569 test_rusage = 1;
1570 test_concurrent_max = 1;
1571 if (strlen(argv[i]) > strlen("-R")) {
1572 test_rusage_cpu_calibration =
1573 strtod(argv[i]+2, NULL);
1574 if (test_rusage_cpu_calibration < 0.00001) {
1575 fprintf(stderr,
1576 "%% Invalid CPU calibration "
1577 "value: %s\n", argv[i]+2);
1578 exit(1);
1579 }
1580 }
1581 } else if (*argv[i] != '-')
1582 tests_to_run = argv[i];
1583 else {
1584 printf("Unknown option: %s\n"
1585 "\n"
1586 "Usage: %s [options] [<test-match-substr>]\n"
1587 "Options:\n"
1588 " -p<N> Run N tests in parallel\n"
1589 " -l/-L Only/dont run local tests (no broker needed)\n"
1590 " -k/-K Only/dont run tests with known issues\n"
1591 " -E Don't run sockem tests\n"
1592 " -a Assert on failures\n"
1593 " -S Dont show test summary\n"
1594 " -s <scenario> Test scenario.\n"
1595 " -V <N.N.N.N> Broker version.\n"
1596 " -D Delete all test topics between each test (-p1) or after all tests\n"
1597 " -P Run all tests with `enable.idempotency=true`\n"
1598 " -Q Run tests in quick mode: faster tests, fewer iterations, less data.\n"
1599 " -R Check resource usage thresholds.\n"
1600 " -R<C> Check resource usage thresholds but adjust CPU thresholds by C (float):\n"
1601 " C < 1.0: CPU is faster than base line system.\n"
1602 " C > 1.0: CPU is slower than base line system.\n"
1603 " E.g. -R2.5 = CPU is 2.5x slower than base line system.\n"
1604 "\n"
1605 "Environment variables:\n"
1606 " TESTS - substring matched test to run (e.g., 0033)\n"
1607 " TEST_KAFKA_VERSION - broker version (e.g., 0.9.0.1)\n"
1608 " TEST_SCENARIO - Test scenario\n"
1609 " TEST_LEVEL - Test verbosity level\n"
1610 " TEST_MODE - bare, helgrind, valgrind\n"
1611 " TEST_SEED - random seed\n"
1612 " RDKAFKA_TEST_CONF - test config file (test.conf)\n"
1613 " KAFKA_PATH - Path to kafka source dir\n"
1614 " ZK_ADDRESS - Zookeeper address\n"
1615 "\n",
1616 argv[i], argv[0]);
1617 exit(1);
1618 }
1619 }
1620
1621 TEST_SAY("Git version: %s\n", test_git_version);
1622
1623 if (!strcmp(test_broker_version_str, "trunk"))
1624 test_broker_version_str = "9.9.9.9"; /* for now */
1625
1626 d = 0;
1627 if (sscanf(test_broker_version_str, "%d.%d.%d.%d",
1628 &a, &b, &c, &d) < 3) {
1629 printf("%% Expected broker version to be in format "
1630 "N.N.N (N=int), not %s\n",
1631 test_broker_version_str);
1632 exit(1);
1633 }
1634 test_broker_version = TEST_BRKVER(a, b, c, d);
1635 TEST_SAY("Broker version: %s (%d.%d.%d.%d)\n",
1636 test_broker_version_str,
1637 TEST_BRKVER_X(test_broker_version, 0),
1638 TEST_BRKVER_X(test_broker_version, 1),
1639 TEST_BRKVER_X(test_broker_version, 2),
1640 TEST_BRKVER_X(test_broker_version, 3));
1641
1642 /* Set up fake "<MAIN>" test for all operations performed in
1643 * the main thread rather than the per-test threads.
1644 * Nice side effect is that we get timing and status for main as well.*/
1645 test_curr = &tests[0];
1646 test_curr->state = TEST_PASSED;
1647 test_curr->start = test_clock();
1648
1649 if (test_on_ci) {
1650 TEST_LOCK();
1651 test_timeout_multiplier += 2;
1652 TEST_UNLOCK();
1653 }
1654
1655 if (!strcmp(test_mode, "helgrind") ||
1656 !strcmp(test_mode, "drd")) {
1657 TEST_LOCK();
1658 test_timeout_multiplier += 5;
1659 TEST_UNLOCK();
1660 } else if (!strcmp(test_mode, "valgrind")) {
1661 TEST_LOCK();
1662 test_timeout_multiplier += 3;
1663 TEST_UNLOCK();
1664 }
1665
1666 /* Broker version 0.9 and api.version.request=true (which is default)
1667 * will cause a 10s stall per connection. Instead of fixing
1668 * that for each affected API in every test we increase the timeout
1669 * multiplier accordingly instead. The typical consume timeout is 5
1670 * seconds, so a multiplier of 3 should be good. */
1671 if ((test_broker_version & 0xffff0000) == 0x00090000)
1672 test_timeout_multiplier += 3;
1673
1674 if (test_concurrent_max > 1)
1675 test_timeout_multiplier += (double)test_concurrent_max / 3;
1676
1677 TEST_SAY("Tests to run : %s\n", tests_to_run ? tests_to_run : "all");
1678 TEST_SAY("Test mode : %s%s\n", test_quick ? "quick, ":"", test_mode);
1679 TEST_SAY("Test scenario: %s\n", test_scenario);
1680 TEST_SAY("Test filter : %s\n",
1681 (test_flags & TEST_F_LOCAL) ? "local tests only" : "no filter");
1682 TEST_SAY("Test timeout multiplier: %.1f\n", test_timeout_multiplier);
1683 TEST_SAY("Action on test failure: %s\n",
1684 test_assert_on_fail ? "assert crash" : "continue other tests");
1685 if (test_rusage)
1686 TEST_SAY("Test rusage : yes (%.2fx CPU calibration)\n",
1687 test_rusage_cpu_calibration);
1688 if (test_idempotent_producer)
1689 TEST_SAY("Test Idempotent Producer: enabled\n");
1690
1691 {
1692 char cwd[512], *pcwd;
1693 #ifdef _MSC_VER
1694 pcwd = _getcwd(cwd, sizeof(cwd) - 1);
1695 #else
1696 pcwd = getcwd(cwd, sizeof(cwd) - 1);
1697 #endif
1698 if (pcwd)
1699 TEST_SAY("Current directory: %s\n", cwd);
1700 }
1701
1702 test_timeout_set(30);
1703
1704 TIMING_START(&t_all, "ALL-TESTS");
1705
1706 /* Run tests */
1707 run_tests(argc, argv);
1708
1709 TEST_LOCK();
1710 while (tests_running_cnt > 0 && !test_exit) {
1711 struct test *test;
1712
1713 if (!test_quick && test_level >= 2) {
1714 TEST_SAY("%d test(s) running:", tests_running_cnt);
1715
1716 for (test = tests ; test->name ; test++) {
1717 if (test->state != TEST_RUNNING)
1718 continue;
1719
1720 TEST_SAY0(" %s", test->name);
1721 }
1722
1723 TEST_SAY0("\n");
1724 }
1725
1726 check_test_timeouts();
1727
1728 TEST_UNLOCK();
1729
1730 if (test_quick)
1731 rd_usleep(200*1000, NULL);
1732 else
1733 rd_sleep(1);
1734 TEST_LOCK();
1735 }
1736
1737 TIMING_STOP(&t_all);
1738
1739 test_curr = &tests[0];
1740 test_curr->duration = test_clock() - test_curr->start;
1741
1742 TEST_UNLOCK();
1743
1744 if (test_delete_topics_between)
1745 test_delete_all_test_topics(60*1000);
1746
1747 r = test_summary(1/*lock*/) ? 1 : 0;
1748
1749 /* Wait for everything to be cleaned up since broker destroys are
1750 * handled in its own thread. */
1751 test_wait_exit(0);
1752
1753 /* If we havent failed at this point then
1754 * there were no threads leaked */
1755 if (r == 0)
1756 TEST_SAY("\n============== ALL TESTS PASSED ==============\n");
1757
1758 test_cleanup();
1759
1760 if (r > 0)
1761 TEST_FAIL("%d test(s) failed, see previous errors", r);
1762
1763 return r;
1764 }
1765
1766
1767
1768
1769
1770 /******************************************************************************
1771 *
1772 * Helpers
1773 *
1774 ******************************************************************************/
1775
test_dr_msg_cb(rd_kafka_t * rk,const rd_kafka_message_t * rkmessage,void * opaque)1776 void test_dr_msg_cb (rd_kafka_t *rk, const rd_kafka_message_t *rkmessage,
1777 void *opaque) {
1778 int *remainsp = rkmessage->_private;
1779 static const char *status_names[] = {
1780 [RD_KAFKA_MSG_STATUS_NOT_PERSISTED] = "NotPersisted",
1781 [RD_KAFKA_MSG_STATUS_POSSIBLY_PERSISTED] = "PossiblyPersisted",
1782 [RD_KAFKA_MSG_STATUS_PERSISTED] = "Persisted"
1783 };
1784
1785 TEST_SAYL(4, "Delivery report: %s (%s) to %s [%"PRId32"]\n",
1786 rd_kafka_err2str(rkmessage->err),
1787 status_names[rd_kafka_message_status(rkmessage)],
1788 rd_kafka_topic_name(rkmessage->rkt),
1789 rkmessage->partition);
1790
1791 if (!test_curr->produce_sync) {
1792 if (!test_curr->ignore_dr_err &&
1793 rkmessage->err != test_curr->exp_dr_err)
1794 TEST_FAIL("Message delivery (to %s [%"PRId32"]) "
1795 "failed: expected %s, got %s",
1796 rd_kafka_topic_name(rkmessage->rkt),
1797 rkmessage->partition,
1798 rd_kafka_err2str(test_curr->exp_dr_err),
1799 rd_kafka_err2str(rkmessage->err));
1800
1801 if ((int)test_curr->exp_dr_status != -1) {
1802 rd_kafka_msg_status_t status =
1803 rd_kafka_message_status(rkmessage);
1804
1805 TEST_ASSERT(status == test_curr->exp_dr_status,
1806 "Expected message status %s, not %s",
1807 status_names[test_curr->exp_dr_status],
1808 status_names[status]);
1809 }
1810 }
1811
1812 if (remainsp) {
1813 TEST_ASSERT(*remainsp > 0,
1814 "Too many messages delivered (remains %i)",
1815 *remainsp);
1816
1817 (*remainsp)--;
1818 }
1819
1820 if (test_curr->produce_sync)
1821 test_curr->produce_sync_err = rkmessage->err;
1822 }
1823
1824
test_create_handle(int mode,rd_kafka_conf_t * conf)1825 rd_kafka_t *test_create_handle (int mode, rd_kafka_conf_t *conf) {
1826 rd_kafka_t *rk;
1827 char errstr[512];
1828
1829 if (!conf) {
1830 test_conf_init(&conf, NULL, 0);
1831 #if WITH_SOCKEM
1832 if (*test_sockem_conf)
1833 test_socket_enable(conf);
1834 #endif
1835 } else {
1836 test_conf_set(conf, "client.id", test_curr->name);
1837 }
1838
1839
1840
1841 /* Creat kafka instance */
1842 rk = rd_kafka_new(mode, conf, errstr, sizeof(errstr));
1843 if (!rk)
1844 TEST_FAIL("Failed to create rdkafka instance: %s\n", errstr);
1845
1846 TEST_SAY("Created kafka instance %s\n", rd_kafka_name(rk));
1847
1848 return rk;
1849 }
1850
1851
test_create_producer(void)1852 rd_kafka_t *test_create_producer (void) {
1853 rd_kafka_conf_t *conf;
1854
1855 test_conf_init(&conf, NULL, 0);
1856 rd_kafka_conf_set_dr_msg_cb(conf, test_dr_msg_cb);
1857
1858 return test_create_handle(RD_KAFKA_PRODUCER, conf);
1859 }
1860
1861
1862 /**
1863 * Create topic_t object with va-arg list as key-value config pairs
1864 * terminated by NULL.
1865 */
test_create_topic_object(rd_kafka_t * rk,const char * topic,...)1866 rd_kafka_topic_t *test_create_topic_object (rd_kafka_t *rk,
1867 const char *topic, ...) {
1868 rd_kafka_topic_t *rkt;
1869 rd_kafka_topic_conf_t *topic_conf;
1870 va_list ap;
1871 const char *name, *val;
1872
1873 test_conf_init(NULL, &topic_conf, 0);
1874
1875 va_start(ap, topic);
1876 while ((name = va_arg(ap, const char *)) &&
1877 (val = va_arg(ap, const char *))) {
1878 test_topic_conf_set(topic_conf, name, val);
1879 }
1880 va_end(ap);
1881
1882 rkt = rd_kafka_topic_new(rk, topic, topic_conf);
1883 if (!rkt)
1884 TEST_FAIL("Failed to create topic: %s\n",
1885 rd_kafka_err2str(rd_kafka_last_error()));
1886
1887 return rkt;
1888
1889 }
1890
1891
test_create_producer_topic(rd_kafka_t * rk,const char * topic,...)1892 rd_kafka_topic_t *test_create_producer_topic (rd_kafka_t *rk,
1893 const char *topic, ...) {
1894 rd_kafka_topic_t *rkt;
1895 rd_kafka_topic_conf_t *topic_conf;
1896 char errstr[512];
1897 va_list ap;
1898 const char *name, *val;
1899
1900 test_conf_init(NULL, &topic_conf, 0);
1901
1902 va_start(ap, topic);
1903 while ((name = va_arg(ap, const char *)) &&
1904 (val = va_arg(ap, const char *))) {
1905 if (rd_kafka_topic_conf_set(topic_conf, name, val,
1906 errstr, sizeof(errstr)) != RD_KAFKA_CONF_OK)
1907 TEST_FAIL("Conf failed: %s\n", errstr);
1908 }
1909 va_end(ap);
1910
1911 /* Make sure all replicas are in-sync after producing
1912 * so that consume test wont fail. */
1913 rd_kafka_topic_conf_set(topic_conf, "request.required.acks", "-1",
1914 errstr, sizeof(errstr));
1915
1916
1917 rkt = rd_kafka_topic_new(rk, topic, topic_conf);
1918 if (!rkt)
1919 TEST_FAIL("Failed to create topic: %s\n",
1920 rd_kafka_err2str(rd_kafka_last_error()));
1921
1922 return rkt;
1923
1924 }
1925
1926
1927
1928 /**
1929 * Produces \p cnt messages and returns immediately.
1930 * Does not wait for delivery.
1931 * \p msgcounterp is incremented for each produced messages and passed
1932 * as \p msg_opaque which is later used in test_dr_msg_cb to decrement
1933 * the counter on delivery.
1934 *
1935 * If \p payload is NULL the message key and payload will be formatted
1936 * according to standard test format, otherwise the key will be NULL and
1937 * payload send as message payload.
1938 *
1939 * Default message size is 128 bytes, if \p size is non-zero and \p payload
1940 * is NULL the message size of \p size will be used.
1941 */
test_produce_msgs_nowait(rd_kafka_t * rk,rd_kafka_topic_t * rkt,uint64_t testid,int32_t partition,int msg_base,int cnt,const char * payload,size_t size,int msgrate,int * msgcounterp)1942 void test_produce_msgs_nowait (rd_kafka_t *rk, rd_kafka_topic_t *rkt,
1943 uint64_t testid, int32_t partition,
1944 int msg_base, int cnt,
1945 const char *payload, size_t size, int msgrate,
1946 int *msgcounterp) {
1947 int msg_id;
1948 test_timing_t t_all, t_poll;
1949 char key[128];
1950 void *buf;
1951 int64_t tot_bytes = 0;
1952 int64_t tot_time_poll = 0;
1953 int64_t per_msg_wait = 0;
1954
1955 if (msgrate > 0)
1956 per_msg_wait = 1000000 / (int64_t)msgrate;
1957
1958
1959 if (payload)
1960 buf = (void *)payload;
1961 else {
1962 if (size == 0)
1963 size = 128;
1964 buf = calloc(1, size);
1965 }
1966
1967 TEST_SAY("Produce to %s [%"PRId32"]: messages #%d..%d\n",
1968 rd_kafka_topic_name(rkt), partition, msg_base, msg_base+cnt);
1969
1970 TIMING_START(&t_all, "PRODUCE");
1971 TIMING_START(&t_poll, "SUM(POLL)");
1972
1973 for (msg_id = msg_base ; msg_id < msg_base + cnt ; msg_id++) {
1974 int wait_time = 0;
1975
1976 if (!payload)
1977 test_prepare_msg(testid, partition, msg_id,
1978 buf, size, key, sizeof(key));
1979
1980
1981 if (rd_kafka_produce(rkt, partition,
1982 RD_KAFKA_MSG_F_COPY,
1983 buf, size,
1984 !payload ? key : NULL,
1985 !payload ? strlen(key) : 0,
1986 msgcounterp) == -1)
1987 TEST_FAIL("Failed to produce message %i "
1988 "to partition %i: %s",
1989 msg_id, (int)partition,
1990 rd_kafka_err2str(rd_kafka_last_error()));
1991
1992 (*msgcounterp)++;
1993 tot_bytes += size;
1994
1995 TIMING_RESTART(&t_poll);
1996 do {
1997 if (per_msg_wait) {
1998 wait_time = (int)(per_msg_wait -
1999 TIMING_DURATION(&t_poll)) /
2000 1000;
2001 if (wait_time < 0)
2002 wait_time = 0;
2003 }
2004 rd_kafka_poll(rk, wait_time);
2005 } while (wait_time > 0);
2006
2007 tot_time_poll = TIMING_DURATION(&t_poll);
2008
2009 if (TIMING_EVERY(&t_all, 3*1000000))
2010 TEST_SAY("produced %3d%%: %d/%d messages "
2011 "(%d msgs/s, %d bytes/s)\n",
2012 ((msg_id - msg_base) * 100) / cnt,
2013 msg_id - msg_base, cnt,
2014 (int)((msg_id - msg_base) /
2015 (TIMING_DURATION(&t_all) / 1000000)),
2016 (int)((tot_bytes) /
2017 (TIMING_DURATION(&t_all) / 1000000)));
2018 }
2019
2020 if (!payload)
2021 free(buf);
2022
2023 t_poll.duration = tot_time_poll;
2024 TIMING_STOP(&t_poll);
2025 TIMING_STOP(&t_all);
2026 }
2027
2028 /**
2029 * Waits for the messages tracked by counter \p msgcounterp to be delivered.
2030 */
test_wait_delivery(rd_kafka_t * rk,int * msgcounterp)2031 void test_wait_delivery (rd_kafka_t *rk, int *msgcounterp) {
2032 test_timing_t t_all;
2033 int start_cnt = *msgcounterp;
2034
2035 TIMING_START(&t_all, "PRODUCE.DELIVERY.WAIT");
2036
2037 /* Wait for messages to be delivered */
2038 while (*msgcounterp > 0 && rd_kafka_outq_len(rk) > 0) {
2039 rd_kafka_poll(rk, 10);
2040 if (TIMING_EVERY(&t_all, 3*1000000)) {
2041 int delivered = start_cnt - *msgcounterp;
2042 TEST_SAY("wait_delivery: "
2043 "%d/%d messages delivered: %d msgs/s\n",
2044 delivered, start_cnt,
2045 (int)(delivered /
2046 (TIMING_DURATION(&t_all) / 1000000)));
2047 }
2048 }
2049
2050 TIMING_STOP(&t_all);
2051
2052 TEST_ASSERT(*msgcounterp == 0,
2053 "Not all messages delivered: msgcounter still at %d, "
2054 "outq_len %d",
2055 *msgcounterp, rd_kafka_outq_len(rk));
2056 }
2057
2058 /**
2059 * Produces \p cnt messages and waits for succesful delivery
2060 */
test_produce_msgs(rd_kafka_t * rk,rd_kafka_topic_t * rkt,uint64_t testid,int32_t partition,int msg_base,int cnt,const char * payload,size_t size)2061 void test_produce_msgs (rd_kafka_t *rk, rd_kafka_topic_t *rkt,
2062 uint64_t testid, int32_t partition,
2063 int msg_base, int cnt,
2064 const char *payload, size_t size) {
2065 int remains = 0;
2066
2067 test_produce_msgs_nowait(rk, rkt, testid, partition, msg_base, cnt,
2068 payload, size, 0, &remains);
2069
2070 test_wait_delivery(rk, &remains);
2071 }
2072
2073
2074 /**
2075 * @brief Produces \p cnt messages and waits for succesful delivery
2076 */
test_produce_msgs2(rd_kafka_t * rk,const char * topic,uint64_t testid,int32_t partition,int msg_base,int cnt,const char * payload,size_t size)2077 void test_produce_msgs2 (rd_kafka_t *rk, const char *topic,
2078 uint64_t testid, int32_t partition,
2079 int msg_base, int cnt,
2080 const char *payload, size_t size) {
2081 int remains = 0;
2082 rd_kafka_topic_t *rkt = test_create_topic_object(rk, topic, NULL);
2083
2084 test_produce_msgs_nowait(rk, rkt, testid, partition, msg_base, cnt,
2085 payload, size, 0, &remains);
2086
2087 test_wait_delivery(rk, &remains);
2088
2089 rd_kafka_topic_destroy(rkt);
2090 }
2091
2092 /**
2093 * @brief Produces \p cnt messages without waiting for delivery.
2094 */
test_produce_msgs2_nowait(rd_kafka_t * rk,const char * topic,uint64_t testid,int32_t partition,int msg_base,int cnt,const char * payload,size_t size,int * remainsp)2095 void test_produce_msgs2_nowait (rd_kafka_t *rk, const char *topic,
2096 uint64_t testid, int32_t partition,
2097 int msg_base, int cnt,
2098 const char *payload, size_t size,
2099 int *remainsp) {
2100 rd_kafka_topic_t *rkt = test_create_topic_object(rk, topic, NULL);
2101
2102 test_produce_msgs_nowait(rk, rkt, testid, partition, msg_base, cnt,
2103 payload, size, 0, remainsp);
2104
2105 rd_kafka_topic_destroy(rkt);
2106 }
2107
2108
2109 /**
2110 * Produces \p cnt messages at \p msgs/s, and waits for succesful delivery
2111 */
test_produce_msgs_rate(rd_kafka_t * rk,rd_kafka_topic_t * rkt,uint64_t testid,int32_t partition,int msg_base,int cnt,const char * payload,size_t size,int msgrate)2112 void test_produce_msgs_rate (rd_kafka_t *rk, rd_kafka_topic_t *rkt,
2113 uint64_t testid, int32_t partition,
2114 int msg_base, int cnt,
2115 const char *payload, size_t size, int msgrate) {
2116 int remains = 0;
2117
2118 test_produce_msgs_nowait(rk, rkt, testid, partition, msg_base, cnt,
2119 payload, size, msgrate, &remains);
2120
2121 test_wait_delivery(rk, &remains);
2122 }
2123
2124
2125
2126 /**
2127 * Create producer, produce \p msgcnt messages to \p topic \p partition,
2128 * destroy consumer, and returns the used testid.
2129 */
2130 uint64_t
test_produce_msgs_easy_size(const char * topic,uint64_t testid,int32_t partition,int msgcnt,size_t size)2131 test_produce_msgs_easy_size (const char *topic, uint64_t testid,
2132 int32_t partition, int msgcnt, size_t size) {
2133 rd_kafka_t *rk;
2134 rd_kafka_topic_t *rkt;
2135 test_timing_t t_produce;
2136
2137 if (!testid)
2138 testid = test_id_generate();
2139 rk = test_create_producer();
2140 rkt = test_create_producer_topic(rk, topic, NULL);
2141
2142 TIMING_START(&t_produce, "PRODUCE");
2143 test_produce_msgs(rk, rkt, testid, partition, 0, msgcnt, NULL, size);
2144 TIMING_STOP(&t_produce);
2145 rd_kafka_topic_destroy(rkt);
2146 rd_kafka_destroy(rk);
2147
2148 return testid;
2149 }
2150
test_produce_sync(rd_kafka_t * rk,rd_kafka_topic_t * rkt,uint64_t testid,int32_t partition)2151 rd_kafka_resp_err_t test_produce_sync (rd_kafka_t *rk, rd_kafka_topic_t *rkt,
2152 uint64_t testid, int32_t partition) {
2153 test_curr->produce_sync = 1;
2154 test_produce_msgs(rk, rkt, testid, partition, 0, 1, NULL, 0);
2155 test_curr->produce_sync = 0;
2156 return test_curr->produce_sync_err;
2157 }
2158
2159
2160 /**
2161 * @brief Easy produce function.
2162 *
2163 * @param ... is a NULL-terminated list of key, value config property pairs.
2164 */
test_produce_msgs_easy_v(const char * topic,int32_t partition,uint64_t testid,int msg_base,int cnt,size_t size,...)2165 void test_produce_msgs_easy_v (const char *topic,
2166 int32_t partition, uint64_t testid,
2167 int msg_base, int cnt, size_t size, ...) {
2168 rd_kafka_conf_t *conf;
2169 rd_kafka_t *p;
2170 rd_kafka_topic_t *rkt;
2171 va_list ap;
2172 const char *key, *val;
2173
2174 test_conf_init(&conf, NULL, 0);
2175
2176 va_start(ap, size);
2177 while ((key = va_arg(ap, const char *)) &&
2178 (val = va_arg(ap, const char *)))
2179 test_conf_set(conf, key, val);
2180 va_end(ap);
2181
2182 rd_kafka_conf_set_dr_msg_cb(conf, test_dr_msg_cb);
2183
2184 p = test_create_handle(RD_KAFKA_PRODUCER, conf);
2185
2186 rkt = test_create_producer_topic(p, topic, NULL);
2187
2188 test_produce_msgs(p, rkt, testid, partition, msg_base, cnt, NULL, size);
2189
2190 rd_kafka_topic_destroy(rkt);
2191 rd_kafka_destroy(p);
2192 }
2193
2194
2195
test_create_consumer(const char * group_id,void (* rebalance_cb)(rd_kafka_t * rk,rd_kafka_resp_err_t err,rd_kafka_topic_partition_list_t * partitions,void * opaque),rd_kafka_conf_t * conf,rd_kafka_topic_conf_t * default_topic_conf)2196 rd_kafka_t *test_create_consumer (const char *group_id,
2197 void (*rebalance_cb) (
2198 rd_kafka_t *rk,
2199 rd_kafka_resp_err_t err,
2200 rd_kafka_topic_partition_list_t
2201 *partitions,
2202 void *opaque),
2203 rd_kafka_conf_t *conf,
2204 rd_kafka_topic_conf_t *default_topic_conf) {
2205 rd_kafka_t *rk;
2206 char tmp[64];
2207
2208 if (!conf)
2209 test_conf_init(&conf, NULL, 0);
2210
2211 if (group_id) {
2212 test_conf_set(conf, "group.id", group_id);
2213
2214 rd_snprintf(tmp, sizeof(tmp), "%d", test_session_timeout_ms);
2215 test_conf_set(conf, "session.timeout.ms", tmp);
2216
2217 if (rebalance_cb)
2218 rd_kafka_conf_set_rebalance_cb(conf, rebalance_cb);
2219 } else {
2220 TEST_ASSERT(!rebalance_cb);
2221 }
2222
2223 if (default_topic_conf)
2224 rd_kafka_conf_set_default_topic_conf(conf, default_topic_conf);
2225
2226 /* Create kafka instance */
2227 rk = test_create_handle(RD_KAFKA_CONSUMER, conf);
2228
2229 if (group_id)
2230 rd_kafka_poll_set_consumer(rk);
2231
2232 return rk;
2233 }
2234
test_create_consumer_topic(rd_kafka_t * rk,const char * topic)2235 rd_kafka_topic_t *test_create_consumer_topic (rd_kafka_t *rk,
2236 const char *topic) {
2237 rd_kafka_topic_t *rkt;
2238 rd_kafka_topic_conf_t *topic_conf;
2239
2240 test_conf_init(NULL, &topic_conf, 0);
2241
2242 rkt = rd_kafka_topic_new(rk, topic, topic_conf);
2243 if (!rkt)
2244 TEST_FAIL("Failed to create topic: %s\n",
2245 rd_kafka_err2str(rd_kafka_last_error()));
2246
2247 return rkt;
2248 }
2249
2250
test_consumer_start(const char * what,rd_kafka_topic_t * rkt,int32_t partition,int64_t start_offset)2251 void test_consumer_start (const char *what,
2252 rd_kafka_topic_t *rkt, int32_t partition,
2253 int64_t start_offset) {
2254
2255 TEST_SAY("%s: consumer_start: %s [%"PRId32"] at offset %"PRId64"\n",
2256 what, rd_kafka_topic_name(rkt), partition, start_offset);
2257
2258 if (rd_kafka_consume_start(rkt, partition, start_offset) == -1)
2259 TEST_FAIL("%s: consume_start failed: %s\n",
2260 what, rd_kafka_err2str(rd_kafka_last_error()));
2261 }
2262
test_consumer_stop(const char * what,rd_kafka_topic_t * rkt,int32_t partition)2263 void test_consumer_stop (const char *what,
2264 rd_kafka_topic_t *rkt, int32_t partition) {
2265
2266 TEST_SAY("%s: consumer_stop: %s [%"PRId32"]\n",
2267 what, rd_kafka_topic_name(rkt), partition);
2268
2269 if (rd_kafka_consume_stop(rkt, partition) == -1)
2270 TEST_FAIL("%s: consume_stop failed: %s\n",
2271 what, rd_kafka_err2str(rd_kafka_last_error()));
2272 }
2273
test_consumer_seek(const char * what,rd_kafka_topic_t * rkt,int32_t partition,int64_t offset)2274 void test_consumer_seek (const char *what, rd_kafka_topic_t *rkt,
2275 int32_t partition, int64_t offset) {
2276 int err;
2277
2278 TEST_SAY("%s: consumer_seek: %s [%"PRId32"] to offset %"PRId64"\n",
2279 what, rd_kafka_topic_name(rkt), partition, offset);
2280
2281 if ((err = rd_kafka_seek(rkt, partition, offset, 2000)))
2282 TEST_FAIL("%s: consume_seek(%s, %"PRId32", %"PRId64") "
2283 "failed: %s\n",
2284 what,
2285 rd_kafka_topic_name(rkt), partition, offset,
2286 rd_kafka_err2str(err));
2287 }
2288
2289
2290
2291 /**
2292 * Returns offset of the last message consumed
2293 */
test_consume_msgs(const char * what,rd_kafka_topic_t * rkt,uint64_t testid,int32_t partition,int64_t offset,int exp_msg_base,int exp_cnt,int parse_fmt)2294 int64_t test_consume_msgs (const char *what, rd_kafka_topic_t *rkt,
2295 uint64_t testid, int32_t partition, int64_t offset,
2296 int exp_msg_base, int exp_cnt, int parse_fmt) {
2297 int cnt = 0;
2298 int msg_next = exp_msg_base;
2299 int fails = 0;
2300 int64_t offset_last = -1;
2301 int64_t tot_bytes = 0;
2302 test_timing_t t_first, t_all;
2303
2304 TEST_SAY("%s: consume_msgs: %s [%"PRId32"]: expect msg #%d..%d "
2305 "at offset %"PRId64"\n",
2306 what, rd_kafka_topic_name(rkt), partition,
2307 exp_msg_base, exp_msg_base+exp_cnt, offset);
2308
2309 if (offset != TEST_NO_SEEK) {
2310 rd_kafka_resp_err_t err;
2311 test_timing_t t_seek;
2312
2313 TIMING_START(&t_seek, "SEEK");
2314 if ((err = rd_kafka_seek(rkt, partition, offset, 5000)))
2315 TEST_FAIL("%s: consume_msgs: %s [%"PRId32"]: "
2316 "seek to %"PRId64" failed: %s\n",
2317 what, rd_kafka_topic_name(rkt), partition,
2318 offset, rd_kafka_err2str(err));
2319 TIMING_STOP(&t_seek);
2320 TEST_SAY("%s: seeked to offset %"PRId64"\n", what, offset);
2321 }
2322
2323 TIMING_START(&t_first, "FIRST MSG");
2324 TIMING_START(&t_all, "ALL MSGS");
2325
2326 while (cnt < exp_cnt) {
2327 rd_kafka_message_t *rkmessage;
2328 int msg_id;
2329
2330 rkmessage = rd_kafka_consume(rkt, partition,
2331 tmout_multip(5000));
2332
2333 if (TIMING_EVERY(&t_all, 3*1000000))
2334 TEST_SAY("%s: "
2335 "consumed %3d%%: %d/%d messages "
2336 "(%d msgs/s, %d bytes/s)\n",
2337 what, cnt * 100 / exp_cnt, cnt, exp_cnt,
2338 (int)(cnt /
2339 (TIMING_DURATION(&t_all) / 1000000)),
2340 (int)(tot_bytes /
2341 (TIMING_DURATION(&t_all) / 1000000)));
2342
2343 if (!rkmessage)
2344 TEST_FAIL("%s: consume_msgs: %s [%"PRId32"]: "
2345 "expected msg #%d (%d/%d): timed out\n",
2346 what, rd_kafka_topic_name(rkt), partition,
2347 msg_next, cnt, exp_cnt);
2348
2349 if (rkmessage->err)
2350 TEST_FAIL("%s: consume_msgs: %s [%"PRId32"]: "
2351 "expected msg #%d (%d/%d): got error: %s\n",
2352 what, rd_kafka_topic_name(rkt), partition,
2353 msg_next, cnt, exp_cnt,
2354 rd_kafka_err2str(rkmessage->err));
2355
2356 if (cnt == 0)
2357 TIMING_STOP(&t_first);
2358
2359 if (parse_fmt)
2360 test_msg_parse(testid, rkmessage, partition, &msg_id);
2361 else
2362 msg_id = 0;
2363
2364 if (test_level >= 3)
2365 TEST_SAY("%s: consume_msgs: %s [%"PRId32"]: "
2366 "got msg #%d at offset %"PRId64
2367 " (expect #%d at offset %"PRId64")\n",
2368 what, rd_kafka_topic_name(rkt), partition,
2369 msg_id, rkmessage->offset,
2370 msg_next,
2371 offset >= 0 ? offset + cnt : -1);
2372
2373 if (parse_fmt && msg_id != msg_next) {
2374 TEST_SAY("%s: consume_msgs: %s [%"PRId32"]: "
2375 "expected msg #%d (%d/%d): got msg #%d\n",
2376 what, rd_kafka_topic_name(rkt), partition,
2377 msg_next, cnt, exp_cnt, msg_id);
2378 fails++;
2379 }
2380
2381 cnt++;
2382 tot_bytes += rkmessage->len;
2383 msg_next++;
2384 offset_last = rkmessage->offset;
2385
2386 rd_kafka_message_destroy(rkmessage);
2387 }
2388
2389 TIMING_STOP(&t_all);
2390
2391 if (fails)
2392 TEST_FAIL("%s: consume_msgs: %s [%"PRId32"]: %d failures\n",
2393 what, rd_kafka_topic_name(rkt), partition, fails);
2394
2395 TEST_SAY("%s: consume_msgs: %s [%"PRId32"]: "
2396 "%d/%d messages consumed succesfully\n",
2397 what, rd_kafka_topic_name(rkt), partition,
2398 cnt, exp_cnt);
2399 return offset_last;
2400 }
2401
2402
2403 /**
2404 * Create high-level consumer subscribing to \p topic from BEGINNING
2405 * and expects \d exp_msgcnt with matching \p testid
2406 * Destroys consumer when done.
2407 *
2408 * @param txn If true, isolation.level is set to read_committed.
2409 * @param partition If -1 the topic will be subscribed to, otherwise the
2410 * single partition will be assigned immediately.
2411 *
2412 * If \p group_id is NULL a new unique group is generated
2413 */
2414 void
test_consume_msgs_easy_mv0(const char * group_id,const char * topic,rd_bool_t txn,int32_t partition,uint64_t testid,int exp_eofcnt,int exp_msgcnt,rd_kafka_topic_conf_t * tconf,test_msgver_t * mv)2415 test_consume_msgs_easy_mv0 (const char *group_id, const char *topic,
2416 rd_bool_t txn,
2417 int32_t partition,
2418 uint64_t testid, int exp_eofcnt, int exp_msgcnt,
2419 rd_kafka_topic_conf_t *tconf,
2420 test_msgver_t *mv) {
2421 rd_kafka_t *rk;
2422 char grpid0[64];
2423 rd_kafka_conf_t *conf;
2424
2425 test_conf_init(&conf, tconf ? NULL : &tconf, 0);
2426
2427 if (!group_id)
2428 group_id = test_str_id_generate(grpid0, sizeof(grpid0));
2429
2430 if (txn)
2431 test_conf_set(conf, "isolation.level", "read_committed");
2432
2433 test_topic_conf_set(tconf, "auto.offset.reset", "smallest");
2434 if (exp_eofcnt != -1)
2435 test_conf_set(conf, "enable.partition.eof", "true");
2436 rk = test_create_consumer(group_id, NULL, conf, tconf);
2437
2438 rd_kafka_poll_set_consumer(rk);
2439
2440 if (partition == -1) {
2441 TEST_SAY("Subscribing to topic %s in group %s "
2442 "(expecting %d msgs with testid %"PRIu64")\n",
2443 topic, group_id, exp_msgcnt, testid);
2444
2445 test_consumer_subscribe(rk, topic);
2446 } else {
2447 rd_kafka_topic_partition_list_t *plist;
2448
2449 TEST_SAY("Assign topic %s [%"PRId32"] in group %s "
2450 "(expecting %d msgs with testid %"PRIu64")\n",
2451 topic, partition, group_id, exp_msgcnt, testid);
2452
2453 plist = rd_kafka_topic_partition_list_new(1);
2454 rd_kafka_topic_partition_list_add(plist, topic, partition);
2455 test_consumer_assign("consume_easy_mv", rk, plist);
2456 rd_kafka_topic_partition_list_destroy(plist);
2457 }
2458
2459 /* Consume messages */
2460 test_consumer_poll("consume.easy", rk, testid, exp_eofcnt,
2461 -1, exp_msgcnt, mv);
2462
2463 test_consumer_close(rk);
2464
2465 rd_kafka_destroy(rk);
2466 }
2467
2468 void
test_consume_msgs_easy(const char * group_id,const char * topic,uint64_t testid,int exp_eofcnt,int exp_msgcnt,rd_kafka_topic_conf_t * tconf)2469 test_consume_msgs_easy (const char *group_id, const char *topic,
2470 uint64_t testid, int exp_eofcnt, int exp_msgcnt,
2471 rd_kafka_topic_conf_t *tconf) {
2472 test_msgver_t mv;
2473
2474 test_msgver_init(&mv, testid);
2475
2476 test_consume_msgs_easy_mv(group_id, topic, -1, testid, exp_eofcnt,
2477 exp_msgcnt, tconf, &mv);
2478
2479 test_msgver_clear(&mv);
2480 }
2481
2482
2483 void
test_consume_txn_msgs_easy(const char * group_id,const char * topic,uint64_t testid,int exp_eofcnt,int exp_msgcnt,rd_kafka_topic_conf_t * tconf)2484 test_consume_txn_msgs_easy (const char *group_id, const char *topic,
2485 uint64_t testid, int exp_eofcnt, int exp_msgcnt,
2486 rd_kafka_topic_conf_t *tconf) {
2487 test_msgver_t mv;
2488
2489 test_msgver_init(&mv, testid);
2490
2491 test_consume_msgs_easy_mv0(group_id, topic, rd_true/*txn*/,
2492 -1, testid, exp_eofcnt,
2493 exp_msgcnt, tconf, &mv);
2494
2495 test_msgver_clear(&mv);
2496 }
2497
2498
2499 /**
2500 * @brief Waits for up to \p timeout_ms for consumer to receive assignment.
2501 * If no assignment received without the timeout the test fails.
2502 */
test_consumer_wait_assignment(rd_kafka_t * rk)2503 void test_consumer_wait_assignment (rd_kafka_t *rk) {
2504 rd_kafka_topic_partition_list_t *assignment = NULL;
2505 int i;
2506
2507 while (1) {
2508 rd_kafka_resp_err_t err;
2509
2510 err = rd_kafka_assignment(rk, &assignment);
2511 TEST_ASSERT(!err, "rd_kafka_assignment() failed: %s",
2512 rd_kafka_err2str(err));
2513
2514 if (assignment->cnt > 0)
2515 break;
2516
2517 rd_kafka_topic_partition_list_destroy(assignment);
2518
2519 test_consumer_poll_once(rk, NULL, 1000);
2520 }
2521
2522 TEST_SAY("Assignment (%d partition(s)): ", assignment->cnt);
2523 for (i = 0 ; i < assignment->cnt ; i++)
2524 TEST_SAY0("%s%s[%"PRId32"]",
2525 i == 0 ? "" : ", ",
2526 assignment->elems[i].topic,
2527 assignment->elems[i].partition);
2528 TEST_SAY0("\n");
2529
2530 rd_kafka_topic_partition_list_destroy(assignment);
2531 }
2532
2533
2534 /**
2535 * @brief Start subscribing for 'topic'
2536 */
test_consumer_subscribe(rd_kafka_t * rk,const char * topic)2537 void test_consumer_subscribe (rd_kafka_t *rk, const char *topic) {
2538 rd_kafka_topic_partition_list_t *topics;
2539 rd_kafka_resp_err_t err;
2540
2541 topics = rd_kafka_topic_partition_list_new(1);
2542 rd_kafka_topic_partition_list_add(topics, topic,
2543 RD_KAFKA_PARTITION_UA);
2544
2545 err = rd_kafka_subscribe(rk, topics);
2546 if (err)
2547 TEST_FAIL("%s: Failed to subscribe to %s: %s\n",
2548 rd_kafka_name(rk), topic, rd_kafka_err2str(err));
2549
2550 rd_kafka_topic_partition_list_destroy(topics);
2551 }
2552
2553
test_consumer_assign(const char * what,rd_kafka_t * rk,rd_kafka_topic_partition_list_t * partitions)2554 void test_consumer_assign (const char *what, rd_kafka_t *rk,
2555 rd_kafka_topic_partition_list_t *partitions) {
2556 rd_kafka_resp_err_t err;
2557 test_timing_t timing;
2558
2559 TIMING_START(&timing, "ASSIGN.PARTITIONS");
2560 err = rd_kafka_assign(rk, partitions);
2561 TIMING_STOP(&timing);
2562 if (err)
2563 TEST_FAIL("%s: failed to assign %d partition(s): %s\n",
2564 what, partitions->cnt, rd_kafka_err2str(err));
2565 else
2566 TEST_SAY("%s: assigned %d partition(s)\n",
2567 what, partitions->cnt);
2568 }
2569
2570
test_consumer_unassign(const char * what,rd_kafka_t * rk)2571 void test_consumer_unassign (const char *what, rd_kafka_t *rk) {
2572 rd_kafka_resp_err_t err;
2573 test_timing_t timing;
2574
2575 TIMING_START(&timing, "UNASSIGN.PARTITIONS");
2576 err = rd_kafka_assign(rk, NULL);
2577 TIMING_STOP(&timing);
2578 if (err)
2579 TEST_FAIL("%s: failed to unassign current partitions: %s\n",
2580 what, rd_kafka_err2str(err));
2581 else
2582 TEST_SAY("%s: unassigned current partitions\n", what);
2583 }
2584
2585
2586 /**
2587 * @brief Assign a single partition with an optional starting offset
2588 */
test_consumer_assign_partition(const char * what,rd_kafka_t * rk,const char * topic,int32_t partition,int64_t offset)2589 void test_consumer_assign_partition (const char *what, rd_kafka_t *rk,
2590 const char *topic, int32_t partition,
2591 int64_t offset) {
2592 rd_kafka_topic_partition_list_t *part;
2593
2594 part = rd_kafka_topic_partition_list_new(1);
2595 rd_kafka_topic_partition_list_add(part, topic, partition)->offset =
2596 offset;
2597
2598 test_consumer_assign(what, rk, part);
2599
2600 rd_kafka_topic_partition_list_destroy(part);
2601 }
2602
2603
2604 /**
2605 * Message verification services
2606 *
2607 */
2608
test_msgver_init(test_msgver_t * mv,uint64_t testid)2609 void test_msgver_init (test_msgver_t *mv, uint64_t testid) {
2610 memset(mv, 0, sizeof(*mv));
2611 mv->testid = testid;
2612 /* Max warning logs before suppressing. */
2613 mv->log_max = (test_level + 1) * 100;
2614 }
2615
2616 #define TEST_MV_WARN(mv,...) do { \
2617 if ((mv)->log_cnt++ > (mv)->log_max) \
2618 (mv)->log_suppr_cnt++; \
2619 else \
2620 TEST_WARN(__VA_ARGS__); \
2621 } while (0)
2622
2623
2624
test_mv_mvec_grow(struct test_mv_mvec * mvec,int tot_size)2625 static void test_mv_mvec_grow (struct test_mv_mvec *mvec, int tot_size) {
2626 if (tot_size <= mvec->size)
2627 return;
2628 mvec->size = tot_size;
2629 mvec->m = realloc(mvec->m, sizeof(*mvec->m) * mvec->size);
2630 }
2631
2632 /**
2633 * Make sure there is room for at least \p cnt messages, else grow mvec.
2634 */
test_mv_mvec_reserve(struct test_mv_mvec * mvec,int cnt)2635 static void test_mv_mvec_reserve (struct test_mv_mvec *mvec, int cnt) {
2636 test_mv_mvec_grow(mvec, mvec->cnt + cnt);
2637 }
2638
test_mv_mvec_init(struct test_mv_mvec * mvec,int exp_cnt)2639 void test_mv_mvec_init (struct test_mv_mvec *mvec, int exp_cnt) {
2640 TEST_ASSERT(mvec->m == NULL, "mvec not cleared");
2641
2642 if (!exp_cnt)
2643 return;
2644
2645 test_mv_mvec_grow(mvec, exp_cnt);
2646 }
2647
2648
test_mv_mvec_clear(struct test_mv_mvec * mvec)2649 void test_mv_mvec_clear (struct test_mv_mvec *mvec) {
2650 if (mvec->m)
2651 free(mvec->m);
2652 }
2653
test_msgver_clear(test_msgver_t * mv)2654 void test_msgver_clear (test_msgver_t *mv) {
2655 int i;
2656 for (i = 0 ; i < mv->p_cnt ; i++) {
2657 struct test_mv_p *p = mv->p[i];
2658 free(p->topic);
2659 test_mv_mvec_clear(&p->mvec);
2660 free(p);
2661 }
2662
2663 free(mv->p);
2664
2665 test_msgver_init(mv, mv->testid);
2666 }
2667
test_msgver_p_get(test_msgver_t * mv,const char * topic,int32_t partition,int do_create)2668 struct test_mv_p *test_msgver_p_get (test_msgver_t *mv, const char *topic,
2669 int32_t partition, int do_create) {
2670 int i;
2671 struct test_mv_p *p;
2672
2673 for (i = 0 ; i < mv->p_cnt ; i++) {
2674 p = mv->p[i];
2675 if (p->partition == partition && !strcmp(p->topic, topic))
2676 return p;
2677 }
2678
2679 if (!do_create)
2680 TEST_FAIL("Topic %s [%d] not found in msgver", topic, partition);
2681
2682 if (mv->p_cnt == mv->p_size) {
2683 mv->p_size = (mv->p_size + 4) * 2;
2684 mv->p = realloc(mv->p, sizeof(*mv->p) * mv->p_size);
2685 }
2686
2687 mv->p[mv->p_cnt++] = p = calloc(1, sizeof(*p));
2688
2689 p->topic = rd_strdup(topic);
2690 p->partition = partition;
2691 p->eof_offset = RD_KAFKA_OFFSET_INVALID;
2692
2693 return p;
2694 }
2695
2696
2697 /**
2698 * Add (room for) message to message vector.
2699 * Resizes the vector as needed.
2700 */
test_mv_mvec_add(struct test_mv_mvec * mvec)2701 static struct test_mv_m *test_mv_mvec_add (struct test_mv_mvec *mvec) {
2702 if (mvec->cnt == mvec->size) {
2703 test_mv_mvec_grow(mvec, (mvec->size ? mvec->size * 2 : 10000));
2704 }
2705
2706 mvec->cnt++;
2707
2708 return &mvec->m[mvec->cnt-1];
2709 }
2710
2711 /**
2712 * Returns message at index \p mi
2713 */
test_mv_mvec_get(struct test_mv_mvec * mvec,int mi)2714 static RD_INLINE struct test_mv_m *test_mv_mvec_get (struct test_mv_mvec *mvec,
2715 int mi) {
2716 if (mi >= mvec->cnt)
2717 return NULL;
2718 return &mvec->m[mi];
2719 }
2720
2721 /**
2722 * @returns the message with msgid \p msgid, or NULL.
2723 */
test_mv_mvec_find_by_msgid(struct test_mv_mvec * mvec,int msgid)2724 static struct test_mv_m *test_mv_mvec_find_by_msgid (struct test_mv_mvec *mvec,
2725 int msgid) {
2726 int mi;
2727
2728 for (mi = 0 ; mi < mvec->cnt ; mi++)
2729 if (mvec->m[mi].msgid == msgid)
2730 return &mvec->m[mi];
2731
2732 return NULL;
2733 }
2734
2735
2736 /**
2737 * Print message list to \p fp
2738 */
2739 static RD_UNUSED
test_mv_mvec_dump(FILE * fp,const struct test_mv_mvec * mvec)2740 void test_mv_mvec_dump (FILE *fp, const struct test_mv_mvec *mvec) {
2741 int mi;
2742
2743 fprintf(fp, "*** Dump mvec with %d messages (capacity %d): ***\n",
2744 mvec->cnt, mvec->size);
2745 for (mi = 0 ; mi < mvec->cnt ; mi++)
2746 fprintf(fp, " msgid %d, offset %"PRId64"\n",
2747 mvec->m[mi].msgid, mvec->m[mi].offset);
2748 fprintf(fp, "*** Done ***\n");
2749
2750 }
2751
test_mv_mvec_sort(struct test_mv_mvec * mvec,int (* cmp)(const void *,const void *))2752 static void test_mv_mvec_sort (struct test_mv_mvec *mvec,
2753 int (*cmp) (const void *, const void *)) {
2754 qsort(mvec->m, mvec->cnt, sizeof(*mvec->m), cmp);
2755 }
2756
2757
2758 /**
2759 * @brief Adds a message to the msgver service.
2760 *
2761 * @returns 1 if message is from the expected testid, else 0 (not added)
2762 */
test_msgver_add_msg00(const char * func,int line,const char * clientname,test_msgver_t * mv,uint64_t testid,const char * topic,int32_t partition,int64_t offset,int64_t timestamp,rd_kafka_resp_err_t err,int msgnum)2763 int test_msgver_add_msg00 (const char *func, int line, const char *clientname,
2764 test_msgver_t *mv,
2765 uint64_t testid,
2766 const char *topic, int32_t partition,
2767 int64_t offset, int64_t timestamp,
2768 rd_kafka_resp_err_t err, int msgnum) {
2769 struct test_mv_p *p;
2770 struct test_mv_m *m;
2771
2772 if (testid != mv->testid) {
2773 TEST_SAYL(3, "%s:%d: %s: mismatching testid %"PRIu64" != %"PRIu64"\n",
2774 func, line, clientname, testid, mv->testid);
2775 return 0; /* Ignore message */
2776 }
2777
2778 p = test_msgver_p_get(mv, topic, partition, 1);
2779
2780 if (err == RD_KAFKA_RESP_ERR__PARTITION_EOF) {
2781 p->eof_offset = offset;
2782 return 1;
2783 }
2784
2785 m = test_mv_mvec_add(&p->mvec);
2786
2787 m->offset = offset;
2788 m->msgid = msgnum;
2789 m->timestamp = timestamp;
2790
2791 if (test_level > 2) {
2792 TEST_SAY("%s:%d: %s: "
2793 "Recv msg %s [%"PRId32"] offset %"PRId64" msgid %d "
2794 "timestamp %"PRId64"\n",
2795 func, line, clientname,
2796 p->topic, p->partition, m->offset, m->msgid,
2797 m->timestamp);
2798 }
2799
2800 mv->msgcnt++;
2801
2802 return 1;
2803 }
2804
2805 /**
2806 * Adds a message to the msgver service.
2807 *
2808 * Message must be a proper message or PARTITION_EOF.
2809 *
2810 * @param override_topic if non-NULL, overrides the rkmessage's topic
2811 * with this one.
2812 *
2813 * @returns 1 if message is from the expected testid, else 0 (not added).
2814 */
test_msgver_add_msg0(const char * func,int line,const char * clientname,test_msgver_t * mv,rd_kafka_message_t * rkmessage,const char * override_topic)2815 int test_msgver_add_msg0 (const char *func, int line, const char *clientname,
2816 test_msgver_t *mv, rd_kafka_message_t *rkmessage,
2817 const char *override_topic) {
2818 uint64_t in_testid;
2819 int in_part;
2820 int in_msgnum = -1;
2821 char buf[128];
2822 const void *val;
2823 size_t valsize;
2824
2825 if (mv->fwd)
2826 test_msgver_add_msg0(func, line, clientname,
2827 mv->fwd, rkmessage, override_topic);
2828
2829 if (rkmessage->err) {
2830 if (rkmessage->err != RD_KAFKA_RESP_ERR__PARTITION_EOF)
2831 return 0; /* Ignore error */
2832
2833 in_testid = mv->testid;
2834
2835 } else {
2836
2837 if (!mv->msgid_hdr) {
2838 rd_snprintf(buf, sizeof(buf), "%.*s",
2839 (int)rkmessage->len,
2840 (char *)rkmessage->payload);
2841 val = buf;
2842 } else {
2843 /* msgid is in message header */
2844 rd_kafka_headers_t *hdrs;
2845
2846 if (rd_kafka_message_headers(rkmessage, &hdrs) ||
2847 rd_kafka_header_get_last(hdrs, mv->msgid_hdr,
2848 &val, &valsize)) {
2849 TEST_SAYL(3,
2850 "%s:%d: msgid expected in header %s "
2851 "but %s exists for "
2852 "message at offset %"PRId64
2853 " has no headers\n",
2854 func, line, mv->msgid_hdr,
2855 hdrs ? "no such header" : "no headers",
2856 rkmessage->offset);
2857
2858 return 0;
2859 }
2860 }
2861
2862 if (sscanf(val, "testid=%"SCNu64", partition=%i, msg=%i\n",
2863 &in_testid, &in_part, &in_msgnum) != 3)
2864 TEST_FAIL("%s:%d: Incorrect format at offset %"PRId64
2865 ": %s",
2866 func, line, rkmessage->offset,
2867 (const char *)val);
2868 }
2869
2870 return test_msgver_add_msg00(func, line, clientname, mv, in_testid,
2871 override_topic ?
2872 override_topic :
2873 rd_kafka_topic_name(rkmessage->rkt),
2874 rkmessage->partition,
2875 rkmessage->offset,
2876 rd_kafka_message_timestamp(rkmessage, NULL),
2877 rkmessage->err,
2878 in_msgnum);
2879 return 1;
2880 }
2881
2882
2883
2884 /**
2885 * Verify that all messages were received in order.
2886 *
2887 * - Offsets need to occur without gaps
2888 * - msgids need to be increasing: but may have gaps, e.g., using partitioner)
2889 */
test_mv_mvec_verify_order(test_msgver_t * mv,int flags,struct test_mv_p * p,struct test_mv_mvec * mvec,struct test_mv_vs * vs)2890 static int test_mv_mvec_verify_order (test_msgver_t *mv, int flags,
2891 struct test_mv_p *p,
2892 struct test_mv_mvec *mvec,
2893 struct test_mv_vs *vs) {
2894 int mi;
2895 int fails = 0;
2896
2897 for (mi = 1/*skip first*/ ; mi < mvec->cnt ; mi++) {
2898 struct test_mv_m *prev = test_mv_mvec_get(mvec, mi-1);
2899 struct test_mv_m *this = test_mv_mvec_get(mvec, mi);
2900
2901 if (((flags & TEST_MSGVER_BY_OFFSET) &&
2902 prev->offset + 1 != this->offset) ||
2903 ((flags & TEST_MSGVER_BY_MSGID) &&
2904 prev->msgid > this->msgid)) {
2905 TEST_MV_WARN(
2906 mv,
2907 " %s [%"PRId32"] msg rcvidx #%d/%d: "
2908 "out of order (prev vs this): "
2909 "offset %"PRId64" vs %"PRId64", "
2910 "msgid %d vs %d\n",
2911 p ? p->topic : "*",
2912 p ? p->partition : -1,
2913 mi, mvec->cnt,
2914 prev->offset, this->offset,
2915 prev->msgid, this->msgid);
2916 fails++;
2917 }
2918 }
2919
2920 return fails;
2921 }
2922
2923
2924 /**
2925 * @brief Verify that messages correspond to 'correct' msgver.
2926 */
test_mv_mvec_verify_corr(test_msgver_t * mv,int flags,struct test_mv_p * p,struct test_mv_mvec * mvec,struct test_mv_vs * vs)2927 static int test_mv_mvec_verify_corr (test_msgver_t *mv, int flags,
2928 struct test_mv_p *p,
2929 struct test_mv_mvec *mvec,
2930 struct test_mv_vs *vs) {
2931 int mi;
2932 int fails = 0;
2933 struct test_mv_p *corr_p = NULL;
2934 struct test_mv_mvec *corr_mvec;
2935 int verifycnt = 0;
2936
2937 TEST_ASSERT(vs->corr);
2938
2939 /* Get correct mvec for comparison. */
2940 if (p)
2941 corr_p = test_msgver_p_get(vs->corr, p->topic, p->partition, 0);
2942 if (!corr_p) {
2943 TEST_MV_WARN(mv,
2944 " %s [%"PRId32"]: "
2945 "no corresponding correct partition found\n",
2946 p ? p->topic : "*",
2947 p ? p->partition : -1);
2948 return 1;
2949 }
2950
2951 corr_mvec = &corr_p->mvec;
2952
2953 for (mi = 0 ; mi < mvec->cnt ; mi++) {
2954 struct test_mv_m *this = test_mv_mvec_get(mvec, mi);
2955 const struct test_mv_m *corr;
2956
2957
2958 if (flags & TEST_MSGVER_SUBSET)
2959 corr = test_mv_mvec_find_by_msgid(corr_mvec,
2960 this->msgid);
2961 else
2962 corr = test_mv_mvec_get(corr_mvec, mi);
2963
2964 if (0)
2965 TEST_MV_WARN(mv,
2966 "msg #%d: msgid %d, offset %"PRId64"\n",
2967 mi, this->msgid, this->offset);
2968 if (!corr) {
2969 if (!(flags & TEST_MSGVER_SUBSET)) {
2970 TEST_MV_WARN(
2971 mv,
2972 " %s [%"PRId32"] msg rcvidx #%d/%d: "
2973 "out of range: correct mvec has "
2974 "%d messages: "
2975 "message offset %"PRId64", msgid %d\n",
2976 p ? p->topic : "*",
2977 p ? p->partition : -1,
2978 mi, mvec->cnt, corr_mvec->cnt,
2979 this->offset, this->msgid);
2980 fails++;
2981 }
2982 continue;
2983 }
2984
2985 if (((flags & TEST_MSGVER_BY_OFFSET) &&
2986 this->offset != corr->offset) ||
2987 ((flags & TEST_MSGVER_BY_MSGID) &&
2988 this->msgid != corr->msgid) ||
2989 ((flags & TEST_MSGVER_BY_TIMESTAMP) &&
2990 this->timestamp != corr->timestamp)) {
2991 TEST_MV_WARN(
2992 mv,
2993 " %s [%"PRId32"] msg rcvidx #%d/%d: "
2994 "did not match correct msg: "
2995 "offset %"PRId64" vs %"PRId64", "
2996 "msgid %d vs %d, "
2997 "timestamp %"PRId64" vs %"PRId64" (fl 0x%x)\n",
2998 p ? p->topic : "*",
2999 p ? p->partition : -1,
3000 mi, mvec->cnt,
3001 this->offset, corr->offset,
3002 this->msgid, corr->msgid,
3003 this->timestamp, corr->timestamp,
3004 flags);
3005 fails++;
3006 } else {
3007 verifycnt++;
3008 }
3009 }
3010
3011 if (verifycnt != corr_mvec->cnt &&
3012 !(flags & TEST_MSGVER_SUBSET)) {
3013 TEST_MV_WARN(
3014 mv,
3015 " %s [%"PRId32"]: of %d input messages, "
3016 "only %d/%d matched correct messages\n",
3017 p ? p->topic : "*",
3018 p ? p->partition : -1,
3019 mvec->cnt, verifycnt, corr_mvec->cnt);
3020 fails++;
3021 }
3022
3023 return fails;
3024 }
3025
3026
3027
test_mv_m_cmp_offset(const void * _a,const void * _b)3028 static int test_mv_m_cmp_offset (const void *_a, const void *_b) {
3029 const struct test_mv_m *a = _a, *b = _b;
3030
3031 return RD_CMP(a->offset, b->offset);
3032 }
3033
test_mv_m_cmp_msgid(const void * _a,const void * _b)3034 static int test_mv_m_cmp_msgid (const void *_a, const void *_b) {
3035 const struct test_mv_m *a = _a, *b = _b;
3036
3037 return RD_CMP(a->msgid, b->msgid);
3038 }
3039
3040
3041 /**
3042 * Verify that there are no duplicate message.
3043 *
3044 * - Offsets are checked
3045 * - msgids are checked
3046 *
3047 * * NOTE: This sorts the message (.m) array, first by offset, then by msgid
3048 * and leaves the message array sorted (by msgid)
3049 */
test_mv_mvec_verify_dup(test_msgver_t * mv,int flags,struct test_mv_p * p,struct test_mv_mvec * mvec,struct test_mv_vs * vs)3050 static int test_mv_mvec_verify_dup (test_msgver_t *mv, int flags,
3051 struct test_mv_p *p,
3052 struct test_mv_mvec *mvec,
3053 struct test_mv_vs *vs) {
3054 int mi;
3055 int fails = 0;
3056 enum {
3057 _P_OFFSET,
3058 _P_MSGID
3059 } pass;
3060
3061 for (pass = _P_OFFSET ; pass <= _P_MSGID ; pass++) {
3062
3063 if (pass == _P_OFFSET) {
3064 if (!(flags & TEST_MSGVER_BY_OFFSET))
3065 continue;
3066 test_mv_mvec_sort(mvec, test_mv_m_cmp_offset);
3067 } else if (pass == _P_MSGID) {
3068 if (!(flags & TEST_MSGVER_BY_MSGID))
3069 continue;
3070 test_mv_mvec_sort(mvec, test_mv_m_cmp_msgid);
3071 }
3072
3073 for (mi = 1/*skip first*/ ; mi < mvec->cnt ; mi++) {
3074 struct test_mv_m *prev = test_mv_mvec_get(mvec, mi-1);
3075 struct test_mv_m *this = test_mv_mvec_get(mvec, mi);
3076 int is_dup = 0;
3077
3078 if (pass == _P_OFFSET)
3079 is_dup = prev->offset == this->offset;
3080 else if (pass == _P_MSGID)
3081 is_dup = prev->msgid == this->msgid;
3082
3083 if (!is_dup)
3084 continue;
3085
3086 TEST_MV_WARN(mv,
3087 " %s [%"PRId32"] "
3088 "duplicate msg (prev vs this): "
3089 "offset %"PRId64" vs %"PRId64", "
3090 "msgid %d vs %d\n",
3091 p ? p->topic : "*",
3092 p ? p->partition : -1,
3093 prev->offset, this->offset,
3094 prev->msgid, this->msgid);
3095 fails++;
3096 }
3097 }
3098
3099 return fails;
3100 }
3101
3102
3103
3104 /**
3105 * Verify that \p mvec contains the expected range:
3106 * - TEST_MSGVER_BY_MSGID: msgid within \p vs->msgid_min .. \p vs->msgid_max
3107 * - TEST_MSGVER_BY_TIMESTAMP: timestamp with \p vs->timestamp_min .. _max
3108 *
3109 * * NOTE: TEST_MSGVER_BY_MSGID is required
3110 *
3111 * * NOTE: This sorts the message (.m) array by msgid
3112 * and leaves the message array sorted (by msgid)
3113 */
test_mv_mvec_verify_range(test_msgver_t * mv,int flags,struct test_mv_p * p,struct test_mv_mvec * mvec,struct test_mv_vs * vs)3114 static int test_mv_mvec_verify_range (test_msgver_t *mv, int flags,
3115 struct test_mv_p *p,
3116 struct test_mv_mvec *mvec,
3117 struct test_mv_vs *vs) {
3118 int mi;
3119 int fails = 0;
3120 int cnt = 0;
3121 int exp_cnt = vs->msgid_max - vs->msgid_min + 1;
3122 int skip_cnt = 0;
3123
3124 if (!(flags & TEST_MSGVER_BY_MSGID))
3125 return 0;
3126
3127 test_mv_mvec_sort(mvec, test_mv_m_cmp_msgid);
3128
3129 //test_mv_mvec_dump(stdout, mvec);
3130
3131 for (mi = 0 ; mi < mvec->cnt ; mi++) {
3132 struct test_mv_m *prev = mi ? test_mv_mvec_get(mvec, mi-1):NULL;
3133 struct test_mv_m *this = test_mv_mvec_get(mvec, mi);
3134
3135 if (this->msgid < vs->msgid_min) {
3136 skip_cnt++;
3137 continue;
3138 } else if (this->msgid > vs->msgid_max)
3139 break;
3140
3141 if (flags & TEST_MSGVER_BY_TIMESTAMP) {
3142 if (this->timestamp < vs->timestamp_min ||
3143 this->timestamp > vs->timestamp_max) {
3144 TEST_MV_WARN(
3145 mv,
3146 " %s [%"PRId32"] range check: "
3147 "msgid #%d (at mi %d): "
3148 "timestamp %"PRId64" outside "
3149 "expected range %"PRId64"..%"PRId64"\n",
3150 p ? p->topic : "*",
3151 p ? p->partition : -1,
3152 this->msgid, mi,
3153 this->timestamp,
3154 vs->timestamp_min, vs->timestamp_max);
3155 fails++;
3156 }
3157 }
3158
3159 if (cnt++ == 0) {
3160 if (this->msgid != vs->msgid_min) {
3161 TEST_MV_WARN(mv,
3162 " %s [%"PRId32"] range check: "
3163 "first message #%d (at mi %d) "
3164 "is not first in "
3165 "expected range %d..%d\n",
3166 p ? p->topic : "*",
3167 p ? p->partition : -1,
3168 this->msgid, mi,
3169 vs->msgid_min, vs->msgid_max);
3170 fails++;
3171 }
3172 } else if (cnt > exp_cnt) {
3173 TEST_MV_WARN(mv,
3174 " %s [%"PRId32"] range check: "
3175 "too many messages received (%d/%d) at "
3176 "msgid %d for expected range %d..%d\n",
3177 p ? p->topic : "*",
3178 p ? p->partition : -1,
3179 cnt, exp_cnt, this->msgid,
3180 vs->msgid_min, vs->msgid_max);
3181 fails++;
3182 }
3183
3184 if (!prev) {
3185 skip_cnt++;
3186 continue;
3187 }
3188
3189 if (prev->msgid + 1 != this->msgid) {
3190 TEST_MV_WARN(mv, " %s [%"PRId32"] range check: "
3191 " %d message(s) missing between "
3192 "msgid %d..%d in expected range %d..%d\n",
3193 p ? p->topic : "*",
3194 p ? p->partition : -1,
3195 this->msgid - prev->msgid - 1,
3196 prev->msgid+1, this->msgid-1,
3197 vs->msgid_min, vs->msgid_max);
3198 fails++;
3199 }
3200 }
3201
3202 if (cnt != exp_cnt) {
3203 TEST_MV_WARN(mv,
3204 " %s [%"PRId32"] range check: "
3205 " wrong number of messages seen, wanted %d got %d "
3206 "in expected range %d..%d (%d messages skipped)\n",
3207 p ? p->topic : "*",
3208 p ? p->partition : -1,
3209 exp_cnt, cnt, vs->msgid_min, vs->msgid_max,
3210 skip_cnt);
3211 fails++;
3212 }
3213
3214 return fails;
3215 }
3216
3217
3218
3219 /**
3220 * Run verifier \p f for all partitions.
3221 */
3222 #define test_mv_p_verify_f(mv,flags,f,vs) \
3223 test_mv_p_verify_f0(mv,flags,f, # f, vs)
test_mv_p_verify_f0(test_msgver_t * mv,int flags,int (* f)(test_msgver_t * mv,int flags,struct test_mv_p * p,struct test_mv_mvec * mvec,struct test_mv_vs * vs),const char * f_name,struct test_mv_vs * vs)3224 static int test_mv_p_verify_f0 (test_msgver_t *mv, int flags,
3225 int (*f) (test_msgver_t *mv,
3226 int flags,
3227 struct test_mv_p *p,
3228 struct test_mv_mvec *mvec,
3229 struct test_mv_vs *vs),
3230 const char *f_name,
3231 struct test_mv_vs *vs) {
3232 int i;
3233 int fails = 0;
3234
3235 for (i = 0 ; i < mv->p_cnt ; i++) {
3236 TEST_SAY("Verifying %s [%"PRId32"] %d msgs with %s\n",
3237 mv->p[i]->topic, mv->p[i]->partition,
3238 mv->p[i]->mvec.cnt, f_name);
3239 fails += f(mv, flags, mv->p[i], &mv->p[i]->mvec, vs);
3240 }
3241
3242 return fails;
3243 }
3244
3245
3246 /**
3247 * Collect all messages from all topics and partitions into vs->mvec
3248 */
test_mv_collect_all_msgs(test_msgver_t * mv,struct test_mv_vs * vs)3249 static void test_mv_collect_all_msgs (test_msgver_t *mv,
3250 struct test_mv_vs *vs) {
3251 int i;
3252
3253 for (i = 0 ; i < mv->p_cnt ; i++) {
3254 struct test_mv_p *p = mv->p[i];
3255 int mi;
3256
3257 test_mv_mvec_reserve(&vs->mvec, p->mvec.cnt);
3258 for (mi = 0 ; mi < p->mvec.cnt ; mi++) {
3259 struct test_mv_m *m = test_mv_mvec_get(&p->mvec, mi);
3260 struct test_mv_m *m_new = test_mv_mvec_add(&vs->mvec);
3261 *m_new = *m;
3262 }
3263 }
3264 }
3265
3266
3267 /**
3268 * Verify that all messages (by msgid) in range msg_base+exp_cnt were received
3269 * and received only once.
3270 * This works across all partitions.
3271 */
test_msgver_verify_range(test_msgver_t * mv,int flags,struct test_mv_vs * vs)3272 static int test_msgver_verify_range (test_msgver_t *mv, int flags,
3273 struct test_mv_vs *vs) {
3274 int fails = 0;
3275
3276 /**
3277 * Create temporary array to hold expected message set,
3278 * then traverse all topics and partitions and move matching messages
3279 * to that set. Then verify the message set.
3280 */
3281
3282 test_mv_mvec_init(&vs->mvec, vs->exp_cnt);
3283
3284 /* Collect all msgs into vs mvec */
3285 test_mv_collect_all_msgs(mv, vs);
3286
3287 fails += test_mv_mvec_verify_range(mv, TEST_MSGVER_BY_MSGID|flags,
3288 NULL, &vs->mvec, vs);
3289 fails += test_mv_mvec_verify_dup(mv, TEST_MSGVER_BY_MSGID|flags,
3290 NULL, &vs->mvec, vs);
3291
3292 test_mv_mvec_clear(&vs->mvec);
3293
3294 return fails;
3295 }
3296
3297
3298 /**
3299 * Verify that \p exp_cnt messages were received for \p topic and \p partition
3300 * starting at msgid base \p msg_base.
3301 */
test_msgver_verify_part0(const char * func,int line,const char * what,test_msgver_t * mv,int flags,const char * topic,int partition,int msg_base,int exp_cnt)3302 int test_msgver_verify_part0 (const char *func, int line, const char *what,
3303 test_msgver_t *mv, int flags,
3304 const char *topic, int partition,
3305 int msg_base, int exp_cnt) {
3306 int fails = 0;
3307 struct test_mv_vs vs = { .msg_base = msg_base, .exp_cnt = exp_cnt };
3308 struct test_mv_p *p;
3309
3310 TEST_SAY("%s:%d: %s: Verifying %d received messages (flags 0x%x) "
3311 "in %s [%d]: expecting msgids %d..%d (%d)\n",
3312 func, line, what, mv->msgcnt, flags, topic, partition,
3313 msg_base, msg_base+exp_cnt, exp_cnt);
3314
3315 p = test_msgver_p_get(mv, topic, partition, 0);
3316
3317 /* Per-partition checks */
3318 if (flags & TEST_MSGVER_ORDER)
3319 fails += test_mv_mvec_verify_order(mv, flags, p, &p->mvec, &vs);
3320 if (flags & TEST_MSGVER_DUP)
3321 fails += test_mv_mvec_verify_dup(mv, flags, p, &p->mvec, &vs);
3322
3323 if (mv->msgcnt < vs.exp_cnt) {
3324 TEST_MV_WARN(mv,
3325 "%s:%d: "
3326 "%s [%"PRId32"] expected %d messages but only "
3327 "%d received\n",
3328 func, line,
3329 p ? p->topic : "*",
3330 p ? p->partition : -1,
3331 vs.exp_cnt, mv->msgcnt);
3332 fails++;
3333 }
3334
3335
3336 if (mv->log_suppr_cnt > 0)
3337 TEST_WARN("%s:%d: %s: %d message warning logs suppressed\n",
3338 func, line, what, mv->log_suppr_cnt);
3339
3340 if (fails)
3341 TEST_FAIL("%s:%d: %s: Verification of %d received messages "
3342 "failed: "
3343 "expected msgids %d..%d (%d): see previous errors\n",
3344 func, line, what,
3345 mv->msgcnt, msg_base, msg_base+exp_cnt, exp_cnt);
3346 else
3347 TEST_SAY("%s:%d: %s: Verification of %d received messages "
3348 "succeeded: "
3349 "expected msgids %d..%d (%d)\n",
3350 func, line, what,
3351 mv->msgcnt, msg_base, msg_base+exp_cnt, exp_cnt);
3352
3353 return fails;
3354
3355 }
3356
3357 /**
3358 * Verify that \p exp_cnt messages were received starting at
3359 * msgid base \p msg_base.
3360 */
test_msgver_verify0(const char * func,int line,const char * what,test_msgver_t * mv,int flags,struct test_mv_vs vs)3361 int test_msgver_verify0 (const char *func, int line, const char *what,
3362 test_msgver_t *mv,
3363 int flags, struct test_mv_vs vs) {
3364 int fails = 0;
3365
3366 TEST_SAY("%s:%d: %s: Verifying %d received messages (flags 0x%x): "
3367 "expecting msgids %d..%d (%d)\n",
3368 func, line, what, mv->msgcnt, flags,
3369 vs.msg_base, vs.msg_base+vs.exp_cnt, vs.exp_cnt);
3370 if (flags & TEST_MSGVER_BY_TIMESTAMP) {
3371 assert((flags & TEST_MSGVER_BY_MSGID)); /* Required */
3372 TEST_SAY("%s:%d: %s: "
3373 " and expecting timestamps %"PRId64"..%"PRId64"\n",
3374 func, line, what,
3375 vs.timestamp_min, vs.timestamp_max);
3376 }
3377
3378 /* Per-partition checks */
3379 if (flags & TEST_MSGVER_ORDER)
3380 fails += test_mv_p_verify_f(mv, flags,
3381 test_mv_mvec_verify_order, &vs);
3382 if (flags & TEST_MSGVER_DUP)
3383 fails += test_mv_p_verify_f(mv, flags,
3384 test_mv_mvec_verify_dup, &vs);
3385
3386 /* Checks across all partitions */
3387 if ((flags & TEST_MSGVER_RANGE) && vs.exp_cnt > 0) {
3388 vs.msgid_min = vs.msg_base;
3389 vs.msgid_max = vs.msgid_min + vs.exp_cnt - 1;
3390 fails += test_msgver_verify_range(mv, flags, &vs);
3391 }
3392
3393 if (mv->log_suppr_cnt > 0)
3394 TEST_WARN("%s:%d: %s: %d message warning logs suppressed\n",
3395 func, line, what, mv->log_suppr_cnt);
3396
3397 if (vs.exp_cnt != mv->msgcnt) {
3398 if (!(flags & TEST_MSGVER_SUBSET)) {
3399 TEST_WARN("%s:%d: %s: expected %d messages, got %d\n",
3400 func, line, what, vs.exp_cnt, mv->msgcnt);
3401 fails++;
3402 }
3403 }
3404
3405 if (fails)
3406 TEST_FAIL("%s:%d: %s: Verification of %d received messages "
3407 "failed: "
3408 "expected msgids %d..%d (%d): see previous errors\n",
3409 func, line, what,
3410 mv->msgcnt, vs.msg_base, vs.msg_base+vs.exp_cnt,
3411 vs.exp_cnt);
3412 else
3413 TEST_SAY("%s:%d: %s: Verification of %d received messages "
3414 "succeeded: "
3415 "expected msgids %d..%d (%d)\n",
3416 func, line, what,
3417 mv->msgcnt, vs.msg_base, vs.msg_base+vs.exp_cnt,
3418 vs.exp_cnt);
3419
3420 return fails;
3421 }
3422
3423
3424
3425
test_verify_rkmessage0(const char * func,int line,rd_kafka_message_t * rkmessage,uint64_t testid,int32_t partition,int msgnum)3426 void test_verify_rkmessage0 (const char *func, int line,
3427 rd_kafka_message_t *rkmessage, uint64_t testid,
3428 int32_t partition, int msgnum) {
3429 uint64_t in_testid;
3430 int in_part;
3431 int in_msgnum;
3432 char buf[128];
3433
3434 rd_snprintf(buf, sizeof(buf), "%.*s",
3435 (int)rkmessage->len, (char *)rkmessage->payload);
3436
3437 if (sscanf(buf, "testid=%"SCNu64", partition=%i, msg=%i\n",
3438 &in_testid, &in_part, &in_msgnum) != 3)
3439 TEST_FAIL("Incorrect format: %s", buf);
3440
3441 if (testid != in_testid ||
3442 (partition != -1 && partition != in_part) ||
3443 (msgnum != -1 && msgnum != in_msgnum) ||
3444 in_msgnum < 0)
3445 goto fail_match;
3446
3447 if (test_level > 2) {
3448 TEST_SAY("%s:%i: Our testid %"PRIu64", part %i (%i), msg %i\n",
3449 func, line,
3450 testid, (int)partition, (int)rkmessage->partition,
3451 msgnum);
3452 }
3453
3454
3455 return;
3456
3457 fail_match:
3458 TEST_FAIL("%s:%i: Our testid %"PRIu64", part %i, msg %i did "
3459 "not match message: \"%s\"\n",
3460 func, line,
3461 testid, (int)partition, msgnum, buf);
3462 }
3463
3464
3465 /**
3466 * @brief Verify that \p mv is identical to \p corr according to flags.
3467 */
test_msgver_verify_compare0(const char * func,int line,const char * what,test_msgver_t * mv,test_msgver_t * corr,int flags)3468 void test_msgver_verify_compare0 (const char *func, int line,
3469 const char *what, test_msgver_t *mv,
3470 test_msgver_t *corr, int flags) {
3471 struct test_mv_vs vs;
3472 int fails = 0;
3473
3474 memset(&vs, 0, sizeof(vs));
3475
3476 TEST_SAY("%s:%d: %s: Verifying %d received messages (flags 0x%x) by "
3477 "comparison to correct msgver (%d messages)\n",
3478 func, line, what, mv->msgcnt, flags, corr->msgcnt);
3479
3480 vs.corr = corr;
3481
3482 /* Per-partition checks */
3483 fails += test_mv_p_verify_f(mv, flags,
3484 test_mv_mvec_verify_corr, &vs);
3485
3486 if (mv->log_suppr_cnt > 0)
3487 TEST_WARN("%s:%d: %s: %d message warning logs suppressed\n",
3488 func, line, what, mv->log_suppr_cnt);
3489
3490 if (corr->msgcnt != mv->msgcnt) {
3491 if (!(flags & TEST_MSGVER_SUBSET)) {
3492 TEST_WARN("%s:%d: %s: expected %d messages, got %d\n",
3493 func, line, what, corr->msgcnt, mv->msgcnt);
3494 fails++;
3495 }
3496 }
3497
3498 if (fails)
3499 TEST_FAIL("%s:%d: %s: Verification of %d received messages "
3500 "failed: expected %d messages: see previous errors\n",
3501 func, line, what,
3502 mv->msgcnt, corr->msgcnt);
3503 else
3504 TEST_SAY("%s:%d: %s: Verification of %d received messages "
3505 "succeeded: matching %d messages from correct msgver\n",
3506 func, line, what,
3507 mv->msgcnt, corr->msgcnt);
3508
3509 }
3510
3511
3512 /**
3513 * Consumer poll but dont expect any proper messages for \p timeout_ms.
3514 */
test_consumer_poll_no_msgs(const char * what,rd_kafka_t * rk,uint64_t testid,int timeout_ms)3515 void test_consumer_poll_no_msgs (const char *what, rd_kafka_t *rk,
3516 uint64_t testid, int timeout_ms) {
3517 int64_t tmout = test_clock() + timeout_ms * 1000;
3518 int cnt = 0;
3519 test_timing_t t_cons;
3520 test_msgver_t mv;
3521
3522 test_msgver_init(&mv, testid);
3523
3524 TEST_SAY("%s: not expecting any messages for %dms\n",
3525 what, timeout_ms);
3526
3527 TIMING_START(&t_cons, "CONSUME");
3528
3529 do {
3530 rd_kafka_message_t *rkmessage;
3531
3532 rkmessage = rd_kafka_consumer_poll(rk, timeout_ms);
3533 if (!rkmessage)
3534 continue;
3535
3536 if (rkmessage->err == RD_KAFKA_RESP_ERR__PARTITION_EOF) {
3537 TEST_SAY("%s [%"PRId32"] reached EOF at "
3538 "offset %"PRId64"\n",
3539 rd_kafka_topic_name(rkmessage->rkt),
3540 rkmessage->partition,
3541 rkmessage->offset);
3542 test_msgver_add_msg(rk, &mv, rkmessage);
3543
3544 } else if (rkmessage->err) {
3545 TEST_FAIL("%s [%"PRId32"] error (offset %"PRId64"): %s",
3546 rkmessage->rkt ?
3547 rd_kafka_topic_name(rkmessage->rkt) :
3548 "(no-topic)",
3549 rkmessage->partition,
3550 rkmessage->offset,
3551 rd_kafka_message_errstr(rkmessage));
3552
3553 } else {
3554 if (test_msgver_add_msg(rk, &mv, rkmessage)) {
3555 TEST_MV_WARN(&mv,
3556 "Received unexpected message on "
3557 "%s [%"PRId32"] at offset "
3558 "%"PRId64"\n",
3559 rd_kafka_topic_name(rkmessage->
3560 rkt),
3561 rkmessage->partition,
3562 rkmessage->offset);
3563 cnt++;
3564 }
3565 }
3566
3567 rd_kafka_message_destroy(rkmessage);
3568 } while (test_clock() <= tmout);
3569
3570 TIMING_STOP(&t_cons);
3571
3572 test_msgver_verify(what, &mv, TEST_MSGVER_ALL, 0, 0);
3573 test_msgver_clear(&mv);
3574
3575 TEST_ASSERT(cnt == 0, "Expected 0 messages, got %d", cnt);
3576 }
3577
3578 /**
3579 * @brief Consumer poll with expectation that a \p err will be reached
3580 * within \p timeout_ms.
3581 */
test_consumer_poll_expect_err(rd_kafka_t * rk,uint64_t testid,int timeout_ms,rd_kafka_resp_err_t err)3582 void test_consumer_poll_expect_err (rd_kafka_t *rk, uint64_t testid,
3583 int timeout_ms, rd_kafka_resp_err_t err) {
3584 int64_t tmout = test_clock() + timeout_ms * 1000;
3585
3586 TEST_SAY("%s: expecting error %s within %dms\n",
3587 rd_kafka_name(rk), rd_kafka_err2name(err), timeout_ms);
3588
3589 do {
3590 rd_kafka_message_t *rkmessage;
3591 rkmessage = rd_kafka_consumer_poll(rk, timeout_ms);
3592 if (!rkmessage)
3593 continue;
3594
3595 if (rkmessage->err == err) {
3596 TEST_SAY("Got expected error: %s: %s\n",
3597 rd_kafka_err2name(rkmessage->err),
3598 rd_kafka_message_errstr(rkmessage));
3599 rd_kafka_message_destroy(rkmessage);
3600
3601 return;
3602 } else if (rkmessage->err) {
3603 TEST_FAIL("%s [%"PRId32"] unexpected error "
3604 "(offset %"PRId64"): %s",
3605 rkmessage->rkt ?
3606 rd_kafka_topic_name(rkmessage->rkt) :
3607 "(no-topic)",
3608 rkmessage->partition,
3609 rkmessage->offset,
3610 rd_kafka_err2name(rkmessage->err));
3611 }
3612
3613 rd_kafka_message_destroy(rkmessage);
3614 } while (test_clock() <= tmout);
3615 TEST_FAIL("Expected error %s not seen in %dms",
3616 rd_kafka_err2name(err), timeout_ms);
3617 }
3618
3619 /**
3620 * Call consumer poll once and then return.
3621 * Messages are handled.
3622 *
3623 * \p mv is optional
3624 *
3625 * @returns 0 on timeout, 1 if a message was received or .._PARTITION_EOF
3626 * if EOF was reached.
3627 * TEST_FAIL()s on all errors.
3628 */
test_consumer_poll_once(rd_kafka_t * rk,test_msgver_t * mv,int timeout_ms)3629 int test_consumer_poll_once (rd_kafka_t *rk, test_msgver_t *mv, int timeout_ms){
3630 rd_kafka_message_t *rkmessage;
3631
3632 rkmessage = rd_kafka_consumer_poll(rk, timeout_ms);
3633 if (!rkmessage)
3634 return 0;
3635
3636 if (rkmessage->err == RD_KAFKA_RESP_ERR__PARTITION_EOF) {
3637 TEST_SAY("%s [%"PRId32"] reached EOF at "
3638 "offset %"PRId64"\n",
3639 rd_kafka_topic_name(rkmessage->rkt),
3640 rkmessage->partition,
3641 rkmessage->offset);
3642 if (mv)
3643 test_msgver_add_msg(rk, mv, rkmessage);
3644 rd_kafka_message_destroy(rkmessage);
3645 return RD_KAFKA_RESP_ERR__PARTITION_EOF;
3646
3647 } else if (rkmessage->err) {
3648 TEST_FAIL("%s [%"PRId32"] error (offset %"PRId64"): %s",
3649 rkmessage->rkt ?
3650 rd_kafka_topic_name(rkmessage->rkt) :
3651 "(no-topic)",
3652 rkmessage->partition,
3653 rkmessage->offset,
3654 rd_kafka_message_errstr(rkmessage));
3655
3656 } else {
3657 if (mv)
3658 test_msgver_add_msg(rk, mv, rkmessage);
3659 }
3660
3661 rd_kafka_message_destroy(rkmessage);
3662 return 1;
3663 }
3664
3665
test_consumer_poll(const char * what,rd_kafka_t * rk,uint64_t testid,int exp_eof_cnt,int exp_msg_base,int exp_cnt,test_msgver_t * mv)3666 int test_consumer_poll (const char *what, rd_kafka_t *rk, uint64_t testid,
3667 int exp_eof_cnt, int exp_msg_base, int exp_cnt,
3668 test_msgver_t *mv) {
3669 int eof_cnt = 0;
3670 int cnt = 0;
3671 test_timing_t t_cons;
3672
3673 TEST_SAY("%s: consume %d messages\n", what, exp_cnt);
3674
3675 TIMING_START(&t_cons, "CONSUME");
3676
3677 while ((exp_eof_cnt <= 0 || eof_cnt < exp_eof_cnt) &&
3678 (exp_cnt <= 0 || cnt < exp_cnt)) {
3679 rd_kafka_message_t *rkmessage;
3680
3681 rkmessage = rd_kafka_consumer_poll(rk, tmout_multip(10*1000));
3682 if (!rkmessage) /* Shouldn't take this long to get a msg */
3683 TEST_FAIL("%s: consumer_poll() timeout "
3684 "(%d/%d eof, %d/%d msgs)\n", what,
3685 eof_cnt, exp_eof_cnt, cnt, exp_cnt);
3686
3687
3688 if (rkmessage->err == RD_KAFKA_RESP_ERR__PARTITION_EOF) {
3689 TEST_SAY("%s [%"PRId32"] reached EOF at "
3690 "offset %"PRId64"\n",
3691 rd_kafka_topic_name(rkmessage->rkt),
3692 rkmessage->partition,
3693 rkmessage->offset);
3694 TEST_ASSERT(exp_eof_cnt != 0, "expected no EOFs");
3695 if (mv)
3696 test_msgver_add_msg(rk, mv, rkmessage);
3697 eof_cnt++;
3698
3699 } else if (rkmessage->err) {
3700 TEST_FAIL("%s [%"PRId32"] error (offset %"PRId64
3701 "): %s",
3702 rkmessage->rkt ?
3703 rd_kafka_topic_name(rkmessage->rkt) :
3704 "(no-topic)",
3705 rkmessage->partition,
3706 rkmessage->offset,
3707 rd_kafka_message_errstr(rkmessage));
3708
3709 } else {
3710 if (!mv || test_msgver_add_msg(rk, mv, rkmessage))
3711 cnt++;
3712 }
3713
3714 rd_kafka_message_destroy(rkmessage);
3715 }
3716
3717 TIMING_STOP(&t_cons);
3718
3719 TEST_SAY("%s: consumed %d/%d messages (%d/%d EOFs)\n",
3720 what, cnt, exp_cnt, eof_cnt, exp_eof_cnt);
3721
3722 if (exp_cnt == 0)
3723 TEST_ASSERT(cnt == 0 && eof_cnt == exp_eof_cnt,
3724 "%s: expected no messages and %d EOFs: "
3725 "got %d messages and %d EOFs",
3726 what, exp_eof_cnt, cnt, eof_cnt);
3727 return cnt;
3728 }
3729
test_consumer_close(rd_kafka_t * rk)3730 void test_consumer_close (rd_kafka_t *rk) {
3731 rd_kafka_resp_err_t err;
3732 test_timing_t timing;
3733
3734 TEST_SAY("Closing consumer\n");
3735
3736 TIMING_START(&timing, "CONSUMER.CLOSE");
3737 err = rd_kafka_consumer_close(rk);
3738 TIMING_STOP(&timing);
3739 if (err)
3740 TEST_FAIL("Failed to close consumer: %s\n",
3741 rd_kafka_err2str(err));
3742 }
3743
3744
test_flush(rd_kafka_t * rk,int timeout_ms)3745 void test_flush (rd_kafka_t *rk, int timeout_ms) {
3746 test_timing_t timing;
3747 rd_kafka_resp_err_t err;
3748
3749 TEST_SAY("%s: Flushing %d messages\n",
3750 rd_kafka_name(rk), rd_kafka_outq_len(rk));
3751 TIMING_START(&timing, "FLUSH");
3752 err = rd_kafka_flush(rk, timeout_ms);
3753 TIMING_STOP(&timing);
3754 if (err)
3755 TEST_FAIL("Failed to flush(%s, %d): %s: len() = %d\n",
3756 rd_kafka_name(rk), timeout_ms,
3757 rd_kafka_err2str(err),
3758 rd_kafka_outq_len(rk));
3759 }
3760
3761
test_conf_set(rd_kafka_conf_t * conf,const char * name,const char * val)3762 void test_conf_set (rd_kafka_conf_t *conf, const char *name, const char *val) {
3763 char errstr[512];
3764 if (rd_kafka_conf_set(conf, name, val, errstr, sizeof(errstr)) !=
3765 RD_KAFKA_CONF_OK)
3766 TEST_FAIL("Failed to set config \"%s\"=\"%s\": %s\n",
3767 name, val, errstr);
3768 }
3769
test_conf_get(const rd_kafka_conf_t * conf,const char * name)3770 char *test_conf_get (const rd_kafka_conf_t *conf, const char *name) {
3771 static RD_TLS char ret[256];
3772 size_t ret_sz = sizeof(ret);
3773 if (rd_kafka_conf_get(conf, name, ret, &ret_sz) != RD_KAFKA_CONF_OK)
3774 TEST_FAIL("Failed to get config \"%s\": %s\n", name,
3775 "unknown property");
3776 return ret;
3777 }
3778
3779
3780 /**
3781 * @brief Check if property \name matches \p val in \p conf.
3782 * If \p conf is NULL the test config will be used. */
test_conf_match(rd_kafka_conf_t * conf,const char * name,const char * val)3783 int test_conf_match (rd_kafka_conf_t *conf, const char *name, const char *val) {
3784 char *real;
3785 int free_conf = 0;
3786
3787 if (!conf) {
3788 test_conf_init(&conf, NULL, 0);
3789 free_conf = 1;
3790 }
3791
3792 real = test_conf_get(conf, name);
3793
3794 if (free_conf)
3795 rd_kafka_conf_destroy(conf);
3796
3797 return !strcmp(real, val);
3798 }
3799
3800
test_topic_conf_set(rd_kafka_topic_conf_t * tconf,const char * name,const char * val)3801 void test_topic_conf_set (rd_kafka_topic_conf_t *tconf,
3802 const char *name, const char *val) {
3803 char errstr[512];
3804 if (rd_kafka_topic_conf_set(tconf, name, val, errstr, sizeof(errstr)) !=
3805 RD_KAFKA_CONF_OK)
3806 TEST_FAIL("Failed to set topic config \"%s\"=\"%s\": %s\n",
3807 name, val, errstr);
3808 }
3809
3810 /**
3811 * @brief First attempt to set topic level property, then global.
3812 */
test_any_conf_set(rd_kafka_conf_t * conf,rd_kafka_topic_conf_t * tconf,const char * name,const char * val)3813 void test_any_conf_set (rd_kafka_conf_t *conf,
3814 rd_kafka_topic_conf_t *tconf,
3815 const char *name, const char *val) {
3816 rd_kafka_conf_res_t res = RD_KAFKA_CONF_UNKNOWN;
3817 char errstr[512] = {"Missing conf_t"};
3818
3819 if (tconf)
3820 res = rd_kafka_topic_conf_set(tconf, name, val,
3821 errstr, sizeof(errstr));
3822 if (res == RD_KAFKA_CONF_UNKNOWN && conf)
3823 res = rd_kafka_conf_set(conf, name, val,
3824 errstr, sizeof(errstr));
3825
3826 if (res != RD_KAFKA_CONF_OK)
3827 TEST_FAIL("Failed to set any config \"%s\"=\"%s\": %s\n",
3828 name, val, errstr);
3829 }
3830
3831
3832 /**
3833 * @returns true if test clients need to be configured for authentication
3834 * or other security measures (SSL), else false for unauthed plaintext.
3835 */
test_needs_auth(void)3836 int test_needs_auth (void) {
3837 rd_kafka_conf_t *conf;
3838 const char *sec;
3839
3840 test_conf_init(&conf, NULL, 0);
3841
3842 sec = test_conf_get(conf, "security.protocol");
3843
3844 rd_kafka_conf_destroy(conf);
3845
3846 return strcmp(sec, "plaintext");
3847 }
3848
3849
test_print_partition_list(const rd_kafka_topic_partition_list_t * partitions)3850 void test_print_partition_list (const rd_kafka_topic_partition_list_t
3851 *partitions) {
3852 int i;
3853 for (i = 0 ; i < partitions->cnt ; i++) {
3854 TEST_SAY(" %s [%"PRId32"] offset %"PRId64"%s%s\n",
3855 partitions->elems[i].topic,
3856 partitions->elems[i].partition,
3857 partitions->elems[i].offset,
3858 partitions->elems[i].err ? ": " : "",
3859 partitions->elems[i].err ?
3860 rd_kafka_err2str(partitions->elems[i].err) : "");
3861 }
3862 }
3863
3864 /**
3865 * @brief Execute kafka-topics.sh from the Kafka distribution.
3866 */
test_kafka_topics(const char * fmt,...)3867 void test_kafka_topics (const char *fmt, ...) {
3868 #ifdef _MSC_VER
3869 TEST_FAIL("%s not supported on Windows, yet", __FUNCTION__);
3870 #else
3871 char cmd[512];
3872 int r;
3873 va_list ap;
3874 test_timing_t t_cmd;
3875 const char *kpath, *zk;
3876
3877 kpath = test_getenv("KAFKA_PATH", NULL);
3878 zk = test_getenv("ZK_ADDRESS", NULL);
3879
3880 if (!kpath || !zk)
3881 TEST_FAIL("%s: KAFKA_PATH and ZK_ADDRESS must be set",
3882 __FUNCTION__);
3883
3884 r = rd_snprintf(cmd, sizeof(cmd),
3885 "%s/bin/kafka-topics.sh --zookeeper %s ", kpath, zk);
3886 TEST_ASSERT(r < (int)sizeof(cmd));
3887
3888 va_start(ap, fmt);
3889 rd_vsnprintf(cmd+r, sizeof(cmd)-r, fmt, ap);
3890 va_end(ap);
3891
3892 TEST_SAY("Executing: %s\n", cmd);
3893 TIMING_START(&t_cmd, "exec");
3894 r = system(cmd);
3895 TIMING_STOP(&t_cmd);
3896
3897 if (r == -1)
3898 TEST_FAIL("system(\"%s\") failed: %s", cmd, strerror(errno));
3899 else if (WIFSIGNALED(r))
3900 TEST_FAIL("system(\"%s\") terminated by signal %d\n", cmd,
3901 WTERMSIG(r));
3902 else if (WEXITSTATUS(r))
3903 TEST_FAIL("system(\"%s\") failed with exit status %d\n",
3904 cmd, WEXITSTATUS(r));
3905 #endif
3906 }
3907
3908
3909
3910 /**
3911 * @brief Create topic using Topic Admin API
3912 */
test_admin_create_topic(rd_kafka_t * use_rk,const char * topicname,int partition_cnt,int replication_factor)3913 static void test_admin_create_topic (rd_kafka_t *use_rk,
3914 const char *topicname, int partition_cnt,
3915 int replication_factor) {
3916 rd_kafka_t *rk;
3917 rd_kafka_NewTopic_t *newt[1];
3918 const size_t newt_cnt = 1;
3919 rd_kafka_AdminOptions_t *options;
3920 rd_kafka_queue_t *rkqu;
3921 rd_kafka_event_t *rkev;
3922 const rd_kafka_CreateTopics_result_t *res;
3923 const rd_kafka_topic_result_t **terr;
3924 int timeout_ms = tmout_multip(10000);
3925 size_t res_cnt;
3926 rd_kafka_resp_err_t err;
3927 char errstr[512];
3928 test_timing_t t_create;
3929
3930 if (!(rk = use_rk))
3931 rk = test_create_producer();
3932
3933 rkqu = rd_kafka_queue_new(rk);
3934
3935 newt[0] = rd_kafka_NewTopic_new(topicname, partition_cnt,
3936 replication_factor,
3937 errstr, sizeof(errstr));
3938 TEST_ASSERT(newt[0] != NULL, "%s", errstr);
3939
3940 options = rd_kafka_AdminOptions_new(rk, RD_KAFKA_ADMIN_OP_CREATETOPICS);
3941 err = rd_kafka_AdminOptions_set_operation_timeout(options, timeout_ms,
3942 errstr,
3943 sizeof(errstr));
3944 TEST_ASSERT(!err, "%s", errstr);
3945
3946 TEST_SAY("Creating topic \"%s\" "
3947 "(partitions=%d, replication_factor=%d, timeout=%d)\n",
3948 topicname, partition_cnt, replication_factor, timeout_ms);
3949
3950 TIMING_START(&t_create, "CreateTopics");
3951 rd_kafka_CreateTopics(rk, newt, newt_cnt, options, rkqu);
3952
3953 /* Wait for result */
3954 rkev = rd_kafka_queue_poll(rkqu, timeout_ms + 2000);
3955 TEST_ASSERT(rkev, "Timed out waiting for CreateTopics result");
3956
3957 TIMING_STOP(&t_create);
3958
3959 TEST_ASSERT(!rd_kafka_event_error(rkev),
3960 "CreateTopics failed: %s",
3961 rd_kafka_event_error_string(rkev));
3962
3963 res = rd_kafka_event_CreateTopics_result(rkev);
3964 TEST_ASSERT(res, "Expected CreateTopics_result, not %s",
3965 rd_kafka_event_name(rkev));
3966
3967 terr = rd_kafka_CreateTopics_result_topics(res, &res_cnt);
3968 TEST_ASSERT(terr, "CreateTopics_result_topics returned NULL");
3969 TEST_ASSERT(res_cnt == newt_cnt,
3970 "CreateTopics_result_topics returned %"PRIusz" topics, "
3971 "not the expected %"PRIusz,
3972 res_cnt, newt_cnt);
3973
3974 TEST_ASSERT(!rd_kafka_topic_result_error(terr[0]) ||
3975 rd_kafka_topic_result_error(terr[0]) ==
3976 RD_KAFKA_RESP_ERR_TOPIC_ALREADY_EXISTS,
3977 "Topic %s result error: %s",
3978 rd_kafka_topic_result_name(terr[0]),
3979 rd_kafka_topic_result_error_string(terr[0]));
3980
3981 rd_kafka_event_destroy(rkev);
3982
3983 rd_kafka_queue_destroy(rkqu);
3984
3985 rd_kafka_AdminOptions_destroy(options);
3986
3987 rd_kafka_NewTopic_destroy(newt[0]);
3988
3989 if (!use_rk)
3990 rd_kafka_destroy(rk);
3991 }
3992
3993
3994
3995
3996 /**
3997 * @brief Create topic using kafka-topics.sh --create
3998 */
test_create_topic_sh(const char * topicname,int partition_cnt,int replication_factor)3999 static void test_create_topic_sh (const char *topicname, int partition_cnt,
4000 int replication_factor) {
4001 test_kafka_topics("--create --topic \"%s\" "
4002 "--replication-factor %d --partitions %d",
4003 topicname, replication_factor, partition_cnt);
4004 }
4005
4006
4007 /**
4008 * @brief Create topic
4009 */
test_create_topic(rd_kafka_t * use_rk,const char * topicname,int partition_cnt,int replication_factor)4010 void test_create_topic (rd_kafka_t *use_rk,
4011 const char *topicname, int partition_cnt,
4012 int replication_factor) {
4013 if (test_broker_version < TEST_BRKVER(0,10,2,0))
4014 test_create_topic_sh(topicname, partition_cnt,
4015 replication_factor);
4016 else
4017 test_admin_create_topic(use_rk, topicname, partition_cnt,
4018 replication_factor);
4019 }
4020
4021
4022 /**
4023 * @brief Create topic using kafka-topics.sh --delete
4024 */
test_delete_topic_sh(const char * topicname)4025 static void test_delete_topic_sh (const char *topicname) {
4026 test_kafka_topics("--delete --topic \"%s\" ", topicname);
4027 }
4028
4029
4030 /**
4031 * @brief Delete topic using Topic Admin API
4032 */
test_admin_delete_topic(rd_kafka_t * use_rk,const char * topicname)4033 static void test_admin_delete_topic (rd_kafka_t *use_rk,
4034 const char *topicname) {
4035 rd_kafka_t *rk;
4036 rd_kafka_DeleteTopic_t *delt[1];
4037 const size_t delt_cnt = 1;
4038 rd_kafka_AdminOptions_t *options;
4039 rd_kafka_queue_t *rkqu;
4040 rd_kafka_event_t *rkev;
4041 const rd_kafka_DeleteTopics_result_t *res;
4042 const rd_kafka_topic_result_t **terr;
4043 int timeout_ms = tmout_multip(10000);
4044 size_t res_cnt;
4045 rd_kafka_resp_err_t err;
4046 char errstr[512];
4047 test_timing_t t_create;
4048
4049 if (!(rk = use_rk))
4050 rk = test_create_producer();
4051
4052 rkqu = rd_kafka_queue_new(rk);
4053
4054 delt[0] = rd_kafka_DeleteTopic_new(topicname);
4055
4056 options = rd_kafka_AdminOptions_new(rk, RD_KAFKA_ADMIN_OP_DELETETOPICS);
4057 err = rd_kafka_AdminOptions_set_operation_timeout(options, timeout_ms,
4058 errstr,
4059 sizeof(errstr));
4060 TEST_ASSERT(!err, "%s", errstr);
4061
4062 TEST_SAY("Deleting topic \"%s\" "
4063 "(timeout=%d)\n",
4064 topicname, timeout_ms);
4065
4066 TIMING_START(&t_create, "DeleteTopics");
4067 rd_kafka_DeleteTopics(rk, delt, delt_cnt, options, rkqu);
4068
4069 /* Wait for result */
4070 rkev = rd_kafka_queue_poll(rkqu, timeout_ms + 2000);
4071 TEST_ASSERT(rkev, "Timed out waiting for DeleteTopics result");
4072
4073 TIMING_STOP(&t_create);
4074
4075 res = rd_kafka_event_DeleteTopics_result(rkev);
4076 TEST_ASSERT(res, "Expected DeleteTopics_result, not %s",
4077 rd_kafka_event_name(rkev));
4078
4079 terr = rd_kafka_DeleteTopics_result_topics(res, &res_cnt);
4080 TEST_ASSERT(terr, "DeleteTopics_result_topics returned NULL");
4081 TEST_ASSERT(res_cnt == delt_cnt,
4082 "DeleteTopics_result_topics returned %"PRIusz" topics, "
4083 "not the expected %"PRIusz,
4084 res_cnt, delt_cnt);
4085
4086 TEST_ASSERT(!rd_kafka_topic_result_error(terr[0]),
4087 "Topic %s result error: %s",
4088 rd_kafka_topic_result_name(terr[0]),
4089 rd_kafka_topic_result_error_string(terr[0]));
4090
4091 rd_kafka_event_destroy(rkev);
4092
4093 rd_kafka_queue_destroy(rkqu);
4094
4095 rd_kafka_AdminOptions_destroy(options);
4096
4097 rd_kafka_DeleteTopic_destroy(delt[0]);
4098
4099 if (!use_rk)
4100 rd_kafka_destroy(rk);
4101 }
4102
4103
4104 /**
4105 * @brief Delete a topic
4106 */
test_delete_topic(rd_kafka_t * use_rk,const char * topicname)4107 void test_delete_topic (rd_kafka_t *use_rk, const char *topicname) {
4108 if (test_broker_version < TEST_BRKVER(0,10,2,0))
4109 test_delete_topic_sh(topicname);
4110 else
4111 test_admin_delete_topic(use_rk, topicname);
4112 }
4113
4114
4115 /**
4116 * @brief Create additional partitions for a topic using Admin API
4117 */
test_admin_create_partitions(rd_kafka_t * use_rk,const char * topicname,int new_partition_cnt)4118 static void test_admin_create_partitions (rd_kafka_t *use_rk,
4119 const char *topicname,
4120 int new_partition_cnt) {
4121 rd_kafka_t *rk;
4122 rd_kafka_NewPartitions_t *newp[1];
4123 const size_t newp_cnt = 1;
4124 rd_kafka_AdminOptions_t *options;
4125 rd_kafka_queue_t *rkqu;
4126 rd_kafka_event_t *rkev;
4127 const rd_kafka_CreatePartitions_result_t *res;
4128 const rd_kafka_topic_result_t **terr;
4129 int timeout_ms = tmout_multip(10000);
4130 size_t res_cnt;
4131 rd_kafka_resp_err_t err;
4132 char errstr[512];
4133 test_timing_t t_create;
4134
4135 if (!(rk = use_rk))
4136 rk = test_create_producer();
4137
4138 rkqu = rd_kafka_queue_new(rk);
4139
4140 newp[0] = rd_kafka_NewPartitions_new(topicname, new_partition_cnt,
4141 errstr, sizeof(errstr));
4142 TEST_ASSERT(newp[0] != NULL, "%s", errstr);
4143
4144 options = rd_kafka_AdminOptions_new(rk,
4145 RD_KAFKA_ADMIN_OP_CREATEPARTITIONS);
4146 err = rd_kafka_AdminOptions_set_operation_timeout(options, timeout_ms,
4147 errstr,
4148 sizeof(errstr));
4149 TEST_ASSERT(!err, "%s", errstr);
4150
4151 TEST_SAY("Creating %d (total) partitions for topic \"%s\"\n",
4152 new_partition_cnt, topicname);
4153
4154 TIMING_START(&t_create, "CreatePartitions");
4155 rd_kafka_CreatePartitions(rk, newp, newp_cnt, options, rkqu);
4156
4157 /* Wait for result */
4158 rkev = rd_kafka_queue_poll(rkqu, timeout_ms + 2000);
4159 TEST_ASSERT(rkev, "Timed out waiting for CreatePartitions result");
4160
4161 TIMING_STOP(&t_create);
4162
4163 res = rd_kafka_event_CreatePartitions_result(rkev);
4164 TEST_ASSERT(res, "Expected CreatePartitions_result, not %s",
4165 rd_kafka_event_name(rkev));
4166
4167 terr = rd_kafka_CreatePartitions_result_topics(res, &res_cnt);
4168 TEST_ASSERT(terr, "CreatePartitions_result_topics returned NULL");
4169 TEST_ASSERT(res_cnt == newp_cnt,
4170 "CreatePartitions_result_topics returned %"PRIusz
4171 " topics, not the expected %"PRIusz,
4172 res_cnt, newp_cnt);
4173
4174 TEST_ASSERT(!rd_kafka_topic_result_error(terr[0]),
4175 "Topic %s result error: %s",
4176 rd_kafka_topic_result_name(terr[0]),
4177 rd_kafka_topic_result_error_string(terr[0]));
4178
4179 rd_kafka_event_destroy(rkev);
4180
4181 rd_kafka_queue_destroy(rkqu);
4182
4183 rd_kafka_AdminOptions_destroy(options);
4184
4185 rd_kafka_NewPartitions_destroy(newp[0]);
4186
4187 if (!use_rk)
4188 rd_kafka_destroy(rk);
4189 }
4190
4191
4192 /**
4193 * @brief Create partitions for topic
4194 */
test_create_partitions(rd_kafka_t * use_rk,const char * topicname,int new_partition_cnt)4195 void test_create_partitions (rd_kafka_t *use_rk,
4196 const char *topicname, int new_partition_cnt) {
4197 if (test_broker_version < TEST_BRKVER(0,10,2,0))
4198 test_kafka_topics("--alter --topic %s --partitions %d",
4199 topicname, new_partition_cnt);
4200 else
4201 test_admin_create_partitions(use_rk, topicname,
4202 new_partition_cnt);
4203 }
4204
4205
test_get_partition_count(rd_kafka_t * rk,const char * topicname,int timeout_ms)4206 int test_get_partition_count (rd_kafka_t *rk, const char *topicname,
4207 int timeout_ms) {
4208 rd_kafka_t *use_rk;
4209 rd_kafka_resp_err_t err;
4210 rd_kafka_topic_t *rkt;
4211 int64_t abs_timeout = test_clock() + (timeout_ms * 1000);
4212 int ret = -1;
4213
4214 if (!rk)
4215 use_rk = test_create_producer();
4216 else
4217 use_rk = rk;
4218
4219 rkt = rd_kafka_topic_new(use_rk, topicname, NULL);
4220
4221 do {
4222 const struct rd_kafka_metadata *metadata;
4223
4224 err = rd_kafka_metadata(use_rk, 0, rkt, &metadata,
4225 tmout_multip(15000));
4226 if (err)
4227 TEST_WARN("metadata() for %s failed: %s\n",
4228 rkt ? rd_kafka_topic_name(rkt) :
4229 "(all-local)",
4230 rd_kafka_err2str(err));
4231 else {
4232 if (metadata->topic_cnt == 1) {
4233 if (metadata->topics[0].err == 0 ||
4234 metadata->topics[0].partition_cnt > 0) {
4235 int32_t cnt;
4236 cnt = metadata->topics[0].partition_cnt;
4237 rd_kafka_metadata_destroy(metadata);
4238 ret = (int)cnt;
4239 break;
4240 }
4241 TEST_SAY("metadata(%s) returned %s: retrying\n",
4242 rd_kafka_topic_name(rkt),
4243 rd_kafka_err2str(metadata->
4244 topics[0].err));
4245 }
4246 rd_kafka_metadata_destroy(metadata);
4247 rd_sleep(1);
4248 }
4249 } while (test_clock() < abs_timeout);
4250
4251 rd_kafka_topic_destroy(rkt);
4252
4253 if (!rk)
4254 rd_kafka_destroy(use_rk);
4255
4256 return ret;
4257 }
4258
4259 /**
4260 * @brief Let the broker auto-create the topic for us.
4261 */
test_auto_create_topic_rkt(rd_kafka_t * rk,rd_kafka_topic_t * rkt,int timeout_ms)4262 rd_kafka_resp_err_t test_auto_create_topic_rkt (rd_kafka_t *rk,
4263 rd_kafka_topic_t *rkt,
4264 int timeout_ms) {
4265 const struct rd_kafka_metadata *metadata;
4266 rd_kafka_resp_err_t err;
4267 test_timing_t t;
4268 int64_t abs_timeout = test_clock() + (timeout_ms * 1000);
4269
4270 do {
4271 TIMING_START(&t, "auto_create_topic");
4272 err = rd_kafka_metadata(rk, 0, rkt, &metadata,
4273 tmout_multip(15000));
4274 TIMING_STOP(&t);
4275 if (err)
4276 TEST_WARN("metadata() for %s failed: %s\n",
4277 rkt ? rd_kafka_topic_name(rkt) :
4278 "(all-local)",
4279 rd_kafka_err2str(err));
4280 else {
4281 if (metadata->topic_cnt == 1) {
4282 if (metadata->topics[0].err == 0 ||
4283 metadata->topics[0].partition_cnt > 0) {
4284 rd_kafka_metadata_destroy(metadata);
4285 return 0;
4286 }
4287 TEST_SAY("metadata(%s) returned %s: retrying\n",
4288 rd_kafka_topic_name(rkt),
4289 rd_kafka_err2str(metadata->
4290 topics[0].err));
4291 }
4292 rd_kafka_metadata_destroy(metadata);
4293 rd_sleep(1);
4294 }
4295 } while (test_clock() < abs_timeout);
4296
4297 return err;
4298 }
4299
test_auto_create_topic(rd_kafka_t * rk,const char * name,int timeout_ms)4300 rd_kafka_resp_err_t test_auto_create_topic (rd_kafka_t *rk, const char *name,
4301 int timeout_ms) {
4302 rd_kafka_topic_t *rkt = rd_kafka_topic_new(rk, name, NULL);
4303 rd_kafka_resp_err_t err;
4304 if (!rkt)
4305 return rd_kafka_last_error();
4306 err = test_auto_create_topic_rkt(rk, rkt, timeout_ms);
4307 rd_kafka_topic_destroy(rkt);
4308 return err;
4309 }
4310
4311
4312 /**
4313 * @brief Check if topic auto creation works.
4314 * @returns 1 if it does, else 0.
4315 */
test_check_auto_create_topic(void)4316 int test_check_auto_create_topic (void) {
4317 rd_kafka_t *rk;
4318 rd_kafka_conf_t *conf;
4319 rd_kafka_resp_err_t err;
4320 const char *topic = test_mk_topic_name("autocreatetest", 1);
4321
4322 test_conf_init(&conf, NULL, 0);
4323 rk = test_create_handle(RD_KAFKA_PRODUCER, conf);
4324 err = test_auto_create_topic(rk, topic, tmout_multip(5000));
4325 if (err)
4326 TEST_SAY("Auto topic creation of \"%s\" failed: %s\n",
4327 topic, rd_kafka_err2str(err));
4328 rd_kafka_destroy(rk);
4329
4330 return err ? 0 : 1;
4331 }
4332
4333
4334 /**
4335 * @brief Builds and runs a Java application from the java/ directory.
4336 *
4337 * The application is started in the background, use
4338 * test_waitpid() to await its demise.
4339 *
4340 * @param cls The app class to run using java/run-class.sh
4341 *
4342 * @returns -1 if the application could not be started, else the pid.
4343 */
test_run_java(const char * cls,const char ** argv)4344 int test_run_java (const char *cls, const char **argv) {
4345 #ifdef _MSC_VER
4346 TEST_WARN("%s(%s) not supported Windows, yet",
4347 __FUNCTION__, cls);
4348 return -1;
4349 #else
4350 int r;
4351 const char *kpath;
4352 pid_t pid;
4353 const char **full_argv, **p;
4354 int cnt;
4355 extern char **environ;
4356
4357 kpath = test_getenv("KAFKA_PATH", NULL);
4358
4359 if (!kpath) {
4360 TEST_WARN("%s(%s): KAFKA_PATH must be set\n",
4361 __FUNCTION__, cls);
4362 return -1;
4363 }
4364
4365 /* Build */
4366 r = system("make -s java");
4367
4368 if (r == -1 || WIFSIGNALED(r) || WEXITSTATUS(r)) {
4369 TEST_WARN("%s(%s): failed to build java class (code %d)\n",
4370 __FUNCTION__, cls, r);
4371 return -1;
4372 }
4373
4374 /* For child process and run cls */
4375 pid = fork();
4376 if (pid == -1) {
4377 TEST_WARN("%s(%s): failed to fork: %s\n",
4378 __FUNCTION__, cls, strerror(errno));
4379 return -1;
4380 }
4381
4382 if (pid > 0)
4383 return (int)pid; /* In parent process */
4384
4385 /* In child process */
4386
4387 /* Reconstruct argv to contain run-class.sh and the cls */
4388 for (cnt = 0 ; argv[cnt] ; cnt++)
4389 ;
4390
4391 cnt += 3; /* run-class.sh, cls, .., NULL */
4392 full_argv = malloc(sizeof(*full_argv) * cnt);
4393 full_argv[0] = "java/run-class.sh";
4394 full_argv[1] = (const char *)cls;
4395
4396 /* Copy arguments */
4397 for (p = &full_argv[2] ; *argv ; p++, argv++)
4398 *p = *argv;
4399 *p = NULL;
4400
4401 /* Run */
4402 r = execve(full_argv[0], (char *const*)full_argv, environ);
4403
4404 TEST_WARN("%s(%s): failed to execute run-class.sh: %s\n",
4405 __FUNCTION__, cls, strerror(errno));
4406 exit(2);
4407
4408 return -1; /* NOTREACHED */
4409 #endif
4410 }
4411
4412
4413 /**
4414 * @brief Wait for child-process \p pid to exit.
4415 *
4416 * @returns -1 if the child process exited successfully, else -1.
4417 */
test_waitpid(int pid)4418 int test_waitpid (int pid) {
4419 #ifdef _MSC_VER
4420 TEST_WARN("%s() not supported Windows, yet",
4421 __FUNCTION__);
4422 return -1;
4423 #else
4424 pid_t r;
4425 int status = 0;
4426
4427 r = waitpid((pid_t)pid, &status, 0);
4428
4429 if (r == -1) {
4430 TEST_WARN("waitpid(%d) failed: %s\n",
4431 pid, strerror(errno));
4432 return -1;
4433 }
4434
4435 if (WIFSIGNALED(status)) {
4436 TEST_WARN("Process %d terminated by signal %d\n", pid,
4437 WTERMSIG(status));
4438 return -1;
4439 } else if (WEXITSTATUS(status)) {
4440 TEST_WARN("Process %d exited with status %d\n",
4441 pid, WEXITSTATUS(status));
4442 return -1;
4443 }
4444
4445 return 0;
4446 #endif
4447 }
4448
4449
4450 /**
4451 * @brief Check if \p feature is builtin to librdkafka.
4452 * @returns returns 1 if feature is built in, else 0.
4453 */
test_check_builtin(const char * feature)4454 int test_check_builtin (const char *feature) {
4455 rd_kafka_conf_t *conf;
4456 char errstr[128];
4457 int r;
4458
4459 conf = rd_kafka_conf_new();
4460 if (rd_kafka_conf_set(conf, "builtin.features", feature,
4461 errstr, sizeof(errstr)) != RD_KAFKA_CONF_OK) {
4462 TEST_SAY("Feature \"%s\" not built-in: %s\n",
4463 feature, errstr);
4464 r = 0;
4465 } else {
4466 TEST_SAY("Feature \"%s\" is built-in\n", feature);
4467 r = 1;
4468 }
4469
4470 rd_kafka_conf_destroy(conf);
4471 return r;
4472 }
4473
4474
tsprintf(const char * fmt,...)4475 char *tsprintf (const char *fmt, ...) {
4476 static RD_TLS char ret[8][512];
4477 static RD_TLS int i;
4478 va_list ap;
4479
4480
4481 i = (i + 1) % 8;
4482
4483 va_start(ap, fmt);
4484 rd_vsnprintf(ret[i], sizeof(ret[i]), fmt, ap);
4485 va_end(ap);
4486
4487 return ret[i];
4488 }
4489
4490
4491 /**
4492 * @brief Add a test report JSON object.
4493 * These will be written as a JSON array to the test report file.
4494 */
test_report_add(struct test * test,const char * fmt,...)4495 void test_report_add (struct test *test, const char *fmt, ...) {
4496 va_list ap;
4497 char buf[512];
4498
4499 va_start(ap, fmt);
4500 vsnprintf(buf, sizeof(buf), fmt, ap);
4501 va_end(ap);
4502
4503 if (test->report_cnt == test->report_size) {
4504 if (test->report_size == 0)
4505 test->report_size = 8;
4506 else
4507 test->report_size *= 2;
4508
4509 test->report_arr = realloc(test->report_arr,
4510 sizeof(*test->report_arr) *
4511 test->report_size);
4512 }
4513
4514 test->report_arr[test->report_cnt++] = rd_strdup(buf);
4515
4516 TEST_SAYL(1, "Report #%d: %s\n", test->report_cnt-1, buf);
4517 }
4518
4519 /**
4520 * Returns 1 if KAFKA_PATH and ZK_ADDRESS is set to se we can use the
4521 * kafka-topics.sh script to manually create topics.
4522 *
4523 * If \p skip is set TEST_SKIP() will be called with a helpful message.
4524 */
test_can_create_topics(int skip)4525 int test_can_create_topics (int skip) {
4526 /* Has AdminAPI */
4527 if (test_broker_version >= TEST_BRKVER(0,10,2,0))
4528 return 1;
4529
4530 #ifdef _MSC_VER
4531 if (skip)
4532 TEST_SKIP("Cannot create topics on Win32\n");
4533 return 0;
4534 #else
4535
4536 if (!test_getenv("KAFKA_PATH", NULL) ||
4537 !test_getenv("ZK_ADDRESS", NULL)) {
4538 if (skip)
4539 TEST_SKIP("Cannot create topics "
4540 "(set KAFKA_PATH and ZK_ADDRESS)\n");
4541 return 0;
4542 }
4543
4544
4545 return 1;
4546 #endif
4547 }
4548
4549
4550 /**
4551 * Wait for \p event_type, discarding all other events prior to it.
4552 */
test_wait_event(rd_kafka_queue_t * eventq,rd_kafka_event_type_t event_type,int timeout_ms)4553 rd_kafka_event_t *test_wait_event (rd_kafka_queue_t *eventq,
4554 rd_kafka_event_type_t event_type,
4555 int timeout_ms) {
4556 test_timing_t t_w;
4557 int64_t abs_timeout = test_clock() + (timeout_ms * 1000);
4558
4559 TIMING_START(&t_w, "wait_event");
4560 while (test_clock() < abs_timeout) {
4561 rd_kafka_event_t *rkev;
4562
4563 rkev = rd_kafka_queue_poll(eventq,
4564 (int)(abs_timeout - test_clock())/
4565 1000);
4566
4567 if (rd_kafka_event_type(rkev) == event_type) {
4568 TIMING_STOP(&t_w);
4569 return rkev;
4570 }
4571
4572 if (!rkev)
4573 continue;
4574
4575 if (rd_kafka_event_error(rkev))
4576 TEST_SAY("discarding ignored event %s: %s\n",
4577 rd_kafka_event_name(rkev),
4578 rd_kafka_event_error_string(rkev));
4579 else
4580 TEST_SAY("discarding ignored event %s\n",
4581 rd_kafka_event_name(rkev));
4582 rd_kafka_event_destroy(rkev);
4583
4584 }
4585 TIMING_STOP(&t_w);
4586
4587 return NULL;
4588 }
4589
4590
test_SAY(const char * file,int line,int level,const char * str)4591 void test_SAY (const char *file, int line, int level, const char *str) {
4592 TEST_SAYL(level, "%s", str);
4593 }
4594
test_SKIP(const char * file,int line,const char * str)4595 void test_SKIP (const char *file, int line, const char *str) {
4596 TEST_WARN("SKIPPING TEST: %s", str);
4597 TEST_LOCK();
4598 test_curr->state = TEST_SKIPPED;
4599 if (!*test_curr->failstr) {
4600 rd_snprintf(test_curr->failstr,
4601 sizeof(test_curr->failstr), "%s", str);
4602 rtrim(test_curr->failstr);
4603 }
4604 TEST_UNLOCK();
4605 }
4606
test_curr_name(void)4607 const char *test_curr_name (void) {
4608 return test_curr->name;
4609 }
4610
4611
4612 /**
4613 * @brief Dump/print message haders
4614 */
test_headers_dump(const char * what,int lvl,const rd_kafka_headers_t * hdrs)4615 void test_headers_dump (const char *what, int lvl,
4616 const rd_kafka_headers_t *hdrs) {
4617 size_t idx = 0;
4618 const char *name, *value;
4619 size_t size;
4620
4621 while (!rd_kafka_header_get_all(hdrs, idx++, &name,
4622 (const void **)&value, &size))
4623 TEST_SAYL(lvl, "%s: Header #%"PRIusz": %s='%s'\n",
4624 what, idx-1, name,
4625 value ? value : "(NULL)");
4626 }
4627
4628
4629 /**
4630 * @brief Retrieve and return the list of broker ids in the cluster.
4631 *
4632 * @param rk Optional instance to use.
4633 * @param cntp Will be updated to the number of brokers returned.
4634 *
4635 * @returns a malloc:ed list of int32_t broker ids.
4636 */
test_get_broker_ids(rd_kafka_t * use_rk,size_t * cntp)4637 int32_t *test_get_broker_ids (rd_kafka_t *use_rk, size_t *cntp) {
4638 int32_t *ids;
4639 rd_kafka_t *rk;
4640 const rd_kafka_metadata_t *md;
4641 rd_kafka_resp_err_t err;
4642 size_t i;
4643
4644 if (!(rk = use_rk))
4645 rk = test_create_producer();
4646
4647 err = rd_kafka_metadata(rk, 0, NULL, &md, tmout_multip(5000));
4648 TEST_ASSERT(!err, "%s", rd_kafka_err2str(err));
4649 TEST_ASSERT(md->broker_cnt > 0,
4650 "%d brokers, expected > 0", md->broker_cnt);
4651
4652 ids = malloc(sizeof(*ids) * md->broker_cnt);
4653
4654 for (i = 0 ; i < (size_t)md->broker_cnt ; i++)
4655 ids[i] = md->brokers[i].id;
4656
4657 *cntp = md->broker_cnt;
4658
4659 rd_kafka_metadata_destroy(md);
4660
4661 if (!use_rk)
4662 rd_kafka_destroy(rk);
4663
4664 return ids;
4665 }
4666
4667
4668
4669 /**
4670 * @brief Verify that all topics in \p topics are reported in metadata,
4671 * and that none of the topics in \p not_topics are reported.
4672 *
4673 * @returns the number of failures (but does not FAIL).
4674 */
verify_topics_in_metadata(rd_kafka_t * rk,rd_kafka_metadata_topic_t * topics,size_t topic_cnt,rd_kafka_metadata_topic_t * not_topics,size_t not_topic_cnt)4675 static int verify_topics_in_metadata (rd_kafka_t *rk,
4676 rd_kafka_metadata_topic_t *topics,
4677 size_t topic_cnt,
4678 rd_kafka_metadata_topic_t *not_topics,
4679 size_t not_topic_cnt) {
4680 const rd_kafka_metadata_t *md;
4681 rd_kafka_resp_err_t err;
4682 int ti;
4683 size_t i;
4684 int fails = 0;
4685
4686 /* Mark topics with dummy error which is overwritten
4687 * when topic is found in metadata, allowing us to check
4688 * for missed topics. */
4689 for (i = 0 ; i < topic_cnt ; i++)
4690 topics[i].err = 12345;
4691
4692 err = rd_kafka_metadata(rk, 1/*all_topics*/, NULL, &md,
4693 tmout_multip(5000));
4694 TEST_ASSERT(!err, "metadata failed: %s", rd_kafka_err2str(err));
4695
4696 for (ti = 0 ; ti < md->topic_cnt ; ti++) {
4697 const rd_kafka_metadata_topic_t *mdt = &md->topics[ti];
4698
4699 for (i = 0 ; i < topic_cnt ; i++) {
4700 int pi;
4701 rd_kafka_metadata_topic_t *exp_mdt;
4702
4703 if (strcmp(topics[i].topic, mdt->topic))
4704 continue;
4705
4706 exp_mdt = &topics[i];
4707
4708 exp_mdt->err = mdt->err; /* indicate found */
4709 if (mdt->err) {
4710 TEST_SAY("metadata: "
4711 "Topic %s has error %s\n",
4712 mdt->topic,
4713 rd_kafka_err2str(mdt->err));
4714 fails++;
4715 }
4716
4717 if (exp_mdt->partition_cnt != 0 &&
4718 mdt->partition_cnt != exp_mdt->partition_cnt) {
4719 TEST_SAY("metadata: "
4720 "Topic %s, expected %d partitions"
4721 ", not %d\n",
4722 mdt->topic,
4723 exp_mdt->partition_cnt,
4724 mdt->partition_cnt);
4725 fails++;
4726 continue;
4727 }
4728
4729 /* Verify per-partition values */
4730 for (pi = 0 ; exp_mdt->partitions &&
4731 pi < exp_mdt->partition_cnt ; pi++) {
4732 const rd_kafka_metadata_partition_t *mdp =
4733 &mdt->partitions[pi];
4734 const rd_kafka_metadata_partition_t *exp_mdp =
4735 &exp_mdt->partitions[pi];
4736
4737 if (mdp->id != exp_mdp->id) {
4738 TEST_SAY("metadata: "
4739 "Topic %s, "
4740 "partition %d, "
4741 "partition list out of order,"
4742 " expected %d, not %d\n",
4743 mdt->topic, pi,
4744 exp_mdp->id, mdp->id);
4745 fails++;
4746 continue;
4747 }
4748
4749 if (exp_mdp->replicas) {
4750 if (mdp->replica_cnt !=
4751 exp_mdp->replica_cnt) {
4752 TEST_SAY("metadata: "
4753 "Topic %s, "
4754 "partition %d, "
4755 "expected %d replicas,"
4756 " not %d\n",
4757 mdt->topic, pi,
4758 exp_mdp->replica_cnt,
4759 mdp->replica_cnt);
4760 fails++;
4761 } else if (memcmp(mdp->replicas,
4762 exp_mdp->replicas,
4763 mdp->replica_cnt *
4764 sizeof(*mdp->replicas))) {
4765 int ri;
4766
4767 TEST_SAY("metadata: "
4768 "Topic %s, "
4769 "partition %d, "
4770 "replica mismatch:\n",
4771 mdt->topic, pi);
4772
4773 for (ri = 0 ;
4774 ri < mdp->replica_cnt ;
4775 ri++) {
4776 TEST_SAY(" #%d: "
4777 "expected "
4778 "replica %d, "
4779 "not %d\n",
4780 ri,
4781 exp_mdp->
4782 replicas[ri],
4783 mdp->
4784 replicas[ri]);
4785 }
4786
4787 fails++;
4788 }
4789
4790 }
4791 }
4792 }
4793
4794 for (i = 0 ; i < not_topic_cnt ; i++) {
4795 if (strcmp(not_topics[i].topic, mdt->topic))
4796 continue;
4797
4798 TEST_SAY("metadata: "
4799 "Topic %s found in metadata, unexpected\n",
4800 mdt->topic);
4801 fails++;
4802 }
4803
4804 }
4805
4806 for (i = 0 ; i < topic_cnt ; i++) {
4807 if ((int)topics[i].err == 12345) {
4808 TEST_SAY("metadata: "
4809 "Topic %s not seen in metadata\n",
4810 topics[i].topic);
4811 fails++;
4812 }
4813 }
4814
4815 if (fails > 0)
4816 TEST_SAY("Metadata verification for %"PRIusz" topics failed "
4817 "with %d errors (see above)\n",
4818 topic_cnt, fails);
4819 else
4820 TEST_SAY("Metadata verification succeeded: "
4821 "%"PRIusz" desired topics seen, "
4822 "%"PRIusz" undesired topics not seen\n",
4823 topic_cnt, not_topic_cnt);
4824
4825 rd_kafka_metadata_destroy(md);
4826
4827 return fails;
4828 }
4829
4830
4831
4832 /**
4833 * @brief Wait for metadata to reflect expected and not expected topics
4834 */
test_wait_metadata_update(rd_kafka_t * rk,rd_kafka_metadata_topic_t * topics,size_t topic_cnt,rd_kafka_metadata_topic_t * not_topics,size_t not_topic_cnt,int tmout)4835 void test_wait_metadata_update (rd_kafka_t *rk,
4836 rd_kafka_metadata_topic_t *topics,
4837 size_t topic_cnt,
4838 rd_kafka_metadata_topic_t *not_topics,
4839 size_t not_topic_cnt,
4840 int tmout) {
4841 int64_t abs_timeout;
4842 test_timing_t t_md;
4843
4844 abs_timeout = test_clock() + (tmout * 1000);
4845
4846 test_timeout_set(10 + (tmout/1000));
4847
4848 TEST_SAY("Waiting for up to %dms for metadata update\n", tmout);
4849
4850 TIMING_START(&t_md, "METADATA.WAIT");
4851 do {
4852 int md_fails;
4853
4854 md_fails = verify_topics_in_metadata(
4855 rk,
4856 topics, topic_cnt,
4857 not_topics, not_topic_cnt);
4858
4859 if (!md_fails) {
4860 TEST_SAY("All expected topics (not?) "
4861 "seen in metadata\n");
4862 abs_timeout = 0;
4863 break;
4864 }
4865
4866 rd_sleep(1);
4867 } while (test_clock() < abs_timeout);
4868 TIMING_STOP(&t_md);
4869
4870 if (abs_timeout)
4871 TEST_FAIL("Expected topics not seen in given time.");
4872 }
4873
4874
4875
4876 /**
4877 * @brief Wait for up to \p tmout for any type of admin result.
4878 * @returns the event
4879 */
4880 rd_kafka_event_t *
test_wait_admin_result(rd_kafka_queue_t * q,rd_kafka_event_type_t evtype,int tmout)4881 test_wait_admin_result (rd_kafka_queue_t *q,
4882 rd_kafka_event_type_t evtype,
4883 int tmout) {
4884 rd_kafka_event_t *rkev;
4885
4886 while (1) {
4887 rkev = rd_kafka_queue_poll(q, tmout);
4888 if (!rkev)
4889 TEST_FAIL("Timed out waiting for admin result (%d)\n",
4890 evtype);
4891
4892 if (rd_kafka_event_type(rkev) == evtype)
4893 return rkev;
4894
4895
4896 if (rd_kafka_event_type(rkev) == RD_KAFKA_EVENT_ERROR) {
4897 TEST_WARN("Received error event while waiting for %d: "
4898 "%s: ignoring",
4899 evtype, rd_kafka_event_error_string(rkev));
4900 continue;
4901 }
4902
4903
4904 TEST_ASSERT(rd_kafka_event_type(rkev) == evtype,
4905 "Expected event type %d, got %d (%s)",
4906 evtype,
4907 rd_kafka_event_type(rkev),
4908 rd_kafka_event_name(rkev));
4909 }
4910
4911 return NULL;
4912 }
4913
4914
4915
4916 /**
4917 * @brief Wait for up to \p tmout for a
4918 * CreateTopics/DeleteTopics/CreatePartitions or
4919 * DescribeConfigs/AlterConfigs result and return the
4920 * distilled error code.
4921 */
4922 rd_kafka_resp_err_t
test_wait_topic_admin_result(rd_kafka_queue_t * q,rd_kafka_event_type_t evtype,rd_kafka_event_t ** retevent,int tmout)4923 test_wait_topic_admin_result (rd_kafka_queue_t *q,
4924 rd_kafka_event_type_t evtype,
4925 rd_kafka_event_t **retevent,
4926 int tmout) {
4927 rd_kafka_event_t *rkev;
4928 size_t i;
4929 const rd_kafka_topic_result_t **terr = NULL;
4930 size_t terr_cnt = 0;
4931 const rd_kafka_ConfigResource_t **cres = NULL;
4932 size_t cres_cnt = 0;
4933 int errcnt = 0;
4934 rd_kafka_resp_err_t err;
4935
4936 rkev = test_wait_admin_result(q, evtype, tmout);
4937
4938 if ((err = rd_kafka_event_error(rkev))) {
4939 TEST_WARN("%s failed: %s\n",
4940 rd_kafka_event_name(rkev),
4941 rd_kafka_event_error_string(rkev));
4942 rd_kafka_event_destroy(rkev);
4943 return err;
4944 }
4945
4946 if (evtype == RD_KAFKA_EVENT_CREATETOPICS_RESULT) {
4947 const rd_kafka_CreateTopics_result_t *res;
4948 if (!(res = rd_kafka_event_CreateTopics_result(rkev)))
4949 TEST_FAIL("Expected a CreateTopics result, not %s",
4950 rd_kafka_event_name(rkev));
4951
4952 terr = rd_kafka_CreateTopics_result_topics(res, &terr_cnt);
4953
4954 } else if (evtype == RD_KAFKA_EVENT_DELETETOPICS_RESULT) {
4955 const rd_kafka_DeleteTopics_result_t *res;
4956 if (!(res = rd_kafka_event_DeleteTopics_result(rkev)))
4957 TEST_FAIL("Expected a DeleteTopics result, not %s",
4958 rd_kafka_event_name(rkev));
4959
4960 terr = rd_kafka_DeleteTopics_result_topics(res, &terr_cnt);
4961
4962 } else if (evtype == RD_KAFKA_EVENT_CREATEPARTITIONS_RESULT) {
4963 const rd_kafka_CreatePartitions_result_t *res;
4964 if (!(res = rd_kafka_event_CreatePartitions_result(rkev)))
4965 TEST_FAIL("Expected a CreatePartitions result, not %s",
4966 rd_kafka_event_name(rkev));
4967
4968 terr = rd_kafka_CreatePartitions_result_topics(res, &terr_cnt);
4969
4970 } else if (evtype == RD_KAFKA_EVENT_DESCRIBECONFIGS_RESULT) {
4971 const rd_kafka_DescribeConfigs_result_t *res;
4972
4973 if (!(res = rd_kafka_event_DescribeConfigs_result(rkev)))
4974 TEST_FAIL("Expected a DescribeConfigs result, not %s",
4975 rd_kafka_event_name(rkev));
4976
4977 cres = rd_kafka_DescribeConfigs_result_resources(res,
4978 &cres_cnt);
4979
4980 } else if (evtype == RD_KAFKA_EVENT_ALTERCONFIGS_RESULT) {
4981 const rd_kafka_AlterConfigs_result_t *res;
4982
4983 if (!(res = rd_kafka_event_AlterConfigs_result(rkev)))
4984 TEST_FAIL("Expected a AlterConfigs result, not %s",
4985 rd_kafka_event_name(rkev));
4986
4987 cres = rd_kafka_AlterConfigs_result_resources(res, &cres_cnt);
4988
4989 } else {
4990 TEST_FAIL("Bad evtype: %d", evtype);
4991 RD_NOTREACHED();
4992 }
4993
4994 /* Check topic errors */
4995 for (i = 0 ; i < terr_cnt ; i++) {
4996 if (rd_kafka_topic_result_error(terr[i])) {
4997 TEST_WARN("..Topics result: %s: error: %s\n",
4998 rd_kafka_topic_result_name(terr[i]),
4999 rd_kafka_topic_result_error_string(terr[i]));
5000 if (!(errcnt++))
5001 err = rd_kafka_topic_result_error(terr[i]);
5002 }
5003 }
5004
5005 /* Check resource errors */
5006 for (i = 0 ; i < cres_cnt ; i++) {
5007 if (rd_kafka_ConfigResource_error(cres[i])) {
5008 TEST_WARN("ConfigResource result: %d,%s: error: %s\n",
5009 rd_kafka_ConfigResource_type(cres[i]),
5010 rd_kafka_ConfigResource_name(cres[i]),
5011 rd_kafka_ConfigResource_error_string(cres[i]));
5012 if (!(errcnt++))
5013 err = rd_kafka_ConfigResource_error(cres[i]);
5014 }
5015 }
5016
5017 if (!err && retevent)
5018 *retevent = rkev;
5019 else
5020 rd_kafka_event_destroy(rkev);
5021
5022 return err;
5023 }
5024
5025
5026
5027 /**
5028 * @brief Topic Admin API helpers
5029 *
5030 * @param useq Makes the call async and posts the response in this queue.
5031 * If NULL this call will be synchronous and return the error
5032 * result.
5033 *
5034 * @remark Fails the current test on failure.
5035 */
5036
5037 rd_kafka_resp_err_t
test_CreateTopics_simple(rd_kafka_t * rk,rd_kafka_queue_t * useq,char ** topics,size_t topic_cnt,int num_partitions,void * opaque)5038 test_CreateTopics_simple (rd_kafka_t *rk,
5039 rd_kafka_queue_t *useq,
5040 char **topics, size_t topic_cnt,
5041 int num_partitions,
5042 void *opaque) {
5043 rd_kafka_NewTopic_t **new_topics;
5044 rd_kafka_AdminOptions_t *options;
5045 rd_kafka_queue_t *q;
5046 size_t i;
5047 const int tmout = 30 * 1000;
5048 rd_kafka_resp_err_t err;
5049
5050 new_topics = malloc(sizeof(*new_topics) * topic_cnt);
5051
5052 for (i = 0 ; i < topic_cnt ; i++) {
5053 char errstr[512];
5054 new_topics[i] = rd_kafka_NewTopic_new(topics[i],
5055 num_partitions, 1,
5056 errstr, sizeof(errstr));
5057 TEST_ASSERT(new_topics[i],
5058 "Failed to NewTopic(\"%s\", %d) #%"PRIusz": %s",
5059 topics[i], num_partitions, i, errstr);
5060 }
5061
5062 options = rd_kafka_AdminOptions_new(rk, RD_KAFKA_ADMIN_OP_CREATETOPICS);
5063 rd_kafka_AdminOptions_set_opaque(options, opaque);
5064
5065 if (!useq) {
5066 char errstr[512];
5067
5068 err = rd_kafka_AdminOptions_set_request_timeout(options,
5069 tmout,
5070 errstr,
5071 sizeof(errstr));
5072 TEST_ASSERT(!err, "set_request_timeout: %s", errstr);
5073 err = rd_kafka_AdminOptions_set_operation_timeout(options,
5074 tmout-5000,
5075 errstr,
5076 sizeof(errstr));
5077 TEST_ASSERT(!err, "set_operation_timeout: %s", errstr);
5078
5079 q = rd_kafka_queue_new(rk);
5080 } else {
5081 q = useq;
5082 }
5083
5084 TEST_SAY("Creating %"PRIusz" topics\n", topic_cnt);
5085
5086 rd_kafka_CreateTopics(rk, new_topics, topic_cnt, options, q);
5087
5088 rd_kafka_AdminOptions_destroy(options);
5089
5090 rd_kafka_NewTopic_destroy_array(new_topics, topic_cnt);
5091 free(new_topics);
5092
5093 if (useq)
5094 return RD_KAFKA_RESP_ERR_NO_ERROR;
5095
5096
5097 err = test_wait_topic_admin_result(q,
5098 RD_KAFKA_EVENT_CREATETOPICS_RESULT,
5099 NULL, tmout+5000);
5100
5101 rd_kafka_queue_destroy(q);
5102
5103 if (err)
5104 TEST_FAIL("Failed to create %d topic(s): %s",
5105 (int)topic_cnt, rd_kafka_err2str(err));
5106
5107 return err;
5108 }
5109
5110
5111 rd_kafka_resp_err_t
test_CreatePartitions_simple(rd_kafka_t * rk,rd_kafka_queue_t * useq,const char * topic,size_t total_part_cnt,void * opaque)5112 test_CreatePartitions_simple (rd_kafka_t *rk,
5113 rd_kafka_queue_t *useq,
5114 const char *topic,
5115 size_t total_part_cnt,
5116 void *opaque) {
5117 rd_kafka_NewPartitions_t *newp[1];
5118 rd_kafka_AdminOptions_t *options;
5119 rd_kafka_queue_t *q;
5120 const int tmout = 30 * 1000;
5121 rd_kafka_resp_err_t err;
5122 char errstr[512];
5123
5124 newp[0] = rd_kafka_NewPartitions_new(topic, total_part_cnt, errstr,
5125 sizeof(errstr));
5126 TEST_ASSERT(newp[0],
5127 "Failed to NewPartitions(\"%s\", %"PRIusz"): %s",
5128 topic, total_part_cnt, errstr);
5129
5130 options = rd_kafka_AdminOptions_new(rk,
5131 RD_KAFKA_ADMIN_OP_CREATEPARTITIONS);
5132 rd_kafka_AdminOptions_set_opaque(options, opaque);
5133
5134 if (!useq) {
5135 char errstr[512];
5136
5137 err = rd_kafka_AdminOptions_set_request_timeout(options,
5138 tmout,
5139 errstr,
5140 sizeof(errstr));
5141 TEST_ASSERT(!err, "set_request_timeout: %s", errstr);
5142 err = rd_kafka_AdminOptions_set_operation_timeout(options,
5143 tmout-5000,
5144 errstr,
5145 sizeof(errstr));
5146 TEST_ASSERT(!err, "set_operation_timeout: %s", errstr);
5147
5148 q = rd_kafka_queue_new(rk);
5149 } else {
5150 q = useq;
5151 }
5152
5153 TEST_SAY("Creating (up to) %"PRIusz" partitions for topic \"%s\"\n",
5154 total_part_cnt, topic);
5155
5156 rd_kafka_CreatePartitions(rk, newp, 1, options, q);
5157
5158 rd_kafka_AdminOptions_destroy(options);
5159
5160 rd_kafka_NewPartitions_destroy(newp[0]);
5161
5162 if (useq)
5163 return RD_KAFKA_RESP_ERR_NO_ERROR;
5164
5165
5166 err = test_wait_topic_admin_result(
5167 q, RD_KAFKA_EVENT_CREATEPARTITIONS_RESULT, NULL, tmout+5000);
5168
5169 rd_kafka_queue_destroy(q);
5170
5171 if (err)
5172 TEST_FAIL("Failed to create partitions: %s",
5173 rd_kafka_err2str(err));
5174
5175 return err;
5176 }
5177
5178
5179 rd_kafka_resp_err_t
test_DeleteTopics_simple(rd_kafka_t * rk,rd_kafka_queue_t * useq,char ** topics,size_t topic_cnt,void * opaque)5180 test_DeleteTopics_simple (rd_kafka_t *rk,
5181 rd_kafka_queue_t *useq,
5182 char **topics, size_t topic_cnt,
5183 void *opaque) {
5184 rd_kafka_queue_t *q;
5185 rd_kafka_DeleteTopic_t **del_topics;
5186 rd_kafka_AdminOptions_t *options;
5187 size_t i;
5188 rd_kafka_resp_err_t err;
5189 const int tmout = 30*1000;
5190
5191 del_topics = malloc(sizeof(*del_topics) * topic_cnt);
5192
5193 for (i = 0 ; i < topic_cnt ; i++) {
5194 del_topics[i] = rd_kafka_DeleteTopic_new(topics[i]);
5195 TEST_ASSERT(del_topics[i]);
5196 }
5197
5198 options = rd_kafka_AdminOptions_new(rk, RD_KAFKA_ADMIN_OP_DELETETOPICS);
5199 rd_kafka_AdminOptions_set_opaque(options, opaque);
5200
5201 if (!useq) {
5202 char errstr[512];
5203
5204 err = rd_kafka_AdminOptions_set_request_timeout(options,
5205 tmout,
5206 errstr,
5207 sizeof(errstr));
5208 TEST_ASSERT(!err, "set_request_timeout: %s", errstr);
5209 err = rd_kafka_AdminOptions_set_operation_timeout(options,
5210 tmout-5000,
5211 errstr,
5212 sizeof(errstr));
5213 TEST_ASSERT(!err, "set_operation_timeout: %s", errstr);
5214
5215 q = rd_kafka_queue_new(rk);
5216 } else {
5217 q = useq;
5218 }
5219
5220 TEST_SAY("Deleting %"PRIusz" topics\n", topic_cnt);
5221
5222 rd_kafka_DeleteTopics(rk, del_topics, topic_cnt, options, useq);
5223
5224 rd_kafka_AdminOptions_destroy(options);
5225
5226 rd_kafka_DeleteTopic_destroy_array(del_topics, topic_cnt);
5227
5228 free(del_topics);
5229
5230 if (useq)
5231 return RD_KAFKA_RESP_ERR_NO_ERROR;
5232
5233 err = test_wait_topic_admin_result(q,
5234 RD_KAFKA_EVENT_CREATETOPICS_RESULT,
5235 NULL, tmout+5000);
5236
5237 rd_kafka_queue_destroy(q);
5238
5239 if (err)
5240 TEST_FAIL("Failed to delete topics: %s",
5241 rd_kafka_err2str(err));
5242
5243 return err;
5244 }
5245
5246
5247 /**
5248 * @brief Delta Alter configuration for the given resource,
5249 * overwriting/setting the configs provided in \p configs.
5250 * Existing configuration remains intact.
5251 *
5252 * @param configs 'const char *name, const char *value' tuples
5253 * @param config_cnt is the number of tuples in \p configs
5254 */
5255 rd_kafka_resp_err_t
test_AlterConfigs_simple(rd_kafka_t * rk,rd_kafka_ResourceType_t restype,const char * resname,const char ** configs,size_t config_cnt)5256 test_AlterConfigs_simple (rd_kafka_t *rk,
5257 rd_kafka_ResourceType_t restype,
5258 const char *resname,
5259 const char **configs, size_t config_cnt) {
5260 rd_kafka_queue_t *q;
5261 rd_kafka_ConfigResource_t *confres;
5262 rd_kafka_event_t *rkev;
5263 size_t i;
5264 rd_kafka_resp_err_t err;
5265 const rd_kafka_ConfigResource_t **results;
5266 size_t result_cnt;
5267 const rd_kafka_ConfigEntry_t **configents;
5268 size_t configent_cnt;
5269
5270
5271 q = rd_kafka_queue_new(rk);
5272
5273 TEST_SAY("Getting configuration for %d %s\n", restype, resname);
5274
5275 confres = rd_kafka_ConfigResource_new(restype, resname);
5276 rd_kafka_DescribeConfigs(rk, &confres, 1, NULL, q);
5277
5278 err = test_wait_topic_admin_result(
5279 q, RD_KAFKA_EVENT_DESCRIBECONFIGS_RESULT, &rkev, 15*1000);
5280 if (err) {
5281 rd_kafka_queue_destroy(q);
5282 rd_kafka_ConfigResource_destroy(confres);
5283 return err;
5284 }
5285
5286 results = rd_kafka_DescribeConfigs_result_resources(
5287 rd_kafka_event_DescribeConfigs_result(rkev), &result_cnt);
5288 TEST_ASSERT(result_cnt == 1,
5289 "expected 1 DescribeConfigs result, not %"PRIusz,
5290 result_cnt);
5291
5292 configents = rd_kafka_ConfigResource_configs(results[0],
5293 &configent_cnt);
5294 TEST_ASSERT(configent_cnt > 0,
5295 "expected > 0 ConfigEntry:s, not %"PRIusz, configent_cnt);
5296
5297 TEST_SAY("Altering configuration for %d %s\n", restype, resname);
5298
5299 /* Apply all existing configuration entries to resource object that
5300 * will later be passed to AlterConfigs. */
5301 for (i = 0 ; i < configent_cnt ; i++) {
5302 err = rd_kafka_ConfigResource_set_config(
5303 confres,
5304 rd_kafka_ConfigEntry_name(configents[i]),
5305 rd_kafka_ConfigEntry_value(configents[i]));
5306 TEST_ASSERT(!err, "Failed to set read-back config %s=%s "
5307 "on local resource object",
5308 rd_kafka_ConfigEntry_name(configents[i]),
5309 rd_kafka_ConfigEntry_value(configents[i]));
5310 }
5311
5312 rd_kafka_event_destroy(rkev);
5313
5314 /* Then apply the configuration to change. */
5315 for (i = 0 ; i < config_cnt ; i += 2) {
5316 err = rd_kafka_ConfigResource_set_config(confres,
5317 configs[i],
5318 configs[i+1]);
5319 TEST_ASSERT(!err, "Failed to set config %s=%s on "
5320 "local resource object",
5321 configs[i], configs[i+1]);
5322 }
5323
5324 rd_kafka_AlterConfigs(rk, &confres, 1, NULL, q);
5325
5326 rd_kafka_ConfigResource_destroy(confres);
5327
5328 err = test_wait_topic_admin_result(
5329 q, RD_KAFKA_EVENT_ALTERCONFIGS_RESULT, NULL, 15*1000);
5330
5331 rd_kafka_queue_destroy(q);
5332
5333 return err;
5334 }
5335
5336
5337
test_free_string_array(char ** strs,size_t cnt)5338 static void test_free_string_array (char **strs, size_t cnt) {
5339 size_t i;
5340 for (i = 0 ; i < cnt ; i++)
5341 free(strs[i]);
5342 free(strs);
5343 }
5344
5345
5346 /**
5347 * @return an array of all topics in the cluster matching our the
5348 * rdkafka test prefix.
5349 */
5350 static rd_kafka_resp_err_t
test_get_all_test_topics(rd_kafka_t * rk,char *** topicsp,size_t * topic_cntp)5351 test_get_all_test_topics (rd_kafka_t *rk, char ***topicsp, size_t *topic_cntp) {
5352 size_t test_topic_prefix_len = strlen(test_topic_prefix);
5353 const rd_kafka_metadata_t *md;
5354 char **topics = NULL;
5355 size_t topic_cnt = 0;
5356 int i;
5357 rd_kafka_resp_err_t err;
5358
5359 *topic_cntp = 0;
5360 if (topicsp)
5361 *topicsp = NULL;
5362
5363 /* Retrieve list of topics */
5364 err = rd_kafka_metadata(rk, 1/*all topics*/, NULL, &md,
5365 tmout_multip(10000));
5366 if (err) {
5367 TEST_WARN("%s: Failed to acquire metadata: %s: "
5368 "not deleting any topics\n",
5369 __FUNCTION__, rd_kafka_err2str(err));
5370 return err;
5371 }
5372
5373 if (md->topic_cnt == 0) {
5374 TEST_WARN("%s: No topics in cluster\n", __FUNCTION__);
5375 rd_kafka_metadata_destroy(md);
5376 return RD_KAFKA_RESP_ERR_NO_ERROR;
5377 }
5378
5379 if (topicsp)
5380 topics = malloc(sizeof(*topics) * md->topic_cnt);
5381
5382 for (i = 0 ; i < md->topic_cnt ; i++) {
5383 if (strlen(md->topics[i].topic) >= test_topic_prefix_len &&
5384 !strncmp(md->topics[i].topic,
5385 test_topic_prefix, test_topic_prefix_len)) {
5386 if (topicsp)
5387 topics[topic_cnt++] =
5388 rd_strdup(md->topics[i].topic);
5389 else
5390 topic_cnt++;
5391 }
5392 }
5393
5394 if (topic_cnt == 0) {
5395 TEST_SAY("%s: No topics (out of %d) matching our "
5396 "test prefix (%s)\n",
5397 __FUNCTION__, md->topic_cnt, test_topic_prefix);
5398 rd_kafka_metadata_destroy(md);
5399 if (topics)
5400 test_free_string_array(topics, topic_cnt);
5401 return RD_KAFKA_RESP_ERR_NO_ERROR;
5402 }
5403
5404 rd_kafka_metadata_destroy(md);
5405
5406 if (topicsp)
5407 *topicsp = topics;
5408 *topic_cntp = topic_cnt;
5409
5410 return RD_KAFKA_RESP_ERR_NO_ERROR;
5411 }
5412
5413 /**
5414 * @brief Delete all test topics using the Kafka Admin API.
5415 */
test_delete_all_test_topics(int timeout_ms)5416 rd_kafka_resp_err_t test_delete_all_test_topics (int timeout_ms) {
5417 rd_kafka_t *rk;
5418 char **topics;
5419 size_t topic_cnt = 0;
5420 rd_kafka_resp_err_t err;
5421 int i;
5422 rd_kafka_AdminOptions_t *options;
5423 rd_kafka_queue_t *q;
5424 char errstr[256];
5425 int64_t abs_timeout = test_clock() + (timeout_ms * 1000);
5426
5427 rk = test_create_producer();
5428
5429 err = test_get_all_test_topics(rk, &topics, &topic_cnt);
5430 if (err) {
5431 /* Error already reported by test_get_all_test_topics() */
5432 rd_kafka_destroy(rk);
5433 return err;
5434 }
5435
5436 if (topic_cnt == 0) {
5437 rd_kafka_destroy(rk);
5438 return RD_KAFKA_RESP_ERR_NO_ERROR;
5439 }
5440
5441 q = rd_kafka_queue_get_main(rk);
5442
5443 options = rd_kafka_AdminOptions_new(rk, RD_KAFKA_ADMIN_OP_DELETETOPICS);
5444 if (rd_kafka_AdminOptions_set_operation_timeout(options, 2*60*1000,
5445 errstr,
5446 sizeof(errstr)))
5447 TEST_SAY(_C_YEL "Failed to set DeleteTopics timeout: %s: "
5448 "ignoring\n",
5449 errstr);
5450
5451 TEST_SAY(_C_MAG "====> Deleting all test topics with <===="
5452 "a timeout of 2 minutes\n");
5453
5454 test_DeleteTopics_simple(rk, q, topics, topic_cnt, options);
5455
5456 rd_kafka_AdminOptions_destroy(options);
5457
5458 while (1) {
5459 rd_kafka_event_t *rkev;
5460 const rd_kafka_DeleteTopics_result_t *res;
5461
5462 rkev = rd_kafka_queue_poll(q, -1);
5463
5464 res = rd_kafka_event_DeleteTopics_result(rkev);
5465 if (!res) {
5466 TEST_SAY("%s: Ignoring event: %s: %s\n",
5467 __FUNCTION__, rd_kafka_event_name(rkev),
5468 rd_kafka_event_error_string(rkev));
5469 rd_kafka_event_destroy(rkev);
5470 continue;
5471 }
5472
5473 if (rd_kafka_event_error(rkev)) {
5474 TEST_WARN("%s: DeleteTopics for %"PRIusz" topics "
5475 "failed: %s\n",
5476 __FUNCTION__, topic_cnt,
5477 rd_kafka_event_error_string(rkev));
5478 err = rd_kafka_event_error(rkev);
5479 } else {
5480 const rd_kafka_topic_result_t **terr;
5481 size_t tcnt;
5482 int okcnt = 0;
5483
5484 terr = rd_kafka_DeleteTopics_result_topics(res, &tcnt);
5485
5486 for(i = 0 ; i < (int)tcnt ; i++) {
5487 if (!rd_kafka_topic_result_error(terr[i])) {
5488 okcnt++;
5489 continue;
5490 }
5491
5492 TEST_WARN("%s: Failed to delete topic %s: %s\n",
5493 __FUNCTION__,
5494 rd_kafka_topic_result_name(terr[i]),
5495 rd_kafka_topic_result_error_string(
5496 terr[i]));
5497 }
5498
5499 TEST_SAY("%s: DeleteTopics "
5500 "succeeded for %d/%"PRIusz" topics\n",
5501 __FUNCTION__, okcnt, topic_cnt);
5502 err = RD_KAFKA_RESP_ERR_NO_ERROR;
5503 }
5504
5505 rd_kafka_event_destroy(rkev);
5506 break;
5507 }
5508
5509 rd_kafka_queue_destroy(q);
5510
5511 test_free_string_array(topics, topic_cnt);
5512
5513 /* Wait for topics to be fully deleted */
5514 while (1) {
5515 err = test_get_all_test_topics(rk, NULL, &topic_cnt);
5516
5517 if (!err && topic_cnt == 0)
5518 break;
5519
5520 if (abs_timeout < test_clock()) {
5521 TEST_WARN("%s: Timed out waiting for "
5522 "remaining %"PRIusz" deleted topics "
5523 "to disappear from cluster metadata\n",
5524 __FUNCTION__, topic_cnt);
5525 break;
5526 }
5527
5528 TEST_SAY("Waiting for remaining %"PRIusz" delete topics "
5529 "to disappear from cluster metadata\n", topic_cnt);
5530
5531 rd_sleep(1);
5532 }
5533
5534 rd_kafka_destroy(rk);
5535
5536 return err;
5537 }
5538
5539
5540
test_fail0(const char * file,int line,const char * function,int do_lock,int fail_now,const char * fmt,...)5541 void test_fail0 (const char *file, int line, const char *function,
5542 int do_lock, int fail_now, const char *fmt, ...) {
5543 char buf[512];
5544 int is_thrd = 0;
5545 size_t of;
5546 va_list ap;
5547 char *t;
5548 char timestr[32];
5549 time_t tnow = time(NULL);
5550
5551 #ifdef _MSC_VER
5552 ctime_s(timestr, sizeof(timestr), &tnow);
5553 #else
5554 ctime_r(&tnow, timestr);
5555 #endif
5556 t = strchr(timestr, '\n');
5557 if (t)
5558 *t = '\0';
5559
5560 of = rd_snprintf(buf, sizeof(buf), "%s():%i: ", function, line);
5561 rd_assert(of < sizeof(buf));
5562
5563 va_start(ap, fmt);
5564 rd_vsnprintf(buf+of, sizeof(buf)-of, fmt, ap);
5565 va_end(ap);
5566
5567 /* Remove trailing newline */
5568 if ((t = strchr(buf, '\n')) && !*(t+1))
5569 *t = '\0';
5570
5571 TEST_SAYL(0, "TEST FAILURE\n");
5572 fprintf(stderr, "\033[31m### Test \"%s\" failed at %s:%i:%s() at %s: "
5573 "###\n"
5574 "%s\n",
5575 test_curr->name, file, line, function, timestr, buf+of);
5576 if (do_lock)
5577 TEST_LOCK();
5578 test_curr->state = TEST_FAILED;
5579 test_curr->failcnt += 1;
5580
5581 if (!*test_curr->failstr) {
5582 strncpy(test_curr->failstr, buf, sizeof(test_curr->failstr));
5583 test_curr->failstr[sizeof(test_curr->failstr)-1] = '\0';
5584 }
5585 if (fail_now && test_curr->mainfunc) {
5586 tests_running_cnt--;
5587 is_thrd = 1;
5588 }
5589 if (do_lock)
5590 TEST_UNLOCK();
5591 if (!fail_now)
5592 return;
5593 if (test_assert_on_fail || !is_thrd)
5594 assert(0);
5595 else
5596 thrd_exit(0);
5597 }
5598
5599
5600 /**
5601 * @brief Destroy a mock cluster and its underlying rd_kafka_t handle
5602 */
test_mock_cluster_destroy(rd_kafka_mock_cluster_t * mcluster)5603 void test_mock_cluster_destroy (rd_kafka_mock_cluster_t *mcluster) {
5604 rd_kafka_t *rk = rd_kafka_mock_cluster_handle(mcluster);
5605 rd_kafka_mock_cluster_destroy(mcluster);
5606 rd_kafka_destroy(rk);
5607 }
5608
5609
5610
5611 /**
5612 * @brief Create a standalone mock cluster that can be used by multiple
5613 * rd_kafka_t instances.
5614 */
test_mock_cluster_new(int broker_cnt,const char ** bootstraps)5615 rd_kafka_mock_cluster_t *test_mock_cluster_new (int broker_cnt,
5616 const char **bootstraps) {
5617 rd_kafka_t *rk;
5618 rd_kafka_conf_t *conf = rd_kafka_conf_new();
5619 rd_kafka_mock_cluster_t *mcluster;
5620 char errstr[256];
5621
5622 test_conf_common_init(conf, 0);
5623
5624 test_conf_set(conf, "client.id", "MOCK");
5625
5626 rk = rd_kafka_new(RD_KAFKA_PRODUCER, conf, errstr, sizeof(errstr));
5627 TEST_ASSERT(rk, "Failed to create mock cluster rd_kafka_t: %s", errstr);
5628
5629 mcluster = rd_kafka_mock_cluster_new(rk, broker_cnt);
5630 TEST_ASSERT(mcluster, "Failed to acquire mock cluster");
5631
5632 if (bootstraps)
5633 *bootstraps = rd_kafka_mock_cluster_bootstraps(mcluster);
5634
5635 return mcluster;
5636 }
5637