1 /*
2 * librdkafka - Apache Kafka C library
3 *
4 * Copyright (c) 2012-2013, Magnus Edenhill
5 * All rights reserved.
6 *
7 * Redistribution and use in source and binary forms, with or without
8 * modification, are permitted provided that the following conditions are met:
9 *
10 * 1. Redistributions of source code must retain the above copyright notice,
11 * this list of conditions and the following disclaimer.
12 * 2. Redistributions in binary form must reproduce the above copyright notice,
13 * this list of conditions and the following disclaimer in the documentation
14 * and/or other materials provided with the distribution.
15 *
16 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
17 * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
18 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
19 * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
20 * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
21 * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
22 * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
23 * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
24 * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
25 * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
26 * POSSIBILITY OF SUCH DAMAGE.
27 */
28
29
30 #define _CRT_RAND_S // rand_s() on MSVC
31 #include <stdarg.h>
32 #include "test.h"
33 #include <signal.h>
34 #include <stdlib.h>
35 #include <stdio.h>
36
37 #ifdef _WIN32
38 #include <direct.h> /* _getcwd */
39 #else
40 #include <sys/wait.h> /* waitpid */
41 #endif
42
43 /* Typical include path would be <librdkafka/rdkafka.h>, but this program
44 * is built from within the librdkafka source tree and thus differs. */
45 #include "rdkafka.h"
46
47 int test_level = 2;
48 int test_seed = 0;
49
50 char test_mode[64] = "bare";
51 char test_scenario[64] = "default";
52 static volatile sig_atomic_t test_exit = 0;
53 static char test_topic_prefix[128] = "rdkafkatest";
54 static int test_topic_random = 0;
55 int tests_running_cnt = 0;
56 int test_concurrent_max = 5;
57 int test_assert_on_fail = 0;
58 double test_timeout_multiplier = 1.0;
59 static char *test_sql_cmd = NULL;
60 int test_session_timeout_ms = 6000;
61 int test_broker_version;
62 static const char *test_broker_version_str = "2.4.0.0";
63 int test_flags = 0;
64 int test_neg_flags = TEST_F_KNOWN_ISSUE;
65 /* run delete-test-topics.sh between each test (when concurrent_max = 1) */
66 static int test_delete_topics_between = 0;
67 static const char *test_git_version = "HEAD";
68 static const char *test_sockem_conf = "";
69 int test_on_ci = 0; /* Tests are being run on CI, be more forgiving
70 * with regards to timeouts, etc. */
71 int test_quick = 0; /** Run tests quickly */
72 int test_idempotent_producer = 0;
73 int test_rusage = 0; /**< Check resource usage */
74 /**< CPU speed calibration for rusage threshold checks.
75 * >1.0: CPU is slower than base line system,
76 * <1.0: CPU is faster than base line system. */
77 double test_rusage_cpu_calibration = 1.0;
78 static const char *tests_to_run = NULL; /* all */
79 static const char *subtests_to_run = NULL; /* all */
80 static const char *tests_to_skip = NULL; /* none */
81 int test_write_report = 0; /**< Write test report file */
82
83 static int show_summary = 1;
84 static int test_summary (int do_lock);
85
86 /**
87 * Protects shared state, such as tests[]
88 */
89 mtx_t test_mtx;
90 cnd_t test_cnd;
91
92 static const char *test_states[] = {
93 "DNS",
94 "SKIPPED",
95 "RUNNING",
96 "PASSED",
97 "FAILED",
98 };
99
100
101
102 #define _TEST_DECL(NAME) \
103 extern int main_ ## NAME (int, char **)
104 #define _TEST(NAME,FLAGS,...) \
105 { .name = # NAME, .mainfunc = main_ ## NAME, .flags = FLAGS, __VA_ARGS__ }
106
107
108 /**
109 * Declare all tests here
110 */
111 _TEST_DECL(0000_unittests);
112 _TEST_DECL(0001_multiobj);
113 _TEST_DECL(0002_unkpart);
114 _TEST_DECL(0003_msgmaxsize);
115 _TEST_DECL(0004_conf);
116 _TEST_DECL(0005_order);
117 _TEST_DECL(0006_symbols);
118 _TEST_DECL(0007_autotopic);
119 _TEST_DECL(0008_reqacks);
120 _TEST_DECL(0009_mock_cluster);
121 _TEST_DECL(0011_produce_batch);
122 _TEST_DECL(0012_produce_consume);
123 _TEST_DECL(0013_null_msgs);
124 _TEST_DECL(0014_reconsume_191);
125 _TEST_DECL(0015_offsets_seek);
126 _TEST_DECL(0016_client_swname);
127 _TEST_DECL(0017_compression);
128 _TEST_DECL(0018_cgrp_term);
129 _TEST_DECL(0019_list_groups);
130 _TEST_DECL(0020_destroy_hang);
131 _TEST_DECL(0021_rkt_destroy);
132 _TEST_DECL(0022_consume_batch);
133 _TEST_DECL(0025_timers);
134 _TEST_DECL(0026_consume_pause);
135 _TEST_DECL(0028_long_topicnames);
136 _TEST_DECL(0029_assign_offset);
137 _TEST_DECL(0030_offset_commit);
138 _TEST_DECL(0031_get_offsets);
139 _TEST_DECL(0033_regex_subscribe);
140 _TEST_DECL(0033_regex_subscribe_local);
141 _TEST_DECL(0034_offset_reset);
142 _TEST_DECL(0035_api_version);
143 _TEST_DECL(0036_partial_fetch);
144 _TEST_DECL(0037_destroy_hang_local);
145 _TEST_DECL(0038_performance);
146 _TEST_DECL(0039_event_dr);
147 _TEST_DECL(0039_event_log);
148 _TEST_DECL(0039_event);
149 _TEST_DECL(0040_io_event);
150 _TEST_DECL(0041_fetch_max_bytes);
151 _TEST_DECL(0042_many_topics);
152 _TEST_DECL(0043_no_connection);
153 _TEST_DECL(0044_partition_cnt);
154 _TEST_DECL(0045_subscribe_update);
155 _TEST_DECL(0045_subscribe_update_topic_remove);
156 _TEST_DECL(0045_subscribe_update_non_exist_and_partchange);
157 _TEST_DECL(0046_rkt_cache);
158 _TEST_DECL(0047_partial_buf_tmout);
159 _TEST_DECL(0048_partitioner);
160 _TEST_DECL(0049_consume_conn_close);
161 _TEST_DECL(0050_subscribe_adds);
162 _TEST_DECL(0051_assign_adds);
163 _TEST_DECL(0052_msg_timestamps);
164 _TEST_DECL(0053_stats_timing);
165 _TEST_DECL(0053_stats);
166 _TEST_DECL(0054_offset_time);
167 _TEST_DECL(0055_producer_latency);
168 _TEST_DECL(0056_balanced_group_mt);
169 _TEST_DECL(0057_invalid_topic);
170 _TEST_DECL(0058_log);
171 _TEST_DECL(0059_bsearch);
172 _TEST_DECL(0060_op_prio);
173 _TEST_DECL(0061_consumer_lag);
174 _TEST_DECL(0062_stats_event);
175 _TEST_DECL(0063_clusterid);
176 _TEST_DECL(0064_interceptors);
177 _TEST_DECL(0065_yield);
178 _TEST_DECL(0066_plugins);
179 _TEST_DECL(0067_empty_topic);
180 _TEST_DECL(0068_produce_timeout);
181 _TEST_DECL(0069_consumer_add_parts);
182 _TEST_DECL(0070_null_empty);
183 _TEST_DECL(0072_headers_ut);
184 _TEST_DECL(0073_headers);
185 _TEST_DECL(0074_producev);
186 _TEST_DECL(0075_retry);
187 _TEST_DECL(0076_produce_retry);
188 _TEST_DECL(0077_compaction);
189 _TEST_DECL(0078_c_from_cpp);
190 _TEST_DECL(0079_fork);
191 _TEST_DECL(0080_admin_ut);
192 _TEST_DECL(0081_admin);
193 _TEST_DECL(0082_fetch_max_bytes);
194 _TEST_DECL(0083_cb_event);
195 _TEST_DECL(0084_destroy_flags_local);
196 _TEST_DECL(0084_destroy_flags);
197 _TEST_DECL(0085_headers);
198 _TEST_DECL(0086_purge_local);
199 _TEST_DECL(0086_purge_remote);
200 _TEST_DECL(0088_produce_metadata_timeout);
201 _TEST_DECL(0089_max_poll_interval);
202 _TEST_DECL(0090_idempotence);
203 _TEST_DECL(0091_max_poll_interval_timeout);
204 _TEST_DECL(0092_mixed_msgver);
205 _TEST_DECL(0093_holb_consumer);
206 _TEST_DECL(0094_idempotence_msg_timeout);
207 _TEST_DECL(0095_all_brokers_down);
208 _TEST_DECL(0097_ssl_verify);
209 _TEST_DECL(0098_consumer_txn);
210 _TEST_DECL(0099_commit_metadata);
211 _TEST_DECL(0100_thread_interceptors);
212 _TEST_DECL(0101_fetch_from_follower);
213 _TEST_DECL(0102_static_group_rebalance);
214 _TEST_DECL(0103_transactions_local);
215 _TEST_DECL(0103_transactions);
216 _TEST_DECL(0104_fetch_from_follower_mock);
217 _TEST_DECL(0105_transactions_mock);
218 _TEST_DECL(0106_cgrp_sess_timeout);
219 _TEST_DECL(0107_topic_recreate);
220 _TEST_DECL(0109_auto_create_topics);
221 _TEST_DECL(0110_batch_size);
222 _TEST_DECL(0111_delay_create_topics);
223 _TEST_DECL(0112_assign_unknown_part);
224 _TEST_DECL(0113_cooperative_rebalance_local);
225 _TEST_DECL(0113_cooperative_rebalance);
226 _TEST_DECL(0114_sticky_partitioning);
227 _TEST_DECL(0115_producer_auth);
228 _TEST_DECL(0116_kafkaconsumer_close);
229 _TEST_DECL(0117_mock_errors);
230 _TEST_DECL(0118_commit_rebalance);
231 _TEST_DECL(0119_consumer_auth);
232 _TEST_DECL(0120_asymmetric_subscription);
233 _TEST_DECL(0121_clusterid);
234 _TEST_DECL(0122_buffer_cleaning_after_rebalance);
235 _TEST_DECL(0123_connections_max_idle);
236 _TEST_DECL(0124_openssl_invalid_engine);
237
238 /* Manual tests */
239 _TEST_DECL(8000_idle);
240
241
242 /* Define test resource usage thresholds if the default limits
243 * are not tolerable.
244 *
245 * Fields:
246 * .ucpu - Max User CPU percentage (double)
247 * .scpu - Max System/Kernel CPU percentage (double)
248 * .rss - Max RSS (memory) in megabytes (double)
249 * .ctxsw - Max number of voluntary context switches (int)
250 *
251 * Also see test_rusage_check_thresholds() in rusage.c
252 *
253 * Make a comment in the _THRES() below why the extra thresholds are required.
254 *
255 * Usage:
256 * _TEST(00...., ...,
257 * _THRES(.ucpu = 15.0)), <-- Max 15% User CPU usage
258 */
259 #define _THRES(...) .rusage_thres = { __VA_ARGS__ }
260
261 /**
262 * Define all tests here
263 */
264 struct test tests[] = {
265 /* Special MAIN test to hold over-all timings, etc. */
266 { .name = "<MAIN>", .flags = TEST_F_LOCAL },
267 _TEST(0000_unittests, TEST_F_LOCAL,
268 /* The msgq insert order tests are heavy on
269 * user CPU (memory scan), RSS, and
270 * system CPU (lots of allocations -> madvise(2)). */
271 _THRES(.ucpu = 100.0, .scpu = 20.0, .rss = 900.0)),
272 _TEST(0001_multiobj, 0),
273 _TEST(0002_unkpart, 0),
274 _TEST(0003_msgmaxsize, 0),
275 _TEST(0004_conf, TEST_F_LOCAL),
276 _TEST(0005_order, 0),
277 _TEST(0006_symbols, TEST_F_LOCAL),
278 _TEST(0007_autotopic, 0),
279 _TEST(0008_reqacks, 0),
280 _TEST(0009_mock_cluster, TEST_F_LOCAL,
281 /* Mock cluster requires MsgVersion 2 */
282 TEST_BRKVER(0,11,0,0)),
283 _TEST(0011_produce_batch, 0,
284 /* Produces a lot of messages */
285 _THRES(.ucpu = 40.0, .scpu = 8.0)),
286 _TEST(0012_produce_consume, 0),
287 _TEST(0013_null_msgs, 0),
288 _TEST(0014_reconsume_191, 0),
289 _TEST(0015_offsets_seek, 0),
290 _TEST(0016_client_swname, 0),
291 _TEST(0017_compression, 0),
292 _TEST(0018_cgrp_term, 0, TEST_BRKVER(0,9,0,0)),
293 _TEST(0019_list_groups, 0, TEST_BRKVER(0,9,0,0)),
294 _TEST(0020_destroy_hang, 0, TEST_BRKVER(0,9,0,0)),
295 _TEST(0021_rkt_destroy, 0),
296 _TEST(0022_consume_batch, 0),
297 _TEST(0025_timers, TEST_F_LOCAL),
298 _TEST(0026_consume_pause, TEST_F_KNOWN_ISSUE, TEST_BRKVER(0,9,0,0),
299 .extra = "Fragile test due to #2190"),
300 _TEST(0028_long_topicnames, TEST_F_KNOWN_ISSUE, TEST_BRKVER(0,9,0,0),
301 .extra = "https://github.com/edenhill/librdkafka/issues/529"),
302 _TEST(0029_assign_offset, 0),
303 _TEST(0030_offset_commit, 0, TEST_BRKVER(0,9,0,0),
304 /* Loops over committed() until timeout */
305 _THRES(.ucpu = 10.0, .scpu = 5.0)),
306 _TEST(0031_get_offsets, 0),
307 _TEST(0033_regex_subscribe, 0, TEST_BRKVER(0,9,0,0)),
308 _TEST(0033_regex_subscribe_local, TEST_F_LOCAL),
309 _TEST(0034_offset_reset, 0),
310 _TEST(0035_api_version, 0),
311 _TEST(0036_partial_fetch, 0),
312 _TEST(0037_destroy_hang_local, TEST_F_LOCAL),
313 _TEST(0038_performance, 0,
314 /* Produces and consumes a lot of messages */
315 _THRES(.ucpu = 150.0, .scpu = 10)),
316 _TEST(0039_event_dr, 0),
317 _TEST(0039_event_log, TEST_F_LOCAL),
318 _TEST(0039_event, TEST_F_LOCAL),
319 _TEST(0040_io_event, 0, TEST_BRKVER(0,9,0,0)),
320 _TEST(0041_fetch_max_bytes, 0,
321 /* Re-fetches large messages multiple times */
322 _THRES(.ucpu = 20.0, .scpu = 10.0)),
323 _TEST(0042_many_topics, 0),
324 _TEST(0043_no_connection, TEST_F_LOCAL),
325 _TEST(0044_partition_cnt, 0, TEST_BRKVER(1,0,0,0),
326 /* Produces a lot of messages */
327 _THRES(.ucpu = 30.0)),
328 _TEST(0045_subscribe_update, 0, TEST_BRKVER(0,9,0,0)),
329 _TEST(0045_subscribe_update_topic_remove, 0,
330 TEST_BRKVER(0,9,0,0),
331 .scenario = "noautocreate"),
332 _TEST(0045_subscribe_update_non_exist_and_partchange, 0,
333 TEST_BRKVER(0,9,0,0),
334 .scenario = "noautocreate"),
335 _TEST(0046_rkt_cache, TEST_F_LOCAL),
336 _TEST(0047_partial_buf_tmout, TEST_F_KNOWN_ISSUE),
337 _TEST(0048_partitioner, 0,
338 /* Produces many small messages */
339 _THRES(.ucpu = 10.0, .scpu = 5.0)),
340 #if WITH_SOCKEM
341 _TEST(0049_consume_conn_close, TEST_F_SOCKEM, TEST_BRKVER(0,9,0,0)),
342 #endif
343 _TEST(0050_subscribe_adds, 0, TEST_BRKVER(0,9,0,0)),
344 _TEST(0051_assign_adds, 0, TEST_BRKVER(0,9,0,0)),
345 _TEST(0052_msg_timestamps, 0, TEST_BRKVER(0,10,0,0)),
346 _TEST(0053_stats_timing, TEST_F_LOCAL),
347 _TEST(0053_stats, 0),
348 _TEST(0054_offset_time, 0, TEST_BRKVER(0,10,1,0)),
349 _TEST(0055_producer_latency, TEST_F_KNOWN_ISSUE_WIN32),
350 _TEST(0056_balanced_group_mt, 0, TEST_BRKVER(0,9,0,0)),
351 _TEST(0057_invalid_topic, 0, TEST_BRKVER(0,9,0,0)),
352 _TEST(0058_log, TEST_F_LOCAL),
353 _TEST(0059_bsearch, 0, TEST_BRKVER(0,10,0,0)),
354 _TEST(0060_op_prio, 0, TEST_BRKVER(0,9,0,0)),
355 _TEST(0061_consumer_lag, 0),
356 _TEST(0062_stats_event, TEST_F_LOCAL),
357 _TEST(0063_clusterid, 0, TEST_BRKVER(0,10,1,0)),
358 _TEST(0064_interceptors, 0, TEST_BRKVER(0,9,0,0)),
359 _TEST(0065_yield, 0),
360 _TEST(0066_plugins,
361 TEST_F_LOCAL|TEST_F_KNOWN_ISSUE_WIN32|TEST_F_KNOWN_ISSUE_OSX,
362 .extra = "dynamic loading of tests might not be fixed for this platform"),
363 _TEST(0067_empty_topic, 0),
364 #if WITH_SOCKEM
365 _TEST(0068_produce_timeout, TEST_F_SOCKEM),
366 #endif
367 _TEST(0069_consumer_add_parts, TEST_F_KNOWN_ISSUE_WIN32,
368 TEST_BRKVER(1,0,0,0)),
369 _TEST(0070_null_empty, 0),
370 _TEST(0072_headers_ut, TEST_F_LOCAL),
371 _TEST(0073_headers, 0, TEST_BRKVER(0,11,0,0)),
372 _TEST(0074_producev, TEST_F_LOCAL),
373 #if WITH_SOCKEM
374 _TEST(0075_retry, TEST_F_SOCKEM),
375 #endif
376 _TEST(0076_produce_retry, TEST_F_SOCKEM),
377 _TEST(0077_compaction, 0,
378 /* The test itself requires message headers */
379 TEST_BRKVER(0,11,0,0)),
380 _TEST(0078_c_from_cpp, TEST_F_LOCAL),
381 _TEST(0079_fork, TEST_F_LOCAL|TEST_F_KNOWN_ISSUE,
382 .extra = "using a fork():ed rd_kafka_t is not supported and will "
383 "most likely hang"),
384 _TEST(0080_admin_ut, TEST_F_LOCAL),
385 _TEST(0081_admin, 0, TEST_BRKVER(0,10,2,0)),
386 _TEST(0082_fetch_max_bytes, 0, TEST_BRKVER(0,10,1,0)),
387 _TEST(0083_cb_event, 0, TEST_BRKVER(0,9,0,0)),
388 _TEST(0084_destroy_flags_local, TEST_F_LOCAL),
389 _TEST(0084_destroy_flags, 0),
390 _TEST(0085_headers, 0, TEST_BRKVER(0,11,0,0)),
391 _TEST(0086_purge_local, TEST_F_LOCAL),
392 _TEST(0086_purge_remote, 0),
393 #if WITH_SOCKEM
394 _TEST(0088_produce_metadata_timeout, TEST_F_SOCKEM),
395 #endif
396 _TEST(0089_max_poll_interval, 0, TEST_BRKVER(0,10,1,0)),
397 _TEST(0090_idempotence, 0, TEST_BRKVER(0,11,0,0)),
398 _TEST(0091_max_poll_interval_timeout, 0, TEST_BRKVER(0,10,1,0)),
399 _TEST(0092_mixed_msgver, 0, TEST_BRKVER(0,11,0,0)),
400 _TEST(0093_holb_consumer, 0, TEST_BRKVER(0,10,1,0)),
401 #if WITH_SOCKEM
402 _TEST(0094_idempotence_msg_timeout, TEST_F_SOCKEM,
403 TEST_BRKVER(0,11,0,0)),
404 #endif
405 _TEST(0095_all_brokers_down, TEST_F_LOCAL),
406 _TEST(0097_ssl_verify, 0),
407 _TEST(0098_consumer_txn, 0, TEST_BRKVER(0,11,0,0)),
408 _TEST(0099_commit_metadata, 0),
409 _TEST(0100_thread_interceptors, TEST_F_LOCAL),
410 _TEST(0101_fetch_from_follower, 0, TEST_BRKVER(2,4,0,0)),
411 _TEST(0102_static_group_rebalance, 0,
412 TEST_BRKVER(2,3,0,0)),
413 _TEST(0103_transactions_local, TEST_F_LOCAL),
414 _TEST(0103_transactions, 0, TEST_BRKVER(0, 11, 0, 0),
415 .scenario = "default,ak23"),
416 _TEST(0104_fetch_from_follower_mock, TEST_F_LOCAL,
417 TEST_BRKVER(2,4,0,0)),
418 _TEST(0105_transactions_mock, TEST_F_LOCAL, TEST_BRKVER(0,11,0,0)),
419 _TEST(0106_cgrp_sess_timeout, TEST_F_LOCAL, TEST_BRKVER(0,11,0,0)),
420 _TEST(0107_topic_recreate, 0, TEST_BRKVER_TOPIC_ADMINAPI,
421 .scenario = "noautocreate"),
422 _TEST(0109_auto_create_topics, 0),
423 _TEST(0110_batch_size, 0),
424 _TEST(0111_delay_create_topics, 0, TEST_BRKVER_TOPIC_ADMINAPI,
425 .scenario = "noautocreate"),
426 _TEST(0112_assign_unknown_part, 0),
427 _TEST(0113_cooperative_rebalance_local, TEST_F_LOCAL,
428 TEST_BRKVER(2,4,0,0)),
429 _TEST(0113_cooperative_rebalance, 0, TEST_BRKVER(2,4,0,0)),
430 _TEST(0114_sticky_partitioning, 0),
431 _TEST(0115_producer_auth, 0, TEST_BRKVER(2,1,0,0)),
432 _TEST(0116_kafkaconsumer_close, TEST_F_LOCAL),
433 _TEST(0117_mock_errors, TEST_F_LOCAL),
434 _TEST(0118_commit_rebalance, 0),
435 _TEST(0119_consumer_auth, 0, TEST_BRKVER(2,1,0,0)),
436 _TEST(0120_asymmetric_subscription, TEST_F_LOCAL),
437 _TEST(0121_clusterid, TEST_F_LOCAL),
438 _TEST(0122_buffer_cleaning_after_rebalance, TEST_BRKVER(2,4,0,0)),
439 _TEST(0123_connections_max_idle, 0),
440 _TEST(0124_openssl_invalid_engine, TEST_F_LOCAL),
441
442 /* Manual tests */
443 _TEST(8000_idle, TEST_F_MANUAL),
444
445 { NULL }
446 };
447
448
449 RD_TLS struct test *test_curr = &tests[0];
450
451
452
453 #if WITH_SOCKEM
454 /**
455 * Socket network emulation with sockem
456 */
457
test_socket_add(struct test * test,sockem_t * skm)458 static void test_socket_add (struct test *test, sockem_t *skm) {
459 TEST_LOCK();
460 rd_list_add(&test->sockets, skm);
461 TEST_UNLOCK();
462 }
463
test_socket_del(struct test * test,sockem_t * skm,int do_lock)464 static void test_socket_del (struct test *test, sockem_t *skm, int do_lock) {
465 if (do_lock)
466 TEST_LOCK();
467 /* Best effort, skm might not have been added if connect_cb failed */
468 rd_list_remove(&test->sockets, skm);
469 if (do_lock)
470 TEST_UNLOCK();
471 }
472
test_socket_sockem_set_all(const char * key,int val)473 int test_socket_sockem_set_all (const char *key, int val) {
474 int i;
475 sockem_t *skm;
476 int cnt = 0;
477
478 TEST_LOCK();
479
480 cnt = rd_list_cnt(&test_curr->sockets);
481 TEST_SAY("Setting sockem %s=%d on %s%d socket(s)\n", key, val,
482 cnt > 0 ? "" : _C_RED, cnt);
483
484 RD_LIST_FOREACH(skm, &test_curr->sockets, i) {
485 if (sockem_set(skm, key, val, NULL) == -1)
486 TEST_FAIL("sockem_set(%s, %d) failed", key, val);
487 }
488
489 TEST_UNLOCK();
490
491 return cnt;
492 }
493
test_socket_sockem_set(int s,const char * key,int value)494 void test_socket_sockem_set (int s, const char *key, int value) {
495 sockem_t *skm;
496
497 TEST_LOCK();
498 skm = sockem_find(s);
499 if (skm)
500 sockem_set(skm, key, value, NULL);
501 TEST_UNLOCK();
502 }
503
test_socket_close_all(struct test * test,int reinit)504 void test_socket_close_all (struct test *test, int reinit) {
505 TEST_LOCK();
506 rd_list_destroy(&test->sockets);
507 if (reinit)
508 rd_list_init(&test->sockets, 16, (void *)sockem_close);
509 TEST_UNLOCK();
510 }
511
512
test_connect_cb(int s,const struct sockaddr * addr,int addrlen,const char * id,void * opaque)513 static int test_connect_cb (int s, const struct sockaddr *addr,
514 int addrlen, const char *id, void *opaque) {
515 struct test *test = opaque;
516 sockem_t *skm;
517 int r;
518
519 skm = sockem_connect(s, addr, addrlen, test_sockem_conf, 0, NULL);
520 if (!skm)
521 return errno;
522
523 if (test->connect_cb) {
524 r = test->connect_cb(test, skm, id);
525 if (r)
526 return r;
527 }
528
529 test_socket_add(test, skm);
530
531 return 0;
532 }
533
test_closesocket_cb(int s,void * opaque)534 static int test_closesocket_cb (int s, void *opaque) {
535 struct test *test = opaque;
536 sockem_t *skm;
537
538 TEST_LOCK();
539 skm = sockem_find(s);
540 if (skm) {
541 /* Close sockem's sockets */
542 sockem_close(skm);
543 test_socket_del(test, skm, 0/*nolock*/);
544 }
545 TEST_UNLOCK();
546
547 /* Close librdkafka's socket */
548 #ifdef _WIN32
549 closesocket(s);
550 #else
551 close(s);
552 #endif
553
554 return 0;
555 }
556
557
test_socket_enable(rd_kafka_conf_t * conf)558 void test_socket_enable (rd_kafka_conf_t *conf) {
559 rd_kafka_conf_set_connect_cb(conf, test_connect_cb);
560 rd_kafka_conf_set_closesocket_cb(conf, test_closesocket_cb);
561 rd_kafka_conf_set_opaque(conf, test_curr);
562 }
563 #endif /* WITH_SOCKEM */
564
565 /**
566 * @brief For use as the is_fatal_cb(), treating no errors as test-fatal.
567 */
test_error_is_not_fatal_cb(rd_kafka_t * rk,rd_kafka_resp_err_t err,const char * reason)568 int test_error_is_not_fatal_cb (rd_kafka_t *rk, rd_kafka_resp_err_t err,
569 const char *reason) {
570 return 0;
571 }
572
test_error_cb(rd_kafka_t * rk,int err,const char * reason,void * opaque)573 static void test_error_cb (rd_kafka_t *rk, int err,
574 const char *reason, void *opaque) {
575 if (test_curr->is_fatal_cb && !test_curr->is_fatal_cb(rk, err, reason)) {
576 TEST_SAY(_C_YEL "%s rdkafka error (non-testfatal): %s: %s\n",
577 rd_kafka_name(rk), rd_kafka_err2str(err), reason);
578 } else {
579 if (err == RD_KAFKA_RESP_ERR__FATAL) {
580 char errstr[512];
581 TEST_SAY(_C_RED "%s Fatal error: %s\n",
582 rd_kafka_name(rk), reason);
583
584 err = rd_kafka_fatal_error(rk, errstr, sizeof(errstr));
585
586 if (test_curr->is_fatal_cb &&
587 !test_curr->is_fatal_cb(rk, err, reason))
588 TEST_SAY(_C_YEL
589 "%s rdkafka ignored FATAL error: "
590 "%s: %s\n",
591 rd_kafka_name(rk),
592 rd_kafka_err2str(err), errstr);
593 else
594 TEST_FAIL("%s rdkafka FATAL error: %s: %s",
595 rd_kafka_name(rk),
596 rd_kafka_err2str(err), errstr);
597
598 } else {
599 TEST_FAIL("%s rdkafka error: %s: %s",
600 rd_kafka_name(rk),
601 rd_kafka_err2str(err), reason);
602 }
603 }
604 }
605
test_stats_cb(rd_kafka_t * rk,char * json,size_t json_len,void * opaque)606 static int test_stats_cb (rd_kafka_t *rk, char *json, size_t json_len,
607 void *opaque) {
608 struct test *test = test_curr;
609 if (test->stats_fp)
610 fprintf(test->stats_fp,
611 "{\"test\": \"%s\", \"instance\":\"%s\", "
612 "\"stats\": %s}\n",
613 test->name, rd_kafka_name(rk), json);
614 return 0;
615 }
616
617
618 /**
619 * @brief Limit the test run time (in seconds)
620 */
test_timeout_set(int timeout)621 void test_timeout_set (int timeout) {
622 TEST_LOCK();
623 TEST_SAY("Setting test timeout to %ds * %.1f\n",
624 timeout, test_timeout_multiplier);
625 timeout = (int)((double)timeout * test_timeout_multiplier);
626 test_curr->timeout = test_clock() + (timeout * 1000000);
627 TEST_UNLOCK();
628 }
629
tmout_multip(int msecs)630 int tmout_multip (int msecs) {
631 int r;
632 TEST_LOCK();
633 r = (int)(((double)(msecs)) * test_timeout_multiplier);
634 TEST_UNLOCK();
635 return r;
636 }
637
638
639
640 #ifdef _WIN32
test_init_win32(void)641 static void test_init_win32 (void) {
642 /* Enable VT emulation to support colored output. */
643 HANDLE hOut = GetStdHandle(STD_OUTPUT_HANDLE);
644 DWORD dwMode = 0;
645
646 if (hOut == INVALID_HANDLE_VALUE ||
647 !GetConsoleMode(hOut, &dwMode))
648 return;
649
650 #ifndef ENABLE_VIRTUAL_TERMINAL_PROCESSING
651 #define ENABLE_VIRTUAL_TERMINAL_PROCESSING 0x4
652 #endif
653 dwMode |= ENABLE_VIRTUAL_TERMINAL_PROCESSING;
654 SetConsoleMode(hOut, dwMode);
655 }
656 #endif
657
658
test_init(void)659 static void test_init (void) {
660 int seed;
661 const char *tmp;
662
663
664 if (test_seed)
665 return;
666
667 if ((tmp = test_getenv("TEST_LEVEL", NULL)))
668 test_level = atoi(tmp);
669 if ((tmp = test_getenv("TEST_MODE", NULL)))
670 strncpy(test_mode, tmp, sizeof(test_mode)-1);
671 if ((tmp = test_getenv("TEST_SCENARIO", NULL)))
672 strncpy(test_scenario, tmp, sizeof(test_scenario)-1);
673 if ((tmp = test_getenv("TEST_SOCKEM", NULL)))
674 test_sockem_conf = tmp;
675 if ((tmp = test_getenv("TEST_SEED", NULL)))
676 seed = atoi(tmp);
677 else
678 seed = test_clock() & 0xffffffff;
679 if ((tmp = test_getenv("TEST_CPU_CALIBRATION", NULL))) {
680 test_rusage_cpu_calibration = strtod(tmp, NULL);
681 if (test_rusage_cpu_calibration < 0.00001) {
682 fprintf(stderr,
683 "%% Invalid CPU calibration "
684 "value (from TEST_CPU_CALIBRATION env): %s\n",
685 tmp);
686 exit(1);
687 }
688 }
689
690 #ifdef _WIN32
691 test_init_win32();
692 {
693 LARGE_INTEGER cycl;
694 QueryPerformanceCounter(&cycl);
695 seed = (int)cycl.QuadPart;
696 }
697 #endif
698 srand(seed);
699 test_seed = seed;
700 }
701
702
test_mk_topic_name(const char * suffix,int randomized)703 const char *test_mk_topic_name (const char *suffix, int randomized) {
704 static RD_TLS char ret[512];
705
706 /* Strip main_ prefix (caller is using __FUNCTION__) */
707 if (!strncmp(suffix, "main_", 5))
708 suffix += 5;
709
710 if (test_topic_random || randomized)
711 rd_snprintf(ret, sizeof(ret), "%s_rnd%"PRIx64"_%s",
712 test_topic_prefix, test_id_generate(), suffix);
713 else
714 rd_snprintf(ret, sizeof(ret), "%s_%s", test_topic_prefix, suffix);
715
716 TEST_SAY("Using topic \"%s\"\n", ret);
717
718 return ret;
719 }
720
721
722 /**
723 * @brief Set special test config property
724 * @returns 1 if property was known, else 0.
725 */
test_set_special_conf(const char * name,const char * val,int * timeoutp)726 int test_set_special_conf (const char *name, const char *val, int *timeoutp) {
727 if (!strcmp(name, "test.timeout.multiplier")) {
728 TEST_LOCK();
729 test_timeout_multiplier = strtod(val, NULL);
730 TEST_UNLOCK();
731 *timeoutp = tmout_multip((*timeoutp)*1000) / 1000;
732 } else if (!strcmp(name, "test.topic.prefix")) {
733 rd_snprintf(test_topic_prefix, sizeof(test_topic_prefix),
734 "%s", val);
735 } else if (!strcmp(name, "test.topic.random")) {
736 if (!strcmp(val, "true") ||
737 !strcmp(val, "1"))
738 test_topic_random = 1;
739 else
740 test_topic_random = 0;
741 } else if (!strcmp(name, "test.concurrent.max")) {
742 TEST_LOCK();
743 test_concurrent_max = (int)strtod(val, NULL);
744 TEST_UNLOCK();
745 } else if (!strcmp(name, "test.sql.command")) {
746 TEST_LOCK();
747 if (test_sql_cmd)
748 rd_free(test_sql_cmd);
749 test_sql_cmd = rd_strdup(val);
750 TEST_UNLOCK();
751 } else
752 return 0;
753
754 return 1;
755 }
756
test_read_conf_file(const char * conf_path,rd_kafka_conf_t * conf,rd_kafka_topic_conf_t * topic_conf,int * timeoutp)757 static void test_read_conf_file (const char *conf_path,
758 rd_kafka_conf_t *conf,
759 rd_kafka_topic_conf_t *topic_conf,
760 int *timeoutp) {
761 FILE *fp;
762 char buf[1024];
763 int line = 0;
764
765 #ifndef _WIN32
766 fp = fopen(conf_path, "r");
767 #else
768 fp = NULL;
769 errno = fopen_s(&fp, conf_path, "r");
770 #endif
771 if (!fp) {
772 if (errno == ENOENT) {
773 TEST_SAY("Test config file %s not found\n", conf_path);
774 return;
775 } else
776 TEST_FAIL("Failed to read %s: %s",
777 conf_path, strerror(errno));
778 }
779
780 while (fgets(buf, sizeof(buf)-1, fp)) {
781 char *t;
782 char *b = buf;
783 rd_kafka_conf_res_t res = RD_KAFKA_CONF_UNKNOWN;
784 char *name, *val;
785 char errstr[512];
786
787 line++;
788 if ((t = strchr(b, '\n')))
789 *t = '\0';
790
791 if (*b == '#' || !*b)
792 continue;
793
794 if (!(t = strchr(b, '=')))
795 TEST_FAIL("%s:%i: expected name=value format\n",
796 conf_path, line);
797
798 name = b;
799 *t = '\0';
800 val = t+1;
801
802 if (test_set_special_conf(name, val, timeoutp))
803 continue;
804
805 if (!strncmp(name, "topic.", strlen("topic."))) {
806 name += strlen("topic.");
807 if (topic_conf)
808 res = rd_kafka_topic_conf_set(topic_conf,
809 name, val,
810 errstr,
811 sizeof(errstr));
812 else
813 res = RD_KAFKA_CONF_OK;
814 name -= strlen("topic.");
815 }
816
817 if (res == RD_KAFKA_CONF_UNKNOWN) {
818 if (conf)
819 res = rd_kafka_conf_set(conf,
820 name, val,
821 errstr, sizeof(errstr));
822 else
823 res = RD_KAFKA_CONF_OK;
824 }
825
826 if (res != RD_KAFKA_CONF_OK)
827 TEST_FAIL("%s:%i: %s\n",
828 conf_path, line, errstr);
829 }
830
831 fclose(fp);
832 }
833
834 /**
835 * @brief Get path to test config file
836 */
test_conf_get_path(void)837 const char *test_conf_get_path (void) {
838 return test_getenv("RDKAFKA_TEST_CONF", "test.conf");
839 }
840
test_getenv(const char * env,const char * def)841 const char *test_getenv (const char *env, const char *def) {
842 return rd_getenv(env, def);
843 }
844
test_conf_common_init(rd_kafka_conf_t * conf,int timeout)845 void test_conf_common_init (rd_kafka_conf_t *conf, int timeout) {
846 if (conf) {
847 const char *tmp = test_getenv("TEST_DEBUG", NULL);
848 if (tmp)
849 test_conf_set(conf, "debug", tmp);
850 }
851
852 if (timeout)
853 test_timeout_set(timeout);
854 }
855
856
857 /**
858 * Creates and sets up kafka configuration objects.
859 * Will read "test.conf" file if it exists.
860 */
test_conf_init(rd_kafka_conf_t ** conf,rd_kafka_topic_conf_t ** topic_conf,int timeout)861 void test_conf_init (rd_kafka_conf_t **conf, rd_kafka_topic_conf_t **topic_conf,
862 int timeout) {
863 const char *test_conf = test_conf_get_path();
864
865 if (conf) {
866 *conf = rd_kafka_conf_new();
867 rd_kafka_conf_set(*conf, "client.id", test_curr->name, NULL, 0);
868 if (test_idempotent_producer)
869 test_conf_set(*conf, "enable.idempotence", "true");
870 rd_kafka_conf_set_error_cb(*conf, test_error_cb);
871 rd_kafka_conf_set_stats_cb(*conf, test_stats_cb);
872
873 /* Allow higher request timeouts on CI */
874 if (test_on_ci)
875 test_conf_set(*conf, "request.timeout.ms", "10000");
876
877 #ifdef SIGIO
878 {
879 char buf[64];
880
881 /* Quick termination */
882 rd_snprintf(buf, sizeof(buf), "%i", SIGIO);
883 rd_kafka_conf_set(*conf, "internal.termination.signal",
884 buf, NULL, 0);
885 signal(SIGIO, SIG_IGN);
886 }
887 #endif
888 }
889
890 #if WITH_SOCKEM
891 if (*test_sockem_conf && conf)
892 test_socket_enable(*conf);
893 #endif
894
895 if (topic_conf)
896 *topic_conf = rd_kafka_topic_conf_new();
897
898 /* Open and read optional local test configuration file, if any. */
899 test_read_conf_file(test_conf,
900 conf ? *conf : NULL,
901 topic_conf ? *topic_conf : NULL, &timeout);
902
903 test_conf_common_init(conf ? *conf : NULL, timeout);
904 }
905
906
test_rand(void)907 static RD_INLINE unsigned int test_rand(void) {
908 unsigned int r;
909 #ifdef _WIN32
910 rand_s(&r);
911 #else
912 r = rand();
913 #endif
914 return r;
915 }
916 /**
917 * Generate a "unique" test id.
918 */
test_id_generate(void)919 uint64_t test_id_generate (void) {
920 return (((uint64_t)test_rand()) << 32) | (uint64_t)test_rand();
921 }
922
923
924 /**
925 * Generate a "unique" string id
926 */
test_str_id_generate(char * dest,size_t dest_size)927 char *test_str_id_generate (char *dest, size_t dest_size) {
928 rd_snprintf(dest, dest_size, "%"PRId64, test_id_generate());
929 return dest;
930 }
931
932 /**
933 * Same as test_str_id_generate but returns a temporary string.
934 */
test_str_id_generate_tmp(void)935 const char *test_str_id_generate_tmp (void) {
936 static RD_TLS char ret[64];
937 return test_str_id_generate(ret, sizeof(ret));
938 }
939
940 /**
941 * Format a message token.
942 * Pad's to dest_size.
943 */
test_msg_fmt(char * dest,size_t dest_size,uint64_t testid,int32_t partition,int msgid)944 void test_msg_fmt (char *dest, size_t dest_size,
945 uint64_t testid, int32_t partition, int msgid) {
946 size_t of;
947
948 of = rd_snprintf(dest, dest_size,
949 "testid=%"PRIu64", partition=%"PRId32", msg=%i\n",
950 testid, partition, msgid);
951 if (of < dest_size - 1) {
952 memset(dest+of, '!', dest_size-of);
953 dest[dest_size-1] = '\0';
954 }
955 }
956
957 /**
958 * @brief Prepare message value and key for test produce.
959 */
test_prepare_msg(uint64_t testid,int32_t partition,int msg_id,char * val,size_t val_size,char * key,size_t key_size)960 void test_prepare_msg (uint64_t testid, int32_t partition, int msg_id,
961 char *val, size_t val_size,
962 char *key, size_t key_size) {
963 size_t of = 0;
964
965 test_msg_fmt(key, key_size, testid, partition, msg_id);
966
967 while (of < val_size) {
968 /* Copy-repeat key into val until val_size */
969 size_t len = RD_MIN(val_size-of, key_size);
970 memcpy(val+of, key, len);
971 of += len;
972 }
973 }
974
975
976
977 /**
978 * Parse a message token
979 */
test_msg_parse00(const char * func,int line,uint64_t testid,int32_t exp_partition,int * msgidp,const char * topic,int32_t partition,int64_t offset,const char * key,size_t key_size)980 void test_msg_parse00 (const char *func, int line,
981 uint64_t testid, int32_t exp_partition, int *msgidp,
982 const char *topic, int32_t partition, int64_t offset,
983 const char *key, size_t key_size) {
984 char buf[128];
985 uint64_t in_testid;
986 int in_part;
987
988 if (!key)
989 TEST_FAIL("%s:%i: Message (%s [%"PRId32"] @ %"PRId64") "
990 "has empty key\n",
991 func, line, topic, partition, offset);
992
993 rd_snprintf(buf, sizeof(buf), "%.*s", (int)key_size, key);
994
995 if (sscanf(buf, "testid=%"SCNu64", partition=%i, msg=%i\n",
996 &in_testid, &in_part, msgidp) != 3)
997 TEST_FAIL("%s:%i: Incorrect key format: %s", func, line, buf);
998
999
1000 if (testid != in_testid ||
1001 (exp_partition != -1 && exp_partition != in_part))
1002 TEST_FAIL("%s:%i: Our testid %"PRIu64", part %i did "
1003 "not match message: \"%s\"\n",
1004 func, line, testid, (int)exp_partition, buf);
1005 }
1006
test_msg_parse0(const char * func,int line,uint64_t testid,rd_kafka_message_t * rkmessage,int32_t exp_partition,int * msgidp)1007 void test_msg_parse0 (const char *func, int line,
1008 uint64_t testid, rd_kafka_message_t *rkmessage,
1009 int32_t exp_partition, int *msgidp) {
1010 test_msg_parse00(func, line, testid, exp_partition, msgidp,
1011 rd_kafka_topic_name(rkmessage->rkt),
1012 rkmessage->partition, rkmessage->offset,
1013 (const char *)rkmessage->key, rkmessage->key_len);
1014 }
1015
1016
1017 struct run_args {
1018 struct test *test;
1019 int argc;
1020 char **argv;
1021 };
1022
run_test0(struct run_args * run_args)1023 static int run_test0 (struct run_args *run_args) {
1024 struct test *test = run_args->test;
1025 test_timing_t t_run;
1026 int r;
1027 char stats_file[256];
1028
1029 rd_snprintf(stats_file, sizeof(stats_file), "stats_%s_%"PRIu64".json",
1030 test->name, test_id_generate());
1031 if (!(test->stats_fp = fopen(stats_file, "w+")))
1032 TEST_SAY("=== Failed to create stats file %s: %s ===\n",
1033 stats_file, strerror(errno));
1034
1035 test_curr = test;
1036
1037 #if WITH_SOCKEM
1038 rd_list_init(&test->sockets, 16, (void *)sockem_close);
1039 #endif
1040 /* Don't check message status by default */
1041 test->exp_dr_status = (rd_kafka_msg_status_t)-1;
1042
1043 TEST_SAY("================= Running test %s =================\n",
1044 test->name);
1045 if (test->stats_fp)
1046 TEST_SAY("==== Stats written to file %s ====\n", stats_file);
1047
1048 test_rusage_start(test_curr);
1049 TIMING_START(&t_run, "%s", test->name);
1050 test->start = t_run.ts_start;
1051
1052 /* Run test main function */
1053 r = test->mainfunc(run_args->argc, run_args->argv);
1054
1055 TIMING_STOP(&t_run);
1056 test_rusage_stop(test_curr,
1057 (double)TIMING_DURATION(&t_run) / 1000000.0);
1058
1059 TEST_LOCK();
1060 test->duration = TIMING_DURATION(&t_run);
1061
1062 if (test->state == TEST_SKIPPED) {
1063 TEST_SAY("================= Test %s SKIPPED "
1064 "=================\n",
1065 run_args->test->name);
1066 } else if (r) {
1067 test->state = TEST_FAILED;
1068 TEST_SAY("\033[31m"
1069 "================= Test %s FAILED ================="
1070 "\033[0m\n",
1071 run_args->test->name);
1072 } else {
1073 test->state = TEST_PASSED;
1074 TEST_SAY("\033[32m"
1075 "================= Test %s PASSED ================="
1076 "\033[0m\n",
1077 run_args->test->name);
1078 }
1079 TEST_UNLOCK();
1080
1081 cnd_broadcast(&test_cnd);
1082
1083 #if WITH_SOCKEM
1084 test_socket_close_all(test, 0);
1085 #endif
1086
1087 if (test->stats_fp) {
1088 long pos = ftell(test->stats_fp);
1089 fclose(test->stats_fp);
1090 test->stats_fp = NULL;
1091 /* Delete file if nothing was written */
1092 if (pos == 0) {
1093 #ifndef _WIN32
1094 unlink(stats_file);
1095 #else
1096 _unlink(stats_file);
1097 #endif
1098 }
1099 }
1100
1101 if (test_delete_topics_between && test_concurrent_max == 1)
1102 test_delete_all_test_topics(60*1000);
1103
1104 return r;
1105 }
1106
1107
1108
1109
run_test_from_thread(void * arg)1110 static int run_test_from_thread (void *arg) {
1111 struct run_args *run_args = arg;
1112
1113 thrd_detach(thrd_current());
1114
1115 run_test0(run_args);
1116
1117 TEST_LOCK();
1118 tests_running_cnt--;
1119 TEST_UNLOCK();
1120
1121 free(run_args);
1122
1123 return 0;
1124 }
1125
1126
1127 /**
1128 * @brief Check running tests for timeouts.
1129 * @locks TEST_LOCK MUST be held
1130 */
check_test_timeouts(void)1131 static void check_test_timeouts (void) {
1132 int64_t now = test_clock();
1133 struct test *test;
1134
1135 for (test = tests ; test->name ; test++) {
1136 if (test->state != TEST_RUNNING)
1137 continue;
1138
1139 /* Timeout check */
1140 if (now > test->timeout) {
1141 struct test *save_test = test_curr;
1142 test_curr = test;
1143 test->state = TEST_FAILED;
1144 test_summary(0/*no-locks*/);
1145 TEST_FAIL0(__FILE__,__LINE__,0/*nolock*/,
1146 0/*fail-later*/,
1147 "Test %s%s%s%s timed out "
1148 "(timeout set to %d seconds)\n",
1149 test->name,
1150 *test->subtest ? " (" : "",
1151 test->subtest,
1152 *test->subtest ? ")" : "",
1153 (int)(test->timeout-
1154 test->start)/
1155 1000000);
1156 test_curr = save_test;
1157 tests_running_cnt--; /* fail-later misses this*/
1158 #ifdef _WIN32
1159 TerminateThread(test->thrd, -1);
1160 #else
1161 pthread_kill(test->thrd, SIGKILL);
1162 #endif
1163 }
1164 }
1165 }
1166
1167
run_test(struct test * test,int argc,char ** argv)1168 static int run_test (struct test *test, int argc, char **argv) {
1169 struct run_args *run_args = calloc(1, sizeof(*run_args));
1170 int wait_cnt = 0;
1171
1172 run_args->test = test;
1173 run_args->argc = argc;
1174 run_args->argv = argv;
1175
1176 TEST_LOCK();
1177 while (tests_running_cnt >= test_concurrent_max) {
1178 if (!(wait_cnt++ % 100))
1179 TEST_SAY("Too many tests running (%d >= %d): "
1180 "postponing %s start...\n",
1181 tests_running_cnt, test_concurrent_max,
1182 test->name);
1183 cnd_timedwait_ms(&test_cnd, &test_mtx, 100);
1184
1185 check_test_timeouts();
1186 }
1187 tests_running_cnt++;
1188 test->timeout = test_clock() + (int64_t)(30.0 * 1000000.0 *
1189 test_timeout_multiplier);
1190 test->state = TEST_RUNNING;
1191 TEST_UNLOCK();
1192
1193 if (thrd_create(&test->thrd, run_test_from_thread, run_args) !=
1194 thrd_success) {
1195 TEST_LOCK();
1196 tests_running_cnt--;
1197 test->state = TEST_FAILED;
1198 TEST_UNLOCK();
1199
1200 TEST_FAIL("Failed to start thread for test %s\n",
1201 test->name);
1202 }
1203
1204 return 0;
1205 }
1206
run_tests(int argc,char ** argv)1207 static void run_tests (int argc, char **argv) {
1208 struct test *test;
1209
1210 for (test = tests ; test->name ; test++) {
1211 char testnum[128];
1212 char *t;
1213 const char *skip_reason = NULL;
1214 rd_bool_t skip_silent = rd_false;
1215 char tmp[128];
1216 const char *scenario =
1217 test->scenario ? test->scenario : "default";
1218
1219 if (!test->mainfunc)
1220 continue;
1221
1222 /* Extract test number, as string */
1223 strncpy(testnum, test->name, sizeof(testnum)-1);
1224 testnum[sizeof(testnum)-1] = '\0';
1225 if ((t = strchr(testnum, '_')))
1226 *t = '\0';
1227
1228 if ((test_flags && (test_flags & test->flags) != test_flags)) {
1229 skip_reason = "filtered due to test flags";
1230 skip_silent = rd_true;
1231 } if ((test_neg_flags & ~test_flags) & test->flags)
1232 skip_reason = "Filtered due to negative test flags";
1233 if (test_broker_version &&
1234 (test->minver > test_broker_version ||
1235 (test->maxver && test->maxver < test_broker_version))) {
1236 rd_snprintf(tmp, sizeof(tmp),
1237 "not applicable for broker "
1238 "version %d.%d.%d.%d",
1239 TEST_BRKVER_X(test_broker_version, 0),
1240 TEST_BRKVER_X(test_broker_version, 1),
1241 TEST_BRKVER_X(test_broker_version, 2),
1242 TEST_BRKVER_X(test_broker_version, 3));
1243 skip_reason = tmp;
1244 }
1245
1246 if (!strstr(scenario, test_scenario)) {
1247 rd_snprintf(tmp, sizeof(tmp),
1248 "requires test scenario %s", scenario);
1249 skip_silent = rd_true;
1250 skip_reason = tmp;
1251 }
1252
1253 if (tests_to_run && !strstr(tests_to_run, testnum)) {
1254 skip_reason = "not included in TESTS list";
1255 skip_silent = rd_true;
1256 } else if (!tests_to_run && (test->flags & TEST_F_MANUAL)) {
1257 skip_reason = "manual test";
1258 skip_silent = rd_true;
1259 } else if (tests_to_skip && strstr(tests_to_skip, testnum))
1260 skip_reason = "included in TESTS_SKIP list";
1261
1262 if (!skip_reason) {
1263 run_test(test, argc, argv);
1264 } else {
1265 if (skip_silent) {
1266 TEST_SAYL(3,
1267 "================= Skipping test %s "
1268 "(%s) ================\n",
1269 test->name, skip_reason);
1270 TEST_LOCK();
1271 test->state = TEST_SKIPPED;
1272 TEST_UNLOCK();
1273 } else {
1274 test_curr = test;
1275 TEST_SKIP("%s\n", skip_reason);
1276 test_curr = &tests[0];
1277 }
1278
1279 }
1280 }
1281
1282
1283 }
1284
1285 /**
1286 * @brief Print summary for all tests.
1287 *
1288 * @returns the number of failed tests.
1289 */
test_summary(int do_lock)1290 static int test_summary (int do_lock) {
1291 struct test *test;
1292 FILE *report_fp = NULL;
1293 char report_path[128];
1294 time_t t;
1295 struct tm *tm;
1296 char datestr[64];
1297 int64_t total_duration = 0;
1298 int tests_run = 0;
1299 int tests_failed = 0;
1300 int tests_failed_known = 0;
1301 int tests_passed = 0;
1302 FILE *sql_fp = NULL;
1303 const char *tmp;
1304
1305 t = time(NULL);
1306 tm = localtime(&t);
1307 strftime(datestr, sizeof(datestr), "%Y%m%d%H%M%S", tm);
1308
1309 if ((tmp = test_getenv("TEST_REPORT", NULL)))
1310 rd_snprintf(report_path, sizeof(report_path), "%s", tmp);
1311 else if (test_write_report)
1312 rd_snprintf(report_path, sizeof(report_path),
1313 "test_report_%s.json", datestr);
1314 else
1315 report_path[0] = '\0';
1316
1317 if (*report_path) {
1318 report_fp = fopen(report_path, "w+");
1319 if (!report_fp)
1320 TEST_WARN("Failed to create report file %s: %s\n",
1321 report_path, strerror(errno));
1322 else
1323 fprintf(report_fp,
1324 "{ \"id\": \"%s_%s\", \"mode\": \"%s\", "
1325 "\"scenario\": \"%s\", "
1326 "\"date\": \"%s\", "
1327 "\"git_version\": \"%s\", "
1328 "\"broker_version\": \"%s\", "
1329 "\"tests\": {",
1330 datestr, test_mode, test_mode,
1331 test_scenario, datestr,
1332 test_git_version,
1333 test_broker_version_str);
1334 }
1335
1336 if (do_lock)
1337 TEST_LOCK();
1338
1339 if (test_sql_cmd) {
1340 #ifdef _WIN32
1341 sql_fp = _popen(test_sql_cmd, "w");
1342 #else
1343 sql_fp = popen(test_sql_cmd, "w");
1344 #endif
1345
1346 fprintf(sql_fp,
1347 "CREATE TABLE IF NOT EXISTS "
1348 "runs(runid text PRIMARY KEY, mode text, "
1349 "date datetime, cnt int, passed int, failed int, "
1350 "duration numeric);\n"
1351 "CREATE TABLE IF NOT EXISTS "
1352 "tests(runid text, mode text, name text, state text, "
1353 "extra text, duration numeric);\n");
1354 }
1355
1356 if (show_summary)
1357 printf("TEST %s (%s, scenario %s) SUMMARY\n"
1358 "#==================================================================#\n",
1359 datestr, test_mode, test_scenario);
1360
1361 for (test = tests ; test->name ; test++) {
1362 const char *color;
1363 int64_t duration;
1364 char extra[128] = "";
1365 int do_count = 1;
1366
1367 if (!(duration = test->duration) && test->start > 0)
1368 duration = test_clock() - test->start;
1369
1370 if (test == tests) {
1371 /* <MAIN> test:
1372 * test accounts for total runtime.
1373 * dont include in passed/run/failed counts. */
1374 total_duration = duration;
1375 do_count = 0;
1376 }
1377
1378 switch (test->state)
1379 {
1380 case TEST_PASSED:
1381 color = _C_GRN;
1382 if (do_count) {
1383 tests_passed++;
1384 tests_run++;
1385 }
1386 break;
1387 case TEST_FAILED:
1388 if (test->flags & TEST_F_KNOWN_ISSUE) {
1389 rd_snprintf(extra, sizeof(extra),
1390 " <-- known issue%s%s",
1391 test->extra ? ": " : "",
1392 test->extra ? test->extra : "");
1393 if (do_count)
1394 tests_failed_known++;
1395 }
1396 color = _C_RED;
1397 if (do_count) {
1398 tests_failed++;
1399 tests_run++;
1400 }
1401 break;
1402 case TEST_RUNNING:
1403 color = _C_MAG;
1404 if (do_count) {
1405 tests_failed++; /* All tests should be finished */
1406 tests_run++;
1407 }
1408 break;
1409 case TEST_NOT_STARTED:
1410 color = _C_YEL;
1411 if (test->extra)
1412 rd_snprintf(extra, sizeof(extra), " %s",
1413 test->extra);
1414 break;
1415 default:
1416 color = _C_CYA;
1417 break;
1418 }
1419
1420 if (show_summary &&
1421 (test->state != TEST_SKIPPED || *test->failstr ||
1422 (tests_to_run &&
1423 !strncmp(tests_to_run, test->name,
1424 strlen(tests_to_run))))) {
1425 printf("|%s %-40s | %10s | %7.3fs %s|",
1426 color,
1427 test->name, test_states[test->state],
1428 (double)duration/1000000.0, _C_CLR);
1429 if (test->state == TEST_FAILED)
1430 printf(_C_RED " %s" _C_CLR, test->failstr);
1431 else if (test->state == TEST_SKIPPED)
1432 printf(_C_CYA " %s" _C_CLR, test->failstr);
1433 printf("%s\n", extra);
1434 }
1435
1436 if (report_fp) {
1437 int i;
1438 fprintf(report_fp,
1439 "%s\"%s\": {"
1440 "\"name\": \"%s\", "
1441 "\"state\": \"%s\", "
1442 "\"known_issue\": %s, "
1443 "\"extra\": \"%s\", "
1444 "\"duration\": %.3f, "
1445 "\"report\": [ ",
1446 test == tests ? "": ", ",
1447 test->name,
1448 test->name, test_states[test->state],
1449 test->flags & TEST_F_KNOWN_ISSUE ? "true":"false",
1450 test->extra ? test->extra : "",
1451 (double)duration/1000000.0);
1452
1453 for (i = 0 ; i < test->report_cnt ; i++) {
1454 fprintf(report_fp, "%s%s ",
1455 i == 0 ? "":",",
1456 test->report_arr[i]);
1457 }
1458
1459 fprintf(report_fp, "] }");
1460 }
1461
1462 if (sql_fp)
1463 fprintf(sql_fp,
1464 "INSERT INTO tests VALUES("
1465 "'%s_%s', '%s', '%s', '%s', '%s', %f);\n",
1466 datestr, test_mode, test_mode,
1467 test->name, test_states[test->state],
1468 test->extra ? test->extra : "",
1469 (double)duration/1000000.0);
1470 }
1471 if (do_lock)
1472 TEST_UNLOCK();
1473
1474 if (show_summary)
1475 printf("#==================================================================#\n");
1476
1477 if (report_fp) {
1478 fprintf(report_fp,
1479 "}, "
1480 "\"tests_run\": %d, "
1481 "\"tests_passed\": %d, "
1482 "\"tests_failed\": %d, "
1483 "\"duration\": %.3f"
1484 "}\n",
1485 tests_run, tests_passed, tests_failed,
1486 (double)total_duration/1000000.0);
1487
1488 fclose(report_fp);
1489 TEST_SAY("# Test report written to %s\n", report_path);
1490 }
1491
1492 if (sql_fp) {
1493 fprintf(sql_fp,
1494 "INSERT INTO runs VALUES('%s_%s', '%s', datetime(), "
1495 "%d, %d, %d, %f);\n",
1496 datestr, test_mode, test_mode,
1497 tests_run, tests_passed, tests_failed,
1498 (double)total_duration/1000000.0);
1499 fclose(sql_fp);
1500 }
1501
1502 return tests_failed - tests_failed_known;
1503 }
1504
1505 #ifndef _WIN32
test_sig_term(int sig)1506 static void test_sig_term (int sig) {
1507 if (test_exit)
1508 exit(1);
1509 fprintf(stderr, "Exiting tests, waiting for running tests to finish.\n");
1510 test_exit = 1;
1511 }
1512 #endif
1513
1514 /**
1515 * Wait 'timeout' seconds for rdkafka to kill all its threads and clean up.
1516 */
test_wait_exit(int timeout)1517 static void test_wait_exit (int timeout) {
1518 int r;
1519 time_t start = time(NULL);
1520
1521 while ((r = rd_kafka_thread_cnt()) && timeout-- >= 0) {
1522 TEST_SAY("%i thread(s) in use by librdkafka, waiting...\n", r);
1523 rd_sleep(1);
1524 }
1525
1526 TEST_SAY("%i thread(s) in use by librdkafka\n", r);
1527
1528 if (r > 0)
1529 TEST_FAIL("%i thread(s) still active in librdkafka", r);
1530
1531 timeout -= (int)(time(NULL) - start);
1532 if (timeout > 0) {
1533 TEST_SAY("Waiting %d seconds for all librdkafka memory "
1534 "to be released\n", timeout);
1535 if (rd_kafka_wait_destroyed(timeout * 1000) == -1)
1536 TEST_FAIL("Not all internal librdkafka "
1537 "objects destroyed\n");
1538 }
1539 }
1540
1541
1542
1543
1544 /**
1545 * @brief Test framework cleanup before termination.
1546 */
test_cleanup(void)1547 static void test_cleanup (void) {
1548 struct test *test;
1549
1550 /* Free report arrays */
1551 for (test = tests ; test->name ; test++) {
1552 int i;
1553 if (!test->report_arr)
1554 continue;
1555 for (i = 0 ; i < test->report_cnt ; i++)
1556 rd_free(test->report_arr[i]);
1557 rd_free(test->report_arr);
1558 test->report_arr = NULL;
1559 }
1560
1561 if (test_sql_cmd)
1562 rd_free(test_sql_cmd);
1563 }
1564
1565
main(int argc,char ** argv)1566 int main(int argc, char **argv) {
1567 int i, r;
1568 test_timing_t t_all;
1569 int a,b,c,d;
1570 const char *tmpver;
1571
1572 mtx_init(&test_mtx, mtx_plain);
1573 cnd_init(&test_cnd);
1574
1575 test_init();
1576
1577 #ifndef _WIN32
1578 signal(SIGINT, test_sig_term);
1579 #endif
1580 tests_to_run = test_getenv("TESTS", NULL);
1581 subtests_to_run = test_getenv("SUBTESTS", NULL);
1582 tests_to_skip = test_getenv("TESTS_SKIP", NULL);
1583 tmpver = test_getenv("TEST_KAFKA_VERSION", NULL);
1584 if (!tmpver)
1585 tmpver = test_getenv("KAFKA_VERSION", test_broker_version_str);
1586 test_broker_version_str = tmpver;
1587
1588 test_git_version = test_getenv("RDKAFKA_GITVER", "HEAD");
1589
1590 /* Are we running on CI? */
1591 if (test_getenv("CI", NULL)) {
1592 test_on_ci = 1;
1593 test_concurrent_max = 3;
1594 }
1595
1596 test_conf_init(NULL, NULL, 10);
1597
1598 for (i = 1 ; i < argc ; i++) {
1599 if (!strncmp(argv[i], "-p", 2) && strlen(argv[i]) > 2) {
1600 if (test_rusage) {
1601 fprintf(stderr,
1602 "%% %s ignored: -R takes preceedence\n",
1603 argv[i]);
1604 continue;
1605 }
1606 test_concurrent_max = (int)strtod(argv[i]+2, NULL);
1607 } else if (!strcmp(argv[i], "-l"))
1608 test_flags |= TEST_F_LOCAL;
1609 else if (!strcmp(argv[i], "-L"))
1610 test_neg_flags |= TEST_F_LOCAL;
1611 else if (!strcmp(argv[i], "-a"))
1612 test_assert_on_fail = 1;
1613 else if (!strcmp(argv[i], "-k"))
1614 test_flags |= TEST_F_KNOWN_ISSUE;
1615 else if (!strcmp(argv[i], "-K"))
1616 test_neg_flags |= TEST_F_KNOWN_ISSUE;
1617 else if (!strcmp(argv[i], "-E"))
1618 test_neg_flags |= TEST_F_SOCKEM;
1619 else if (!strcmp(argv[i], "-V") && i+1 < argc)
1620 test_broker_version_str = argv[++i];
1621 else if (!strcmp(argv[i], "-s") && i+1 < argc)
1622 strncpy(test_scenario, argv[++i],
1623 sizeof(test_scenario)-1);
1624 else if (!strcmp(argv[i], "-S"))
1625 show_summary = 0;
1626 else if (!strcmp(argv[i], "-D"))
1627 test_delete_topics_between = 1;
1628 else if (!strcmp(argv[i], "-P"))
1629 test_idempotent_producer = 1;
1630 else if (!strcmp(argv[i], "-Q"))
1631 test_quick = 1;
1632 else if (!strcmp(argv[i], "-r"))
1633 test_write_report = 1;
1634 else if (!strncmp(argv[i], "-R", 2)) {
1635 test_rusage = 1;
1636 test_concurrent_max = 1;
1637 if (strlen(argv[i]) > strlen("-R")) {
1638 test_rusage_cpu_calibration =
1639 strtod(argv[i]+2, NULL);
1640 if (test_rusage_cpu_calibration < 0.00001) {
1641 fprintf(stderr,
1642 "%% Invalid CPU calibration "
1643 "value: %s\n", argv[i]+2);
1644 exit(1);
1645 }
1646 }
1647 } else if (*argv[i] != '-')
1648 tests_to_run = argv[i];
1649 else {
1650 printf("Unknown option: %s\n"
1651 "\n"
1652 "Usage: %s [options] [<test-match-substr>]\n"
1653 "Options:\n"
1654 " -p<N> Run N tests in parallel\n"
1655 " -l/-L Only/dont run local tests (no broker needed)\n"
1656 " -k/-K Only/dont run tests with known issues\n"
1657 " -E Don't run sockem tests\n"
1658 " -a Assert on failures\n"
1659 " -r Write test_report_...json file.\n"
1660 " -S Dont show test summary\n"
1661 " -s <scenario> Test scenario.\n"
1662 " -V <N.N.N.N> Broker version.\n"
1663 " -D Delete all test topics between each test (-p1) or after all tests\n"
1664 " -P Run all tests with `enable.idempotency=true`\n"
1665 " -Q Run tests in quick mode: faster tests, fewer iterations, less data.\n"
1666 " -R Check resource usage thresholds.\n"
1667 " -R<C> Check resource usage thresholds but adjust CPU thresholds by C (float):\n"
1668 " C < 1.0: CPU is faster than base line system.\n"
1669 " C > 1.0: CPU is slower than base line system.\n"
1670 " E.g. -R2.5 = CPU is 2.5x slower than base line system.\n"
1671 "\n"
1672 "Environment variables:\n"
1673 " TESTS - substring matched test to run (e.g., 0033)\n"
1674 " SUBTESTS - substring matched subtest to run "
1675 "(e.g., n_wildcard)\n"
1676 " TEST_KAFKA_VERSION - broker version (e.g., 0.9.0.1)\n"
1677 " TEST_SCENARIO - Test scenario\n"
1678 " TEST_LEVEL - Test verbosity level\n"
1679 " TEST_MODE - bare, helgrind, valgrind\n"
1680 " TEST_SEED - random seed\n"
1681 " RDKAFKA_TEST_CONF - test config file (test.conf)\n"
1682 " KAFKA_PATH - Path to kafka source dir\n"
1683 " ZK_ADDRESS - Zookeeper address\n"
1684 "\n",
1685 argv[i], argv[0]);
1686 exit(1);
1687 }
1688 }
1689
1690 TEST_SAY("Git version: %s\n", test_git_version);
1691
1692 if (!strcmp(test_broker_version_str, "trunk"))
1693 test_broker_version_str = "9.9.9.9"; /* for now */
1694
1695 d = 0;
1696 if (sscanf(test_broker_version_str, "%d.%d.%d.%d",
1697 &a, &b, &c, &d) < 3) {
1698 printf("%% Expected broker version to be in format "
1699 "N.N.N (N=int), not %s\n",
1700 test_broker_version_str);
1701 exit(1);
1702 }
1703 test_broker_version = TEST_BRKVER(a, b, c, d);
1704 TEST_SAY("Broker version: %s (%d.%d.%d.%d)\n",
1705 test_broker_version_str,
1706 TEST_BRKVER_X(test_broker_version, 0),
1707 TEST_BRKVER_X(test_broker_version, 1),
1708 TEST_BRKVER_X(test_broker_version, 2),
1709 TEST_BRKVER_X(test_broker_version, 3));
1710
1711 /* Set up fake "<MAIN>" test for all operations performed in
1712 * the main thread rather than the per-test threads.
1713 * Nice side effect is that we get timing and status for main as well.*/
1714 test_curr = &tests[0];
1715 test_curr->state = TEST_PASSED;
1716 test_curr->start = test_clock();
1717
1718 if (test_on_ci) {
1719 TEST_LOCK();
1720 test_timeout_multiplier += 2;
1721 TEST_UNLOCK();
1722 }
1723
1724 if (!strcmp(test_mode, "helgrind") ||
1725 !strcmp(test_mode, "drd")) {
1726 TEST_LOCK();
1727 test_timeout_multiplier += 5;
1728 TEST_UNLOCK();
1729 } else if (!strcmp(test_mode, "valgrind")) {
1730 TEST_LOCK();
1731 test_timeout_multiplier += 3;
1732 TEST_UNLOCK();
1733 }
1734
1735 /* Broker version 0.9 and api.version.request=true (which is default)
1736 * will cause a 10s stall per connection. Instead of fixing
1737 * that for each affected API in every test we increase the timeout
1738 * multiplier accordingly instead. The typical consume timeout is 5
1739 * seconds, so a multiplier of 3 should be good. */
1740 if ((test_broker_version & 0xffff0000) == 0x00090000)
1741 test_timeout_multiplier += 3;
1742
1743 if (test_concurrent_max > 1)
1744 test_timeout_multiplier += (double)test_concurrent_max / 3;
1745
1746 TEST_SAY("Tests to run : %s\n", tests_to_run ? tests_to_run : "all");
1747 if (subtests_to_run)
1748 TEST_SAY("Sub tests : %s\n", subtests_to_run);
1749 if (tests_to_skip)
1750 TEST_SAY("Skip tests : %s\n", tests_to_skip);
1751 TEST_SAY("Test mode : %s%s%s\n",
1752 test_quick ? "quick, ":"",
1753 test_mode,
1754 test_on_ci ? ", CI":"");
1755 TEST_SAY("Test scenario: %s\n", test_scenario);
1756 TEST_SAY("Test filter : %s\n",
1757 (test_flags & TEST_F_LOCAL) ? "local tests only" : "no filter");
1758 TEST_SAY("Test timeout multiplier: %.1f\n", test_timeout_multiplier);
1759 TEST_SAY("Action on test failure: %s\n",
1760 test_assert_on_fail ? "assert crash" : "continue other tests");
1761 if (test_rusage)
1762 TEST_SAY("Test rusage : yes (%.2fx CPU calibration)\n",
1763 test_rusage_cpu_calibration);
1764 if (test_idempotent_producer)
1765 TEST_SAY("Test Idempotent Producer: enabled\n");
1766
1767 {
1768 char cwd[512], *pcwd;
1769 #ifdef _WIN32
1770 pcwd = _getcwd(cwd, sizeof(cwd) - 1);
1771 #else
1772 pcwd = getcwd(cwd, sizeof(cwd) - 1);
1773 #endif
1774 if (pcwd)
1775 TEST_SAY("Current directory: %s\n", cwd);
1776 }
1777
1778 test_timeout_set(30);
1779
1780 TIMING_START(&t_all, "ALL-TESTS");
1781
1782 /* Run tests */
1783 run_tests(argc, argv);
1784
1785 TEST_LOCK();
1786 while (tests_running_cnt > 0 && !test_exit) {
1787 struct test *test;
1788
1789 if (!test_quick && test_level >= 2) {
1790 TEST_SAY("%d test(s) running:", tests_running_cnt);
1791
1792 for (test = tests ; test->name ; test++) {
1793 if (test->state != TEST_RUNNING)
1794 continue;
1795
1796 TEST_SAY0(" %s", test->name);
1797 }
1798
1799 TEST_SAY0("\n");
1800 }
1801
1802 check_test_timeouts();
1803
1804 TEST_UNLOCK();
1805
1806 if (test_quick)
1807 rd_usleep(200*1000, NULL);
1808 else
1809 rd_sleep(1);
1810 TEST_LOCK();
1811 }
1812
1813 TIMING_STOP(&t_all);
1814
1815 test_curr = &tests[0];
1816 test_curr->duration = test_clock() - test_curr->start;
1817
1818 TEST_UNLOCK();
1819
1820 if (test_delete_topics_between)
1821 test_delete_all_test_topics(60*1000);
1822
1823 r = test_summary(1/*lock*/) ? 1 : 0;
1824
1825 /* Wait for everything to be cleaned up since broker destroys are
1826 * handled in its own thread. */
1827 test_wait_exit(0);
1828
1829 /* If we havent failed at this point then
1830 * there were no threads leaked */
1831 if (r == 0)
1832 TEST_SAY("\n============== ALL TESTS PASSED ==============\n");
1833
1834 test_cleanup();
1835
1836 if (r > 0)
1837 TEST_FAIL("%d test(s) failed, see previous errors", r);
1838
1839 return r;
1840 }
1841
1842
1843
1844
1845
1846 /******************************************************************************
1847 *
1848 * Helpers
1849 *
1850 ******************************************************************************/
1851
test_dr_msg_cb(rd_kafka_t * rk,const rd_kafka_message_t * rkmessage,void * opaque)1852 void test_dr_msg_cb (rd_kafka_t *rk, const rd_kafka_message_t *rkmessage,
1853 void *opaque) {
1854 int *remainsp = rkmessage->_private;
1855 static const char *status_names[] = {
1856 [RD_KAFKA_MSG_STATUS_NOT_PERSISTED] = "NotPersisted",
1857 [RD_KAFKA_MSG_STATUS_POSSIBLY_PERSISTED] = "PossiblyPersisted",
1858 [RD_KAFKA_MSG_STATUS_PERSISTED] = "Persisted"
1859 };
1860
1861 TEST_SAYL(4, "Delivery report: %s (%s) to %s [%"PRId32"] "
1862 "at offset %"PRId64" latency %.2fms\n",
1863 rd_kafka_err2str(rkmessage->err),
1864 status_names[rd_kafka_message_status(rkmessage)],
1865 rd_kafka_topic_name(rkmessage->rkt),
1866 rkmessage->partition,
1867 rkmessage->offset,
1868 (float)rd_kafka_message_latency(rkmessage) / 1000.0);
1869
1870 if (!test_curr->produce_sync) {
1871 if (!test_curr->ignore_dr_err &&
1872 rkmessage->err != test_curr->exp_dr_err)
1873 TEST_FAIL("Message delivery (to %s [%"PRId32"]) "
1874 "failed: expected %s, got %s",
1875 rd_kafka_topic_name(rkmessage->rkt),
1876 rkmessage->partition,
1877 rd_kafka_err2str(test_curr->exp_dr_err),
1878 rd_kafka_err2str(rkmessage->err));
1879
1880 if ((int)test_curr->exp_dr_status != -1) {
1881 rd_kafka_msg_status_t status =
1882 rd_kafka_message_status(rkmessage);
1883
1884 TEST_ASSERT(status == test_curr->exp_dr_status,
1885 "Expected message status %s, not %s",
1886 status_names[test_curr->exp_dr_status],
1887 status_names[status]);
1888 }
1889
1890 /* Add message to msgver */
1891 if (!rkmessage->err && test_curr->dr_mv)
1892 test_msgver_add_msg(rk, test_curr->dr_mv, rkmessage);
1893 }
1894
1895 if (remainsp) {
1896 TEST_ASSERT(*remainsp > 0,
1897 "Too many messages delivered (remains %i)",
1898 *remainsp);
1899
1900 (*remainsp)--;
1901 }
1902
1903 if (test_curr->produce_sync)
1904 test_curr->produce_sync_err = rkmessage->err;
1905 }
1906
1907
test_create_handle(int mode,rd_kafka_conf_t * conf)1908 rd_kafka_t *test_create_handle (int mode, rd_kafka_conf_t *conf) {
1909 rd_kafka_t *rk;
1910 char errstr[512];
1911
1912 if (!conf) {
1913 test_conf_init(&conf, NULL, 0);
1914 #if WITH_SOCKEM
1915 if (*test_sockem_conf)
1916 test_socket_enable(conf);
1917 #endif
1918 } else {
1919 if (!strcmp(test_conf_get(conf, "client.id"), "rdkafka"))
1920 test_conf_set(conf, "client.id", test_curr->name);
1921 }
1922
1923
1924
1925 /* Creat kafka instance */
1926 rk = rd_kafka_new(mode, conf, errstr, sizeof(errstr));
1927 if (!rk)
1928 TEST_FAIL("Failed to create rdkafka instance: %s\n", errstr);
1929
1930 TEST_SAY("Created kafka instance %s\n", rd_kafka_name(rk));
1931
1932 return rk;
1933 }
1934
1935
test_create_producer(void)1936 rd_kafka_t *test_create_producer (void) {
1937 rd_kafka_conf_t *conf;
1938
1939 test_conf_init(&conf, NULL, 0);
1940 rd_kafka_conf_set_dr_msg_cb(conf, test_dr_msg_cb);
1941
1942 return test_create_handle(RD_KAFKA_PRODUCER, conf);
1943 }
1944
1945
1946 /**
1947 * Create topic_t object with va-arg list as key-value config pairs
1948 * terminated by NULL.
1949 */
test_create_topic_object(rd_kafka_t * rk,const char * topic,...)1950 rd_kafka_topic_t *test_create_topic_object (rd_kafka_t *rk,
1951 const char *topic, ...) {
1952 rd_kafka_topic_t *rkt;
1953 rd_kafka_topic_conf_t *topic_conf;
1954 va_list ap;
1955 const char *name, *val;
1956
1957 test_conf_init(NULL, &topic_conf, 0);
1958
1959 va_start(ap, topic);
1960 while ((name = va_arg(ap, const char *)) &&
1961 (val = va_arg(ap, const char *))) {
1962 test_topic_conf_set(topic_conf, name, val);
1963 }
1964 va_end(ap);
1965
1966 rkt = rd_kafka_topic_new(rk, topic, topic_conf);
1967 if (!rkt)
1968 TEST_FAIL("Failed to create topic: %s\n",
1969 rd_kafka_err2str(rd_kafka_last_error()));
1970
1971 return rkt;
1972
1973 }
1974
1975
test_create_producer_topic(rd_kafka_t * rk,const char * topic,...)1976 rd_kafka_topic_t *test_create_producer_topic (rd_kafka_t *rk,
1977 const char *topic, ...) {
1978 rd_kafka_topic_t *rkt;
1979 rd_kafka_topic_conf_t *topic_conf;
1980 char errstr[512];
1981 va_list ap;
1982 const char *name, *val;
1983
1984 test_conf_init(NULL, &topic_conf, 0);
1985
1986 va_start(ap, topic);
1987 while ((name = va_arg(ap, const char *)) &&
1988 (val = va_arg(ap, const char *))) {
1989 if (rd_kafka_topic_conf_set(topic_conf, name, val,
1990 errstr, sizeof(errstr)) != RD_KAFKA_CONF_OK)
1991 TEST_FAIL("Conf failed: %s\n", errstr);
1992 }
1993 va_end(ap);
1994
1995 /* Make sure all replicas are in-sync after producing
1996 * so that consume test wont fail. */
1997 rd_kafka_topic_conf_set(topic_conf, "request.required.acks", "-1",
1998 errstr, sizeof(errstr));
1999
2000
2001 rkt = rd_kafka_topic_new(rk, topic, topic_conf);
2002 if (!rkt)
2003 TEST_FAIL("Failed to create topic: %s\n",
2004 rd_kafka_err2str(rd_kafka_last_error()));
2005
2006 return rkt;
2007
2008 }
2009
2010
2011
2012 /**
2013 * Produces \p cnt messages and returns immediately.
2014 * Does not wait for delivery.
2015 * \p msgcounterp is incremented for each produced messages and passed
2016 * as \p msg_opaque which is later used in test_dr_msg_cb to decrement
2017 * the counter on delivery.
2018 *
2019 * If \p payload is NULL the message key and payload will be formatted
2020 * according to standard test format, otherwise the key will be NULL and
2021 * payload send as message payload.
2022 *
2023 * Default message size is 128 bytes, if \p size is non-zero and \p payload
2024 * is NULL the message size of \p size will be used.
2025 */
test_produce_msgs_nowait(rd_kafka_t * rk,rd_kafka_topic_t * rkt,uint64_t testid,int32_t partition,int msg_base,int cnt,const char * payload,size_t size,int msgrate,int * msgcounterp)2026 void test_produce_msgs_nowait (rd_kafka_t *rk, rd_kafka_topic_t *rkt,
2027 uint64_t testid, int32_t partition,
2028 int msg_base, int cnt,
2029 const char *payload, size_t size, int msgrate,
2030 int *msgcounterp) {
2031 int msg_id;
2032 test_timing_t t_all, t_poll;
2033 char key[128];
2034 void *buf;
2035 int64_t tot_bytes = 0;
2036 int64_t tot_time_poll = 0;
2037 int64_t per_msg_wait = 0;
2038
2039 if (msgrate > 0)
2040 per_msg_wait = 1000000 / (int64_t)msgrate;
2041
2042
2043 if (payload)
2044 buf = (void *)payload;
2045 else {
2046 if (size == 0)
2047 size = 128;
2048 buf = calloc(1, size);
2049 }
2050
2051 TEST_SAY("Produce to %s [%"PRId32"]: messages #%d..%d\n",
2052 rd_kafka_topic_name(rkt), partition, msg_base, msg_base+cnt);
2053
2054 TIMING_START(&t_all, "PRODUCE");
2055 TIMING_START(&t_poll, "SUM(POLL)");
2056
2057 for (msg_id = msg_base ; msg_id < msg_base + cnt ; msg_id++) {
2058 int wait_time = 0;
2059
2060 if (!payload)
2061 test_prepare_msg(testid, partition, msg_id,
2062 buf, size, key, sizeof(key));
2063
2064
2065 if (rd_kafka_produce(rkt, partition,
2066 RD_KAFKA_MSG_F_COPY,
2067 buf, size,
2068 !payload ? key : NULL,
2069 !payload ? strlen(key) : 0,
2070 msgcounterp) == -1)
2071 TEST_FAIL("Failed to produce message %i "
2072 "to partition %i: %s",
2073 msg_id, (int)partition,
2074 rd_kafka_err2str(rd_kafka_last_error()));
2075
2076 (*msgcounterp)++;
2077 tot_bytes += size;
2078
2079 TIMING_RESTART(&t_poll);
2080 do {
2081 if (per_msg_wait) {
2082 wait_time = (int)(per_msg_wait -
2083 TIMING_DURATION(&t_poll)) /
2084 1000;
2085 if (wait_time < 0)
2086 wait_time = 0;
2087 }
2088 rd_kafka_poll(rk, wait_time);
2089 } while (wait_time > 0);
2090
2091 tot_time_poll = TIMING_DURATION(&t_poll);
2092
2093 if (TIMING_EVERY(&t_all, 3*1000000))
2094 TEST_SAY("produced %3d%%: %d/%d messages "
2095 "(%d msgs/s, %d bytes/s)\n",
2096 ((msg_id - msg_base) * 100) / cnt,
2097 msg_id - msg_base, cnt,
2098 (int)((msg_id - msg_base) /
2099 (TIMING_DURATION(&t_all) / 1000000)),
2100 (int)((tot_bytes) /
2101 (TIMING_DURATION(&t_all) / 1000000)));
2102 }
2103
2104 if (!payload)
2105 free(buf);
2106
2107 t_poll.duration = tot_time_poll;
2108 TIMING_STOP(&t_poll);
2109 TIMING_STOP(&t_all);
2110 }
2111
2112 /**
2113 * Waits for the messages tracked by counter \p msgcounterp to be delivered.
2114 */
test_wait_delivery(rd_kafka_t * rk,int * msgcounterp)2115 void test_wait_delivery (rd_kafka_t *rk, int *msgcounterp) {
2116 test_timing_t t_all;
2117 int start_cnt = *msgcounterp;
2118
2119 TIMING_START(&t_all, "PRODUCE.DELIVERY.WAIT");
2120
2121 /* Wait for messages to be delivered */
2122 while (*msgcounterp > 0 && rd_kafka_outq_len(rk) > 0) {
2123 rd_kafka_poll(rk, 10);
2124 if (TIMING_EVERY(&t_all, 3*1000000)) {
2125 int delivered = start_cnt - *msgcounterp;
2126 TEST_SAY("wait_delivery: "
2127 "%d/%d messages delivered: %d msgs/s\n",
2128 delivered, start_cnt,
2129 (int)(delivered /
2130 (TIMING_DURATION(&t_all) / 1000000)));
2131 }
2132 }
2133
2134 TIMING_STOP(&t_all);
2135
2136 TEST_ASSERT(*msgcounterp == 0,
2137 "Not all messages delivered: msgcounter still at %d, "
2138 "outq_len %d",
2139 *msgcounterp, rd_kafka_outq_len(rk));
2140 }
2141
2142 /**
2143 * Produces \p cnt messages and waits for succesful delivery
2144 */
test_produce_msgs(rd_kafka_t * rk,rd_kafka_topic_t * rkt,uint64_t testid,int32_t partition,int msg_base,int cnt,const char * payload,size_t size)2145 void test_produce_msgs (rd_kafka_t *rk, rd_kafka_topic_t *rkt,
2146 uint64_t testid, int32_t partition,
2147 int msg_base, int cnt,
2148 const char *payload, size_t size) {
2149 int remains = 0;
2150
2151 test_produce_msgs_nowait(rk, rkt, testid, partition, msg_base, cnt,
2152 payload, size, 0, &remains);
2153
2154 test_wait_delivery(rk, &remains);
2155 }
2156
2157
2158 /**
2159 * @brief Produces \p cnt messages and waits for succesful delivery
2160 */
test_produce_msgs2(rd_kafka_t * rk,const char * topic,uint64_t testid,int32_t partition,int msg_base,int cnt,const char * payload,size_t size)2161 void test_produce_msgs2 (rd_kafka_t *rk, const char *topic,
2162 uint64_t testid, int32_t partition,
2163 int msg_base, int cnt,
2164 const char *payload, size_t size) {
2165 int remains = 0;
2166 rd_kafka_topic_t *rkt = test_create_topic_object(rk, topic, NULL);
2167
2168 test_produce_msgs_nowait(rk, rkt, testid, partition, msg_base, cnt,
2169 payload, size, 0, &remains);
2170
2171 test_wait_delivery(rk, &remains);
2172
2173 rd_kafka_topic_destroy(rkt);
2174 }
2175
2176 /**
2177 * @brief Produces \p cnt messages without waiting for delivery.
2178 */
test_produce_msgs2_nowait(rd_kafka_t * rk,const char * topic,uint64_t testid,int32_t partition,int msg_base,int cnt,const char * payload,size_t size,int * remainsp)2179 void test_produce_msgs2_nowait (rd_kafka_t *rk, const char *topic,
2180 uint64_t testid, int32_t partition,
2181 int msg_base, int cnt,
2182 const char *payload, size_t size,
2183 int *remainsp) {
2184 rd_kafka_topic_t *rkt = test_create_topic_object(rk, topic, NULL);
2185
2186 test_produce_msgs_nowait(rk, rkt, testid, partition, msg_base, cnt,
2187 payload, size, 0, remainsp);
2188
2189 rd_kafka_topic_destroy(rkt);
2190 }
2191
2192
2193 /**
2194 * Produces \p cnt messages at \p msgs/s, and waits for succesful delivery
2195 */
test_produce_msgs_rate(rd_kafka_t * rk,rd_kafka_topic_t * rkt,uint64_t testid,int32_t partition,int msg_base,int cnt,const char * payload,size_t size,int msgrate)2196 void test_produce_msgs_rate (rd_kafka_t *rk, rd_kafka_topic_t *rkt,
2197 uint64_t testid, int32_t partition,
2198 int msg_base, int cnt,
2199 const char *payload, size_t size, int msgrate) {
2200 int remains = 0;
2201
2202 test_produce_msgs_nowait(rk, rkt, testid, partition, msg_base, cnt,
2203 payload, size, msgrate, &remains);
2204
2205 test_wait_delivery(rk, &remains);
2206 }
2207
2208
2209
2210 /**
2211 * Create producer, produce \p msgcnt messages to \p topic \p partition,
2212 * destroy consumer, and returns the used testid.
2213 */
2214 uint64_t
test_produce_msgs_easy_size(const char * topic,uint64_t testid,int32_t partition,int msgcnt,size_t size)2215 test_produce_msgs_easy_size (const char *topic, uint64_t testid,
2216 int32_t partition, int msgcnt, size_t size) {
2217 rd_kafka_t *rk;
2218 rd_kafka_topic_t *rkt;
2219 test_timing_t t_produce;
2220
2221 if (!testid)
2222 testid = test_id_generate();
2223 rk = test_create_producer();
2224 rkt = test_create_producer_topic(rk, topic, NULL);
2225
2226 TIMING_START(&t_produce, "PRODUCE");
2227 test_produce_msgs(rk, rkt, testid, partition, 0, msgcnt, NULL, size);
2228 TIMING_STOP(&t_produce);
2229 rd_kafka_topic_destroy(rkt);
2230 rd_kafka_destroy(rk);
2231
2232 return testid;
2233 }
2234
test_produce_sync(rd_kafka_t * rk,rd_kafka_topic_t * rkt,uint64_t testid,int32_t partition)2235 rd_kafka_resp_err_t test_produce_sync (rd_kafka_t *rk, rd_kafka_topic_t *rkt,
2236 uint64_t testid, int32_t partition) {
2237 test_curr->produce_sync = 1;
2238 test_produce_msgs(rk, rkt, testid, partition, 0, 1, NULL, 0);
2239 test_curr->produce_sync = 0;
2240 return test_curr->produce_sync_err;
2241 }
2242
2243
2244 /**
2245 * @brief Easy produce function.
2246 *
2247 * @param ... is a NULL-terminated list of key, value config property pairs.
2248 */
test_produce_msgs_easy_v(const char * topic,uint64_t testid,int32_t partition,int msg_base,int cnt,size_t size,...)2249 void test_produce_msgs_easy_v (const char *topic, uint64_t testid,
2250 int32_t partition,
2251 int msg_base, int cnt, size_t size, ...) {
2252 rd_kafka_conf_t *conf;
2253 rd_kafka_t *p;
2254 rd_kafka_topic_t *rkt;
2255 va_list ap;
2256 const char *key, *val;
2257
2258 test_conf_init(&conf, NULL, 0);
2259
2260 va_start(ap, size);
2261 while ((key = va_arg(ap, const char *)) &&
2262 (val = va_arg(ap, const char *)))
2263 test_conf_set(conf, key, val);
2264 va_end(ap);
2265
2266 rd_kafka_conf_set_dr_msg_cb(conf, test_dr_msg_cb);
2267
2268 p = test_create_handle(RD_KAFKA_PRODUCER, conf);
2269
2270 rkt = test_create_producer_topic(p, topic, NULL);
2271
2272 test_produce_msgs(p, rkt, testid, partition, msg_base, cnt, NULL, size);
2273
2274 rd_kafka_topic_destroy(rkt);
2275 rd_kafka_destroy(p);
2276 }
2277
2278
2279 /**
2280 * @brief Produce messages to multiple topic-partitions.
2281 *
2282 * @param ...vararg is a tuple of:
2283 * const char *topic
2284 * int32_t partition (or UA)
2285 * int msg_base
2286 * int msg_cnt
2287 *
2288 * End with a NULL topic
2289 */
test_produce_msgs_easy_multi(uint64_t testid,...)2290 void test_produce_msgs_easy_multi (uint64_t testid, ...) {
2291 rd_kafka_conf_t *conf;
2292 rd_kafka_t *p;
2293 va_list ap;
2294 const char *topic;
2295 int msgcounter = 0;
2296
2297 test_conf_init(&conf, NULL, 0);
2298
2299 rd_kafka_conf_set_dr_msg_cb(conf, test_dr_msg_cb);
2300
2301 p = test_create_handle(RD_KAFKA_PRODUCER, conf);
2302
2303 va_start(ap, testid);
2304 while ((topic = va_arg(ap, const char *))) {
2305 int32_t partition = va_arg(ap, int32_t);
2306 int msg_base = va_arg(ap, int);
2307 int msg_cnt = va_arg(ap, int);
2308 rd_kafka_topic_t *rkt;
2309
2310 rkt = test_create_producer_topic(p, topic, NULL);
2311
2312 test_produce_msgs_nowait(p, rkt, testid, partition,
2313 msg_base, msg_cnt,
2314 NULL, 0, 0, &msgcounter);
2315
2316 rd_kafka_topic_destroy(rkt);
2317 }
2318 va_end(ap);
2319
2320 test_flush(p, tmout_multip(10*1000));
2321
2322 rd_kafka_destroy(p);
2323 }
2324
2325
2326 /**
2327 * @brief A standard rebalance callback.
2328 */
test_rebalance_cb(rd_kafka_t * rk,rd_kafka_resp_err_t err,rd_kafka_topic_partition_list_t * parts,void * opaque)2329 void test_rebalance_cb (rd_kafka_t *rk,
2330 rd_kafka_resp_err_t err,
2331 rd_kafka_topic_partition_list_t *parts,
2332 void *opaque) {
2333
2334 TEST_SAY("%s: Rebalance: %s: %d partition(s)\n",
2335 rd_kafka_name(rk), rd_kafka_err2name(err), parts->cnt);
2336
2337 switch (err)
2338 {
2339 case RD_KAFKA_RESP_ERR__ASSIGN_PARTITIONS:
2340 test_consumer_assign("assign", rk, parts);
2341 break;
2342 case RD_KAFKA_RESP_ERR__REVOKE_PARTITIONS:
2343 test_consumer_unassign("unassign", rk);
2344 break;
2345 default:
2346 TEST_FAIL("Unknown rebalance event: %s",
2347 rd_kafka_err2name(err));
2348 break;
2349 }
2350 }
2351
2352
2353
test_create_consumer(const char * group_id,void (* rebalance_cb)(rd_kafka_t * rk,rd_kafka_resp_err_t err,rd_kafka_topic_partition_list_t * partitions,void * opaque),rd_kafka_conf_t * conf,rd_kafka_topic_conf_t * default_topic_conf)2354 rd_kafka_t *test_create_consumer (const char *group_id,
2355 void (*rebalance_cb) (
2356 rd_kafka_t *rk,
2357 rd_kafka_resp_err_t err,
2358 rd_kafka_topic_partition_list_t
2359 *partitions,
2360 void *opaque),
2361 rd_kafka_conf_t *conf,
2362 rd_kafka_topic_conf_t *default_topic_conf) {
2363 rd_kafka_t *rk;
2364 char tmp[64];
2365
2366 if (!conf)
2367 test_conf_init(&conf, NULL, 0);
2368
2369 if (group_id) {
2370 test_conf_set(conf, "group.id", group_id);
2371
2372 rd_snprintf(tmp, sizeof(tmp), "%d", test_session_timeout_ms);
2373 test_conf_set(conf, "session.timeout.ms", tmp);
2374
2375 if (rebalance_cb)
2376 rd_kafka_conf_set_rebalance_cb(conf, rebalance_cb);
2377 } else {
2378 TEST_ASSERT(!rebalance_cb);
2379 }
2380
2381 if (default_topic_conf)
2382 rd_kafka_conf_set_default_topic_conf(conf, default_topic_conf);
2383
2384 /* Create kafka instance */
2385 rk = test_create_handle(RD_KAFKA_CONSUMER, conf);
2386
2387 if (group_id)
2388 rd_kafka_poll_set_consumer(rk);
2389
2390 return rk;
2391 }
2392
test_create_consumer_topic(rd_kafka_t * rk,const char * topic)2393 rd_kafka_topic_t *test_create_consumer_topic (rd_kafka_t *rk,
2394 const char *topic) {
2395 rd_kafka_topic_t *rkt;
2396 rd_kafka_topic_conf_t *topic_conf;
2397
2398 test_conf_init(NULL, &topic_conf, 0);
2399
2400 rkt = rd_kafka_topic_new(rk, topic, topic_conf);
2401 if (!rkt)
2402 TEST_FAIL("Failed to create topic: %s\n",
2403 rd_kafka_err2str(rd_kafka_last_error()));
2404
2405 return rkt;
2406 }
2407
2408
test_consumer_start(const char * what,rd_kafka_topic_t * rkt,int32_t partition,int64_t start_offset)2409 void test_consumer_start (const char *what,
2410 rd_kafka_topic_t *rkt, int32_t partition,
2411 int64_t start_offset) {
2412
2413 TEST_SAY("%s: consumer_start: %s [%"PRId32"] at offset %"PRId64"\n",
2414 what, rd_kafka_topic_name(rkt), partition, start_offset);
2415
2416 if (rd_kafka_consume_start(rkt, partition, start_offset) == -1)
2417 TEST_FAIL("%s: consume_start failed: %s\n",
2418 what, rd_kafka_err2str(rd_kafka_last_error()));
2419 }
2420
test_consumer_stop(const char * what,rd_kafka_topic_t * rkt,int32_t partition)2421 void test_consumer_stop (const char *what,
2422 rd_kafka_topic_t *rkt, int32_t partition) {
2423
2424 TEST_SAY("%s: consumer_stop: %s [%"PRId32"]\n",
2425 what, rd_kafka_topic_name(rkt), partition);
2426
2427 if (rd_kafka_consume_stop(rkt, partition) == -1)
2428 TEST_FAIL("%s: consume_stop failed: %s\n",
2429 what, rd_kafka_err2str(rd_kafka_last_error()));
2430 }
2431
test_consumer_seek(const char * what,rd_kafka_topic_t * rkt,int32_t partition,int64_t offset)2432 void test_consumer_seek (const char *what, rd_kafka_topic_t *rkt,
2433 int32_t partition, int64_t offset) {
2434 int err;
2435
2436 TEST_SAY("%s: consumer_seek: %s [%"PRId32"] to offset %"PRId64"\n",
2437 what, rd_kafka_topic_name(rkt), partition, offset);
2438
2439 if ((err = rd_kafka_seek(rkt, partition, offset, 2000)))
2440 TEST_FAIL("%s: consume_seek(%s, %"PRId32", %"PRId64") "
2441 "failed: %s\n",
2442 what,
2443 rd_kafka_topic_name(rkt), partition, offset,
2444 rd_kafka_err2str(err));
2445 }
2446
2447
2448
2449 /**
2450 * Returns offset of the last message consumed
2451 */
test_consume_msgs(const char * what,rd_kafka_topic_t * rkt,uint64_t testid,int32_t partition,int64_t offset,int exp_msg_base,int exp_cnt,int parse_fmt)2452 int64_t test_consume_msgs (const char *what, rd_kafka_topic_t *rkt,
2453 uint64_t testid, int32_t partition, int64_t offset,
2454 int exp_msg_base, int exp_cnt, int parse_fmt) {
2455 int cnt = 0;
2456 int msg_next = exp_msg_base;
2457 int fails = 0;
2458 int64_t offset_last = -1;
2459 int64_t tot_bytes = 0;
2460 test_timing_t t_first, t_all;
2461
2462 TEST_SAY("%s: consume_msgs: %s [%"PRId32"]: expect msg #%d..%d "
2463 "at offset %"PRId64"\n",
2464 what, rd_kafka_topic_name(rkt), partition,
2465 exp_msg_base, exp_msg_base+exp_cnt, offset);
2466
2467 if (offset != TEST_NO_SEEK) {
2468 rd_kafka_resp_err_t err;
2469 test_timing_t t_seek;
2470
2471 TIMING_START(&t_seek, "SEEK");
2472 if ((err = rd_kafka_seek(rkt, partition, offset, 5000)))
2473 TEST_FAIL("%s: consume_msgs: %s [%"PRId32"]: "
2474 "seek to %"PRId64" failed: %s\n",
2475 what, rd_kafka_topic_name(rkt), partition,
2476 offset, rd_kafka_err2str(err));
2477 TIMING_STOP(&t_seek);
2478 TEST_SAY("%s: seeked to offset %"PRId64"\n", what, offset);
2479 }
2480
2481 TIMING_START(&t_first, "FIRST MSG");
2482 TIMING_START(&t_all, "ALL MSGS");
2483
2484 while (cnt < exp_cnt) {
2485 rd_kafka_message_t *rkmessage;
2486 int msg_id;
2487
2488 rkmessage = rd_kafka_consume(rkt, partition,
2489 tmout_multip(5000));
2490
2491 if (TIMING_EVERY(&t_all, 3*1000000))
2492 TEST_SAY("%s: "
2493 "consumed %3d%%: %d/%d messages "
2494 "(%d msgs/s, %d bytes/s)\n",
2495 what, cnt * 100 / exp_cnt, cnt, exp_cnt,
2496 (int)(cnt /
2497 (TIMING_DURATION(&t_all) / 1000000)),
2498 (int)(tot_bytes /
2499 (TIMING_DURATION(&t_all) / 1000000)));
2500
2501 if (!rkmessage)
2502 TEST_FAIL("%s: consume_msgs: %s [%"PRId32"]: "
2503 "expected msg #%d (%d/%d): timed out\n",
2504 what, rd_kafka_topic_name(rkt), partition,
2505 msg_next, cnt, exp_cnt);
2506
2507 if (rkmessage->err)
2508 TEST_FAIL("%s: consume_msgs: %s [%"PRId32"]: "
2509 "expected msg #%d (%d/%d): got error: %s\n",
2510 what, rd_kafka_topic_name(rkt), partition,
2511 msg_next, cnt, exp_cnt,
2512 rd_kafka_err2str(rkmessage->err));
2513
2514 if (cnt == 0)
2515 TIMING_STOP(&t_first);
2516
2517 if (parse_fmt)
2518 test_msg_parse(testid, rkmessage, partition, &msg_id);
2519 else
2520 msg_id = 0;
2521
2522 if (test_level >= 3)
2523 TEST_SAY("%s: consume_msgs: %s [%"PRId32"]: "
2524 "got msg #%d at offset %"PRId64
2525 " (expect #%d at offset %"PRId64")\n",
2526 what, rd_kafka_topic_name(rkt), partition,
2527 msg_id, rkmessage->offset,
2528 msg_next,
2529 offset >= 0 ? offset + cnt : -1);
2530
2531 if (parse_fmt && msg_id != msg_next) {
2532 TEST_SAY("%s: consume_msgs: %s [%"PRId32"]: "
2533 "expected msg #%d (%d/%d): got msg #%d\n",
2534 what, rd_kafka_topic_name(rkt), partition,
2535 msg_next, cnt, exp_cnt, msg_id);
2536 fails++;
2537 }
2538
2539 cnt++;
2540 tot_bytes += rkmessage->len;
2541 msg_next++;
2542 offset_last = rkmessage->offset;
2543
2544 rd_kafka_message_destroy(rkmessage);
2545 }
2546
2547 TIMING_STOP(&t_all);
2548
2549 if (fails)
2550 TEST_FAIL("%s: consume_msgs: %s [%"PRId32"]: %d failures\n",
2551 what, rd_kafka_topic_name(rkt), partition, fails);
2552
2553 TEST_SAY("%s: consume_msgs: %s [%"PRId32"]: "
2554 "%d/%d messages consumed succesfully\n",
2555 what, rd_kafka_topic_name(rkt), partition,
2556 cnt, exp_cnt);
2557 return offset_last;
2558 }
2559
2560
2561 /**
2562 * Create high-level consumer subscribing to \p topic from BEGINNING
2563 * and expects \d exp_msgcnt with matching \p testid
2564 * Destroys consumer when done.
2565 *
2566 * @param txn If true, isolation.level is set to read_committed.
2567 * @param partition If -1 the topic will be subscribed to, otherwise the
2568 * single partition will be assigned immediately.
2569 *
2570 * If \p group_id is NULL a new unique group is generated
2571 */
2572 void
test_consume_msgs_easy_mv0(const char * group_id,const char * topic,rd_bool_t txn,int32_t partition,uint64_t testid,int exp_eofcnt,int exp_msgcnt,rd_kafka_topic_conf_t * tconf,test_msgver_t * mv)2573 test_consume_msgs_easy_mv0 (const char *group_id, const char *topic,
2574 rd_bool_t txn,
2575 int32_t partition,
2576 uint64_t testid, int exp_eofcnt, int exp_msgcnt,
2577 rd_kafka_topic_conf_t *tconf,
2578 test_msgver_t *mv) {
2579 rd_kafka_t *rk;
2580 char grpid0[64];
2581 rd_kafka_conf_t *conf;
2582
2583 test_conf_init(&conf, tconf ? NULL : &tconf, 0);
2584
2585 if (!group_id)
2586 group_id = test_str_id_generate(grpid0, sizeof(grpid0));
2587
2588 if (txn)
2589 test_conf_set(conf, "isolation.level", "read_committed");
2590
2591 test_topic_conf_set(tconf, "auto.offset.reset", "smallest");
2592 if (exp_eofcnt != -1)
2593 test_conf_set(conf, "enable.partition.eof", "true");
2594 rk = test_create_consumer(group_id, NULL, conf, tconf);
2595
2596 rd_kafka_poll_set_consumer(rk);
2597
2598 if (partition == -1) {
2599 TEST_SAY("Subscribing to topic %s in group %s "
2600 "(expecting %d msgs with testid %"PRIu64")\n",
2601 topic, group_id, exp_msgcnt, testid);
2602
2603 test_consumer_subscribe(rk, topic);
2604 } else {
2605 rd_kafka_topic_partition_list_t *plist;
2606
2607 TEST_SAY("Assign topic %s [%"PRId32"] in group %s "
2608 "(expecting %d msgs with testid %"PRIu64")\n",
2609 topic, partition, group_id, exp_msgcnt, testid);
2610
2611 plist = rd_kafka_topic_partition_list_new(1);
2612 rd_kafka_topic_partition_list_add(plist, topic, partition);
2613 test_consumer_assign("consume_easy_mv", rk, plist);
2614 rd_kafka_topic_partition_list_destroy(plist);
2615 }
2616
2617 /* Consume messages */
2618 test_consumer_poll("consume.easy", rk, testid, exp_eofcnt,
2619 -1, exp_msgcnt, mv);
2620
2621 test_consumer_close(rk);
2622
2623 rd_kafka_destroy(rk);
2624 }
2625
2626 void
test_consume_msgs_easy(const char * group_id,const char * topic,uint64_t testid,int exp_eofcnt,int exp_msgcnt,rd_kafka_topic_conf_t * tconf)2627 test_consume_msgs_easy (const char *group_id, const char *topic,
2628 uint64_t testid, int exp_eofcnt, int exp_msgcnt,
2629 rd_kafka_topic_conf_t *tconf) {
2630 test_msgver_t mv;
2631
2632 test_msgver_init(&mv, testid);
2633
2634 test_consume_msgs_easy_mv(group_id, topic, -1, testid, exp_eofcnt,
2635 exp_msgcnt, tconf, &mv);
2636
2637 test_msgver_clear(&mv);
2638 }
2639
2640
2641 void
test_consume_txn_msgs_easy(const char * group_id,const char * topic,uint64_t testid,int exp_eofcnt,int exp_msgcnt,rd_kafka_topic_conf_t * tconf)2642 test_consume_txn_msgs_easy (const char *group_id, const char *topic,
2643 uint64_t testid, int exp_eofcnt, int exp_msgcnt,
2644 rd_kafka_topic_conf_t *tconf) {
2645 test_msgver_t mv;
2646
2647 test_msgver_init(&mv, testid);
2648
2649 test_consume_msgs_easy_mv0(group_id, topic, rd_true/*txn*/,
2650 -1, testid, exp_eofcnt,
2651 exp_msgcnt, tconf, &mv);
2652
2653 test_msgver_clear(&mv);
2654 }
2655
2656
2657 /**
2658 * @brief Waits for up to \p timeout_ms for consumer to receive assignment.
2659 * If no assignment received without the timeout the test fails.
2660 *
2661 * @warning This method will poll the consumer and might thus read messages.
2662 * Set \p do_poll to false to use a sleep rather than poll.
2663 */
test_consumer_wait_assignment(rd_kafka_t * rk,rd_bool_t do_poll)2664 void test_consumer_wait_assignment (rd_kafka_t *rk, rd_bool_t do_poll) {
2665 rd_kafka_topic_partition_list_t *assignment = NULL;
2666 int i;
2667
2668 while (1) {
2669 rd_kafka_resp_err_t err;
2670
2671 err = rd_kafka_assignment(rk, &assignment);
2672 TEST_ASSERT(!err, "rd_kafka_assignment() failed: %s",
2673 rd_kafka_err2str(err));
2674
2675 if (assignment->cnt > 0)
2676 break;
2677
2678 rd_kafka_topic_partition_list_destroy(assignment);
2679
2680 if (do_poll)
2681 test_consumer_poll_once(rk, NULL, 1000);
2682 else
2683 rd_usleep(1000*1000, NULL);
2684 }
2685
2686 TEST_SAY("%s: Assignment (%d partition(s)): ",
2687 rd_kafka_name(rk), assignment->cnt);
2688 for (i = 0 ; i < assignment->cnt ; i++)
2689 TEST_SAY0("%s%s[%"PRId32"]",
2690 i == 0 ? "" : ", ",
2691 assignment->elems[i].topic,
2692 assignment->elems[i].partition);
2693 TEST_SAY0("\n");
2694
2695 rd_kafka_topic_partition_list_destroy(assignment);
2696 }
2697
2698
2699 /**
2700 * @brief Verify that the consumer's assignment matches the expected assignment.
2701 *
2702 * The va-list is a NULL-terminated list of (const char *topic, int partition)
2703 * tuples.
2704 *
2705 * Fails the test on mismatch, unless \p fail_immediately is false.
2706 */
test_consumer_verify_assignment0(const char * func,int line,rd_kafka_t * rk,int fail_immediately,...)2707 void test_consumer_verify_assignment0 (const char *func, int line,
2708 rd_kafka_t *rk,
2709 int fail_immediately, ...) {
2710 va_list ap;
2711 int cnt = 0;
2712 const char *topic;
2713 rd_kafka_topic_partition_list_t *assignment;
2714 rd_kafka_resp_err_t err;
2715 int i;
2716
2717 if ((err = rd_kafka_assignment(rk, &assignment)))
2718 TEST_FAIL("%s:%d: Failed to get assignment for %s: %s",
2719 func, line, rd_kafka_name(rk), rd_kafka_err2str(err));
2720
2721 TEST_SAY("%s assignment (%d partition(s)):\n", rd_kafka_name(rk),
2722 assignment->cnt);
2723 for (i = 0 ; i < assignment->cnt ; i++)
2724 TEST_SAY(" %s [%"PRId32"]\n",
2725 assignment->elems[i].topic,
2726 assignment->elems[i].partition);
2727
2728 va_start(ap, fail_immediately);
2729 while ((topic = va_arg(ap, const char *))) {
2730 int partition = va_arg(ap, int);
2731 cnt++;
2732
2733 if (!rd_kafka_topic_partition_list_find(assignment,
2734 topic, partition))
2735 TEST_FAIL_LATER(
2736 "%s:%d: Expected %s [%d] not found in %s's "
2737 "assignment (%d partition(s))",
2738 func, line,
2739 topic, partition, rd_kafka_name(rk),
2740 assignment->cnt);
2741 }
2742 va_end(ap);
2743
2744 if (cnt != assignment->cnt)
2745 TEST_FAIL_LATER(
2746 "%s:%d: "
2747 "Expected %d assigned partition(s) for %s, not %d",
2748 func, line, cnt, rd_kafka_name(rk), assignment->cnt);
2749
2750 if (fail_immediately)
2751 TEST_LATER_CHECK();
2752
2753 rd_kafka_topic_partition_list_destroy(assignment);
2754 }
2755
2756
2757
2758
2759
2760 /**
2761 * @brief Start subscribing for 'topic'
2762 */
test_consumer_subscribe(rd_kafka_t * rk,const char * topic)2763 void test_consumer_subscribe (rd_kafka_t *rk, const char *topic) {
2764 rd_kafka_topic_partition_list_t *topics;
2765 rd_kafka_resp_err_t err;
2766
2767 topics = rd_kafka_topic_partition_list_new(1);
2768 rd_kafka_topic_partition_list_add(topics, topic,
2769 RD_KAFKA_PARTITION_UA);
2770
2771 err = rd_kafka_subscribe(rk, topics);
2772 if (err)
2773 TEST_FAIL("%s: Failed to subscribe to %s: %s\n",
2774 rd_kafka_name(rk), topic, rd_kafka_err2str(err));
2775
2776 rd_kafka_topic_partition_list_destroy(topics);
2777 }
2778
2779
test_consumer_assign(const char * what,rd_kafka_t * rk,rd_kafka_topic_partition_list_t * partitions)2780 void test_consumer_assign (const char *what, rd_kafka_t *rk,
2781 rd_kafka_topic_partition_list_t *partitions) {
2782 rd_kafka_resp_err_t err;
2783 test_timing_t timing;
2784
2785 TIMING_START(&timing, "ASSIGN.PARTITIONS");
2786 err = rd_kafka_assign(rk, partitions);
2787 TIMING_STOP(&timing);
2788 if (err)
2789 TEST_FAIL("%s: failed to assign %d partition(s): %s\n",
2790 what, partitions->cnt, rd_kafka_err2str(err));
2791 else
2792 TEST_SAY("%s: assigned %d partition(s)\n",
2793 what, partitions->cnt);
2794 }
2795
2796
test_consumer_incremental_assign(const char * what,rd_kafka_t * rk,rd_kafka_topic_partition_list_t * partitions)2797 void test_consumer_incremental_assign (const char *what, rd_kafka_t *rk,
2798 rd_kafka_topic_partition_list_t
2799 *partitions) {
2800 rd_kafka_error_t *error;
2801 test_timing_t timing;
2802
2803 TIMING_START(&timing, "INCREMENTAL.ASSIGN.PARTITIONS");
2804 error = rd_kafka_incremental_assign(rk, partitions);
2805 TIMING_STOP(&timing);
2806 if (error) {
2807 TEST_FAIL("%s: incremental assign of %d partition(s) failed: "
2808 "%s", what, partitions->cnt,
2809 rd_kafka_error_string(error));
2810 rd_kafka_error_destroy(error);
2811 } else
2812 TEST_SAY("%s: incremental assign of %d partition(s) done\n",
2813 what, partitions->cnt);
2814 }
2815
2816
test_consumer_unassign(const char * what,rd_kafka_t * rk)2817 void test_consumer_unassign (const char *what, rd_kafka_t *rk) {
2818 rd_kafka_resp_err_t err;
2819 test_timing_t timing;
2820
2821 TIMING_START(&timing, "UNASSIGN.PARTITIONS");
2822 err = rd_kafka_assign(rk, NULL);
2823 TIMING_STOP(&timing);
2824 if (err)
2825 TEST_FAIL("%s: failed to unassign current partitions: %s\n",
2826 what, rd_kafka_err2str(err));
2827 else
2828 TEST_SAY("%s: unassigned current partitions\n", what);
2829 }
2830
2831
test_consumer_incremental_unassign(const char * what,rd_kafka_t * rk,rd_kafka_topic_partition_list_t * partitions)2832 void test_consumer_incremental_unassign (const char *what, rd_kafka_t *rk,
2833 rd_kafka_topic_partition_list_t
2834 *partitions) {
2835 rd_kafka_error_t *error;
2836 test_timing_t timing;
2837
2838 TIMING_START(&timing, "INCREMENTAL.UNASSIGN.PARTITIONS");
2839 error = rd_kafka_incremental_unassign(rk, partitions);
2840 TIMING_STOP(&timing);
2841 if (error) {
2842 TEST_FAIL("%s: incremental unassign of %d partition(s) "
2843 "failed: %s", what, partitions->cnt,
2844 rd_kafka_error_string(error));
2845 rd_kafka_error_destroy(error);
2846 } else
2847 TEST_SAY("%s: incremental unassign of %d partition(s) done\n",
2848 what, partitions->cnt);
2849 }
2850
2851
2852 /**
2853 * @brief Assign a single partition with an optional starting offset
2854 */
test_consumer_assign_partition(const char * what,rd_kafka_t * rk,const char * topic,int32_t partition,int64_t offset)2855 void test_consumer_assign_partition (const char *what, rd_kafka_t *rk,
2856 const char *topic, int32_t partition,
2857 int64_t offset) {
2858 rd_kafka_topic_partition_list_t *part;
2859
2860 part = rd_kafka_topic_partition_list_new(1);
2861 rd_kafka_topic_partition_list_add(part, topic, partition)->offset =
2862 offset;
2863
2864 test_consumer_assign(what, rk, part);
2865
2866 rd_kafka_topic_partition_list_destroy(part);
2867 }
2868
2869
test_consumer_pause_resume_partition(rd_kafka_t * rk,const char * topic,int32_t partition,rd_bool_t pause)2870 void test_consumer_pause_resume_partition (rd_kafka_t *rk,
2871 const char *topic, int32_t partition,
2872 rd_bool_t pause) {
2873 rd_kafka_topic_partition_list_t *part;
2874 rd_kafka_resp_err_t err;
2875
2876 part = rd_kafka_topic_partition_list_new(1);
2877 rd_kafka_topic_partition_list_add(part, topic, partition);
2878
2879 if (pause)
2880 err = rd_kafka_pause_partitions(rk, part);
2881 else
2882 err = rd_kafka_resume_partitions(rk, part);
2883
2884 TEST_ASSERT(!err, "Failed to %s %s [%"PRId32"]: %s",
2885 pause ? "pause":"resume",
2886 topic, partition,
2887 rd_kafka_err2str(err));
2888
2889 rd_kafka_topic_partition_list_destroy(part);
2890 }
2891
2892
2893 /**
2894 * Message verification services
2895 *
2896 */
2897
test_msgver_init(test_msgver_t * mv,uint64_t testid)2898 void test_msgver_init (test_msgver_t *mv, uint64_t testid) {
2899 memset(mv, 0, sizeof(*mv));
2900 mv->testid = testid;
2901 /* Max warning logs before suppressing. */
2902 mv->log_max = (test_level + 1) * 100;
2903 }
2904
test_msgver_ignore_eof(test_msgver_t * mv)2905 void test_msgver_ignore_eof (test_msgver_t *mv) {
2906 mv->ignore_eof = rd_true;
2907 }
2908
2909 #define TEST_MV_WARN(mv,...) do { \
2910 if ((mv)->log_cnt++ > (mv)->log_max) \
2911 (mv)->log_suppr_cnt++; \
2912 else \
2913 TEST_WARN(__VA_ARGS__); \
2914 } while (0)
2915
2916
2917
test_mv_mvec_grow(struct test_mv_mvec * mvec,int tot_size)2918 static void test_mv_mvec_grow (struct test_mv_mvec *mvec, int tot_size) {
2919 if (tot_size <= mvec->size)
2920 return;
2921 mvec->size = tot_size;
2922 mvec->m = realloc(mvec->m, sizeof(*mvec->m) * mvec->size);
2923 }
2924
2925 /**
2926 * Make sure there is room for at least \p cnt messages, else grow mvec.
2927 */
test_mv_mvec_reserve(struct test_mv_mvec * mvec,int cnt)2928 static void test_mv_mvec_reserve (struct test_mv_mvec *mvec, int cnt) {
2929 test_mv_mvec_grow(mvec, mvec->cnt + cnt);
2930 }
2931
test_mv_mvec_init(struct test_mv_mvec * mvec,int exp_cnt)2932 void test_mv_mvec_init (struct test_mv_mvec *mvec, int exp_cnt) {
2933 TEST_ASSERT(mvec->m == NULL, "mvec not cleared");
2934
2935 if (!exp_cnt)
2936 return;
2937
2938 test_mv_mvec_grow(mvec, exp_cnt);
2939 }
2940
2941
test_mv_mvec_clear(struct test_mv_mvec * mvec)2942 void test_mv_mvec_clear (struct test_mv_mvec *mvec) {
2943 if (mvec->m)
2944 free(mvec->m);
2945 }
2946
test_msgver_clear(test_msgver_t * mv)2947 void test_msgver_clear (test_msgver_t *mv) {
2948 int i;
2949 for (i = 0 ; i < mv->p_cnt ; i++) {
2950 struct test_mv_p *p = mv->p[i];
2951 free(p->topic);
2952 test_mv_mvec_clear(&p->mvec);
2953 free(p);
2954 }
2955
2956 free(mv->p);
2957
2958 test_msgver_init(mv, mv->testid);
2959 }
2960
test_msgver_p_get(test_msgver_t * mv,const char * topic,int32_t partition,int do_create)2961 struct test_mv_p *test_msgver_p_get (test_msgver_t *mv, const char *topic,
2962 int32_t partition, int do_create) {
2963 int i;
2964 struct test_mv_p *p;
2965
2966 for (i = 0 ; i < mv->p_cnt ; i++) {
2967 p = mv->p[i];
2968 if (p->partition == partition && !strcmp(p->topic, topic))
2969 return p;
2970 }
2971
2972 if (!do_create)
2973 TEST_FAIL("Topic %s [%d] not found in msgver", topic, partition);
2974
2975 if (mv->p_cnt == mv->p_size) {
2976 mv->p_size = (mv->p_size + 4) * 2;
2977 mv->p = realloc(mv->p, sizeof(*mv->p) * mv->p_size);
2978 }
2979
2980 mv->p[mv->p_cnt++] = p = calloc(1, sizeof(*p));
2981
2982 p->topic = rd_strdup(topic);
2983 p->partition = partition;
2984 p->eof_offset = RD_KAFKA_OFFSET_INVALID;
2985
2986 return p;
2987 }
2988
2989
2990 /**
2991 * Add (room for) message to message vector.
2992 * Resizes the vector as needed.
2993 */
test_mv_mvec_add(struct test_mv_mvec * mvec)2994 static struct test_mv_m *test_mv_mvec_add (struct test_mv_mvec *mvec) {
2995 if (mvec->cnt == mvec->size) {
2996 test_mv_mvec_grow(mvec, (mvec->size ? mvec->size * 2 : 10000));
2997 }
2998
2999 mvec->cnt++;
3000
3001 return &mvec->m[mvec->cnt-1];
3002 }
3003
3004 /**
3005 * Returns message at index \p mi
3006 */
test_mv_mvec_get(struct test_mv_mvec * mvec,int mi)3007 static RD_INLINE struct test_mv_m *test_mv_mvec_get (struct test_mv_mvec *mvec,
3008 int mi) {
3009 if (mi >= mvec->cnt)
3010 return NULL;
3011 return &mvec->m[mi];
3012 }
3013
3014 /**
3015 * @returns the message with msgid \p msgid, or NULL.
3016 */
test_mv_mvec_find_by_msgid(struct test_mv_mvec * mvec,int msgid)3017 static struct test_mv_m *test_mv_mvec_find_by_msgid (struct test_mv_mvec *mvec,
3018 int msgid) {
3019 int mi;
3020
3021 for (mi = 0 ; mi < mvec->cnt ; mi++)
3022 if (mvec->m[mi].msgid == msgid)
3023 return &mvec->m[mi];
3024
3025 return NULL;
3026 }
3027
3028
3029 /**
3030 * Print message list to \p fp
3031 */
3032 static RD_UNUSED
test_mv_mvec_dump(FILE * fp,const struct test_mv_mvec * mvec)3033 void test_mv_mvec_dump (FILE *fp, const struct test_mv_mvec *mvec) {
3034 int mi;
3035
3036 fprintf(fp, "*** Dump mvec with %d messages (capacity %d): ***\n",
3037 mvec->cnt, mvec->size);
3038 for (mi = 0 ; mi < mvec->cnt ; mi++)
3039 fprintf(fp, " msgid %d, offset %"PRId64"\n",
3040 mvec->m[mi].msgid, mvec->m[mi].offset);
3041 fprintf(fp, "*** Done ***\n");
3042
3043 }
3044
test_mv_mvec_sort(struct test_mv_mvec * mvec,int (* cmp)(const void *,const void *))3045 static void test_mv_mvec_sort (struct test_mv_mvec *mvec,
3046 int (*cmp) (const void *, const void *)) {
3047 qsort(mvec->m, mvec->cnt, sizeof(*mvec->m), cmp);
3048 }
3049
3050
3051 /**
3052 * @brief Adds a message to the msgver service.
3053 *
3054 * @returns 1 if message is from the expected testid, else 0 (not added)
3055 */
test_msgver_add_msg00(const char * func,int line,const char * clientname,test_msgver_t * mv,uint64_t testid,const char * topic,int32_t partition,int64_t offset,int64_t timestamp,int32_t broker_id,rd_kafka_resp_err_t err,int msgnum)3056 int test_msgver_add_msg00 (const char *func, int line, const char *clientname,
3057 test_msgver_t *mv,
3058 uint64_t testid,
3059 const char *topic, int32_t partition,
3060 int64_t offset, int64_t timestamp, int32_t broker_id,
3061 rd_kafka_resp_err_t err, int msgnum) {
3062 struct test_mv_p *p;
3063 struct test_mv_m *m;
3064
3065 if (testid != mv->testid) {
3066 TEST_SAYL(3, "%s:%d: %s: mismatching testid %"PRIu64
3067 " != %"PRIu64"\n",
3068 func, line, clientname, testid, mv->testid);
3069 return 0; /* Ignore message */
3070 }
3071
3072 if (err == RD_KAFKA_RESP_ERR__PARTITION_EOF && mv->ignore_eof) {
3073 TEST_SAYL(3, "%s:%d: %s: ignoring EOF for %s [%"PRId32"]\n",
3074 func, line, clientname, topic, partition);
3075 return 0; /* Ignore message */
3076 }
3077
3078 p = test_msgver_p_get(mv, topic, partition, 1);
3079
3080 if (err == RD_KAFKA_RESP_ERR__PARTITION_EOF) {
3081 p->eof_offset = offset;
3082 return 1;
3083 }
3084
3085 m = test_mv_mvec_add(&p->mvec);
3086
3087 m->offset = offset;
3088 m->msgid = msgnum;
3089 m->timestamp = timestamp;
3090 m->broker_id = broker_id;
3091
3092 if (test_level > 2) {
3093 TEST_SAY("%s:%d: %s: "
3094 "Recv msg %s [%"PRId32"] offset %"PRId64" msgid %d "
3095 "timestamp %"PRId64" broker %"PRId32"\n",
3096 func, line, clientname,
3097 p->topic, p->partition, m->offset, m->msgid,
3098 m->timestamp, m->broker_id);
3099 }
3100
3101 mv->msgcnt++;
3102
3103 return 1;
3104 }
3105
3106 /**
3107 * Adds a message to the msgver service.
3108 *
3109 * Message must be a proper message or PARTITION_EOF.
3110 *
3111 * @param override_topic if non-NULL, overrides the rkmessage's topic
3112 * with this one.
3113 *
3114 * @returns 1 if message is from the expected testid, else 0 (not added).
3115 */
test_msgver_add_msg0(const char * func,int line,const char * clientname,test_msgver_t * mv,const rd_kafka_message_t * rkmessage,const char * override_topic)3116 int test_msgver_add_msg0 (const char *func, int line, const char *clientname,
3117 test_msgver_t *mv,
3118 const rd_kafka_message_t *rkmessage,
3119 const char *override_topic) {
3120 uint64_t in_testid;
3121 int in_part;
3122 int in_msgnum = -1;
3123 char buf[128];
3124 const void *val;
3125 size_t valsize;
3126
3127 if (mv->fwd)
3128 test_msgver_add_msg0(func, line, clientname,
3129 mv->fwd, rkmessage, override_topic);
3130
3131 if (rd_kafka_message_status(rkmessage) ==
3132 RD_KAFKA_MSG_STATUS_NOT_PERSISTED && rkmessage->err) {
3133 if (rkmessage->err != RD_KAFKA_RESP_ERR__PARTITION_EOF)
3134 return 0; /* Ignore error */
3135
3136 in_testid = mv->testid;
3137
3138 } else {
3139
3140 if (!mv->msgid_hdr) {
3141 rd_snprintf(buf, sizeof(buf), "%.*s",
3142 (int)rkmessage->len,
3143 (char *)rkmessage->payload);
3144 val = buf;
3145 } else {
3146 /* msgid is in message header */
3147 rd_kafka_headers_t *hdrs;
3148
3149 if (rd_kafka_message_headers(rkmessage, &hdrs) ||
3150 rd_kafka_header_get_last(hdrs, mv->msgid_hdr,
3151 &val, &valsize)) {
3152 TEST_SAYL(3,
3153 "%s:%d: msgid expected in header %s "
3154 "but %s exists for "
3155 "message at offset %"PRId64
3156 " has no headers\n",
3157 func, line, mv->msgid_hdr,
3158 hdrs ? "no such header" : "no headers",
3159 rkmessage->offset);
3160
3161 return 0;
3162 }
3163 }
3164
3165 if (sscanf(val, "testid=%"SCNu64", partition=%i, msg=%i\n",
3166 &in_testid, &in_part, &in_msgnum) != 3)
3167 TEST_FAIL("%s:%d: Incorrect format at offset %"PRId64
3168 ": %s",
3169 func, line, rkmessage->offset,
3170 (const char *)val);
3171 }
3172
3173 return test_msgver_add_msg00(func, line, clientname, mv, in_testid,
3174 override_topic ?
3175 override_topic :
3176 rd_kafka_topic_name(rkmessage->rkt),
3177 rkmessage->partition,
3178 rkmessage->offset,
3179 rd_kafka_message_timestamp(rkmessage, NULL),
3180 rd_kafka_message_broker_id(rkmessage),
3181 rkmessage->err,
3182 in_msgnum);
3183 return 1;
3184 }
3185
3186
3187
3188 /**
3189 * Verify that all messages were received in order.
3190 *
3191 * - Offsets need to occur without gaps
3192 * - msgids need to be increasing: but may have gaps, e.g., using partitioner)
3193 */
test_mv_mvec_verify_order(test_msgver_t * mv,int flags,struct test_mv_p * p,struct test_mv_mvec * mvec,struct test_mv_vs * vs)3194 static int test_mv_mvec_verify_order (test_msgver_t *mv, int flags,
3195 struct test_mv_p *p,
3196 struct test_mv_mvec *mvec,
3197 struct test_mv_vs *vs) {
3198 int mi;
3199 int fails = 0;
3200
3201 for (mi = 1/*skip first*/ ; mi < mvec->cnt ; mi++) {
3202 struct test_mv_m *prev = test_mv_mvec_get(mvec, mi-1);
3203 struct test_mv_m *this = test_mv_mvec_get(mvec, mi);
3204
3205 if (((flags & TEST_MSGVER_BY_OFFSET) &&
3206 prev->offset + 1 != this->offset) ||
3207 ((flags & TEST_MSGVER_BY_MSGID) &&
3208 prev->msgid > this->msgid)) {
3209 TEST_MV_WARN(
3210 mv,
3211 " %s [%"PRId32"] msg rcvidx #%d/%d: "
3212 "out of order (prev vs this): "
3213 "offset %"PRId64" vs %"PRId64", "
3214 "msgid %d vs %d\n",
3215 p ? p->topic : "*",
3216 p ? p->partition : -1,
3217 mi, mvec->cnt,
3218 prev->offset, this->offset,
3219 prev->msgid, this->msgid);
3220 fails++;
3221 } else if ((flags & TEST_MSGVER_BY_BROKER_ID) &&
3222 this->broker_id != vs->broker_id) {
3223 TEST_MV_WARN(
3224 mv,
3225 " %s [%"PRId32"] msg rcvidx #%d/%d: "
3226 "broker id mismatch: expected %"PRId32
3227 ", not %"PRId32"\n",
3228 p ? p->topic : "*",
3229 p ? p->partition : -1,
3230 mi, mvec->cnt,
3231 vs->broker_id, this->broker_id);
3232 fails++;
3233 }
3234 }
3235
3236 return fails;
3237 }
3238
3239
3240 /**
3241 * @brief Verify that messages correspond to 'correct' msgver.
3242 */
test_mv_mvec_verify_corr(test_msgver_t * mv,int flags,struct test_mv_p * p,struct test_mv_mvec * mvec,struct test_mv_vs * vs)3243 static int test_mv_mvec_verify_corr (test_msgver_t *mv, int flags,
3244 struct test_mv_p *p,
3245 struct test_mv_mvec *mvec,
3246 struct test_mv_vs *vs) {
3247 int mi;
3248 int fails = 0;
3249 struct test_mv_p *corr_p = NULL;
3250 struct test_mv_mvec *corr_mvec;
3251 int verifycnt = 0;
3252
3253 TEST_ASSERT(vs->corr);
3254
3255 /* Get correct mvec for comparison. */
3256 if (p)
3257 corr_p = test_msgver_p_get(vs->corr, p->topic, p->partition, 0);
3258 if (!corr_p) {
3259 TEST_MV_WARN(mv,
3260 " %s [%"PRId32"]: "
3261 "no corresponding correct partition found\n",
3262 p ? p->topic : "*",
3263 p ? p->partition : -1);
3264 return 1;
3265 }
3266
3267 corr_mvec = &corr_p->mvec;
3268
3269 for (mi = 0 ; mi < mvec->cnt ; mi++) {
3270 struct test_mv_m *this = test_mv_mvec_get(mvec, mi);
3271 const struct test_mv_m *corr;
3272
3273
3274 if (flags & TEST_MSGVER_SUBSET)
3275 corr = test_mv_mvec_find_by_msgid(corr_mvec,
3276 this->msgid);
3277 else
3278 corr = test_mv_mvec_get(corr_mvec, mi);
3279
3280 if (0)
3281 TEST_MV_WARN(mv,
3282 "msg #%d: msgid %d, offset %"PRId64"\n",
3283 mi, this->msgid, this->offset);
3284 if (!corr) {
3285 if (!(flags & TEST_MSGVER_SUBSET)) {
3286 TEST_MV_WARN(
3287 mv,
3288 " %s [%"PRId32"] msg rcvidx #%d/%d: "
3289 "out of range: correct mvec has "
3290 "%d messages: "
3291 "message offset %"PRId64", msgid %d\n",
3292 p ? p->topic : "*",
3293 p ? p->partition : -1,
3294 mi, mvec->cnt, corr_mvec->cnt,
3295 this->offset, this->msgid);
3296 fails++;
3297 }
3298 continue;
3299 }
3300
3301 if (((flags & TEST_MSGVER_BY_OFFSET) &&
3302 this->offset != corr->offset) ||
3303 ((flags & TEST_MSGVER_BY_MSGID) &&
3304 this->msgid != corr->msgid) ||
3305 ((flags & TEST_MSGVER_BY_TIMESTAMP) &&
3306 this->timestamp != corr->timestamp) ||
3307 ((flags & TEST_MSGVER_BY_BROKER_ID) &&
3308 this->broker_id != corr->broker_id)) {
3309 TEST_MV_WARN(
3310 mv,
3311 " %s [%"PRId32"] msg rcvidx #%d/%d: "
3312 "did not match correct msg: "
3313 "offset %"PRId64" vs %"PRId64", "
3314 "msgid %d vs %d, "
3315 "timestamp %"PRId64" vs %"PRId64", "
3316 "broker %"PRId32" vs %"PRId32" (fl 0x%x)\n",
3317 p ? p->topic : "*",
3318 p ? p->partition : -1,
3319 mi, mvec->cnt,
3320 this->offset, corr->offset,
3321 this->msgid, corr->msgid,
3322 this->timestamp, corr->timestamp,
3323 this->broker_id, corr->broker_id,
3324 flags);
3325 fails++;
3326 } else {
3327 verifycnt++;
3328 }
3329 }
3330
3331 if (verifycnt != corr_mvec->cnt &&
3332 !(flags & TEST_MSGVER_SUBSET)) {
3333 TEST_MV_WARN(
3334 mv,
3335 " %s [%"PRId32"]: of %d input messages, "
3336 "only %d/%d matched correct messages\n",
3337 p ? p->topic : "*",
3338 p ? p->partition : -1,
3339 mvec->cnt, verifycnt, corr_mvec->cnt);
3340 fails++;
3341 }
3342
3343 return fails;
3344 }
3345
3346
3347
test_mv_m_cmp_offset(const void * _a,const void * _b)3348 static int test_mv_m_cmp_offset (const void *_a, const void *_b) {
3349 const struct test_mv_m *a = _a, *b = _b;
3350
3351 return RD_CMP(a->offset, b->offset);
3352 }
3353
test_mv_m_cmp_msgid(const void * _a,const void * _b)3354 static int test_mv_m_cmp_msgid (const void *_a, const void *_b) {
3355 const struct test_mv_m *a = _a, *b = _b;
3356
3357 return RD_CMP(a->msgid, b->msgid);
3358 }
3359
3360
3361 /**
3362 * Verify that there are no duplicate message.
3363 *
3364 * - Offsets are checked
3365 * - msgids are checked
3366 *
3367 * * NOTE: This sorts the message (.m) array, first by offset, then by msgid
3368 * and leaves the message array sorted (by msgid)
3369 */
test_mv_mvec_verify_dup(test_msgver_t * mv,int flags,struct test_mv_p * p,struct test_mv_mvec * mvec,struct test_mv_vs * vs)3370 static int test_mv_mvec_verify_dup (test_msgver_t *mv, int flags,
3371 struct test_mv_p *p,
3372 struct test_mv_mvec *mvec,
3373 struct test_mv_vs *vs) {
3374 int mi;
3375 int fails = 0;
3376 enum {
3377 _P_OFFSET,
3378 _P_MSGID
3379 } pass;
3380
3381 for (pass = _P_OFFSET ; pass <= _P_MSGID ; pass++) {
3382
3383 if (pass == _P_OFFSET) {
3384 if (!(flags & TEST_MSGVER_BY_OFFSET))
3385 continue;
3386 test_mv_mvec_sort(mvec, test_mv_m_cmp_offset);
3387 } else if (pass == _P_MSGID) {
3388 if (!(flags & TEST_MSGVER_BY_MSGID))
3389 continue;
3390 test_mv_mvec_sort(mvec, test_mv_m_cmp_msgid);
3391 }
3392
3393 for (mi = 1/*skip first*/ ; mi < mvec->cnt ; mi++) {
3394 struct test_mv_m *prev = test_mv_mvec_get(mvec, mi-1);
3395 struct test_mv_m *this = test_mv_mvec_get(mvec, mi);
3396 int is_dup = 0;
3397
3398 if (pass == _P_OFFSET)
3399 is_dup = prev->offset == this->offset;
3400 else if (pass == _P_MSGID)
3401 is_dup = prev->msgid == this->msgid;
3402
3403 if (!is_dup)
3404 continue;
3405
3406 TEST_MV_WARN(mv,
3407 " %s [%"PRId32"] "
3408 "duplicate msg (prev vs this): "
3409 "offset %"PRId64" vs %"PRId64", "
3410 "msgid %d vs %d\n",
3411 p ? p->topic : "*",
3412 p ? p->partition : -1,
3413 prev->offset, this->offset,
3414 prev->msgid, this->msgid);
3415 fails++;
3416 }
3417 }
3418
3419 return fails;
3420 }
3421
3422
3423
3424 /**
3425 * Verify that \p mvec contains the expected range:
3426 * - TEST_MSGVER_BY_MSGID: msgid within \p vs->msgid_min .. \p vs->msgid_max
3427 * - TEST_MSGVER_BY_TIMESTAMP: timestamp with \p vs->timestamp_min .. _max
3428 *
3429 * * NOTE: TEST_MSGVER_BY_MSGID is required
3430 *
3431 * * NOTE: This sorts the message (.m) array by msgid
3432 * and leaves the message array sorted (by msgid)
3433 */
test_mv_mvec_verify_range(test_msgver_t * mv,int flags,struct test_mv_p * p,struct test_mv_mvec * mvec,struct test_mv_vs * vs)3434 static int test_mv_mvec_verify_range (test_msgver_t *mv, int flags,
3435 struct test_mv_p *p,
3436 struct test_mv_mvec *mvec,
3437 struct test_mv_vs *vs) {
3438 int mi;
3439 int fails = 0;
3440 int cnt = 0;
3441 int exp_cnt = vs->msgid_max - vs->msgid_min + 1;
3442 int skip_cnt = 0;
3443
3444 if (!(flags & TEST_MSGVER_BY_MSGID))
3445 return 0;
3446
3447 test_mv_mvec_sort(mvec, test_mv_m_cmp_msgid);
3448
3449 //test_mv_mvec_dump(stdout, mvec);
3450
3451 for (mi = 0 ; mi < mvec->cnt ; mi++) {
3452 struct test_mv_m *prev = mi ? test_mv_mvec_get(mvec, mi-1):NULL;
3453 struct test_mv_m *this = test_mv_mvec_get(mvec, mi);
3454
3455 if (this->msgid < vs->msgid_min) {
3456 skip_cnt++;
3457 continue;
3458 } else if (this->msgid > vs->msgid_max)
3459 break;
3460
3461 if (flags & TEST_MSGVER_BY_TIMESTAMP) {
3462 if (this->timestamp < vs->timestamp_min ||
3463 this->timestamp > vs->timestamp_max) {
3464 TEST_MV_WARN(
3465 mv,
3466 " %s [%"PRId32"] range check: "
3467 "msgid #%d (at mi %d): "
3468 "timestamp %"PRId64" outside "
3469 "expected range %"PRId64"..%"PRId64"\n",
3470 p ? p->topic : "*",
3471 p ? p->partition : -1,
3472 this->msgid, mi,
3473 this->timestamp,
3474 vs->timestamp_min, vs->timestamp_max);
3475 fails++;
3476 }
3477 }
3478
3479 if ((flags & TEST_MSGVER_BY_BROKER_ID) &&
3480 this->broker_id != vs->broker_id) {
3481 TEST_MV_WARN(
3482 mv,
3483 " %s [%"PRId32"] range check: "
3484 "msgid #%d (at mi %d): "
3485 "expected broker id %"PRId32", not %"PRId32"\n",
3486 p ? p->topic : "*",
3487 p ? p->partition : -1,
3488 this->msgid, mi,
3489 vs->broker_id, this->broker_id);
3490 fails++;
3491 }
3492
3493 if (cnt++ == 0) {
3494 if (this->msgid != vs->msgid_min) {
3495 TEST_MV_WARN(mv,
3496 " %s [%"PRId32"] range check: "
3497 "first message #%d (at mi %d) "
3498 "is not first in "
3499 "expected range %d..%d\n",
3500 p ? p->topic : "*",
3501 p ? p->partition : -1,
3502 this->msgid, mi,
3503 vs->msgid_min, vs->msgid_max);
3504 fails++;
3505 }
3506 } else if (cnt > exp_cnt) {
3507 TEST_MV_WARN(mv,
3508 " %s [%"PRId32"] range check: "
3509 "too many messages received (%d/%d) at "
3510 "msgid %d for expected range %d..%d\n",
3511 p ? p->topic : "*",
3512 p ? p->partition : -1,
3513 cnt, exp_cnt, this->msgid,
3514 vs->msgid_min, vs->msgid_max);
3515 fails++;
3516 }
3517
3518 if (!prev) {
3519 skip_cnt++;
3520 continue;
3521 }
3522
3523 if (prev->msgid + 1 != this->msgid) {
3524 TEST_MV_WARN(mv, " %s [%"PRId32"] range check: "
3525 " %d message(s) missing between "
3526 "msgid %d..%d in expected range %d..%d\n",
3527 p ? p->topic : "*",
3528 p ? p->partition : -1,
3529 this->msgid - prev->msgid - 1,
3530 prev->msgid+1, this->msgid-1,
3531 vs->msgid_min, vs->msgid_max);
3532 fails++;
3533 }
3534 }
3535
3536 if (cnt != exp_cnt) {
3537 TEST_MV_WARN(mv,
3538 " %s [%"PRId32"] range check: "
3539 " wrong number of messages seen, wanted %d got %d "
3540 "in expected range %d..%d (%d messages skipped)\n",
3541 p ? p->topic : "*",
3542 p ? p->partition : -1,
3543 exp_cnt, cnt, vs->msgid_min, vs->msgid_max,
3544 skip_cnt);
3545 fails++;
3546 }
3547
3548 return fails;
3549 }
3550
3551
3552
3553 /**
3554 * Run verifier \p f for all partitions.
3555 */
3556 #define test_mv_p_verify_f(mv,flags,f,vs) \
3557 test_mv_p_verify_f0(mv,flags,f, # f, vs)
test_mv_p_verify_f0(test_msgver_t * mv,int flags,int (* f)(test_msgver_t * mv,int flags,struct test_mv_p * p,struct test_mv_mvec * mvec,struct test_mv_vs * vs),const char * f_name,struct test_mv_vs * vs)3558 static int test_mv_p_verify_f0 (test_msgver_t *mv, int flags,
3559 int (*f) (test_msgver_t *mv,
3560 int flags,
3561 struct test_mv_p *p,
3562 struct test_mv_mvec *mvec,
3563 struct test_mv_vs *vs),
3564 const char *f_name,
3565 struct test_mv_vs *vs) {
3566 int i;
3567 int fails = 0;
3568
3569 for (i = 0 ; i < mv->p_cnt ; i++) {
3570 TEST_SAY("Verifying %s [%"PRId32"] %d msgs with %s\n",
3571 mv->p[i]->topic, mv->p[i]->partition,
3572 mv->p[i]->mvec.cnt, f_name);
3573 fails += f(mv, flags, mv->p[i], &mv->p[i]->mvec, vs);
3574 }
3575
3576 return fails;
3577 }
3578
3579
3580 /**
3581 * Collect all messages from all topics and partitions into vs->mvec
3582 */
test_mv_collect_all_msgs(test_msgver_t * mv,struct test_mv_vs * vs)3583 static void test_mv_collect_all_msgs (test_msgver_t *mv,
3584 struct test_mv_vs *vs) {
3585 int i;
3586
3587 for (i = 0 ; i < mv->p_cnt ; i++) {
3588 struct test_mv_p *p = mv->p[i];
3589 int mi;
3590
3591 test_mv_mvec_reserve(&vs->mvec, p->mvec.cnt);
3592 for (mi = 0 ; mi < p->mvec.cnt ; mi++) {
3593 struct test_mv_m *m = test_mv_mvec_get(&p->mvec, mi);
3594 struct test_mv_m *m_new = test_mv_mvec_add(&vs->mvec);
3595 *m_new = *m;
3596 }
3597 }
3598 }
3599
3600
3601 /**
3602 * Verify that all messages (by msgid) in range msg_base+exp_cnt were received
3603 * and received only once.
3604 * This works across all partitions.
3605 */
test_msgver_verify_range(test_msgver_t * mv,int flags,struct test_mv_vs * vs)3606 static int test_msgver_verify_range (test_msgver_t *mv, int flags,
3607 struct test_mv_vs *vs) {
3608 int fails = 0;
3609
3610 /**
3611 * Create temporary array to hold expected message set,
3612 * then traverse all topics and partitions and move matching messages
3613 * to that set. Then verify the message set.
3614 */
3615
3616 test_mv_mvec_init(&vs->mvec, vs->exp_cnt);
3617
3618 /* Collect all msgs into vs mvec */
3619 test_mv_collect_all_msgs(mv, vs);
3620
3621 fails += test_mv_mvec_verify_range(mv, TEST_MSGVER_BY_MSGID|flags,
3622 NULL, &vs->mvec, vs);
3623 fails += test_mv_mvec_verify_dup(mv, TEST_MSGVER_BY_MSGID|flags,
3624 NULL, &vs->mvec, vs);
3625
3626 test_mv_mvec_clear(&vs->mvec);
3627
3628 return fails;
3629 }
3630
3631
3632 /**
3633 * Verify that \p exp_cnt messages were received for \p topic and \p partition
3634 * starting at msgid base \p msg_base.
3635 */
test_msgver_verify_part0(const char * func,int line,const char * what,test_msgver_t * mv,int flags,const char * topic,int partition,int msg_base,int exp_cnt)3636 int test_msgver_verify_part0 (const char *func, int line, const char *what,
3637 test_msgver_t *mv, int flags,
3638 const char *topic, int partition,
3639 int msg_base, int exp_cnt) {
3640 int fails = 0;
3641 struct test_mv_vs vs = { .msg_base = msg_base, .exp_cnt = exp_cnt };
3642 struct test_mv_p *p;
3643
3644 TEST_SAY("%s:%d: %s: Verifying %d received messages (flags 0x%x) "
3645 "in %s [%d]: expecting msgids %d..%d (%d)\n",
3646 func, line, what, mv->msgcnt, flags, topic, partition,
3647 msg_base, msg_base+exp_cnt, exp_cnt);
3648
3649 p = test_msgver_p_get(mv, topic, partition, 0);
3650
3651 /* Per-partition checks */
3652 if (flags & TEST_MSGVER_ORDER)
3653 fails += test_mv_mvec_verify_order(mv, flags, p, &p->mvec, &vs);
3654 if (flags & TEST_MSGVER_DUP)
3655 fails += test_mv_mvec_verify_dup(mv, flags, p, &p->mvec, &vs);
3656
3657 if (mv->msgcnt < vs.exp_cnt) {
3658 TEST_MV_WARN(mv,
3659 "%s:%d: "
3660 "%s [%"PRId32"] expected %d messages but only "
3661 "%d received\n",
3662 func, line,
3663 p ? p->topic : "*",
3664 p ? p->partition : -1,
3665 vs.exp_cnt, mv->msgcnt);
3666 fails++;
3667 }
3668
3669
3670 if (mv->log_suppr_cnt > 0)
3671 TEST_WARN("%s:%d: %s: %d message warning logs suppressed\n",
3672 func, line, what, mv->log_suppr_cnt);
3673
3674 if (fails)
3675 TEST_FAIL("%s:%d: %s: Verification of %d received messages "
3676 "failed: "
3677 "expected msgids %d..%d (%d): see previous errors\n",
3678 func, line, what,
3679 mv->msgcnt, msg_base, msg_base+exp_cnt, exp_cnt);
3680 else
3681 TEST_SAY("%s:%d: %s: Verification of %d received messages "
3682 "succeeded: "
3683 "expected msgids %d..%d (%d)\n",
3684 func, line, what,
3685 mv->msgcnt, msg_base, msg_base+exp_cnt, exp_cnt);
3686
3687 return fails;
3688
3689 }
3690
3691 /**
3692 * Verify that \p exp_cnt messages were received starting at
3693 * msgid base \p msg_base.
3694 */
test_msgver_verify0(const char * func,int line,const char * what,test_msgver_t * mv,int flags,struct test_mv_vs vs)3695 int test_msgver_verify0 (const char *func, int line, const char *what,
3696 test_msgver_t *mv,
3697 int flags, struct test_mv_vs vs) {
3698 int fails = 0;
3699
3700 TEST_SAY("%s:%d: %s: Verifying %d received messages (flags 0x%x): "
3701 "expecting msgids %d..%d (%d)\n",
3702 func, line, what, mv->msgcnt, flags,
3703 vs.msg_base, vs.msg_base+vs.exp_cnt, vs.exp_cnt);
3704 if (flags & TEST_MSGVER_BY_TIMESTAMP) {
3705 assert((flags & TEST_MSGVER_BY_MSGID)); /* Required */
3706 TEST_SAY("%s:%d: %s: "
3707 " and expecting timestamps %"PRId64"..%"PRId64"\n",
3708 func, line, what,
3709 vs.timestamp_min, vs.timestamp_max);
3710 }
3711
3712 /* Per-partition checks */
3713 if (flags & TEST_MSGVER_ORDER)
3714 fails += test_mv_p_verify_f(mv, flags,
3715 test_mv_mvec_verify_order, &vs);
3716 if (flags & TEST_MSGVER_DUP)
3717 fails += test_mv_p_verify_f(mv, flags,
3718 test_mv_mvec_verify_dup, &vs);
3719
3720 /* Checks across all partitions */
3721 if ((flags & TEST_MSGVER_RANGE) && vs.exp_cnt > 0) {
3722 vs.msgid_min = vs.msg_base;
3723 vs.msgid_max = vs.msgid_min + vs.exp_cnt - 1;
3724 fails += test_msgver_verify_range(mv, flags, &vs);
3725 }
3726
3727 if (mv->log_suppr_cnt > 0)
3728 TEST_WARN("%s:%d: %s: %d message warning logs suppressed\n",
3729 func, line, what, mv->log_suppr_cnt);
3730
3731 if (vs.exp_cnt != mv->msgcnt) {
3732 if (!(flags & TEST_MSGVER_SUBSET)) {
3733 TEST_WARN("%s:%d: %s: expected %d messages, got %d\n",
3734 func, line, what, vs.exp_cnt, mv->msgcnt);
3735 fails++;
3736 }
3737 }
3738
3739 if (fails)
3740 TEST_FAIL("%s:%d: %s: Verification of %d received messages "
3741 "failed: "
3742 "expected msgids %d..%d (%d): see previous errors\n",
3743 func, line, what,
3744 mv->msgcnt, vs.msg_base, vs.msg_base+vs.exp_cnt,
3745 vs.exp_cnt);
3746 else
3747 TEST_SAY("%s:%d: %s: Verification of %d received messages "
3748 "succeeded: "
3749 "expected msgids %d..%d (%d)\n",
3750 func, line, what,
3751 mv->msgcnt, vs.msg_base, vs.msg_base+vs.exp_cnt,
3752 vs.exp_cnt);
3753
3754 return fails;
3755 }
3756
3757
3758
3759
test_verify_rkmessage0(const char * func,int line,rd_kafka_message_t * rkmessage,uint64_t testid,int32_t partition,int msgnum)3760 void test_verify_rkmessage0 (const char *func, int line,
3761 rd_kafka_message_t *rkmessage, uint64_t testid,
3762 int32_t partition, int msgnum) {
3763 uint64_t in_testid;
3764 int in_part;
3765 int in_msgnum;
3766 char buf[128];
3767
3768 rd_snprintf(buf, sizeof(buf), "%.*s",
3769 (int)rkmessage->len, (char *)rkmessage->payload);
3770
3771 if (sscanf(buf, "testid=%"SCNu64", partition=%i, msg=%i\n",
3772 &in_testid, &in_part, &in_msgnum) != 3)
3773 TEST_FAIL("Incorrect format: %s", buf);
3774
3775 if (testid != in_testid ||
3776 (partition != -1 && partition != in_part) ||
3777 (msgnum != -1 && msgnum != in_msgnum) ||
3778 in_msgnum < 0)
3779 goto fail_match;
3780
3781 if (test_level > 2) {
3782 TEST_SAY("%s:%i: Our testid %"PRIu64", part %i (%i), msg %i\n",
3783 func, line,
3784 testid, (int)partition, (int)rkmessage->partition,
3785 msgnum);
3786 }
3787
3788
3789 return;
3790
3791 fail_match:
3792 TEST_FAIL("%s:%i: Our testid %"PRIu64", part %i, msg %i did "
3793 "not match message: \"%s\"\n",
3794 func, line,
3795 testid, (int)partition, msgnum, buf);
3796 }
3797
3798
3799 /**
3800 * @brief Verify that \p mv is identical to \p corr according to flags.
3801 */
test_msgver_verify_compare0(const char * func,int line,const char * what,test_msgver_t * mv,test_msgver_t * corr,int flags)3802 void test_msgver_verify_compare0 (const char *func, int line,
3803 const char *what, test_msgver_t *mv,
3804 test_msgver_t *corr, int flags) {
3805 struct test_mv_vs vs;
3806 int fails = 0;
3807
3808 memset(&vs, 0, sizeof(vs));
3809
3810 TEST_SAY("%s:%d: %s: Verifying %d received messages (flags 0x%x) by "
3811 "comparison to correct msgver (%d messages)\n",
3812 func, line, what, mv->msgcnt, flags, corr->msgcnt);
3813
3814 vs.corr = corr;
3815
3816 /* Per-partition checks */
3817 fails += test_mv_p_verify_f(mv, flags,
3818 test_mv_mvec_verify_corr, &vs);
3819
3820 if (mv->log_suppr_cnt > 0)
3821 TEST_WARN("%s:%d: %s: %d message warning logs suppressed\n",
3822 func, line, what, mv->log_suppr_cnt);
3823
3824 if (corr->msgcnt != mv->msgcnt) {
3825 if (!(flags & TEST_MSGVER_SUBSET)) {
3826 TEST_WARN("%s:%d: %s: expected %d messages, got %d\n",
3827 func, line, what, corr->msgcnt, mv->msgcnt);
3828 fails++;
3829 }
3830 }
3831
3832 if (fails)
3833 TEST_FAIL("%s:%d: %s: Verification of %d received messages "
3834 "failed: expected %d messages: see previous errors\n",
3835 func, line, what,
3836 mv->msgcnt, corr->msgcnt);
3837 else
3838 TEST_SAY("%s:%d: %s: Verification of %d received messages "
3839 "succeeded: matching %d messages from correct msgver\n",
3840 func, line, what,
3841 mv->msgcnt, corr->msgcnt);
3842
3843 }
3844
3845
3846 /**
3847 * Consumer poll but dont expect any proper messages for \p timeout_ms.
3848 */
test_consumer_poll_no_msgs(const char * what,rd_kafka_t * rk,uint64_t testid,int timeout_ms)3849 void test_consumer_poll_no_msgs (const char *what, rd_kafka_t *rk,
3850 uint64_t testid, int timeout_ms) {
3851 int64_t tmout = test_clock() + timeout_ms * 1000;
3852 int cnt = 0;
3853 test_timing_t t_cons;
3854 test_msgver_t mv;
3855
3856 test_msgver_init(&mv, testid);
3857
3858 TEST_SAY("%s: not expecting any messages for %dms\n",
3859 what, timeout_ms);
3860
3861 TIMING_START(&t_cons, "CONSUME");
3862
3863 do {
3864 rd_kafka_message_t *rkmessage;
3865
3866 rkmessage = rd_kafka_consumer_poll(rk, timeout_ms);
3867 if (!rkmessage)
3868 continue;
3869
3870 if (rkmessage->err == RD_KAFKA_RESP_ERR__PARTITION_EOF) {
3871 TEST_SAY("%s [%"PRId32"] reached EOF at "
3872 "offset %"PRId64"\n",
3873 rd_kafka_topic_name(rkmessage->rkt),
3874 rkmessage->partition,
3875 rkmessage->offset);
3876 test_msgver_add_msg(rk, &mv, rkmessage);
3877
3878 } else if (rkmessage->err) {
3879 TEST_FAIL("%s [%"PRId32"] error (offset %"PRId64"): %s",
3880 rkmessage->rkt ?
3881 rd_kafka_topic_name(rkmessage->rkt) :
3882 "(no-topic)",
3883 rkmessage->partition,
3884 rkmessage->offset,
3885 rd_kafka_message_errstr(rkmessage));
3886
3887 } else {
3888 if (test_msgver_add_msg(rk, &mv, rkmessage)) {
3889 TEST_MV_WARN(&mv,
3890 "Received unexpected message on "
3891 "%s [%"PRId32"] at offset "
3892 "%"PRId64"\n",
3893 rd_kafka_topic_name(rkmessage->
3894 rkt),
3895 rkmessage->partition,
3896 rkmessage->offset);
3897 cnt++;
3898 }
3899 }
3900
3901 rd_kafka_message_destroy(rkmessage);
3902 } while (test_clock() <= tmout);
3903
3904 TIMING_STOP(&t_cons);
3905
3906 test_msgver_verify(what, &mv, TEST_MSGVER_ALL, 0, 0);
3907 test_msgver_clear(&mv);
3908
3909 TEST_ASSERT(cnt == 0, "Expected 0 messages, got %d", cnt);
3910 }
3911
3912 /**
3913 * @brief Consumer poll with expectation that a \p err will be reached
3914 * within \p timeout_ms.
3915 */
test_consumer_poll_expect_err(rd_kafka_t * rk,uint64_t testid,int timeout_ms,rd_kafka_resp_err_t err)3916 void test_consumer_poll_expect_err (rd_kafka_t *rk, uint64_t testid,
3917 int timeout_ms, rd_kafka_resp_err_t err) {
3918 int64_t tmout = test_clock() + timeout_ms * 1000;
3919
3920 TEST_SAY("%s: expecting error %s within %dms\n",
3921 rd_kafka_name(rk), rd_kafka_err2name(err), timeout_ms);
3922
3923 do {
3924 rd_kafka_message_t *rkmessage;
3925 rkmessage = rd_kafka_consumer_poll(rk, timeout_ms);
3926 if (!rkmessage)
3927 continue;
3928
3929 if (rkmessage->err == err) {
3930 TEST_SAY("Got expected error: %s: %s\n",
3931 rd_kafka_err2name(rkmessage->err),
3932 rd_kafka_message_errstr(rkmessage));
3933 rd_kafka_message_destroy(rkmessage);
3934
3935 return;
3936 } else if (rkmessage->err) {
3937 TEST_FAIL("%s [%"PRId32"] unexpected error "
3938 "(offset %"PRId64"): %s",
3939 rkmessage->rkt ?
3940 rd_kafka_topic_name(rkmessage->rkt) :
3941 "(no-topic)",
3942 rkmessage->partition,
3943 rkmessage->offset,
3944 rd_kafka_err2name(rkmessage->err));
3945 }
3946
3947 rd_kafka_message_destroy(rkmessage);
3948 } while (test_clock() <= tmout);
3949 TEST_FAIL("Expected error %s not seen in %dms",
3950 rd_kafka_err2name(err), timeout_ms);
3951 }
3952
3953 /**
3954 * Call consumer poll once and then return.
3955 * Messages are handled.
3956 *
3957 * \p mv is optional
3958 *
3959 * @returns 0 on timeout, 1 if a message was received or .._PARTITION_EOF
3960 * if EOF was reached.
3961 * TEST_FAIL()s on all errors.
3962 */
test_consumer_poll_once(rd_kafka_t * rk,test_msgver_t * mv,int timeout_ms)3963 int test_consumer_poll_once (rd_kafka_t *rk, test_msgver_t *mv, int timeout_ms){
3964 rd_kafka_message_t *rkmessage;
3965
3966 rkmessage = rd_kafka_consumer_poll(rk, timeout_ms);
3967 if (!rkmessage)
3968 return 0;
3969
3970 if (rkmessage->err == RD_KAFKA_RESP_ERR__PARTITION_EOF) {
3971 TEST_SAY("%s [%"PRId32"] reached EOF at "
3972 "offset %"PRId64"\n",
3973 rd_kafka_topic_name(rkmessage->rkt),
3974 rkmessage->partition,
3975 rkmessage->offset);
3976 if (mv)
3977 test_msgver_add_msg(rk, mv, rkmessage);
3978 rd_kafka_message_destroy(rkmessage);
3979 return RD_KAFKA_RESP_ERR__PARTITION_EOF;
3980
3981 } else if (rkmessage->err) {
3982 TEST_FAIL("%s [%"PRId32"] error (offset %"PRId64"): %s",
3983 rkmessage->rkt ?
3984 rd_kafka_topic_name(rkmessage->rkt) :
3985 "(no-topic)",
3986 rkmessage->partition,
3987 rkmessage->offset,
3988 rd_kafka_message_errstr(rkmessage));
3989
3990 } else {
3991 if (mv)
3992 test_msgver_add_msg(rk, mv, rkmessage);
3993 }
3994
3995 rd_kafka_message_destroy(rkmessage);
3996 return 1;
3997 }
3998
3999
4000 /**
4001 * @param exact Require exact exp_eof_cnt (unless -1) and exp_cnt (unless -1).
4002 * If false: poll until either one is reached.
4003 */
test_consumer_poll_exact(const char * what,rd_kafka_t * rk,uint64_t testid,int exp_eof_cnt,int exp_msg_base,int exp_cnt,rd_bool_t exact,test_msgver_t * mv)4004 int test_consumer_poll_exact (const char *what, rd_kafka_t *rk, uint64_t testid,
4005 int exp_eof_cnt, int exp_msg_base, int exp_cnt,
4006 rd_bool_t exact, test_msgver_t *mv) {
4007 int eof_cnt = 0;
4008 int cnt = 0;
4009 test_timing_t t_cons;
4010
4011 TEST_SAY("%s: consume %s%d messages\n", what,
4012 exact ? "exactly ": "", exp_cnt);
4013
4014 TIMING_START(&t_cons, "CONSUME");
4015
4016 while ((!exact &&
4017 ((exp_eof_cnt <= 0 || eof_cnt < exp_eof_cnt) &&
4018 (exp_cnt <= 0 || cnt < exp_cnt))) ||
4019 (exact &&
4020 (eof_cnt < exp_eof_cnt ||
4021 cnt < exp_cnt))) {
4022 rd_kafka_message_t *rkmessage;
4023
4024 rkmessage = rd_kafka_consumer_poll(rk, tmout_multip(10*1000));
4025 if (!rkmessage) /* Shouldn't take this long to get a msg */
4026 TEST_FAIL("%s: consumer_poll() timeout "
4027 "(%d/%d eof, %d/%d msgs)\n", what,
4028 eof_cnt, exp_eof_cnt, cnt, exp_cnt);
4029
4030
4031 if (rkmessage->err == RD_KAFKA_RESP_ERR__PARTITION_EOF) {
4032 TEST_SAY("%s [%"PRId32"] reached EOF at "
4033 "offset %"PRId64"\n",
4034 rd_kafka_topic_name(rkmessage->rkt),
4035 rkmessage->partition,
4036 rkmessage->offset);
4037 TEST_ASSERT(exp_eof_cnt != 0, "expected no EOFs");
4038 if (mv)
4039 test_msgver_add_msg(rk, mv, rkmessage);
4040 eof_cnt++;
4041
4042 } else if (rkmessage->err) {
4043 TEST_FAIL("%s [%"PRId32"] error (offset %"PRId64
4044 "): %s",
4045 rkmessage->rkt ?
4046 rd_kafka_topic_name(rkmessage->rkt) :
4047 "(no-topic)",
4048 rkmessage->partition,
4049 rkmessage->offset,
4050 rd_kafka_message_errstr(rkmessage));
4051
4052 } else {
4053 TEST_SAYL(4, "%s: consumed message on %s [%"PRId32"] "
4054 "at offset %"PRId64"\n",
4055 what,
4056 rd_kafka_topic_name(rkmessage->rkt),
4057 rkmessage->partition,
4058 rkmessage->offset);
4059
4060 if (!mv || test_msgver_add_msg(rk, mv, rkmessage))
4061 cnt++;
4062 }
4063
4064 rd_kafka_message_destroy(rkmessage);
4065 }
4066
4067 TIMING_STOP(&t_cons);
4068
4069 TEST_SAY("%s: consumed %d/%d messages (%d/%d EOFs)\n",
4070 what, cnt, exp_cnt, eof_cnt, exp_eof_cnt);
4071
4072 TEST_ASSERT(!exact ||
4073 ((exp_cnt == -1 || exp_cnt == cnt) &&
4074 (exp_eof_cnt == -1 || exp_eof_cnt == eof_cnt)),
4075 "%s: mismatch between exact expected counts and actual: "
4076 "%d/%d EOFs, %d/%d msgs",
4077 what, eof_cnt, exp_eof_cnt, cnt, exp_cnt);
4078
4079 if (exp_cnt == 0)
4080 TEST_ASSERT(cnt == 0 && eof_cnt == exp_eof_cnt,
4081 "%s: expected no messages and %d EOFs: "
4082 "got %d messages and %d EOFs",
4083 what, exp_eof_cnt, cnt, eof_cnt);
4084 return cnt;
4085 }
4086
4087
test_consumer_poll(const char * what,rd_kafka_t * rk,uint64_t testid,int exp_eof_cnt,int exp_msg_base,int exp_cnt,test_msgver_t * mv)4088 int test_consumer_poll (const char *what, rd_kafka_t *rk, uint64_t testid,
4089 int exp_eof_cnt, int exp_msg_base, int exp_cnt,
4090 test_msgver_t *mv) {
4091 return test_consumer_poll_exact(what, rk, testid,
4092 exp_eof_cnt, exp_msg_base, exp_cnt,
4093 rd_false/*not exact */, mv);
4094 }
4095
test_consumer_close(rd_kafka_t * rk)4096 void test_consumer_close (rd_kafka_t *rk) {
4097 rd_kafka_resp_err_t err;
4098 test_timing_t timing;
4099
4100 TEST_SAY("Closing consumer %s\n", rd_kafka_name(rk));
4101
4102 TIMING_START(&timing, "CONSUMER.CLOSE");
4103 err = rd_kafka_consumer_close(rk);
4104 TIMING_STOP(&timing);
4105 if (err)
4106 TEST_FAIL("Failed to close consumer: %s\n",
4107 rd_kafka_err2str(err));
4108 }
4109
4110
test_flush(rd_kafka_t * rk,int timeout_ms)4111 void test_flush (rd_kafka_t *rk, int timeout_ms) {
4112 test_timing_t timing;
4113 rd_kafka_resp_err_t err;
4114
4115 TEST_SAY("%s: Flushing %d messages\n",
4116 rd_kafka_name(rk), rd_kafka_outq_len(rk));
4117 TIMING_START(&timing, "FLUSH");
4118 err = rd_kafka_flush(rk, timeout_ms);
4119 TIMING_STOP(&timing);
4120 if (err)
4121 TEST_FAIL("Failed to flush(%s, %d): %s: len() = %d\n",
4122 rd_kafka_name(rk), timeout_ms,
4123 rd_kafka_err2str(err),
4124 rd_kafka_outq_len(rk));
4125 }
4126
4127
test_conf_set(rd_kafka_conf_t * conf,const char * name,const char * val)4128 void test_conf_set (rd_kafka_conf_t *conf, const char *name, const char *val) {
4129 char errstr[512];
4130 if (rd_kafka_conf_set(conf, name, val, errstr, sizeof(errstr)) !=
4131 RD_KAFKA_CONF_OK)
4132 TEST_FAIL("Failed to set config \"%s\"=\"%s\": %s\n",
4133 name, val, errstr);
4134 }
4135
test_conf_get(const rd_kafka_conf_t * conf,const char * name)4136 char *test_conf_get (const rd_kafka_conf_t *conf, const char *name) {
4137 static RD_TLS char ret[256];
4138 size_t ret_sz = sizeof(ret);
4139 if (rd_kafka_conf_get(conf, name, ret, &ret_sz) != RD_KAFKA_CONF_OK)
4140 TEST_FAIL("Failed to get config \"%s\": %s\n", name,
4141 "unknown property");
4142 return ret;
4143 }
4144
4145
test_topic_conf_get(const rd_kafka_topic_conf_t * tconf,const char * name)4146 char *test_topic_conf_get (const rd_kafka_topic_conf_t *tconf,
4147 const char *name) {
4148 static RD_TLS char ret[256];
4149 size_t ret_sz = sizeof(ret);
4150 if (rd_kafka_topic_conf_get(tconf, name, ret, &ret_sz) !=
4151 RD_KAFKA_CONF_OK)
4152 TEST_FAIL("Failed to get topic config \"%s\": %s\n", name,
4153 "unknown property");
4154 return ret;
4155 }
4156
4157
4158 /**
4159 * @brief Check if property \name matches \p val in \p conf.
4160 * If \p conf is NULL the test config will be used. */
test_conf_match(rd_kafka_conf_t * conf,const char * name,const char * val)4161 int test_conf_match (rd_kafka_conf_t *conf, const char *name, const char *val) {
4162 char *real;
4163 int free_conf = 0;
4164
4165 if (!conf) {
4166 test_conf_init(&conf, NULL, 0);
4167 free_conf = 1;
4168 }
4169
4170 real = test_conf_get(conf, name);
4171
4172 if (free_conf)
4173 rd_kafka_conf_destroy(conf);
4174
4175 return !strcmp(real, val);
4176 }
4177
4178
test_topic_conf_set(rd_kafka_topic_conf_t * tconf,const char * name,const char * val)4179 void test_topic_conf_set (rd_kafka_topic_conf_t *tconf,
4180 const char *name, const char *val) {
4181 char errstr[512];
4182 if (rd_kafka_topic_conf_set(tconf, name, val, errstr, sizeof(errstr)) !=
4183 RD_KAFKA_CONF_OK)
4184 TEST_FAIL("Failed to set topic config \"%s\"=\"%s\": %s\n",
4185 name, val, errstr);
4186 }
4187
4188 /**
4189 * @brief First attempt to set topic level property, then global.
4190 */
test_any_conf_set(rd_kafka_conf_t * conf,rd_kafka_topic_conf_t * tconf,const char * name,const char * val)4191 void test_any_conf_set (rd_kafka_conf_t *conf,
4192 rd_kafka_topic_conf_t *tconf,
4193 const char *name, const char *val) {
4194 rd_kafka_conf_res_t res = RD_KAFKA_CONF_UNKNOWN;
4195 char errstr[512] = {"Missing conf_t"};
4196
4197 if (tconf)
4198 res = rd_kafka_topic_conf_set(tconf, name, val,
4199 errstr, sizeof(errstr));
4200 if (res == RD_KAFKA_CONF_UNKNOWN && conf)
4201 res = rd_kafka_conf_set(conf, name, val,
4202 errstr, sizeof(errstr));
4203
4204 if (res != RD_KAFKA_CONF_OK)
4205 TEST_FAIL("Failed to set any config \"%s\"=\"%s\": %s\n",
4206 name, val, errstr);
4207 }
4208
4209
4210 /**
4211 * @returns true if test clients need to be configured for authentication
4212 * or other security measures (SSL), else false for unauthed plaintext.
4213 */
test_needs_auth(void)4214 int test_needs_auth (void) {
4215 rd_kafka_conf_t *conf;
4216 const char *sec;
4217
4218 test_conf_init(&conf, NULL, 0);
4219
4220 sec = test_conf_get(conf, "security.protocol");
4221
4222 rd_kafka_conf_destroy(conf);
4223
4224 return strcmp(sec, "plaintext");
4225 }
4226
4227
test_print_partition_list(const rd_kafka_topic_partition_list_t * partitions)4228 void test_print_partition_list (const rd_kafka_topic_partition_list_t
4229 *partitions) {
4230 int i;
4231 for (i = 0 ; i < partitions->cnt ; i++) {
4232 TEST_SAY(" %s [%"PRId32"] offset %"PRId64"%s%s\n",
4233 partitions->elems[i].topic,
4234 partitions->elems[i].partition,
4235 partitions->elems[i].offset,
4236 partitions->elems[i].err ? ": " : "",
4237 partitions->elems[i].err ?
4238 rd_kafka_err2str(partitions->elems[i].err) : "");
4239 }
4240 }
4241
4242 /**
4243 * @brief Compare two lists, returning 0 if equal.
4244 *
4245 * @remark The lists may be sorted by this function.
4246 */
test_partition_list_cmp(rd_kafka_topic_partition_list_t * al,rd_kafka_topic_partition_list_t * bl)4247 int test_partition_list_cmp (rd_kafka_topic_partition_list_t *al,
4248 rd_kafka_topic_partition_list_t *bl) {
4249 int i;
4250
4251 if (al->cnt < bl->cnt)
4252 return -1;
4253 else if (al->cnt > bl->cnt)
4254 return 1;
4255 else if (al->cnt == 0)
4256 return 0;
4257
4258 rd_kafka_topic_partition_list_sort(al, NULL, NULL);
4259 rd_kafka_topic_partition_list_sort(bl, NULL, NULL);
4260
4261 for (i = 0 ; i < al->cnt ; i++) {
4262 const rd_kafka_topic_partition_t *a = &al->elems[i];
4263 const rd_kafka_topic_partition_t *b = &bl->elems[i];
4264 if (a->partition != b->partition ||
4265 strcmp(a->topic, b->topic))
4266 return -1;
4267 }
4268
4269 return 0;
4270 }
4271
4272
4273 /**
4274 * @brief Execute script from the Kafka distribution bin/ path.
4275 */
test_kafka_cmd(const char * fmt,...)4276 void test_kafka_cmd (const char *fmt, ...) {
4277 #ifdef _WIN32
4278 TEST_FAIL("%s not supported on Windows, yet", __FUNCTION__);
4279 #else
4280 char cmd[1024];
4281 int r;
4282 va_list ap;
4283 test_timing_t t_cmd;
4284 const char *kpath;
4285
4286 kpath = test_getenv("KAFKA_PATH", NULL);
4287
4288 if (!kpath)
4289 TEST_FAIL("%s: KAFKA_PATH must be set",
4290 __FUNCTION__);
4291
4292 r = rd_snprintf(cmd, sizeof(cmd),
4293 "%s/bin/", kpath);
4294 TEST_ASSERT(r < (int)sizeof(cmd));
4295
4296 va_start(ap, fmt);
4297 rd_vsnprintf(cmd+r, sizeof(cmd)-r, fmt, ap);
4298 va_end(ap);
4299
4300 TEST_SAY("Executing: %s\n", cmd);
4301 TIMING_START(&t_cmd, "exec");
4302 r = system(cmd);
4303 TIMING_STOP(&t_cmd);
4304
4305 if (r == -1)
4306 TEST_FAIL("system(\"%s\") failed: %s", cmd, strerror(errno));
4307 else if (WIFSIGNALED(r))
4308 TEST_FAIL("system(\"%s\") terminated by signal %d\n", cmd,
4309 WTERMSIG(r));
4310 else if (WEXITSTATUS(r))
4311 TEST_FAIL("system(\"%s\") failed with exit status %d\n",
4312 cmd, WEXITSTATUS(r));
4313 #endif
4314 }
4315
4316 /**
4317 * @brief Execute kafka-topics.sh from the Kafka distribution.
4318 */
test_kafka_topics(const char * fmt,...)4319 void test_kafka_topics (const char *fmt, ...) {
4320 #ifdef _WIN32
4321 TEST_FAIL("%s not supported on Windows, yet", __FUNCTION__);
4322 #else
4323 char cmd[512];
4324 int r;
4325 va_list ap;
4326 test_timing_t t_cmd;
4327 const char *kpath, *zk;
4328
4329 kpath = test_getenv("KAFKA_PATH", NULL);
4330 zk = test_getenv("ZK_ADDRESS", NULL);
4331
4332 if (!kpath || !zk)
4333 TEST_FAIL("%s: KAFKA_PATH and ZK_ADDRESS must be set",
4334 __FUNCTION__);
4335
4336 r = rd_snprintf(cmd, sizeof(cmd),
4337 "%s/bin/kafka-topics.sh --zookeeper %s ", kpath, zk);
4338 TEST_ASSERT(r < (int)sizeof(cmd));
4339
4340 va_start(ap, fmt);
4341 rd_vsnprintf(cmd+r, sizeof(cmd)-r, fmt, ap);
4342 va_end(ap);
4343
4344 TEST_SAY("Executing: %s\n", cmd);
4345 TIMING_START(&t_cmd, "exec");
4346 r = system(cmd);
4347 TIMING_STOP(&t_cmd);
4348
4349 if (r == -1)
4350 TEST_FAIL("system(\"%s\") failed: %s", cmd, strerror(errno));
4351 else if (WIFSIGNALED(r))
4352 TEST_FAIL("system(\"%s\") terminated by signal %d\n", cmd,
4353 WTERMSIG(r));
4354 else if (WEXITSTATUS(r))
4355 TEST_FAIL("system(\"%s\") failed with exit status %d\n",
4356 cmd, WEXITSTATUS(r));
4357 #endif
4358 }
4359
4360
4361
4362 /**
4363 * @brief Create topic using Topic Admin API
4364 */
test_admin_create_topic(rd_kafka_t * use_rk,const char * topicname,int partition_cnt,int replication_factor)4365 static void test_admin_create_topic (rd_kafka_t *use_rk,
4366 const char *topicname, int partition_cnt,
4367 int replication_factor) {
4368 rd_kafka_t *rk;
4369 rd_kafka_NewTopic_t *newt[1];
4370 const size_t newt_cnt = 1;
4371 rd_kafka_AdminOptions_t *options;
4372 rd_kafka_queue_t *rkqu;
4373 rd_kafka_event_t *rkev;
4374 const rd_kafka_CreateTopics_result_t *res;
4375 const rd_kafka_topic_result_t **terr;
4376 int timeout_ms = tmout_multip(10000);
4377 size_t res_cnt;
4378 rd_kafka_resp_err_t err;
4379 char errstr[512];
4380 test_timing_t t_create;
4381
4382 if (!(rk = use_rk))
4383 rk = test_create_producer();
4384
4385 rkqu = rd_kafka_queue_new(rk);
4386
4387 newt[0] = rd_kafka_NewTopic_new(topicname, partition_cnt,
4388 replication_factor,
4389 errstr, sizeof(errstr));
4390 TEST_ASSERT(newt[0] != NULL, "%s", errstr);
4391
4392 options = rd_kafka_AdminOptions_new(rk, RD_KAFKA_ADMIN_OP_CREATETOPICS);
4393 err = rd_kafka_AdminOptions_set_operation_timeout(options, timeout_ms,
4394 errstr,
4395 sizeof(errstr));
4396 TEST_ASSERT(!err, "%s", errstr);
4397
4398 TEST_SAY("Creating topic \"%s\" "
4399 "(partitions=%d, replication_factor=%d, timeout=%d)\n",
4400 topicname, partition_cnt, replication_factor, timeout_ms);
4401
4402 TIMING_START(&t_create, "CreateTopics");
4403 rd_kafka_CreateTopics(rk, newt, newt_cnt, options, rkqu);
4404
4405 /* Wait for result */
4406 rkev = rd_kafka_queue_poll(rkqu, timeout_ms + 2000);
4407 TEST_ASSERT(rkev, "Timed out waiting for CreateTopics result");
4408
4409 TIMING_STOP(&t_create);
4410
4411 TEST_ASSERT(!rd_kafka_event_error(rkev),
4412 "CreateTopics failed: %s",
4413 rd_kafka_event_error_string(rkev));
4414
4415 res = rd_kafka_event_CreateTopics_result(rkev);
4416 TEST_ASSERT(res, "Expected CreateTopics_result, not %s",
4417 rd_kafka_event_name(rkev));
4418
4419 terr = rd_kafka_CreateTopics_result_topics(res, &res_cnt);
4420 TEST_ASSERT(terr, "CreateTopics_result_topics returned NULL");
4421 TEST_ASSERT(res_cnt == newt_cnt,
4422 "CreateTopics_result_topics returned %"PRIusz" topics, "
4423 "not the expected %"PRIusz,
4424 res_cnt, newt_cnt);
4425
4426 TEST_ASSERT(!rd_kafka_topic_result_error(terr[0]) ||
4427 rd_kafka_topic_result_error(terr[0]) ==
4428 RD_KAFKA_RESP_ERR_TOPIC_ALREADY_EXISTS,
4429 "Topic %s result error: %s",
4430 rd_kafka_topic_result_name(terr[0]),
4431 rd_kafka_topic_result_error_string(terr[0]));
4432
4433 rd_kafka_event_destroy(rkev);
4434
4435 rd_kafka_queue_destroy(rkqu);
4436
4437 rd_kafka_AdminOptions_destroy(options);
4438
4439 rd_kafka_NewTopic_destroy(newt[0]);
4440
4441 if (!use_rk)
4442 rd_kafka_destroy(rk);
4443 }
4444
4445
4446
4447
4448 /**
4449 * @brief Create topic using kafka-topics.sh --create
4450 */
test_create_topic_sh(const char * topicname,int partition_cnt,int replication_factor)4451 static void test_create_topic_sh (const char *topicname, int partition_cnt,
4452 int replication_factor) {
4453 test_kafka_topics("--create --topic \"%s\" "
4454 "--replication-factor %d --partitions %d",
4455 topicname, replication_factor, partition_cnt);
4456 }
4457
4458
4459 /**
4460 * @brief Create topic
4461 */
test_create_topic(rd_kafka_t * use_rk,const char * topicname,int partition_cnt,int replication_factor)4462 void test_create_topic (rd_kafka_t *use_rk,
4463 const char *topicname, int partition_cnt,
4464 int replication_factor) {
4465 if (test_broker_version < TEST_BRKVER(0,10,2,0))
4466 test_create_topic_sh(topicname, partition_cnt,
4467 replication_factor);
4468 else
4469 test_admin_create_topic(use_rk, topicname, partition_cnt,
4470 replication_factor);
4471 }
4472
4473
4474 /**
4475 * @brief Create topic using kafka-topics.sh --delete
4476 */
test_delete_topic_sh(const char * topicname)4477 static void test_delete_topic_sh (const char *topicname) {
4478 test_kafka_topics("--delete --topic \"%s\" ", topicname);
4479 }
4480
4481
4482 /**
4483 * @brief Delete topic using Topic Admin API
4484 */
test_admin_delete_topic(rd_kafka_t * use_rk,const char * topicname)4485 static void test_admin_delete_topic (rd_kafka_t *use_rk,
4486 const char *topicname) {
4487 rd_kafka_t *rk;
4488 rd_kafka_DeleteTopic_t *delt[1];
4489 const size_t delt_cnt = 1;
4490 rd_kafka_AdminOptions_t *options;
4491 rd_kafka_queue_t *rkqu;
4492 rd_kafka_event_t *rkev;
4493 const rd_kafka_DeleteTopics_result_t *res;
4494 const rd_kafka_topic_result_t **terr;
4495 int timeout_ms = tmout_multip(10000);
4496 size_t res_cnt;
4497 rd_kafka_resp_err_t err;
4498 char errstr[512];
4499 test_timing_t t_create;
4500
4501 if (!(rk = use_rk))
4502 rk = test_create_producer();
4503
4504 rkqu = rd_kafka_queue_new(rk);
4505
4506 delt[0] = rd_kafka_DeleteTopic_new(topicname);
4507
4508 options = rd_kafka_AdminOptions_new(rk, RD_KAFKA_ADMIN_OP_DELETETOPICS);
4509 err = rd_kafka_AdminOptions_set_operation_timeout(options, timeout_ms,
4510 errstr,
4511 sizeof(errstr));
4512 TEST_ASSERT(!err, "%s", errstr);
4513
4514 TEST_SAY("Deleting topic \"%s\" "
4515 "(timeout=%d)\n",
4516 topicname, timeout_ms);
4517
4518 TIMING_START(&t_create, "DeleteTopics");
4519 rd_kafka_DeleteTopics(rk, delt, delt_cnt, options, rkqu);
4520
4521 /* Wait for result */
4522 rkev = rd_kafka_queue_poll(rkqu, timeout_ms + 2000);
4523 TEST_ASSERT(rkev, "Timed out waiting for DeleteTopics result");
4524
4525 TIMING_STOP(&t_create);
4526
4527 res = rd_kafka_event_DeleteTopics_result(rkev);
4528 TEST_ASSERT(res, "Expected DeleteTopics_result, not %s",
4529 rd_kafka_event_name(rkev));
4530
4531 terr = rd_kafka_DeleteTopics_result_topics(res, &res_cnt);
4532 TEST_ASSERT(terr, "DeleteTopics_result_topics returned NULL");
4533 TEST_ASSERT(res_cnt == delt_cnt,
4534 "DeleteTopics_result_topics returned %"PRIusz" topics, "
4535 "not the expected %"PRIusz,
4536 res_cnt, delt_cnt);
4537
4538 TEST_ASSERT(!rd_kafka_topic_result_error(terr[0]),
4539 "Topic %s result error: %s",
4540 rd_kafka_topic_result_name(terr[0]),
4541 rd_kafka_topic_result_error_string(terr[0]));
4542
4543 rd_kafka_event_destroy(rkev);
4544
4545 rd_kafka_queue_destroy(rkqu);
4546
4547 rd_kafka_AdminOptions_destroy(options);
4548
4549 rd_kafka_DeleteTopic_destroy(delt[0]);
4550
4551 if (!use_rk)
4552 rd_kafka_destroy(rk);
4553 }
4554
4555
4556 /**
4557 * @brief Delete a topic
4558 */
test_delete_topic(rd_kafka_t * use_rk,const char * topicname)4559 void test_delete_topic (rd_kafka_t *use_rk, const char *topicname) {
4560 if (test_broker_version < TEST_BRKVER(0,10,2,0))
4561 test_delete_topic_sh(topicname);
4562 else
4563 test_admin_delete_topic(use_rk, topicname);
4564 }
4565
4566
4567 /**
4568 * @brief Create additional partitions for a topic using Admin API
4569 */
test_admin_create_partitions(rd_kafka_t * use_rk,const char * topicname,int new_partition_cnt)4570 static void test_admin_create_partitions (rd_kafka_t *use_rk,
4571 const char *topicname,
4572 int new_partition_cnt) {
4573 rd_kafka_t *rk;
4574 rd_kafka_NewPartitions_t *newp[1];
4575 const size_t newp_cnt = 1;
4576 rd_kafka_AdminOptions_t *options;
4577 rd_kafka_queue_t *rkqu;
4578 rd_kafka_event_t *rkev;
4579 const rd_kafka_CreatePartitions_result_t *res;
4580 const rd_kafka_topic_result_t **terr;
4581 int timeout_ms = tmout_multip(10000);
4582 size_t res_cnt;
4583 rd_kafka_resp_err_t err;
4584 char errstr[512];
4585 test_timing_t t_create;
4586
4587 if (!(rk = use_rk))
4588 rk = test_create_producer();
4589
4590 rkqu = rd_kafka_queue_new(rk);
4591
4592 newp[0] = rd_kafka_NewPartitions_new(topicname, new_partition_cnt,
4593 errstr, sizeof(errstr));
4594 TEST_ASSERT(newp[0] != NULL, "%s", errstr);
4595
4596 options = rd_kafka_AdminOptions_new(rk,
4597 RD_KAFKA_ADMIN_OP_CREATEPARTITIONS);
4598 err = rd_kafka_AdminOptions_set_operation_timeout(options, timeout_ms,
4599 errstr,
4600 sizeof(errstr));
4601 TEST_ASSERT(!err, "%s", errstr);
4602
4603 TEST_SAY("Creating %d (total) partitions for topic \"%s\"\n",
4604 new_partition_cnt, topicname);
4605
4606 TIMING_START(&t_create, "CreatePartitions");
4607 rd_kafka_CreatePartitions(rk, newp, newp_cnt, options, rkqu);
4608
4609 /* Wait for result */
4610 rkev = rd_kafka_queue_poll(rkqu, timeout_ms + 2000);
4611 TEST_ASSERT(rkev, "Timed out waiting for CreatePartitions result");
4612
4613 TIMING_STOP(&t_create);
4614
4615 res = rd_kafka_event_CreatePartitions_result(rkev);
4616 TEST_ASSERT(res, "Expected CreatePartitions_result, not %s",
4617 rd_kafka_event_name(rkev));
4618
4619 terr = rd_kafka_CreatePartitions_result_topics(res, &res_cnt);
4620 TEST_ASSERT(terr, "CreatePartitions_result_topics returned NULL");
4621 TEST_ASSERT(res_cnt == newp_cnt,
4622 "CreatePartitions_result_topics returned %"PRIusz
4623 " topics, not the expected %"PRIusz,
4624 res_cnt, newp_cnt);
4625
4626 TEST_ASSERT(!rd_kafka_topic_result_error(terr[0]),
4627 "Topic %s result error: %s",
4628 rd_kafka_topic_result_name(terr[0]),
4629 rd_kafka_topic_result_error_string(terr[0]));
4630
4631 rd_kafka_event_destroy(rkev);
4632
4633 rd_kafka_queue_destroy(rkqu);
4634
4635 rd_kafka_AdminOptions_destroy(options);
4636
4637 rd_kafka_NewPartitions_destroy(newp[0]);
4638
4639 if (!use_rk)
4640 rd_kafka_destroy(rk);
4641 }
4642
4643
4644 /**
4645 * @brief Create partitions for topic
4646 */
test_create_partitions(rd_kafka_t * use_rk,const char * topicname,int new_partition_cnt)4647 void test_create_partitions (rd_kafka_t *use_rk,
4648 const char *topicname, int new_partition_cnt) {
4649 if (test_broker_version < TEST_BRKVER(0,10,2,0))
4650 test_kafka_topics("--alter --topic %s --partitions %d",
4651 topicname, new_partition_cnt);
4652 else
4653 test_admin_create_partitions(use_rk, topicname,
4654 new_partition_cnt);
4655 }
4656
4657
test_get_partition_count(rd_kafka_t * rk,const char * topicname,int timeout_ms)4658 int test_get_partition_count (rd_kafka_t *rk, const char *topicname,
4659 int timeout_ms) {
4660 rd_kafka_t *use_rk;
4661 rd_kafka_resp_err_t err;
4662 rd_kafka_topic_t *rkt;
4663 int64_t abs_timeout = test_clock() + (timeout_ms * 1000);
4664 int ret = -1;
4665
4666 if (!rk)
4667 use_rk = test_create_producer();
4668 else
4669 use_rk = rk;
4670
4671 rkt = rd_kafka_topic_new(use_rk, topicname, NULL);
4672
4673 do {
4674 const struct rd_kafka_metadata *metadata;
4675
4676 err = rd_kafka_metadata(use_rk, 0, rkt, &metadata,
4677 tmout_multip(15000));
4678 if (err)
4679 TEST_WARN("metadata() for %s failed: %s\n",
4680 rkt ? rd_kafka_topic_name(rkt) :
4681 "(all-local)",
4682 rd_kafka_err2str(err));
4683 else {
4684 if (metadata->topic_cnt == 1) {
4685 if (metadata->topics[0].err == 0 ||
4686 metadata->topics[0].partition_cnt > 0) {
4687 int32_t cnt;
4688 cnt = metadata->topics[0].partition_cnt;
4689 rd_kafka_metadata_destroy(metadata);
4690 ret = (int)cnt;
4691 break;
4692 }
4693 TEST_SAY("metadata(%s) returned %s: retrying\n",
4694 rd_kafka_topic_name(rkt),
4695 rd_kafka_err2str(metadata->
4696 topics[0].err));
4697 }
4698 rd_kafka_metadata_destroy(metadata);
4699 rd_sleep(1);
4700 }
4701 } while (test_clock() < abs_timeout);
4702
4703 rd_kafka_topic_destroy(rkt);
4704
4705 if (!rk)
4706 rd_kafka_destroy(use_rk);
4707
4708 return ret;
4709 }
4710
4711 /**
4712 * @brief Let the broker auto-create the topic for us.
4713 */
test_auto_create_topic_rkt(rd_kafka_t * rk,rd_kafka_topic_t * rkt,int timeout_ms)4714 rd_kafka_resp_err_t test_auto_create_topic_rkt (rd_kafka_t *rk,
4715 rd_kafka_topic_t *rkt,
4716 int timeout_ms) {
4717 const struct rd_kafka_metadata *metadata;
4718 rd_kafka_resp_err_t err;
4719 test_timing_t t;
4720 int64_t abs_timeout = test_clock() + (timeout_ms * 1000);
4721
4722 do {
4723 TIMING_START(&t, "auto_create_topic");
4724 err = rd_kafka_metadata(rk, 0, rkt, &metadata,
4725 tmout_multip(15000));
4726 TIMING_STOP(&t);
4727 if (err)
4728 TEST_WARN("metadata() for %s failed: %s\n",
4729 rkt ? rd_kafka_topic_name(rkt) :
4730 "(all-local)",
4731 rd_kafka_err2str(err));
4732 else {
4733 if (metadata->topic_cnt == 1) {
4734 if (metadata->topics[0].err == 0 ||
4735 metadata->topics[0].partition_cnt > 0) {
4736 rd_kafka_metadata_destroy(metadata);
4737 return 0;
4738 }
4739 TEST_SAY("metadata(%s) returned %s: retrying\n",
4740 rd_kafka_topic_name(rkt),
4741 rd_kafka_err2str(metadata->
4742 topics[0].err));
4743 }
4744 rd_kafka_metadata_destroy(metadata);
4745 rd_sleep(1);
4746 }
4747 } while (test_clock() < abs_timeout);
4748
4749 return err;
4750 }
4751
test_auto_create_topic(rd_kafka_t * rk,const char * name,int timeout_ms)4752 rd_kafka_resp_err_t test_auto_create_topic (rd_kafka_t *rk, const char *name,
4753 int timeout_ms) {
4754 rd_kafka_topic_t *rkt = rd_kafka_topic_new(rk, name, NULL);
4755 rd_kafka_resp_err_t err;
4756 if (!rkt)
4757 return rd_kafka_last_error();
4758 err = test_auto_create_topic_rkt(rk, rkt, timeout_ms);
4759 rd_kafka_topic_destroy(rkt);
4760 return err;
4761 }
4762
4763
4764 /**
4765 * @brief Check if topic auto creation works.
4766 * @returns 1 if it does, else 0.
4767 */
test_check_auto_create_topic(void)4768 int test_check_auto_create_topic (void) {
4769 rd_kafka_t *rk;
4770 rd_kafka_conf_t *conf;
4771 rd_kafka_resp_err_t err;
4772 const char *topic = test_mk_topic_name("autocreatetest", 1);
4773
4774 test_conf_init(&conf, NULL, 0);
4775 rk = test_create_handle(RD_KAFKA_PRODUCER, conf);
4776 err = test_auto_create_topic(rk, topic, tmout_multip(5000));
4777 if (err)
4778 TEST_SAY("Auto topic creation of \"%s\" failed: %s\n",
4779 topic, rd_kafka_err2str(err));
4780 rd_kafka_destroy(rk);
4781
4782 return err ? 0 : 1;
4783 }
4784
4785
4786 /**
4787 * @brief Builds and runs a Java application from the java/ directory.
4788 *
4789 * The application is started in the background, use
4790 * test_waitpid() to await its demise.
4791 *
4792 * @param cls The app class to run using java/run-class.sh
4793 *
4794 * @returns -1 if the application could not be started, else the pid.
4795 */
test_run_java(const char * cls,const char ** argv)4796 int test_run_java (const char *cls, const char **argv) {
4797 #ifdef _WIN32
4798 TEST_WARN("%s(%s) not supported Windows, yet",
4799 __FUNCTION__, cls);
4800 return -1;
4801 #else
4802 int r;
4803 const char *kpath;
4804 pid_t pid;
4805 const char **full_argv, **p;
4806 int cnt;
4807 extern char **environ;
4808
4809 kpath = test_getenv("KAFKA_PATH", NULL);
4810
4811 if (!kpath) {
4812 TEST_WARN("%s(%s): KAFKA_PATH must be set\n",
4813 __FUNCTION__, cls);
4814 return -1;
4815 }
4816
4817 /* Build */
4818 r = system("make -s java");
4819
4820 if (r == -1 || WIFSIGNALED(r) || WEXITSTATUS(r)) {
4821 TEST_WARN("%s(%s): failed to build java class (code %d)\n",
4822 __FUNCTION__, cls, r);
4823 return -1;
4824 }
4825
4826 /* For child process and run cls */
4827 pid = fork();
4828 if (pid == -1) {
4829 TEST_WARN("%s(%s): failed to fork: %s\n",
4830 __FUNCTION__, cls, strerror(errno));
4831 return -1;
4832 }
4833
4834 if (pid > 0)
4835 return (int)pid; /* In parent process */
4836
4837 /* In child process */
4838
4839 /* Reconstruct argv to contain run-class.sh and the cls */
4840 for (cnt = 0 ; argv[cnt] ; cnt++)
4841 ;
4842
4843 cnt += 3; /* run-class.sh, cls, .., NULL */
4844 full_argv = malloc(sizeof(*full_argv) * cnt);
4845 full_argv[0] = "java/run-class.sh";
4846 full_argv[1] = (const char *)cls;
4847
4848 /* Copy arguments */
4849 for (p = &full_argv[2] ; *argv ; p++, argv++)
4850 *p = *argv;
4851 *p = NULL;
4852
4853 /* Run */
4854 r = execve(full_argv[0], (char *const*)full_argv, environ);
4855
4856 TEST_WARN("%s(%s): failed to execute run-class.sh: %s\n",
4857 __FUNCTION__, cls, strerror(errno));
4858 exit(2);
4859
4860 return -1; /* NOTREACHED */
4861 #endif
4862 }
4863
4864
4865 /**
4866 * @brief Wait for child-process \p pid to exit.
4867 *
4868 * @returns -1 if the child process exited successfully, else -1.
4869 */
test_waitpid(int pid)4870 int test_waitpid (int pid) {
4871 #ifdef _WIN32
4872 TEST_WARN("%s() not supported Windows, yet",
4873 __FUNCTION__);
4874 return -1;
4875 #else
4876 pid_t r;
4877 int status = 0;
4878
4879 r = waitpid((pid_t)pid, &status, 0);
4880
4881 if (r == -1) {
4882 TEST_WARN("waitpid(%d) failed: %s\n",
4883 pid, strerror(errno));
4884 return -1;
4885 }
4886
4887 if (WIFSIGNALED(status)) {
4888 TEST_WARN("Process %d terminated by signal %d\n", pid,
4889 WTERMSIG(status));
4890 return -1;
4891 } else if (WEXITSTATUS(status)) {
4892 TEST_WARN("Process %d exited with status %d\n",
4893 pid, WEXITSTATUS(status));
4894 return -1;
4895 }
4896
4897 return 0;
4898 #endif
4899 }
4900
4901
4902 /**
4903 * @brief Check if \p feature is builtin to librdkafka.
4904 * @returns returns 1 if feature is built in, else 0.
4905 */
test_check_builtin(const char * feature)4906 int test_check_builtin (const char *feature) {
4907 rd_kafka_conf_t *conf;
4908 char errstr[128];
4909 int r;
4910
4911 conf = rd_kafka_conf_new();
4912 if (rd_kafka_conf_set(conf, "builtin.features", feature,
4913 errstr, sizeof(errstr)) != RD_KAFKA_CONF_OK) {
4914 TEST_SAY("Feature \"%s\" not built-in: %s\n",
4915 feature, errstr);
4916 r = 0;
4917 } else {
4918 TEST_SAY("Feature \"%s\" is built-in\n", feature);
4919 r = 1;
4920 }
4921
4922 rd_kafka_conf_destroy(conf);
4923 return r;
4924 }
4925
4926
tsprintf(const char * fmt,...)4927 char *tsprintf (const char *fmt, ...) {
4928 static RD_TLS char ret[8][512];
4929 static RD_TLS int i;
4930 va_list ap;
4931
4932
4933 i = (i + 1) % 8;
4934
4935 va_start(ap, fmt);
4936 rd_vsnprintf(ret[i], sizeof(ret[i]), fmt, ap);
4937 va_end(ap);
4938
4939 return ret[i];
4940 }
4941
4942
4943 /**
4944 * @brief Add a test report JSON object.
4945 * These will be written as a JSON array to the test report file.
4946 */
test_report_add(struct test * test,const char * fmt,...)4947 void test_report_add (struct test *test, const char *fmt, ...) {
4948 va_list ap;
4949 char buf[512];
4950
4951 va_start(ap, fmt);
4952 vsnprintf(buf, sizeof(buf), fmt, ap);
4953 va_end(ap);
4954
4955 if (test->report_cnt == test->report_size) {
4956 if (test->report_size == 0)
4957 test->report_size = 8;
4958 else
4959 test->report_size *= 2;
4960
4961 test->report_arr = realloc(test->report_arr,
4962 sizeof(*test->report_arr) *
4963 test->report_size);
4964 }
4965
4966 test->report_arr[test->report_cnt++] = rd_strdup(buf);
4967
4968 TEST_SAYL(1, "Report #%d: %s\n", test->report_cnt-1, buf);
4969 }
4970
4971 /**
4972 * Returns 1 if KAFKA_PATH and ZK_ADDRESS is set to se we can use the
4973 * kafka-topics.sh script to manually create topics.
4974 *
4975 * If \p skip is set TEST_SKIP() will be called with a helpful message.
4976 */
test_can_create_topics(int skip)4977 int test_can_create_topics (int skip) {
4978 /* Has AdminAPI */
4979 if (test_broker_version >= TEST_BRKVER(0,10,2,0))
4980 return 1;
4981
4982 #ifdef _WIN32
4983 if (skip)
4984 TEST_SKIP("Cannot create topics on Win32\n");
4985 return 0;
4986 #else
4987
4988 if (!test_getenv("KAFKA_PATH", NULL) ||
4989 !test_getenv("ZK_ADDRESS", NULL)) {
4990 if (skip)
4991 TEST_SKIP("Cannot create topics "
4992 "(set KAFKA_PATH and ZK_ADDRESS)\n");
4993 return 0;
4994 }
4995
4996
4997 return 1;
4998 #endif
4999 }
5000
5001
5002 /**
5003 * Wait for \p event_type, discarding all other events prior to it.
5004 */
test_wait_event(rd_kafka_queue_t * eventq,rd_kafka_event_type_t event_type,int timeout_ms)5005 rd_kafka_event_t *test_wait_event (rd_kafka_queue_t *eventq,
5006 rd_kafka_event_type_t event_type,
5007 int timeout_ms) {
5008 test_timing_t t_w;
5009 int64_t abs_timeout = test_clock() + (timeout_ms * 1000);
5010
5011 TIMING_START(&t_w, "wait_event");
5012 while (test_clock() < abs_timeout) {
5013 rd_kafka_event_t *rkev;
5014
5015 rkev = rd_kafka_queue_poll(eventq,
5016 (int)(abs_timeout - test_clock())/
5017 1000);
5018
5019 if (rd_kafka_event_type(rkev) == event_type) {
5020 TIMING_STOP(&t_w);
5021 return rkev;
5022 }
5023
5024 if (!rkev)
5025 continue;
5026
5027 if (rd_kafka_event_error(rkev))
5028 TEST_SAY("discarding ignored event %s: %s\n",
5029 rd_kafka_event_name(rkev),
5030 rd_kafka_event_error_string(rkev));
5031 else
5032 TEST_SAY("discarding ignored event %s\n",
5033 rd_kafka_event_name(rkev));
5034 rd_kafka_event_destroy(rkev);
5035
5036 }
5037 TIMING_STOP(&t_w);
5038
5039 return NULL;
5040 }
5041
5042
test_SAY(const char * file,int line,int level,const char * str)5043 void test_SAY (const char *file, int line, int level, const char *str) {
5044 TEST_SAYL(level, "%s", str);
5045 }
5046
test_SKIP(const char * file,int line,const char * str)5047 void test_SKIP (const char *file, int line, const char *str) {
5048 TEST_WARN("SKIPPING TEST: %s", str);
5049 TEST_LOCK();
5050 test_curr->state = TEST_SKIPPED;
5051 if (!*test_curr->failstr) {
5052 rd_snprintf(test_curr->failstr,
5053 sizeof(test_curr->failstr), "%s", str);
5054 rtrim(test_curr->failstr);
5055 }
5056 TEST_UNLOCK();
5057 }
5058
test_curr_name(void)5059 const char *test_curr_name (void) {
5060 return test_curr->name;
5061 }
5062
5063
5064 /**
5065 * @brief Dump/print message haders
5066 */
test_headers_dump(const char * what,int lvl,const rd_kafka_headers_t * hdrs)5067 void test_headers_dump (const char *what, int lvl,
5068 const rd_kafka_headers_t *hdrs) {
5069 size_t idx = 0;
5070 const char *name, *value;
5071 size_t size;
5072
5073 while (!rd_kafka_header_get_all(hdrs, idx++, &name,
5074 (const void **)&value, &size))
5075 TEST_SAYL(lvl, "%s: Header #%"PRIusz": %s='%s'\n",
5076 what, idx-1, name,
5077 value ? value : "(NULL)");
5078 }
5079
5080
5081 /**
5082 * @brief Retrieve and return the list of broker ids in the cluster.
5083 *
5084 * @param rk Optional instance to use.
5085 * @param cntp Will be updated to the number of brokers returned.
5086 *
5087 * @returns a malloc:ed list of int32_t broker ids.
5088 */
test_get_broker_ids(rd_kafka_t * use_rk,size_t * cntp)5089 int32_t *test_get_broker_ids (rd_kafka_t *use_rk, size_t *cntp) {
5090 int32_t *ids;
5091 rd_kafka_t *rk;
5092 const rd_kafka_metadata_t *md;
5093 rd_kafka_resp_err_t err;
5094 size_t i;
5095
5096 if (!(rk = use_rk))
5097 rk = test_create_producer();
5098
5099 err = rd_kafka_metadata(rk, 0, NULL, &md, tmout_multip(5000));
5100 TEST_ASSERT(!err, "%s", rd_kafka_err2str(err));
5101 TEST_ASSERT(md->broker_cnt > 0,
5102 "%d brokers, expected > 0", md->broker_cnt);
5103
5104 ids = malloc(sizeof(*ids) * md->broker_cnt);
5105
5106 for (i = 0 ; i < (size_t)md->broker_cnt ; i++)
5107 ids[i] = md->brokers[i].id;
5108
5109 *cntp = md->broker_cnt;
5110
5111 rd_kafka_metadata_destroy(md);
5112
5113 if (!use_rk)
5114 rd_kafka_destroy(rk);
5115
5116 return ids;
5117 }
5118
5119
5120
5121 /**
5122 * @brief Verify that all topics in \p topics are reported in metadata,
5123 * and that none of the topics in \p not_topics are reported.
5124 *
5125 * @returns the number of failures (but does not FAIL).
5126 */
verify_topics_in_metadata(rd_kafka_t * rk,rd_kafka_metadata_topic_t * topics,size_t topic_cnt,rd_kafka_metadata_topic_t * not_topics,size_t not_topic_cnt)5127 static int verify_topics_in_metadata (rd_kafka_t *rk,
5128 rd_kafka_metadata_topic_t *topics,
5129 size_t topic_cnt,
5130 rd_kafka_metadata_topic_t *not_topics,
5131 size_t not_topic_cnt) {
5132 const rd_kafka_metadata_t *md;
5133 rd_kafka_resp_err_t err;
5134 int ti;
5135 size_t i;
5136 int fails = 0;
5137
5138 /* Mark topics with dummy error which is overwritten
5139 * when topic is found in metadata, allowing us to check
5140 * for missed topics. */
5141 for (i = 0 ; i < topic_cnt ; i++)
5142 topics[i].err = 12345;
5143
5144 err = rd_kafka_metadata(rk, 1/*all_topics*/, NULL, &md,
5145 tmout_multip(5000));
5146 TEST_ASSERT(!err, "metadata failed: %s", rd_kafka_err2str(err));
5147
5148 for (ti = 0 ; ti < md->topic_cnt ; ti++) {
5149 const rd_kafka_metadata_topic_t *mdt = &md->topics[ti];
5150
5151 for (i = 0 ; i < topic_cnt ; i++) {
5152 int pi;
5153 rd_kafka_metadata_topic_t *exp_mdt;
5154
5155 if (strcmp(topics[i].topic, mdt->topic))
5156 continue;
5157
5158 exp_mdt = &topics[i];
5159
5160 exp_mdt->err = mdt->err; /* indicate found */
5161 if (mdt->err) {
5162 TEST_SAY("metadata: "
5163 "Topic %s has error %s\n",
5164 mdt->topic,
5165 rd_kafka_err2str(mdt->err));
5166 fails++;
5167 }
5168
5169 if (exp_mdt->partition_cnt > 0 &&
5170 mdt->partition_cnt != exp_mdt->partition_cnt) {
5171 TEST_SAY("metadata: "
5172 "Topic %s, expected %d partitions"
5173 ", not %d\n",
5174 mdt->topic,
5175 exp_mdt->partition_cnt,
5176 mdt->partition_cnt);
5177 fails++;
5178 continue;
5179 }
5180
5181 /* Verify per-partition values */
5182 for (pi = 0 ; exp_mdt->partitions &&
5183 pi < exp_mdt->partition_cnt ; pi++) {
5184 const rd_kafka_metadata_partition_t *mdp =
5185 &mdt->partitions[pi];
5186 const rd_kafka_metadata_partition_t *exp_mdp =
5187 &exp_mdt->partitions[pi];
5188
5189 if (mdp->id != exp_mdp->id) {
5190 TEST_SAY("metadata: "
5191 "Topic %s, "
5192 "partition %d, "
5193 "partition list out of order,"
5194 " expected %d, not %d\n",
5195 mdt->topic, pi,
5196 exp_mdp->id, mdp->id);
5197 fails++;
5198 continue;
5199 }
5200
5201 if (exp_mdp->replicas) {
5202 if (mdp->replica_cnt !=
5203 exp_mdp->replica_cnt) {
5204 TEST_SAY("metadata: "
5205 "Topic %s, "
5206 "partition %d, "
5207 "expected %d replicas,"
5208 " not %d\n",
5209 mdt->topic, pi,
5210 exp_mdp->replica_cnt,
5211 mdp->replica_cnt);
5212 fails++;
5213 } else if (memcmp(mdp->replicas,
5214 exp_mdp->replicas,
5215 mdp->replica_cnt *
5216 sizeof(*mdp->replicas))) {
5217 int ri;
5218
5219 TEST_SAY("metadata: "
5220 "Topic %s, "
5221 "partition %d, "
5222 "replica mismatch:\n",
5223 mdt->topic, pi);
5224
5225 for (ri = 0 ;
5226 ri < mdp->replica_cnt ;
5227 ri++) {
5228 TEST_SAY(" #%d: "
5229 "expected "
5230 "replica %d, "
5231 "not %d\n",
5232 ri,
5233 exp_mdp->
5234 replicas[ri],
5235 mdp->
5236 replicas[ri]);
5237 }
5238
5239 fails++;
5240 }
5241
5242 }
5243 }
5244 }
5245
5246 for (i = 0 ; i < not_topic_cnt ; i++) {
5247 if (strcmp(not_topics[i].topic, mdt->topic))
5248 continue;
5249
5250 TEST_SAY("metadata: "
5251 "Topic %s found in metadata, unexpected\n",
5252 mdt->topic);
5253 fails++;
5254 }
5255
5256 }
5257
5258 for (i = 0 ; i < topic_cnt ; i++) {
5259 if ((int)topics[i].err == 12345) {
5260 TEST_SAY("metadata: "
5261 "Topic %s not seen in metadata\n",
5262 topics[i].topic);
5263 fails++;
5264 }
5265 }
5266
5267 if (fails > 0)
5268 TEST_SAY("Metadata verification for %"PRIusz" topics failed "
5269 "with %d errors (see above)\n",
5270 topic_cnt, fails);
5271 else
5272 TEST_SAY("Metadata verification succeeded: "
5273 "%"PRIusz" desired topics seen, "
5274 "%"PRIusz" undesired topics not seen\n",
5275 topic_cnt, not_topic_cnt);
5276
5277 rd_kafka_metadata_destroy(md);
5278
5279 return fails;
5280 }
5281
5282
5283
5284 /**
5285 * @brief Wait for metadata to reflect expected and not expected topics
5286 */
test_wait_metadata_update(rd_kafka_t * rk,rd_kafka_metadata_topic_t * topics,size_t topic_cnt,rd_kafka_metadata_topic_t * not_topics,size_t not_topic_cnt,int tmout)5287 void test_wait_metadata_update (rd_kafka_t *rk,
5288 rd_kafka_metadata_topic_t *topics,
5289 size_t topic_cnt,
5290 rd_kafka_metadata_topic_t *not_topics,
5291 size_t not_topic_cnt,
5292 int tmout) {
5293 int64_t abs_timeout;
5294 test_timing_t t_md;
5295 rd_kafka_t *our_rk = NULL;
5296
5297 if (!rk)
5298 rk = our_rk = test_create_handle(RD_KAFKA_PRODUCER, NULL);
5299
5300 abs_timeout = test_clock() + (tmout * 1000);
5301
5302 TEST_SAY("Waiting for up to %dms for metadata update\n", tmout);
5303
5304 TIMING_START(&t_md, "METADATA.WAIT");
5305 do {
5306 int md_fails;
5307
5308 md_fails = verify_topics_in_metadata(
5309 rk,
5310 topics, topic_cnt,
5311 not_topics, not_topic_cnt);
5312
5313 if (!md_fails) {
5314 TEST_SAY("All expected topics (not?) "
5315 "seen in metadata\n");
5316 abs_timeout = 0;
5317 break;
5318 }
5319
5320 rd_sleep(1);
5321 } while (test_clock() < abs_timeout);
5322 TIMING_STOP(&t_md);
5323
5324 if (our_rk)
5325 rd_kafka_destroy(our_rk);
5326
5327 if (abs_timeout)
5328 TEST_FAIL("Expected topics not seen in given time.");
5329 }
5330
5331 /**
5332 * @brief Wait for topic to be available in metadata
5333 */
test_wait_topic_exists(rd_kafka_t * rk,const char * topic,int tmout)5334 void test_wait_topic_exists (rd_kafka_t *rk, const char *topic, int tmout) {
5335 rd_kafka_metadata_topic_t topics = { .topic = (char *)topic };
5336
5337 test_wait_metadata_update(rk, &topics, 1, NULL, 0, tmout);
5338
5339 /* Wait an additional second for the topic to propagate in
5340 * the cluster. This is not perfect but a cheap workaround for
5341 * the asynchronous nature of topic creations in Kafka. */
5342 rd_sleep(1);
5343 }
5344
5345
5346
5347 /**
5348 * @brief Wait for up to \p tmout for any type of admin result.
5349 * @returns the event
5350 */
5351 rd_kafka_event_t *
test_wait_admin_result(rd_kafka_queue_t * q,rd_kafka_event_type_t evtype,int tmout)5352 test_wait_admin_result (rd_kafka_queue_t *q,
5353 rd_kafka_event_type_t evtype,
5354 int tmout) {
5355 rd_kafka_event_t *rkev;
5356
5357 while (1) {
5358 rkev = rd_kafka_queue_poll(q, tmout);
5359 if (!rkev)
5360 TEST_FAIL("Timed out waiting for admin result (%d)\n",
5361 evtype);
5362
5363 if (rd_kafka_event_type(rkev) == evtype)
5364 return rkev;
5365
5366
5367 if (rd_kafka_event_type(rkev) == RD_KAFKA_EVENT_ERROR) {
5368 TEST_WARN("Received error event while waiting for %d: "
5369 "%s: ignoring",
5370 evtype, rd_kafka_event_error_string(rkev));
5371 continue;
5372 }
5373
5374
5375 TEST_ASSERT(rd_kafka_event_type(rkev) == evtype,
5376 "Expected event type %d, got %d (%s)",
5377 evtype,
5378 rd_kafka_event_type(rkev),
5379 rd_kafka_event_name(rkev));
5380 }
5381
5382 return NULL;
5383 }
5384
5385
5386
5387 /**
5388 * @brief Wait for up to \p tmout for an admin API result and return the
5389 * distilled error code.
5390 *
5391 * Supported APIs:
5392 * - AlterConfigs
5393 * - CreatePartitions
5394 * - CreateTopics
5395 * - DeleteGroups
5396 * - DeleteRecords
5397 * - DeleteTopics
5398 * * DeleteConsumerGroupOffsets
5399 * - DescribeConfigs
5400 */
5401 rd_kafka_resp_err_t
test_wait_topic_admin_result(rd_kafka_queue_t * q,rd_kafka_event_type_t evtype,rd_kafka_event_t ** retevent,int tmout)5402 test_wait_topic_admin_result (rd_kafka_queue_t *q,
5403 rd_kafka_event_type_t evtype,
5404 rd_kafka_event_t **retevent,
5405 int tmout) {
5406 rd_kafka_event_t *rkev;
5407 size_t i;
5408 const rd_kafka_topic_result_t **terr = NULL;
5409 size_t terr_cnt = 0;
5410 const rd_kafka_ConfigResource_t **cres = NULL;
5411 size_t cres_cnt = 0;
5412 int errcnt = 0;
5413 rd_kafka_resp_err_t err;
5414 const rd_kafka_group_result_t **gres = NULL;
5415 size_t gres_cnt = 0;
5416 const rd_kafka_topic_partition_list_t *offsets = NULL;
5417
5418 rkev = test_wait_admin_result(q, evtype, tmout);
5419
5420 if ((err = rd_kafka_event_error(rkev))) {
5421 TEST_WARN("%s failed: %s\n",
5422 rd_kafka_event_name(rkev),
5423 rd_kafka_event_error_string(rkev));
5424 rd_kafka_event_destroy(rkev);
5425 return err;
5426 }
5427
5428 if (evtype == RD_KAFKA_EVENT_CREATETOPICS_RESULT) {
5429 const rd_kafka_CreateTopics_result_t *res;
5430 if (!(res = rd_kafka_event_CreateTopics_result(rkev)))
5431 TEST_FAIL("Expected a CreateTopics result, not %s",
5432 rd_kafka_event_name(rkev));
5433
5434 terr = rd_kafka_CreateTopics_result_topics(res, &terr_cnt);
5435
5436 } else if (evtype == RD_KAFKA_EVENT_DELETETOPICS_RESULT) {
5437 const rd_kafka_DeleteTopics_result_t *res;
5438 if (!(res = rd_kafka_event_DeleteTopics_result(rkev)))
5439 TEST_FAIL("Expected a DeleteTopics result, not %s",
5440 rd_kafka_event_name(rkev));
5441
5442 terr = rd_kafka_DeleteTopics_result_topics(res, &terr_cnt);
5443
5444 } else if (evtype == RD_KAFKA_EVENT_CREATEPARTITIONS_RESULT) {
5445 const rd_kafka_CreatePartitions_result_t *res;
5446 if (!(res = rd_kafka_event_CreatePartitions_result(rkev)))
5447 TEST_FAIL("Expected a CreatePartitions result, not %s",
5448 rd_kafka_event_name(rkev));
5449
5450 terr = rd_kafka_CreatePartitions_result_topics(res, &terr_cnt);
5451
5452 } else if (evtype == RD_KAFKA_EVENT_DESCRIBECONFIGS_RESULT) {
5453 const rd_kafka_DescribeConfigs_result_t *res;
5454
5455 if (!(res = rd_kafka_event_DescribeConfigs_result(rkev)))
5456 TEST_FAIL("Expected a DescribeConfigs result, not %s",
5457 rd_kafka_event_name(rkev));
5458
5459 cres = rd_kafka_DescribeConfigs_result_resources(res,
5460 &cres_cnt);
5461
5462 } else if (evtype == RD_KAFKA_EVENT_ALTERCONFIGS_RESULT) {
5463 const rd_kafka_AlterConfigs_result_t *res;
5464
5465 if (!(res = rd_kafka_event_AlterConfigs_result(rkev)))
5466 TEST_FAIL("Expected a AlterConfigs result, not %s",
5467 rd_kafka_event_name(rkev));
5468
5469 cres = rd_kafka_AlterConfigs_result_resources(res, &cres_cnt);
5470
5471 } else if (evtype == RD_KAFKA_EVENT_DELETEGROUPS_RESULT) {
5472 const rd_kafka_DeleteGroups_result_t *res;
5473 if (!(res = rd_kafka_event_DeleteGroups_result(rkev)))
5474 TEST_FAIL("Expected a DeleteGroups result, not %s",
5475 rd_kafka_event_name(rkev));
5476
5477 gres = rd_kafka_DeleteGroups_result_groups(res, &gres_cnt);
5478
5479 } else if (evtype == RD_KAFKA_EVENT_DELETERECORDS_RESULT) {
5480 const rd_kafka_DeleteRecords_result_t *res;
5481 if (!(res = rd_kafka_event_DeleteRecords_result(rkev)))
5482 TEST_FAIL("Expected a DeleteRecords result, not %s",
5483 rd_kafka_event_name(rkev));
5484
5485 offsets = rd_kafka_DeleteRecords_result_offsets(res);
5486
5487 } else if (evtype == RD_KAFKA_EVENT_DELETECONSUMERGROUPOFFSETS_RESULT) {
5488 const rd_kafka_DeleteConsumerGroupOffsets_result_t *res;
5489 if (!(res =
5490 rd_kafka_event_DeleteConsumerGroupOffsets_result(rkev)))
5491 TEST_FAIL("Expected a DeleteConsumerGroupOffsets "
5492 "result, not %s",
5493 rd_kafka_event_name(rkev));
5494
5495 gres = rd_kafka_DeleteConsumerGroupOffsets_result_groups(
5496 rkev, &gres_cnt);
5497
5498 } else {
5499 TEST_FAIL("Bad evtype: %d", evtype);
5500 RD_NOTREACHED();
5501 }
5502
5503 /* Check topic errors */
5504 for (i = 0 ; i < terr_cnt ; i++) {
5505 if (rd_kafka_topic_result_error(terr[i])) {
5506 TEST_WARN("..Topics result: %s: error: %s\n",
5507 rd_kafka_topic_result_name(terr[i]),
5508 rd_kafka_topic_result_error_string(terr[i]));
5509 if (!(errcnt++))
5510 err = rd_kafka_topic_result_error(terr[i]);
5511 }
5512 }
5513
5514 /* Check resource errors */
5515 for (i = 0 ; i < cres_cnt ; i++) {
5516 if (rd_kafka_ConfigResource_error(cres[i])) {
5517 TEST_WARN("ConfigResource result: %d,%s: error: %s\n",
5518 rd_kafka_ConfigResource_type(cres[i]),
5519 rd_kafka_ConfigResource_name(cres[i]),
5520 rd_kafka_ConfigResource_error_string(cres[i]));
5521 if (!(errcnt++))
5522 err = rd_kafka_ConfigResource_error(cres[i]);
5523 }
5524 }
5525
5526 /* Check group errors */
5527 for (i = 0 ; i < gres_cnt ; i++) {
5528 const rd_kafka_topic_partition_list_t *parts;
5529
5530 if (rd_kafka_group_result_error(gres[i])) {
5531
5532 TEST_WARN("%s result: %s: error: %s\n",
5533 rd_kafka_event_name(rkev),
5534 rd_kafka_group_result_name(gres[i]),
5535 rd_kafka_error_string(rd_kafka_group_result_error(gres[i])));
5536 if (!(errcnt++))
5537 err = rd_kafka_error_code(rd_kafka_group_result_error(gres[i]));
5538 }
5539
5540 parts = rd_kafka_group_result_partitions(gres[i]);
5541 if (parts) {
5542 int j;
5543 for (j = 0 ; j < parts->cnt ; i++) {
5544 if (!parts->elems[j].err)
5545 continue;
5546
5547 TEST_WARN("%s result: %s: "
5548 "%s [%"PRId32"] error: %s\n",
5549 rd_kafka_event_name(rkev),
5550 rd_kafka_group_result_name(gres[i]),
5551 parts->elems[j].topic,
5552 parts->elems[j].partition,
5553 rd_kafka_err2str(
5554 parts->elems[j].err));
5555 errcnt++;
5556 }
5557 }
5558 }
5559
5560 /* Check offset errors */
5561 for (i = 0 ; (offsets && i < (size_t)offsets->cnt) ; i++) {
5562 if (offsets->elems[i].err) {
5563 TEST_WARN("DeleteRecords result: %s [%d]: error: %s\n",
5564 offsets->elems[i].topic, offsets->elems[i].partition,
5565 rd_kafka_err2str(offsets->elems[i].err));
5566 if (!(errcnt++))
5567 err = offsets->elems[i].err;
5568 }
5569 }
5570
5571 if (!err && retevent)
5572 *retevent = rkev;
5573 else
5574 rd_kafka_event_destroy(rkev);
5575
5576 return err;
5577 }
5578
5579
5580
5581 /**
5582 * @brief Topic Admin API helpers
5583 *
5584 * @param useq Makes the call async and posts the response in this queue.
5585 * If NULL this call will be synchronous and return the error
5586 * result.
5587 *
5588 * @remark Fails the current test on failure.
5589 */
5590
5591 rd_kafka_resp_err_t
test_CreateTopics_simple(rd_kafka_t * rk,rd_kafka_queue_t * useq,char ** topics,size_t topic_cnt,int num_partitions,void * opaque)5592 test_CreateTopics_simple (rd_kafka_t *rk,
5593 rd_kafka_queue_t *useq,
5594 char **topics, size_t topic_cnt,
5595 int num_partitions,
5596 void *opaque) {
5597 rd_kafka_NewTopic_t **new_topics;
5598 rd_kafka_AdminOptions_t *options;
5599 rd_kafka_queue_t *q;
5600 size_t i;
5601 const int tmout = 30 * 1000;
5602 rd_kafka_resp_err_t err;
5603
5604 new_topics = malloc(sizeof(*new_topics) * topic_cnt);
5605
5606 for (i = 0 ; i < topic_cnt ; i++) {
5607 char errstr[512];
5608 new_topics[i] = rd_kafka_NewTopic_new(topics[i],
5609 num_partitions, 1,
5610 errstr, sizeof(errstr));
5611 TEST_ASSERT(new_topics[i],
5612 "Failed to NewTopic(\"%s\", %d) #%"PRIusz": %s",
5613 topics[i], num_partitions, i, errstr);
5614 }
5615
5616 options = rd_kafka_AdminOptions_new(rk, RD_KAFKA_ADMIN_OP_CREATETOPICS);
5617 rd_kafka_AdminOptions_set_opaque(options, opaque);
5618
5619 if (!useq) {
5620 char errstr[512];
5621
5622 err = rd_kafka_AdminOptions_set_request_timeout(options,
5623 tmout,
5624 errstr,
5625 sizeof(errstr));
5626 TEST_ASSERT(!err, "set_request_timeout: %s", errstr);
5627 err = rd_kafka_AdminOptions_set_operation_timeout(options,
5628 tmout-5000,
5629 errstr,
5630 sizeof(errstr));
5631 TEST_ASSERT(!err, "set_operation_timeout: %s", errstr);
5632
5633 q = rd_kafka_queue_new(rk);
5634 } else {
5635 q = useq;
5636 }
5637
5638 TEST_SAY("Creating %"PRIusz" topics\n", topic_cnt);
5639
5640 rd_kafka_CreateTopics(rk, new_topics, topic_cnt, options, q);
5641
5642 rd_kafka_AdminOptions_destroy(options);
5643
5644 rd_kafka_NewTopic_destroy_array(new_topics, topic_cnt);
5645 free(new_topics);
5646
5647 if (useq)
5648 return RD_KAFKA_RESP_ERR_NO_ERROR;
5649
5650
5651 err = test_wait_topic_admin_result(q,
5652 RD_KAFKA_EVENT_CREATETOPICS_RESULT,
5653 NULL, tmout+5000);
5654
5655 rd_kafka_queue_destroy(q);
5656
5657 if (err)
5658 TEST_FAIL("Failed to create %d topic(s): %s",
5659 (int)topic_cnt, rd_kafka_err2str(err));
5660
5661 return err;
5662 }
5663
5664
5665 rd_kafka_resp_err_t
test_CreatePartitions_simple(rd_kafka_t * rk,rd_kafka_queue_t * useq,const char * topic,size_t total_part_cnt,void * opaque)5666 test_CreatePartitions_simple (rd_kafka_t *rk,
5667 rd_kafka_queue_t *useq,
5668 const char *topic,
5669 size_t total_part_cnt,
5670 void *opaque) {
5671 rd_kafka_NewPartitions_t *newp[1];
5672 rd_kafka_AdminOptions_t *options;
5673 rd_kafka_queue_t *q;
5674 const int tmout = 30 * 1000;
5675 rd_kafka_resp_err_t err;
5676 char errstr[512];
5677
5678 newp[0] = rd_kafka_NewPartitions_new(topic, total_part_cnt, errstr,
5679 sizeof(errstr));
5680 TEST_ASSERT(newp[0],
5681 "Failed to NewPartitions(\"%s\", %"PRIusz"): %s",
5682 topic, total_part_cnt, errstr);
5683
5684 options = rd_kafka_AdminOptions_new(rk,
5685 RD_KAFKA_ADMIN_OP_CREATEPARTITIONS);
5686 rd_kafka_AdminOptions_set_opaque(options, opaque);
5687
5688 if (!useq) {
5689 err = rd_kafka_AdminOptions_set_request_timeout(options,
5690 tmout,
5691 errstr,
5692 sizeof(errstr));
5693 TEST_ASSERT(!err, "set_request_timeout: %s", errstr);
5694 err = rd_kafka_AdminOptions_set_operation_timeout(options,
5695 tmout-5000,
5696 errstr,
5697 sizeof(errstr));
5698 TEST_ASSERT(!err, "set_operation_timeout: %s", errstr);
5699
5700 q = rd_kafka_queue_new(rk);
5701 } else {
5702 q = useq;
5703 }
5704
5705 TEST_SAY("Creating (up to) %"PRIusz" partitions for topic \"%s\"\n",
5706 total_part_cnt, topic);
5707
5708 rd_kafka_CreatePartitions(rk, newp, 1, options, q);
5709
5710 rd_kafka_AdminOptions_destroy(options);
5711
5712 rd_kafka_NewPartitions_destroy(newp[0]);
5713
5714 if (useq)
5715 return RD_KAFKA_RESP_ERR_NO_ERROR;
5716
5717
5718 err = test_wait_topic_admin_result(
5719 q, RD_KAFKA_EVENT_CREATEPARTITIONS_RESULT, NULL, tmout+5000);
5720
5721 rd_kafka_queue_destroy(q);
5722
5723 if (err)
5724 TEST_FAIL("Failed to create partitions: %s",
5725 rd_kafka_err2str(err));
5726
5727 return err;
5728 }
5729
5730
5731 rd_kafka_resp_err_t
test_DeleteTopics_simple(rd_kafka_t * rk,rd_kafka_queue_t * useq,char ** topics,size_t topic_cnt,void * opaque)5732 test_DeleteTopics_simple (rd_kafka_t *rk,
5733 rd_kafka_queue_t *useq,
5734 char **topics, size_t topic_cnt,
5735 void *opaque) {
5736 rd_kafka_queue_t *q;
5737 rd_kafka_DeleteTopic_t **del_topics;
5738 rd_kafka_AdminOptions_t *options;
5739 size_t i;
5740 rd_kafka_resp_err_t err;
5741 const int tmout = 30*1000;
5742
5743 del_topics = malloc(sizeof(*del_topics) * topic_cnt);
5744
5745 for (i = 0 ; i < topic_cnt ; i++) {
5746 del_topics[i] = rd_kafka_DeleteTopic_new(topics[i]);
5747 TEST_ASSERT(del_topics[i]);
5748 }
5749
5750 options = rd_kafka_AdminOptions_new(rk, RD_KAFKA_ADMIN_OP_DELETETOPICS);
5751 rd_kafka_AdminOptions_set_opaque(options, opaque);
5752
5753 if (!useq) {
5754 char errstr[512];
5755
5756 err = rd_kafka_AdminOptions_set_request_timeout(options,
5757 tmout,
5758 errstr,
5759 sizeof(errstr));
5760 TEST_ASSERT(!err, "set_request_timeout: %s", errstr);
5761 err = rd_kafka_AdminOptions_set_operation_timeout(options,
5762 tmout-5000,
5763 errstr,
5764 sizeof(errstr));
5765 TEST_ASSERT(!err, "set_operation_timeout: %s", errstr);
5766
5767 q = rd_kafka_queue_new(rk);
5768 } else {
5769 q = useq;
5770 }
5771
5772 TEST_SAY("Deleting %"PRIusz" topics\n", topic_cnt);
5773
5774 rd_kafka_DeleteTopics(rk, del_topics, topic_cnt, options, useq);
5775
5776 rd_kafka_AdminOptions_destroy(options);
5777
5778 rd_kafka_DeleteTopic_destroy_array(del_topics, topic_cnt);
5779
5780 free(del_topics);
5781
5782 if (useq)
5783 return RD_KAFKA_RESP_ERR_NO_ERROR;
5784
5785 err = test_wait_topic_admin_result(q,
5786 RD_KAFKA_EVENT_DELETETOPICS_RESULT,
5787 NULL, tmout+5000);
5788
5789 rd_kafka_queue_destroy(q);
5790
5791 if (err)
5792 TEST_FAIL("Failed to delete topics: %s",
5793 rd_kafka_err2str(err));
5794
5795 return err;
5796 }
5797
5798 rd_kafka_resp_err_t
test_DeleteGroups_simple(rd_kafka_t * rk,rd_kafka_queue_t * useq,char ** groups,size_t group_cnt,void * opaque)5799 test_DeleteGroups_simple (rd_kafka_t *rk,
5800 rd_kafka_queue_t *useq,
5801 char **groups, size_t group_cnt,
5802 void *opaque) {
5803 rd_kafka_queue_t *q;
5804 rd_kafka_DeleteGroup_t **del_groups;
5805 rd_kafka_AdminOptions_t *options;
5806 size_t i;
5807 rd_kafka_resp_err_t err;
5808 const int tmout = 30*1000;
5809
5810 del_groups = malloc(sizeof(*del_groups) * group_cnt);
5811
5812 for (i = 0 ; i < group_cnt ; i++) {
5813 del_groups[i] = rd_kafka_DeleteGroup_new(groups[i]);
5814 TEST_ASSERT(del_groups[i]);
5815 }
5816
5817 options = rd_kafka_AdminOptions_new(rk, RD_KAFKA_ADMIN_OP_DELETEGROUPS);
5818 rd_kafka_AdminOptions_set_opaque(options, opaque);
5819
5820 if (!useq) {
5821 char errstr[512];
5822
5823 err = rd_kafka_AdminOptions_set_request_timeout(options,
5824 tmout,
5825 errstr,
5826 sizeof(errstr));
5827 TEST_ASSERT(!err, "set_request_timeout: %s", errstr);
5828
5829 q = rd_kafka_queue_new(rk);
5830 } else {
5831 q = useq;
5832 }
5833
5834 TEST_SAY("Deleting %"PRIusz" groups\n", group_cnt);
5835
5836 rd_kafka_DeleteGroups(rk, del_groups, group_cnt, options, useq);
5837
5838 rd_kafka_AdminOptions_destroy(options);
5839
5840 rd_kafka_DeleteGroup_destroy_array(del_groups, group_cnt);
5841 free(del_groups);
5842
5843 if (useq)
5844 return RD_KAFKA_RESP_ERR_NO_ERROR;
5845
5846 err = test_wait_topic_admin_result(q,
5847 RD_KAFKA_EVENT_DELETEGROUPS_RESULT,
5848 NULL, tmout+5000);
5849
5850 rd_kafka_queue_destroy(q);
5851
5852 rd_kafka_DeleteGroup_destroy_array(del_groups, group_cnt);
5853
5854 if (err)
5855 TEST_FAIL("Failed to delete groups: %s",
5856 rd_kafka_err2str(err));
5857
5858 return err;
5859 }
5860
5861 rd_kafka_resp_err_t
test_DeleteRecords_simple(rd_kafka_t * rk,rd_kafka_queue_t * useq,const rd_kafka_topic_partition_list_t * offsets,void * opaque)5862 test_DeleteRecords_simple (rd_kafka_t *rk,
5863 rd_kafka_queue_t *useq,
5864 const rd_kafka_topic_partition_list_t *offsets,
5865 void *opaque) {
5866 rd_kafka_queue_t *q;
5867 rd_kafka_AdminOptions_t *options;
5868 rd_kafka_resp_err_t err;
5869 rd_kafka_DeleteRecords_t *del_records =
5870 rd_kafka_DeleteRecords_new(offsets);
5871 const int tmout = 30*1000;
5872
5873 options = rd_kafka_AdminOptions_new(rk,
5874 RD_KAFKA_ADMIN_OP_DELETERECORDS);
5875 rd_kafka_AdminOptions_set_opaque(options, opaque);
5876
5877 if (!useq) {
5878 char errstr[512];
5879
5880 err = rd_kafka_AdminOptions_set_request_timeout(options,
5881 tmout,
5882 errstr,
5883 sizeof(errstr));
5884 TEST_ASSERT(!err, "set_request_timeout: %s", errstr);
5885 err = rd_kafka_AdminOptions_set_operation_timeout(
5886 options,
5887 tmout-5000,
5888 errstr,
5889 sizeof(errstr));
5890 TEST_ASSERT(!err, "set_operation_timeout: %s", errstr);
5891
5892 q = rd_kafka_queue_new(rk);
5893 } else {
5894 q = useq;
5895 }
5896
5897 TEST_SAY("Deleting offsets from %d partitions\n", offsets->cnt);
5898
5899 rd_kafka_DeleteRecords(rk, &del_records, 1, options, q);
5900
5901 rd_kafka_DeleteRecords_destroy(del_records);
5902
5903 rd_kafka_AdminOptions_destroy(options);
5904
5905 if (useq)
5906 return RD_KAFKA_RESP_ERR_NO_ERROR;
5907
5908 err = test_wait_topic_admin_result(q,
5909 RD_KAFKA_EVENT_DELETERECORDS_RESULT,
5910 NULL, tmout+5000);
5911
5912 rd_kafka_queue_destroy(q);
5913
5914 if (err)
5915 TEST_FAIL("Failed to delete records: %s",
5916 rd_kafka_err2str(err));
5917
5918 return err;
5919 }
5920
5921 rd_kafka_resp_err_t
test_DeleteConsumerGroupOffsets_simple(rd_kafka_t * rk,rd_kafka_queue_t * useq,const char * group_id,const rd_kafka_topic_partition_list_t * offsets,void * opaque)5922 test_DeleteConsumerGroupOffsets_simple (
5923 rd_kafka_t *rk,
5924 rd_kafka_queue_t *useq,
5925 const char *group_id,
5926 const rd_kafka_topic_partition_list_t *offsets,
5927 void *opaque) {
5928 rd_kafka_queue_t *q;
5929 rd_kafka_AdminOptions_t *options;
5930 rd_kafka_resp_err_t err;
5931 const int tmout = 30*1000;
5932 rd_kafka_DeleteConsumerGroupOffsets_t *cgoffsets;
5933
5934 options = rd_kafka_AdminOptions_new(
5935 rk, RD_KAFKA_ADMIN_OP_DELETECONSUMERGROUPOFFSETS);
5936 rd_kafka_AdminOptions_set_opaque(options, opaque);
5937
5938 if (!useq) {
5939 char errstr[512];
5940
5941 err = rd_kafka_AdminOptions_set_request_timeout(options,
5942 tmout,
5943 errstr,
5944 sizeof(errstr));
5945 TEST_ASSERT(!err, "set_request_timeout: %s", errstr);
5946 err = rd_kafka_AdminOptions_set_operation_timeout(
5947 options,
5948 tmout-5000,
5949 errstr,
5950 sizeof(errstr));
5951 TEST_ASSERT(!err, "set_operation_timeout: %s", errstr);
5952
5953 q = rd_kafka_queue_new(rk);
5954 } else {
5955 q = useq;
5956 }
5957
5958 if (offsets) {
5959 TEST_SAY("Deleting committed offsets for group %s and "
5960 "%d partitions\n",
5961 group_id, offsets->cnt);
5962
5963 cgoffsets = rd_kafka_DeleteConsumerGroupOffsets_new(group_id,
5964 offsets);
5965 } else {
5966 TEST_SAY("Provoking invalid DeleteConsumerGroupOffsets call\n");
5967 cgoffsets = NULL;
5968 }
5969
5970 rd_kafka_DeleteConsumerGroupOffsets(rk, &cgoffsets,
5971 cgoffsets ? 1 : 0,
5972 options, useq);
5973
5974 if (cgoffsets)
5975 rd_kafka_DeleteConsumerGroupOffsets_destroy(cgoffsets);
5976
5977 rd_kafka_AdminOptions_destroy(options);
5978
5979 if (useq)
5980 return RD_KAFKA_RESP_ERR_NO_ERROR;
5981
5982 err = test_wait_topic_admin_result(
5983 q,
5984 RD_KAFKA_EVENT_DELETECONSUMERGROUPOFFSETS_RESULT,
5985 NULL, tmout+5000);
5986
5987 rd_kafka_queue_destroy(q);
5988
5989 if (err)
5990 TEST_FAIL("Failed to delete committed offsets: %s",
5991 rd_kafka_err2str(err));
5992
5993 return err;
5994 }
5995
5996 /**
5997 * @brief Delta Alter configuration for the given resource,
5998 * overwriting/setting the configs provided in \p configs.
5999 * Existing configuration remains intact.
6000 *
6001 * @param configs 'const char *name, const char *value' tuples
6002 * @param config_cnt is the number of tuples in \p configs
6003 */
6004 rd_kafka_resp_err_t
test_AlterConfigs_simple(rd_kafka_t * rk,rd_kafka_ResourceType_t restype,const char * resname,const char ** configs,size_t config_cnt)6005 test_AlterConfigs_simple (rd_kafka_t *rk,
6006 rd_kafka_ResourceType_t restype,
6007 const char *resname,
6008 const char **configs, size_t config_cnt) {
6009 rd_kafka_queue_t *q;
6010 rd_kafka_ConfigResource_t *confres;
6011 rd_kafka_event_t *rkev;
6012 size_t i;
6013 rd_kafka_resp_err_t err;
6014 const rd_kafka_ConfigResource_t **results;
6015 size_t result_cnt;
6016 const rd_kafka_ConfigEntry_t **configents;
6017 size_t configent_cnt;
6018
6019
6020 q = rd_kafka_queue_new(rk);
6021
6022 TEST_SAY("Getting configuration for %d %s\n", restype, resname);
6023
6024 confres = rd_kafka_ConfigResource_new(restype, resname);
6025 rd_kafka_DescribeConfigs(rk, &confres, 1, NULL, q);
6026
6027 err = test_wait_topic_admin_result(
6028 q, RD_KAFKA_EVENT_DESCRIBECONFIGS_RESULT, &rkev, 15*1000);
6029 if (err) {
6030 rd_kafka_queue_destroy(q);
6031 rd_kafka_ConfigResource_destroy(confres);
6032 return err;
6033 }
6034
6035 results = rd_kafka_DescribeConfigs_result_resources(
6036 rd_kafka_event_DescribeConfigs_result(rkev), &result_cnt);
6037 TEST_ASSERT(result_cnt == 1,
6038 "expected 1 DescribeConfigs result, not %"PRIusz,
6039 result_cnt);
6040
6041 configents = rd_kafka_ConfigResource_configs(results[0],
6042 &configent_cnt);
6043 TEST_ASSERT(configent_cnt > 0,
6044 "expected > 0 ConfigEntry:s, not %"PRIusz, configent_cnt);
6045
6046 TEST_SAY("Altering configuration for %d %s\n", restype, resname);
6047
6048 /* Apply all existing configuration entries to resource object that
6049 * will later be passed to AlterConfigs. */
6050 for (i = 0 ; i < configent_cnt ; i++) {
6051 err = rd_kafka_ConfigResource_set_config(
6052 confres,
6053 rd_kafka_ConfigEntry_name(configents[i]),
6054 rd_kafka_ConfigEntry_value(configents[i]));
6055 TEST_ASSERT(!err, "Failed to set read-back config %s=%s "
6056 "on local resource object",
6057 rd_kafka_ConfigEntry_name(configents[i]),
6058 rd_kafka_ConfigEntry_value(configents[i]));
6059 }
6060
6061 rd_kafka_event_destroy(rkev);
6062
6063 /* Then apply the configuration to change. */
6064 for (i = 0 ; i < config_cnt ; i += 2) {
6065 err = rd_kafka_ConfigResource_set_config(confres,
6066 configs[i],
6067 configs[i+1]);
6068 TEST_ASSERT(!err, "Failed to set config %s=%s on "
6069 "local resource object",
6070 configs[i], configs[i+1]);
6071 }
6072
6073 rd_kafka_AlterConfigs(rk, &confres, 1, NULL, q);
6074
6075 rd_kafka_ConfigResource_destroy(confres);
6076
6077 err = test_wait_topic_admin_result(
6078 q, RD_KAFKA_EVENT_ALTERCONFIGS_RESULT, NULL, 15*1000);
6079
6080 rd_kafka_queue_destroy(q);
6081
6082 return err;
6083 }
6084
6085
6086
test_free_string_array(char ** strs,size_t cnt)6087 static void test_free_string_array (char **strs, size_t cnt) {
6088 size_t i;
6089 for (i = 0 ; i < cnt ; i++)
6090 free(strs[i]);
6091 free(strs);
6092 }
6093
6094
6095 /**
6096 * @return an array of all topics in the cluster matching our the
6097 * rdkafka test prefix.
6098 */
6099 static rd_kafka_resp_err_t
test_get_all_test_topics(rd_kafka_t * rk,char *** topicsp,size_t * topic_cntp)6100 test_get_all_test_topics (rd_kafka_t *rk, char ***topicsp, size_t *topic_cntp) {
6101 size_t test_topic_prefix_len = strlen(test_topic_prefix);
6102 const rd_kafka_metadata_t *md;
6103 char **topics = NULL;
6104 size_t topic_cnt = 0;
6105 int i;
6106 rd_kafka_resp_err_t err;
6107
6108 *topic_cntp = 0;
6109 if (topicsp)
6110 *topicsp = NULL;
6111
6112 /* Retrieve list of topics */
6113 err = rd_kafka_metadata(rk, 1/*all topics*/, NULL, &md,
6114 tmout_multip(10000));
6115 if (err) {
6116 TEST_WARN("%s: Failed to acquire metadata: %s: "
6117 "not deleting any topics\n",
6118 __FUNCTION__, rd_kafka_err2str(err));
6119 return err;
6120 }
6121
6122 if (md->topic_cnt == 0) {
6123 TEST_WARN("%s: No topics in cluster\n", __FUNCTION__);
6124 rd_kafka_metadata_destroy(md);
6125 return RD_KAFKA_RESP_ERR_NO_ERROR;
6126 }
6127
6128 if (topicsp)
6129 topics = malloc(sizeof(*topics) * md->topic_cnt);
6130
6131 for (i = 0 ; i < md->topic_cnt ; i++) {
6132 if (strlen(md->topics[i].topic) >= test_topic_prefix_len &&
6133 !strncmp(md->topics[i].topic,
6134 test_topic_prefix, test_topic_prefix_len)) {
6135 if (topicsp)
6136 topics[topic_cnt++] =
6137 rd_strdup(md->topics[i].topic);
6138 else
6139 topic_cnt++;
6140 }
6141 }
6142
6143 if (topic_cnt == 0) {
6144 TEST_SAY("%s: No topics (out of %d) matching our "
6145 "test prefix (%s)\n",
6146 __FUNCTION__, md->topic_cnt, test_topic_prefix);
6147 rd_kafka_metadata_destroy(md);
6148 if (topics)
6149 test_free_string_array(topics, topic_cnt);
6150 return RD_KAFKA_RESP_ERR_NO_ERROR;
6151 }
6152
6153 rd_kafka_metadata_destroy(md);
6154
6155 if (topicsp)
6156 *topicsp = topics;
6157 *topic_cntp = topic_cnt;
6158
6159 return RD_KAFKA_RESP_ERR_NO_ERROR;
6160 }
6161
6162 /**
6163 * @brief Delete all test topics using the Kafka Admin API.
6164 */
test_delete_all_test_topics(int timeout_ms)6165 rd_kafka_resp_err_t test_delete_all_test_topics (int timeout_ms) {
6166 rd_kafka_t *rk;
6167 char **topics;
6168 size_t topic_cnt = 0;
6169 rd_kafka_resp_err_t err;
6170 int i;
6171 rd_kafka_AdminOptions_t *options;
6172 rd_kafka_queue_t *q;
6173 char errstr[256];
6174 int64_t abs_timeout = test_clock() + (timeout_ms * 1000);
6175
6176 rk = test_create_producer();
6177
6178 err = test_get_all_test_topics(rk, &topics, &topic_cnt);
6179 if (err) {
6180 /* Error already reported by test_get_all_test_topics() */
6181 rd_kafka_destroy(rk);
6182 return err;
6183 }
6184
6185 if (topic_cnt == 0) {
6186 rd_kafka_destroy(rk);
6187 return RD_KAFKA_RESP_ERR_NO_ERROR;
6188 }
6189
6190 q = rd_kafka_queue_get_main(rk);
6191
6192 options = rd_kafka_AdminOptions_new(rk, RD_KAFKA_ADMIN_OP_DELETETOPICS);
6193 if (rd_kafka_AdminOptions_set_operation_timeout(options, 2*60*1000,
6194 errstr,
6195 sizeof(errstr)))
6196 TEST_SAY(_C_YEL "Failed to set DeleteTopics timeout: %s: "
6197 "ignoring\n",
6198 errstr);
6199
6200 TEST_SAY(_C_MAG "====> Deleting all test topics with <===="
6201 "a timeout of 2 minutes\n");
6202
6203 test_DeleteTopics_simple(rk, q, topics, topic_cnt, options);
6204
6205 rd_kafka_AdminOptions_destroy(options);
6206
6207 while (1) {
6208 rd_kafka_event_t *rkev;
6209 const rd_kafka_DeleteTopics_result_t *res;
6210
6211 rkev = rd_kafka_queue_poll(q, -1);
6212
6213 res = rd_kafka_event_DeleteTopics_result(rkev);
6214 if (!res) {
6215 TEST_SAY("%s: Ignoring event: %s: %s\n",
6216 __FUNCTION__, rd_kafka_event_name(rkev),
6217 rd_kafka_event_error_string(rkev));
6218 rd_kafka_event_destroy(rkev);
6219 continue;
6220 }
6221
6222 if (rd_kafka_event_error(rkev)) {
6223 TEST_WARN("%s: DeleteTopics for %"PRIusz" topics "
6224 "failed: %s\n",
6225 __FUNCTION__, topic_cnt,
6226 rd_kafka_event_error_string(rkev));
6227 err = rd_kafka_event_error(rkev);
6228 } else {
6229 const rd_kafka_topic_result_t **terr;
6230 size_t tcnt;
6231 int okcnt = 0;
6232
6233 terr = rd_kafka_DeleteTopics_result_topics(res, &tcnt);
6234
6235 for(i = 0 ; i < (int)tcnt ; i++) {
6236 if (!rd_kafka_topic_result_error(terr[i])) {
6237 okcnt++;
6238 continue;
6239 }
6240
6241 TEST_WARN("%s: Failed to delete topic %s: %s\n",
6242 __FUNCTION__,
6243 rd_kafka_topic_result_name(terr[i]),
6244 rd_kafka_topic_result_error_string(
6245 terr[i]));
6246 }
6247
6248 TEST_SAY("%s: DeleteTopics "
6249 "succeeded for %d/%"PRIusz" topics\n",
6250 __FUNCTION__, okcnt, topic_cnt);
6251 err = RD_KAFKA_RESP_ERR_NO_ERROR;
6252 }
6253
6254 rd_kafka_event_destroy(rkev);
6255 break;
6256 }
6257
6258 rd_kafka_queue_destroy(q);
6259
6260 test_free_string_array(topics, topic_cnt);
6261
6262 /* Wait for topics to be fully deleted */
6263 while (1) {
6264 err = test_get_all_test_topics(rk, NULL, &topic_cnt);
6265
6266 if (!err && topic_cnt == 0)
6267 break;
6268
6269 if (abs_timeout < test_clock()) {
6270 TEST_WARN("%s: Timed out waiting for "
6271 "remaining %"PRIusz" deleted topics "
6272 "to disappear from cluster metadata\n",
6273 __FUNCTION__, topic_cnt);
6274 break;
6275 }
6276
6277 TEST_SAY("Waiting for remaining %"PRIusz" delete topics "
6278 "to disappear from cluster metadata\n", topic_cnt);
6279
6280 rd_sleep(1);
6281 }
6282
6283 rd_kafka_destroy(rk);
6284
6285 return err;
6286 }
6287
6288
6289
test_fail0(const char * file,int line,const char * function,int do_lock,int fail_now,const char * fmt,...)6290 void test_fail0 (const char *file, int line, const char *function,
6291 int do_lock, int fail_now, const char *fmt, ...) {
6292 char buf[512];
6293 int is_thrd = 0;
6294 size_t of;
6295 va_list ap;
6296 char *t;
6297 char timestr[32];
6298 time_t tnow = time(NULL);
6299
6300 #ifdef __MINGW32__
6301 strftime(timestr, sizeof(timestr), "%a %b %d %H:%M:%S %Y", localtime(&tnow));
6302 #elif defined(_WIN32)
6303 ctime_s(timestr, sizeof(timestr), &tnow);
6304 #else
6305 ctime_r(&tnow, timestr);
6306 #endif
6307 t = strchr(timestr, '\n');
6308 if (t)
6309 *t = '\0';
6310
6311 of = rd_snprintf(buf, sizeof(buf), "%s%s%s():%i: ",
6312 test_curr->subtest, *test_curr->subtest ? ": " : "",
6313 function, line);
6314 rd_assert(of < sizeof(buf));
6315
6316 va_start(ap, fmt);
6317 rd_vsnprintf(buf+of, sizeof(buf)-of, fmt, ap);
6318 va_end(ap);
6319
6320 /* Remove trailing newline */
6321 if ((t = strchr(buf, '\n')) && !*(t+1))
6322 *t = '\0';
6323
6324 TEST_SAYL(0, "TEST FAILURE\n");
6325 fprintf(stderr,
6326 "\033[31m### Test \"%s%s%s%s\" failed at %s:%i:%s() at %s: "
6327 "###\n"
6328 "%s\n",
6329 test_curr->name,
6330 *test_curr->subtest ? " (" : "",
6331 test_curr->subtest,
6332 *test_curr->subtest ? ")" : "",
6333 file, line, function, timestr, buf+of);
6334 if (do_lock)
6335 TEST_LOCK();
6336 test_curr->state = TEST_FAILED;
6337 test_curr->failcnt += 1;
6338 test_curr->is_fatal_cb = NULL;
6339
6340 if (!*test_curr->failstr) {
6341 strncpy(test_curr->failstr, buf, sizeof(test_curr->failstr));
6342 test_curr->failstr[sizeof(test_curr->failstr)-1] = '\0';
6343 }
6344 if (fail_now && test_curr->mainfunc) {
6345 tests_running_cnt--;
6346 is_thrd = 1;
6347 }
6348 if (do_lock)
6349 TEST_UNLOCK();
6350 if (!fail_now)
6351 return;
6352 if (test_assert_on_fail || !is_thrd)
6353 assert(0);
6354 else
6355 thrd_exit(0);
6356 }
6357
6358
6359 /**
6360 * @brief Destroy a mock cluster and its underlying rd_kafka_t handle
6361 */
test_mock_cluster_destroy(rd_kafka_mock_cluster_t * mcluster)6362 void test_mock_cluster_destroy (rd_kafka_mock_cluster_t *mcluster) {
6363 rd_kafka_t *rk = rd_kafka_mock_cluster_handle(mcluster);
6364 rd_kafka_mock_cluster_destroy(mcluster);
6365 rd_kafka_destroy(rk);
6366 }
6367
6368
6369
6370 /**
6371 * @brief Create a standalone mock cluster that can be used by multiple
6372 * rd_kafka_t instances.
6373 */
test_mock_cluster_new(int broker_cnt,const char ** bootstraps)6374 rd_kafka_mock_cluster_t *test_mock_cluster_new (int broker_cnt,
6375 const char **bootstraps) {
6376 rd_kafka_t *rk;
6377 rd_kafka_conf_t *conf = rd_kafka_conf_new();
6378 rd_kafka_mock_cluster_t *mcluster;
6379 char errstr[256];
6380
6381 test_conf_common_init(conf, 0);
6382
6383 test_conf_set(conf, "client.id", "MOCK");
6384
6385 rk = rd_kafka_new(RD_KAFKA_PRODUCER, conf, errstr, sizeof(errstr));
6386 TEST_ASSERT(rk, "Failed to create mock cluster rd_kafka_t: %s", errstr);
6387
6388 mcluster = rd_kafka_mock_cluster_new(rk, broker_cnt);
6389 TEST_ASSERT(mcluster, "Failed to acquire mock cluster");
6390
6391 if (bootstraps)
6392 *bootstraps = rd_kafka_mock_cluster_bootstraps(mcluster);
6393
6394 return mcluster;
6395 }
6396
6397
6398
6399 /**
6400 * @name Sub-tests
6401 */
6402
6403
6404 /**
6405 * @brief Start a sub-test. \p fmt is optional and allows additional
6406 * sub-test info to be displayed, e.g., test parameters.
6407 *
6408 * @returns 0 if sub-test should not be run, else 1.
6409 */
test_sub_start(const char * func,int line,int is_quick,const char * fmt,...)6410 int test_sub_start (const char *func, int line, int is_quick,
6411 const char *fmt, ...) {
6412
6413 if (!is_quick && test_quick)
6414 return 0;
6415
6416 if (subtests_to_run && !strstr(func, subtests_to_run))
6417 return 0;
6418
6419 if (fmt && *fmt) {
6420 va_list ap;
6421 char buf[256];
6422
6423 va_start(ap, fmt);
6424 rd_vsnprintf(buf, sizeof(buf), fmt, ap);
6425 va_end(ap);
6426
6427 rd_snprintf(test_curr->subtest, sizeof(test_curr->subtest),
6428 "%s:%d: %s", func, line, buf);
6429 } else {
6430 rd_snprintf(test_curr->subtest, sizeof(test_curr->subtest),
6431 "%s:%d", func, line);
6432 }
6433
6434 TIMING_START(&test_curr->subtest_duration, "SUBTEST");
6435
6436 TEST_SAY(_C_MAG "[ %s ]\n", test_curr->subtest);
6437
6438 return 1;
6439 }
6440
6441
6442 /**
6443 * @brief Reset the current subtest state.
6444 */
test_sub_reset(void)6445 static void test_sub_reset (void) {
6446 *test_curr->subtest = '\0';
6447 test_curr->is_fatal_cb = NULL;
6448 test_curr->ignore_dr_err = rd_false;
6449 test_curr->exp_dr_err = RD_KAFKA_RESP_ERR_NO_ERROR;
6450 test_curr->dr_mv = NULL;
6451 }
6452
6453 /**
6454 * @brief Sub-test has passed.
6455 */
test_sub_pass(void)6456 void test_sub_pass (void) {
6457
6458 TEST_ASSERT(*test_curr->subtest);
6459
6460 TEST_SAYL(1, _C_GRN "[ %s: PASS (%.02fs) ]\n", test_curr->subtest,
6461 (float)(TIMING_DURATION(&test_curr->subtest_duration) /
6462 1000000.0f));
6463
6464 test_sub_reset();
6465 }
6466
6467
6468 /**
6469 * @brief Skip sub-test (must have been started with SUB_TEST*()).
6470 */
test_sub_skip(const char * fmt,...)6471 void test_sub_skip (const char *fmt, ...) {
6472 va_list ap;
6473 char buf[256];
6474
6475 TEST_ASSERT(*test_curr->subtest);
6476
6477 va_start(ap, fmt);
6478 rd_vsnprintf(buf, sizeof(buf), fmt, ap);
6479 va_end(ap);
6480
6481 TEST_SAYL(1, _C_YEL "[ %s: SKIP: %s ]\n", test_curr->subtest, buf);
6482
6483 test_sub_reset();
6484 }
6485