1 /*
2 * librdkafka - Apache Kafka C library
3 *
4 * Copyright (c) 2012-2015, Magnus Edenhill
5 * All rights reserved.
6 *
7 * Redistribution and use in source and binary forms, with or without
8 * modification, are permitted provided that the following conditions are met:
9 *
10 * 1. Redistributions of source code must retain the above copyright notice,
11 * this list of conditions and the following disclaimer.
12 * 2. Redistributions in binary form must reproduce the above copyright notice,
13 * this list of conditions and the following disclaimer in the documentation
14 * and/or other materials provided with the distribution.
15 *
16 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
17 * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
18 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
19 * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
20 * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
21 * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
22 * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
23 * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
24 * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
25 * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
26 * POSSIBILITY OF SUCH DAMAGE.
27 */
28
29 #include "test.h"
30 #include "rdkafka.h"
31
32 /**
33 * @brief Admin API local dry-run unit-tests.
34 */
35
36 #define MY_SOCKET_TIMEOUT_MS 100
37 #define MY_SOCKET_TIMEOUT_MS_STR "100"
38
39
40
41 static mtx_t last_event_lock;
42 static cnd_t last_event_cnd;
43 static rd_kafka_event_t *last_event = NULL;
44
45 /**
46 * @brief The background event callback is called automatically
47 * by librdkafka from a background thread.
48 */
background_event_cb(rd_kafka_t * rk,rd_kafka_event_t * rkev,void * opaque)49 static void background_event_cb (rd_kafka_t *rk, rd_kafka_event_t *rkev,
50 void *opaque) {
51 mtx_lock(&last_event_lock);
52 TEST_ASSERT(!last_event, "Multiple events seen in background_event_cb "
53 "(existing %s, new %s)",
54 rd_kafka_event_name(last_event), rd_kafka_event_name(rkev));
55 last_event = rkev;
56 mtx_unlock(&last_event_lock);
57 cnd_broadcast(&last_event_cnd);
58 rd_sleep(1);
59 }
60
wait_background_event_cb(void)61 static rd_kafka_event_t *wait_background_event_cb (void) {
62 rd_kafka_event_t *rkev;
63 mtx_lock(&last_event_lock);
64 while (!(rkev = last_event))
65 cnd_wait(&last_event_cnd, &last_event_lock);
66 last_event = NULL;
67 mtx_unlock(&last_event_lock);
68
69 return rkev;
70 }
71
72
73 /**
74 * @brief CreateTopics tests
75 *
76 *
77 *
78 */
do_test_CreateTopics(const char * what,rd_kafka_t * rk,rd_kafka_queue_t * useq,int with_background_event_cb,int with_options)79 static void do_test_CreateTopics (const char *what,
80 rd_kafka_t *rk, rd_kafka_queue_t *useq,
81 int with_background_event_cb,
82 int with_options) {
83 rd_kafka_queue_t *q = useq ? useq : rd_kafka_queue_new(rk);
84 #define MY_NEW_TOPICS_CNT 6
85 rd_kafka_NewTopic_t *new_topics[MY_NEW_TOPICS_CNT];
86 rd_kafka_AdminOptions_t *options = NULL;
87 int exp_timeout = MY_SOCKET_TIMEOUT_MS;
88 int i;
89 char errstr[512];
90 const char *errstr2;
91 rd_kafka_resp_err_t err;
92 test_timing_t timing;
93 rd_kafka_event_t *rkev;
94 const rd_kafka_CreateTopics_result_t *res;
95 const rd_kafka_topic_result_t **restopics;
96 size_t restopic_cnt;
97 void *my_opaque = NULL, *opaque;
98
99 TEST_SAY(_C_MAG "[ %s CreateTopics with %s, timeout %dms ]\n",
100 rd_kafka_name(rk), what, exp_timeout);
101
102 /**
103 * Construct NewTopic array with different properties for
104 * different partitions.
105 */
106 for (i = 0 ; i < MY_NEW_TOPICS_CNT ; i++) {
107 const char *topic = test_mk_topic_name(__FUNCTION__, 1);
108 int num_parts = i * 51 + 1;
109 int num_replicas = jitter(1, MY_NEW_TOPICS_CNT-1);
110 int set_config = (i & 2);
111 int set_replicas = !(i % 1);
112
113 new_topics[i] = rd_kafka_NewTopic_new(topic,
114 num_parts,
115 set_replicas ? -1 :
116 num_replicas,
117 NULL, 0);
118
119 if (set_config) {
120 /*
121 * Add various (unverified) configuration properties
122 */
123 err = rd_kafka_NewTopic_set_config(new_topics[i],
124 "dummy.doesntexist",
125 "butThere'sNothing "
126 "to verify that");
127 TEST_ASSERT(!err, "%s", rd_kafka_err2str(err));
128
129 err = rd_kafka_NewTopic_set_config(new_topics[i],
130 "try.a.null.value",
131 NULL);
132 TEST_ASSERT(!err, "%s", rd_kafka_err2str(err));
133
134 err = rd_kafka_NewTopic_set_config(new_topics[i],
135 "or.empty", "");
136 TEST_ASSERT(!err, "%s", rd_kafka_err2str(err));
137 }
138
139
140 if (set_replicas) {
141 int32_t p;
142 int32_t replicas[MY_NEW_TOPICS_CNT];
143 int j;
144
145 for (j = 0 ; j < num_replicas ; j++)
146 replicas[j] = j;
147
148 /*
149 * Set valid replica assignments
150 */
151 for (p = 0 ; p < num_parts ; p++) {
152 /* Try adding an existing out of order,
153 * should fail */
154 if (p == 1) {
155 err = rd_kafka_NewTopic_set_replica_assignment(
156 new_topics[i], p+1,
157 replicas, num_replicas,
158 errstr, sizeof(errstr));
159 TEST_ASSERT(err == RD_KAFKA_RESP_ERR__INVALID_ARG,
160 "%s", rd_kafka_err2str(err));
161 }
162
163 err = rd_kafka_NewTopic_set_replica_assignment(
164 new_topics[i], p,
165 replicas, num_replicas,
166 errstr, sizeof(errstr));
167 TEST_ASSERT(!err, "%s", errstr);
168 }
169
170 /* Try to add an existing partition, should fail */
171 err = rd_kafka_NewTopic_set_replica_assignment(
172 new_topics[i], 0,
173 replicas, num_replicas, NULL, 0);
174 TEST_ASSERT(err == RD_KAFKA_RESP_ERR__INVALID_ARG,
175 "%s", rd_kafka_err2str(err));
176
177 } else {
178 int32_t dummy_replicas[1] = {1};
179
180 /* Test invalid partition */
181 err = rd_kafka_NewTopic_set_replica_assignment(
182 new_topics[i], num_parts+1, dummy_replicas, 1,
183 errstr, sizeof(errstr));
184 TEST_ASSERT(err == RD_KAFKA_RESP_ERR__INVALID_ARG,
185 "%s: %s", rd_kafka_err2str(err),
186 err == RD_KAFKA_RESP_ERR_NO_ERROR ?
187 "" : errstr);
188
189 /* Setting replicas with with default replicas != -1
190 * is an error. */
191 err = rd_kafka_NewTopic_set_replica_assignment(
192 new_topics[i], 0, dummy_replicas, 1,
193 errstr, sizeof(errstr));
194 TEST_ASSERT(err == RD_KAFKA_RESP_ERR__INVALID_ARG,
195 "%s: %s", rd_kafka_err2str(err),
196 err == RD_KAFKA_RESP_ERR_NO_ERROR ?
197 "" : errstr);
198 }
199 }
200
201 if (with_options) {
202 options = rd_kafka_AdminOptions_new(rk, RD_KAFKA_ADMIN_OP_ANY);
203
204 exp_timeout = MY_SOCKET_TIMEOUT_MS * 2;
205 err = rd_kafka_AdminOptions_set_request_timeout(
206 options, exp_timeout, errstr, sizeof(errstr));
207 TEST_ASSERT(!err, "%s", rd_kafka_err2str(err));
208
209 my_opaque = (void *)123;
210 rd_kafka_AdminOptions_set_opaque(options, my_opaque);
211 }
212
213 TIMING_START(&timing, "CreateTopics");
214 TEST_SAY("Call CreateTopics, timeout is %dms\n", exp_timeout);
215 rd_kafka_CreateTopics(rk, new_topics, MY_NEW_TOPICS_CNT,
216 options, q);
217 TIMING_ASSERT_LATER(&timing, 0, 50);
218
219 if (with_background_event_cb) {
220 /* Result event will be triggered by callback from
221 * librdkafka background queue thread. */
222 TIMING_START(&timing, "CreateTopics.wait_background_event_cb");
223 rkev = wait_background_event_cb();
224 } else {
225 /* Poll result queue */
226 TIMING_START(&timing, "CreateTopics.queue_poll");
227 rkev = rd_kafka_queue_poll(q, exp_timeout + 1000);
228 }
229
230 TIMING_ASSERT_LATER(&timing, exp_timeout-100, exp_timeout+100);
231 TEST_ASSERT(rkev != NULL, "expected result in %dms",
232 exp_timeout);
233 TEST_SAY("CreateTopics: got %s in %.3fs\n",
234 rd_kafka_event_name(rkev),
235 TIMING_DURATION(&timing) / 1000.0f);
236
237 /* Convert event to proper result */
238 res = rd_kafka_event_CreateTopics_result(rkev);
239 TEST_ASSERT(res, "expected CreateTopics_result, not %s",
240 rd_kafka_event_name(rkev));
241
242 opaque = rd_kafka_event_opaque(rkev);
243 TEST_ASSERT(opaque == my_opaque, "expected opaque to be %p, not %p",
244 my_opaque, opaque);
245
246 /* Expecting error */
247 err = rd_kafka_event_error(rkev);
248 errstr2 = rd_kafka_event_error_string(rkev);
249 TEST_ASSERT(err == RD_KAFKA_RESP_ERR__TIMED_OUT,
250 "expected CreateTopics to return error %s, not %s (%s)",
251 rd_kafka_err2str(RD_KAFKA_RESP_ERR__TIMED_OUT),
252 rd_kafka_err2str(err),
253 err ? errstr2 : "n/a");
254
255 /* Attempt to extract topics anyway, should return NULL. */
256 restopics = rd_kafka_CreateTopics_result_topics(res, &restopic_cnt);
257 TEST_ASSERT(!restopics && restopic_cnt == 0,
258 "expected no result_topics, got %p cnt %"PRIusz,
259 restopics, restopic_cnt);
260
261 rd_kafka_event_destroy(rkev);
262
263 rd_kafka_NewTopic_destroy_array(new_topics, MY_NEW_TOPICS_CNT);
264
265 if (options)
266 rd_kafka_AdminOptions_destroy(options);
267
268 if (!useq)
269 rd_kafka_queue_destroy(q);
270 }
271
272
273
274
275
276
277 /**
278 * @brief DeleteTopics tests
279 *
280 *
281 *
282 */
do_test_DeleteTopics(const char * what,rd_kafka_t * rk,rd_kafka_queue_t * useq,int with_options)283 static void do_test_DeleteTopics (const char *what,
284 rd_kafka_t *rk, rd_kafka_queue_t *useq,
285 int with_options) {
286 rd_kafka_queue_t *q = useq ? useq : rd_kafka_queue_new(rk);
287 #define MY_DEL_TOPICS_CNT 4
288 rd_kafka_DeleteTopic_t *del_topics[MY_DEL_TOPICS_CNT];
289 rd_kafka_AdminOptions_t *options = NULL;
290 int exp_timeout = MY_SOCKET_TIMEOUT_MS;
291 int i;
292 char errstr[512];
293 const char *errstr2;
294 rd_kafka_resp_err_t err;
295 test_timing_t timing;
296 rd_kafka_event_t *rkev;
297 const rd_kafka_DeleteTopics_result_t *res;
298 const rd_kafka_topic_result_t **restopics;
299 size_t restopic_cnt;
300 void *my_opaque = NULL, *opaque;
301
302 TEST_SAY(_C_MAG "[ %s DeleteTopics with %s, timeout %dms ]\n",
303 rd_kafka_name(rk), what, exp_timeout);
304
305 for (i = 0 ; i < MY_DEL_TOPICS_CNT ; i++)
306 del_topics[i] = rd_kafka_DeleteTopic_new(test_mk_topic_name(__FUNCTION__, 1));
307
308 if (with_options) {
309 options = rd_kafka_AdminOptions_new(
310 rk, RD_KAFKA_ADMIN_OP_DELETETOPICS);
311
312 exp_timeout = MY_SOCKET_TIMEOUT_MS * 2;
313 err = rd_kafka_AdminOptions_set_request_timeout(
314 options, exp_timeout, errstr, sizeof(errstr));
315 TEST_ASSERT(!err, "%s", rd_kafka_err2str(err));
316
317 if (useq) {
318 my_opaque = (void *)456;
319 rd_kafka_AdminOptions_set_opaque(options, my_opaque);
320 }
321 }
322
323 TIMING_START(&timing, "DeleteTopics");
324 TEST_SAY("Call DeleteTopics, timeout is %dms\n", exp_timeout);
325 rd_kafka_DeleteTopics(rk, del_topics, MY_DEL_TOPICS_CNT,
326 options, q);
327 TIMING_ASSERT_LATER(&timing, 0, 50);
328
329 /* Poll result queue */
330 TIMING_START(&timing, "DeleteTopics.queue_poll");
331 rkev = rd_kafka_queue_poll(q, exp_timeout + 1000);
332 TIMING_ASSERT_LATER(&timing, exp_timeout-100, exp_timeout+100);
333 TEST_ASSERT(rkev != NULL, "expected result in %dms", exp_timeout);
334 TEST_SAY("DeleteTopics: got %s in %.3fs\n",
335 rd_kafka_event_name(rkev), TIMING_DURATION(&timing) / 1000.0f);
336
337 /* Convert event to proper result */
338 res = rd_kafka_event_DeleteTopics_result(rkev);
339 TEST_ASSERT(res, "expected DeleteTopics_result, not %s",
340 rd_kafka_event_name(rkev));
341
342 opaque = rd_kafka_event_opaque(rkev);
343 TEST_ASSERT(opaque == my_opaque, "expected opaque to be %p, not %p",
344 my_opaque, opaque);
345
346 /* Expecting error */
347 err = rd_kafka_event_error(rkev);
348 errstr2 = rd_kafka_event_error_string(rkev);
349 TEST_ASSERT(err == RD_KAFKA_RESP_ERR__TIMED_OUT,
350 "expected DeleteTopics to return error %s, not %s (%s)",
351 rd_kafka_err2str(RD_KAFKA_RESP_ERR__TIMED_OUT),
352 rd_kafka_err2str(err),
353 err ? errstr2 : "n/a");
354
355 /* Attempt to extract topics anyway, should return NULL. */
356 restopics = rd_kafka_DeleteTopics_result_topics(res, &restopic_cnt);
357 TEST_ASSERT(!restopics && restopic_cnt == 0,
358 "expected no result_topics, got %p cnt %"PRIusz,
359 restopics, restopic_cnt);
360
361 rd_kafka_event_destroy(rkev);
362
363 rd_kafka_DeleteTopic_destroy_array(del_topics, MY_DEL_TOPICS_CNT);
364
365 if (options)
366 rd_kafka_AdminOptions_destroy(options);
367
368 if (!useq)
369 rd_kafka_queue_destroy(q);
370 }
371
372
373 /**
374 * @brief Test a mix of APIs using the same replyq.
375 *
376 * - Create topics A,B
377 * - Delete topic B
378 * - Create topic C
379 * - Create extra partitions for topic D
380 */
do_test_mix(rd_kafka_t * rk,rd_kafka_queue_t * rkqu)381 static void do_test_mix (rd_kafka_t *rk, rd_kafka_queue_t *rkqu) {
382 char *topics[] = { "topicA", "topicB", "topicC" };
383 int cnt = 0;
384 struct waiting {
385 rd_kafka_event_type_t evtype;
386 int seen;
387 };
388 struct waiting id1 = {RD_KAFKA_EVENT_CREATETOPICS_RESULT};
389 struct waiting id2 = {RD_KAFKA_EVENT_DELETETOPICS_RESULT};
390 struct waiting id3 = {RD_KAFKA_EVENT_CREATETOPICS_RESULT};
391 struct waiting id4 = {RD_KAFKA_EVENT_CREATEPARTITIONS_RESULT};
392
393 TEST_SAY(_C_MAG "[ Mixed mode test on %s]\n", rd_kafka_name(rk));
394
395 test_CreateTopics_simple(rk, rkqu, topics, 2, 1, &id1);
396 test_DeleteTopics_simple(rk, rkqu, &topics[1], 1, &id2);
397 test_CreateTopics_simple(rk, rkqu, &topics[2], 1, 1, &id3);
398 test_CreatePartitions_simple(rk, rkqu, "topicD", 15, &id4);
399
400 while (cnt < 4) {
401 rd_kafka_event_t *rkev;
402 struct waiting *w;
403
404 rkev = rd_kafka_queue_poll(rkqu, -1);
405 TEST_ASSERT(rkev);
406
407 TEST_SAY("Got event %s: %s\n",
408 rd_kafka_event_name(rkev),
409 rd_kafka_event_error_string(rkev));
410
411 w = rd_kafka_event_opaque(rkev);
412 TEST_ASSERT(w);
413
414 TEST_ASSERT(w->evtype == rd_kafka_event_type(rkev),
415 "Expected evtype %d, not %d (%s)",
416 w->evtype, rd_kafka_event_type(rkev),
417 rd_kafka_event_name(rkev));
418
419 TEST_ASSERT(w->seen == 0, "Duplicate results");
420
421 w->seen++;
422 cnt++;
423
424 rd_kafka_event_destroy(rkev);
425 }
426 }
427
428
429 /**
430 * @brief Test AlterConfigs and DescribeConfigs
431 */
do_test_configs(rd_kafka_t * rk,rd_kafka_queue_t * rkqu)432 static void do_test_configs (rd_kafka_t *rk, rd_kafka_queue_t *rkqu) {
433 #define MY_CONFRES_CNT RD_KAFKA_RESOURCE__CNT + 2
434 rd_kafka_ConfigResource_t *configs[MY_CONFRES_CNT];
435 rd_kafka_AdminOptions_t *options;
436 rd_kafka_event_t *rkev;
437 rd_kafka_resp_err_t err;
438 const rd_kafka_AlterConfigs_result_t *res;
439 const rd_kafka_ConfigResource_t **rconfigs;
440 size_t rconfig_cnt;
441 char errstr[128];
442 int i;
443
444 /* Check invalids */
445 configs[0] = rd_kafka_ConfigResource_new(
446 (rd_kafka_ResourceType_t)-1, "something");
447 TEST_ASSERT(!configs[0]);
448
449 configs[0] = rd_kafka_ConfigResource_new(
450 (rd_kafka_ResourceType_t)0, NULL);
451 TEST_ASSERT(!configs[0]);
452
453
454 for (i = 0 ; i < MY_CONFRES_CNT ; i++) {
455 int set_config = !(i % 2);
456
457 /* librdkafka shall not limit the use of illogical
458 * or unknown settings, they are enforced by the broker. */
459 configs[i] = rd_kafka_ConfigResource_new(
460 (rd_kafka_ResourceType_t)i, "3");
461 TEST_ASSERT(configs[i] != NULL);
462
463 if (set_config) {
464 rd_kafka_ConfigResource_set_config(configs[i],
465 "some.conf",
466 "which remains "
467 "unchecked");
468 rd_kafka_ConfigResource_set_config(configs[i],
469 "some.conf.null",
470 NULL);
471 }
472 }
473
474
475 options = rd_kafka_AdminOptions_new(rk, RD_KAFKA_ADMIN_OP_ANY);
476 err = rd_kafka_AdminOptions_set_request_timeout(options, 1000, errstr,
477 sizeof(errstr));
478 TEST_ASSERT(!err, "%s", errstr);
479
480 /* AlterConfigs */
481 rd_kafka_AlterConfigs(rk, configs, MY_CONFRES_CNT,
482 options, rkqu);
483
484 rkev = test_wait_admin_result(rkqu, RD_KAFKA_EVENT_ALTERCONFIGS_RESULT,
485 2000);
486
487 TEST_ASSERT(rd_kafka_event_error(rkev) == RD_KAFKA_RESP_ERR__TIMED_OUT,
488 "Expected timeout, not %s",
489 rd_kafka_event_error_string(rkev));
490
491 res = rd_kafka_event_AlterConfigs_result(rkev);
492 TEST_ASSERT(res);
493
494 rconfigs = rd_kafka_AlterConfigs_result_resources(res, &rconfig_cnt);
495 TEST_ASSERT(!rconfigs && !rconfig_cnt,
496 "Expected no result resources, got %"PRIusz,
497 rconfig_cnt);
498
499 rd_kafka_event_destroy(rkev);
500
501 /* DescribeConfigs: reuse same configs and options */
502 rd_kafka_DescribeConfigs(rk, configs, MY_CONFRES_CNT,
503 options, rkqu);
504
505 rd_kafka_AdminOptions_destroy(options);
506 rd_kafka_ConfigResource_destroy_array(configs, MY_CONFRES_CNT);
507
508 rkev = test_wait_admin_result(rkqu,
509 RD_KAFKA_EVENT_DESCRIBECONFIGS_RESULT,
510 2000);
511
512 TEST_ASSERT(rd_kafka_event_error(rkev) == RD_KAFKA_RESP_ERR__TIMED_OUT,
513 "Expected timeout, not %s",
514 rd_kafka_event_error_string(rkev));
515
516 res = rd_kafka_event_DescribeConfigs_result(rkev);
517 TEST_ASSERT(res);
518
519 rconfigs = rd_kafka_DescribeConfigs_result_resources(res, &rconfig_cnt);
520 TEST_ASSERT(!rconfigs && !rconfig_cnt,
521 "Expected no result resources, got %"PRIusz,
522 rconfig_cnt);
523
524 rd_kafka_event_destroy(rkev);
525 }
526
527
528 /**
529 * @brief Verify that an unclean rd_kafka_destroy() does not hang.
530 */
do_test_unclean_destroy(rd_kafka_type_t cltype,int with_mainq)531 static void do_test_unclean_destroy (rd_kafka_type_t cltype, int with_mainq) {
532 rd_kafka_t *rk;
533 char errstr[512];
534 rd_kafka_conf_t *conf;
535 rd_kafka_queue_t *q;
536 rd_kafka_event_t *rkev;
537 rd_kafka_DeleteTopic_t *topic;
538 test_timing_t t_destroy;
539
540 test_conf_init(&conf, NULL, 0);
541 /* Remove brokers, if any, since this is a local test and we
542 * rely on the controller not being found. */
543 test_conf_set(conf, "bootstrap.servers", "");
544 test_conf_set(conf, "socket.timeout.ms", "60000");
545
546 rk = rd_kafka_new(cltype, conf, errstr, sizeof(errstr));
547 TEST_ASSERT(rk, "kafka_new(%d): %s", cltype, errstr);
548
549 TEST_SAY(_C_MAG "[ Test unclean destroy for %s using %s]\n", rd_kafka_name(rk),
550 with_mainq ? "mainq" : "tempq");
551
552 if (with_mainq)
553 q = rd_kafka_queue_get_main(rk);
554 else
555 q = rd_kafka_queue_new(rk);
556
557 topic = rd_kafka_DeleteTopic_new("test");
558 rd_kafka_DeleteTopics(rk, &topic, 1, NULL, q);
559 rd_kafka_DeleteTopic_destroy(topic);
560
561 /* We're not expecting a result yet since DeleteTopics will attempt
562 * to look up the controller for socket.timeout.ms (1 minute). */
563 rkev = rd_kafka_queue_poll(q, 100);
564 TEST_ASSERT(!rkev, "Did not expect result: %s", rd_kafka_event_name(rkev));
565
566 rd_kafka_queue_destroy(q);
567
568 TEST_SAY("Giving rd_kafka_destroy() 5s to finish, "
569 "despite Admin API request being processed\n");
570 test_timeout_set(5);
571 TIMING_START(&t_destroy, "rd_kafka_destroy()");
572 rd_kafka_destroy(rk);
573 TIMING_STOP(&t_destroy);
574
575 /* Restore timeout */
576 test_timeout_set(60);
577 }
578
579
580 /**
581 * @brief Test AdminOptions
582 */
do_test_options(rd_kafka_t * rk)583 static void do_test_options (rd_kafka_t *rk) {
584 #define _all_apis { RD_KAFKA_ADMIN_OP_CREATETOPICS, \
585 RD_KAFKA_ADMIN_OP_DELETETOPICS, \
586 RD_KAFKA_ADMIN_OP_CREATEPARTITIONS, \
587 RD_KAFKA_ADMIN_OP_ALTERCONFIGS, \
588 RD_KAFKA_ADMIN_OP_DESCRIBECONFIGS, \
589 RD_KAFKA_ADMIN_OP_ANY /* Must be last */}
590 struct {
591 const char *setter;
592 const rd_kafka_admin_op_t valid_apis[8];
593 } matrix[] = {
594 { "request_timeout", _all_apis },
595 { "operation_timeout", { RD_KAFKA_ADMIN_OP_CREATETOPICS,
596 RD_KAFKA_ADMIN_OP_DELETETOPICS,
597 RD_KAFKA_ADMIN_OP_CREATEPARTITIONS } },
598 { "validate_only", { RD_KAFKA_ADMIN_OP_CREATETOPICS,
599 RD_KAFKA_ADMIN_OP_CREATEPARTITIONS,
600 RD_KAFKA_ADMIN_OP_ALTERCONFIGS } },
601 { "broker", _all_apis },
602 { "opaque", _all_apis },
603 { NULL },
604 };
605 int i;
606 rd_kafka_AdminOptions_t *options;
607
608
609 for (i = 0 ; matrix[i].setter ; i++) {
610 static const rd_kafka_admin_op_t all_apis[] = _all_apis;
611 const rd_kafka_admin_op_t *for_api;
612
613 for (for_api = all_apis ; ; for_api++) {
614 rd_kafka_resp_err_t err = RD_KAFKA_RESP_ERR_NO_ERROR;
615 rd_kafka_resp_err_t exp_err = RD_KAFKA_RESP_ERR_NO_ERROR;
616 char errstr[512];
617 int fi;
618
619 options = rd_kafka_AdminOptions_new(rk, *for_api);
620 TEST_ASSERT(options,
621 "AdminOptions_new(%d) failed", *for_api);
622
623 if (!strcmp(matrix[i].setter, "request_timeout"))
624 err = rd_kafka_AdminOptions_set_request_timeout(
625 options, 1234, errstr, sizeof(errstr));
626 else if (!strcmp(matrix[i].setter, "operation_timeout"))
627 err = rd_kafka_AdminOptions_set_operation_timeout(
628 options, 12345, errstr, sizeof(errstr));
629 else if (!strcmp(matrix[i].setter, "validate_only"))
630 err = rd_kafka_AdminOptions_set_validate_only(
631 options, 1, errstr, sizeof(errstr));
632 else if (!strcmp(matrix[i].setter, "broker"))
633 err = rd_kafka_AdminOptions_set_broker(
634 options, 5, errstr, sizeof(errstr));
635 else if (!strcmp(matrix[i].setter, "opaque")) {
636 rd_kafka_AdminOptions_set_opaque(
637 options, (void *)options);
638 err = RD_KAFKA_RESP_ERR_NO_ERROR;
639 } else
640 TEST_FAIL("Invalid setter: %s",
641 matrix[i].setter);
642
643
644 TEST_SAYL(3, "AdminOptions_set_%s on "
645 "RD_KAFKA_ADMIN_OP_%d options "
646 "returned %s: %s\n",
647 matrix[i].setter,
648 *for_api,
649 rd_kafka_err2name(err),
650 err ? errstr : "success");
651
652 /* Scan matrix valid_apis to see if this
653 * setter should be accepted or not. */
654 if (exp_err) {
655 /* An expected error is already set */
656 } else if (*for_api != RD_KAFKA_ADMIN_OP_ANY) {
657 exp_err = RD_KAFKA_RESP_ERR__INVALID_ARG;
658
659 for (fi = 0 ; matrix[i].valid_apis[fi] ; fi++) {
660 if (matrix[i].valid_apis[fi] ==
661 *for_api)
662 exp_err = RD_KAFKA_RESP_ERR_NO_ERROR;
663 }
664 } else {
665 exp_err = RD_KAFKA_RESP_ERR_NO_ERROR;
666 }
667
668 if (err != exp_err)
669 TEST_FAIL_LATER("Expected AdminOptions_set_%s "
670 "for RD_KAFKA_ADMIN_OP_%d "
671 "options to return %s, "
672 "not %s",
673 matrix[i].setter,
674 *for_api,
675 rd_kafka_err2name(exp_err),
676 rd_kafka_err2name(err));
677
678 rd_kafka_AdminOptions_destroy(options);
679
680 if (*for_api == RD_KAFKA_ADMIN_OP_ANY)
681 break; /* This was the last one */
682 }
683 }
684
685 /* Try an invalid for_api */
686 options = rd_kafka_AdminOptions_new(rk, (rd_kafka_admin_op_t)1234);
687 TEST_ASSERT(!options, "Expectred AdminOptions_new() to fail "
688 "with an invalid for_api, didn't.");
689
690 TEST_LATER_CHECK();
691 }
692
693
do_test_apis(rd_kafka_type_t cltype)694 static void do_test_apis (rd_kafka_type_t cltype) {
695 rd_kafka_t *rk;
696 char errstr[512];
697 rd_kafka_queue_t *mainq, *backgroundq;
698 rd_kafka_conf_t *conf;
699
700 mtx_init(&last_event_lock, mtx_plain);
701 cnd_init(&last_event_cnd);
702
703 do_test_unclean_destroy(cltype, 0/*tempq*/);
704 do_test_unclean_destroy(cltype, 1/*mainq*/);
705
706 test_conf_init(&conf, NULL, 0);
707 /* Remove brokers, if any, since this is a local test and we
708 * rely on the controller not being found. */
709 test_conf_set(conf, "bootstrap.servers", "");
710 test_conf_set(conf, "socket.timeout.ms", MY_SOCKET_TIMEOUT_MS_STR);
711 /* For use with the background queue */
712 rd_kafka_conf_set_background_event_cb(conf, background_event_cb);
713
714 rk = rd_kafka_new(cltype, conf, errstr, sizeof(errstr));
715 TEST_ASSERT(rk, "kafka_new(%d): %s", cltype, errstr);
716
717 mainq = rd_kafka_queue_get_main(rk);
718 backgroundq = rd_kafka_queue_get_background(rk);
719
720 do_test_options(rk);
721
722 do_test_CreateTopics("temp queue, no options", rk, NULL, 0, 0);
723 do_test_CreateTopics("temp queue, no options, background_event_cb",
724 rk, backgroundq, 1, 0);
725 do_test_CreateTopics("temp queue, options", rk, NULL, 0, 1);
726 do_test_CreateTopics("main queue, options", rk, mainq, 0, 1);
727
728 do_test_DeleteTopics("temp queue, no options", rk, NULL, 0);
729 do_test_DeleteTopics("temp queue, options", rk, NULL, 1);
730 do_test_DeleteTopics("main queue, options", rk, mainq, 1);
731
732 do_test_mix(rk, mainq);
733
734 do_test_configs(rk, mainq);
735
736 rd_kafka_queue_destroy(backgroundq);
737 rd_kafka_queue_destroy(mainq);
738
739 rd_kafka_destroy(rk);
740
741 mtx_destroy(&last_event_lock);
742 cnd_destroy(&last_event_cnd);
743
744 }
745
746
main_0080_admin_ut(int argc,char ** argv)747 int main_0080_admin_ut (int argc, char **argv) {
748 do_test_apis(RD_KAFKA_PRODUCER);
749 do_test_apis(RD_KAFKA_CONSUMER);
750 return 0;
751 }
752