1 /*
2 * librdkafka - Apache Kafka C library
3 *
4 * Copyright (c) 2012-2015, Magnus Edenhill
5 * All rights reserved.
6 *
7 * Redistribution and use in source and binary forms, with or without
8 * modification, are permitted provided that the following conditions are met:
9 *
10 * 1. Redistributions of source code must retain the above copyright notice,
11 * this list of conditions and the following disclaimer.
12 * 2. Redistributions in binary form must reproduce the above copyright notice,
13 * this list of conditions and the following disclaimer in the documentation
14 * and/or other materials provided with the distribution.
15 *
16 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
17 * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
18 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
19 * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
20 * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
21 * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
22 * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
23 * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
24 * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
25 * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
26 * POSSIBILITY OF SUCH DAMAGE.
27 */
28
29 #include "test.h"
30 #include "rdkafka.h"
31
32 /**
33 * @brief Admin API local dry-run unit-tests.
34 */
35
36 #define MY_SOCKET_TIMEOUT_MS 100
37 #define MY_SOCKET_TIMEOUT_MS_STR "100"
38
39
40
41 static mtx_t last_event_lock;
42 static cnd_t last_event_cnd;
43 static rd_kafka_event_t *last_event = NULL;
44
45 /**
46 * @brief The background event callback is called automatically
47 * by librdkafka from a background thread.
48 */
background_event_cb(rd_kafka_t * rk,rd_kafka_event_t * rkev,void * opaque)49 static void background_event_cb (rd_kafka_t *rk, rd_kafka_event_t *rkev,
50 void *opaque) {
51 mtx_lock(&last_event_lock);
52 TEST_ASSERT(!last_event, "Multiple events seen in background_event_cb "
53 "(existing %s, new %s)",
54 rd_kafka_event_name(last_event), rd_kafka_event_name(rkev));
55 last_event = rkev;
56 mtx_unlock(&last_event_lock);
57 cnd_broadcast(&last_event_cnd);
58 rd_sleep(1);
59 }
60
wait_background_event_cb(void)61 static rd_kafka_event_t *wait_background_event_cb (void) {
62 rd_kafka_event_t *rkev;
63 mtx_lock(&last_event_lock);
64 while (!(rkev = last_event))
65 cnd_wait(&last_event_cnd, &last_event_lock);
66 last_event = NULL;
67 mtx_unlock(&last_event_lock);
68
69 return rkev;
70 }
71
72
73 /**
74 * @brief CreateTopics tests
75 *
76 *
77 *
78 */
do_test_CreateTopics(const char * what,rd_kafka_t * rk,rd_kafka_queue_t * useq,int with_background_event_cb,int with_options)79 static void do_test_CreateTopics (const char *what,
80 rd_kafka_t *rk, rd_kafka_queue_t *useq,
81 int with_background_event_cb,
82 int with_options) {
83 rd_kafka_queue_t *q;
84 #define MY_NEW_TOPICS_CNT 6
85 rd_kafka_NewTopic_t *new_topics[MY_NEW_TOPICS_CNT];
86 rd_kafka_AdminOptions_t *options = NULL;
87 int exp_timeout = MY_SOCKET_TIMEOUT_MS;
88 int i;
89 char errstr[512];
90 const char *errstr2;
91 rd_kafka_resp_err_t err;
92 test_timing_t timing;
93 rd_kafka_event_t *rkev;
94 const rd_kafka_CreateTopics_result_t *res;
95 const rd_kafka_topic_result_t **restopics;
96 size_t restopic_cnt;
97 void *my_opaque = NULL, *opaque;
98
99 SUB_TEST_QUICK("%s CreateTopics with %s, timeout %dms",
100 rd_kafka_name(rk), what, exp_timeout);
101
102 q = useq ? useq : rd_kafka_queue_new(rk);
103
104 /**
105 * Construct NewTopic array with different properties for
106 * different partitions.
107 */
108 for (i = 0 ; i < MY_NEW_TOPICS_CNT ; i++) {
109 const char *topic = test_mk_topic_name(__FUNCTION__, 1);
110 int num_parts = i * 51 + 1;
111 int num_replicas = jitter(1, MY_NEW_TOPICS_CNT-1);
112 int set_config = (i & 2);
113 int set_replicas = !(i % 1);
114
115 new_topics[i] = rd_kafka_NewTopic_new(topic,
116 num_parts,
117 set_replicas ? -1 :
118 num_replicas,
119 NULL, 0);
120
121 if (set_config) {
122 /*
123 * Add various (unverified) configuration properties
124 */
125 err = rd_kafka_NewTopic_set_config(new_topics[i],
126 "dummy.doesntexist",
127 "butThere'sNothing "
128 "to verify that");
129 TEST_ASSERT(!err, "%s", rd_kafka_err2str(err));
130
131 err = rd_kafka_NewTopic_set_config(new_topics[i],
132 "try.a.null.value",
133 NULL);
134 TEST_ASSERT(!err, "%s", rd_kafka_err2str(err));
135
136 err = rd_kafka_NewTopic_set_config(new_topics[i],
137 "or.empty", "");
138 TEST_ASSERT(!err, "%s", rd_kafka_err2str(err));
139 }
140
141
142 if (set_replicas) {
143 int32_t p;
144 int32_t replicas[MY_NEW_TOPICS_CNT];
145 int j;
146
147 for (j = 0 ; j < num_replicas ; j++)
148 replicas[j] = j;
149
150 /*
151 * Set valid replica assignments
152 */
153 for (p = 0 ; p < num_parts ; p++) {
154 /* Try adding an existing out of order,
155 * should fail */
156 if (p == 1) {
157 err = rd_kafka_NewTopic_set_replica_assignment(
158 new_topics[i], p+1,
159 replicas, num_replicas,
160 errstr, sizeof(errstr));
161 TEST_ASSERT(err == RD_KAFKA_RESP_ERR__INVALID_ARG,
162 "%s", rd_kafka_err2str(err));
163 }
164
165 err = rd_kafka_NewTopic_set_replica_assignment(
166 new_topics[i], p,
167 replicas, num_replicas,
168 errstr, sizeof(errstr));
169 TEST_ASSERT(!err, "%s", errstr);
170 }
171
172 /* Try to add an existing partition, should fail */
173 err = rd_kafka_NewTopic_set_replica_assignment(
174 new_topics[i], 0,
175 replicas, num_replicas, NULL, 0);
176 TEST_ASSERT(err == RD_KAFKA_RESP_ERR__INVALID_ARG,
177 "%s", rd_kafka_err2str(err));
178
179 } else {
180 int32_t dummy_replicas[1] = {1};
181
182 /* Test invalid partition */
183 err = rd_kafka_NewTopic_set_replica_assignment(
184 new_topics[i], num_parts+1, dummy_replicas, 1,
185 errstr, sizeof(errstr));
186 TEST_ASSERT(err == RD_KAFKA_RESP_ERR__INVALID_ARG,
187 "%s: %s", rd_kafka_err2str(err),
188 err == RD_KAFKA_RESP_ERR_NO_ERROR ?
189 "" : errstr);
190
191 /* Setting replicas with with default replicas != -1
192 * is an error. */
193 err = rd_kafka_NewTopic_set_replica_assignment(
194 new_topics[i], 0, dummy_replicas, 1,
195 errstr, sizeof(errstr));
196 TEST_ASSERT(err == RD_KAFKA_RESP_ERR__INVALID_ARG,
197 "%s: %s", rd_kafka_err2str(err),
198 err == RD_KAFKA_RESP_ERR_NO_ERROR ?
199 "" : errstr);
200 }
201 }
202
203 if (with_options) {
204 options = rd_kafka_AdminOptions_new(rk, RD_KAFKA_ADMIN_OP_ANY);
205
206 exp_timeout = MY_SOCKET_TIMEOUT_MS * 2;
207 err = rd_kafka_AdminOptions_set_request_timeout(
208 options, exp_timeout, errstr, sizeof(errstr));
209 TEST_ASSERT(!err, "%s", rd_kafka_err2str(err));
210
211 my_opaque = (void *)123;
212 rd_kafka_AdminOptions_set_opaque(options, my_opaque);
213 }
214
215 TIMING_START(&timing, "CreateTopics");
216 TEST_SAY("Call CreateTopics, timeout is %dms\n", exp_timeout);
217 rd_kafka_CreateTopics(rk, new_topics, MY_NEW_TOPICS_CNT,
218 options, q);
219 TIMING_ASSERT_LATER(&timing, 0, 50);
220
221 if (with_background_event_cb) {
222 /* Result event will be triggered by callback from
223 * librdkafka background queue thread. */
224 TIMING_START(&timing, "CreateTopics.wait_background_event_cb");
225 rkev = wait_background_event_cb();
226 } else {
227 /* Poll result queue */
228 TIMING_START(&timing, "CreateTopics.queue_poll");
229 rkev = rd_kafka_queue_poll(q, exp_timeout + 1000);
230 }
231
232 TIMING_ASSERT_LATER(&timing, exp_timeout-100, exp_timeout+100);
233 TEST_ASSERT(rkev != NULL, "expected result in %dms",
234 exp_timeout);
235 TEST_SAY("CreateTopics: got %s in %.3fs\n",
236 rd_kafka_event_name(rkev),
237 TIMING_DURATION(&timing) / 1000.0f);
238
239 /* Convert event to proper result */
240 res = rd_kafka_event_CreateTopics_result(rkev);
241 TEST_ASSERT(res, "expected CreateTopics_result, not %s",
242 rd_kafka_event_name(rkev));
243
244 opaque = rd_kafka_event_opaque(rkev);
245 TEST_ASSERT(opaque == my_opaque, "expected opaque to be %p, not %p",
246 my_opaque, opaque);
247
248 /* Expecting error */
249 err = rd_kafka_event_error(rkev);
250 errstr2 = rd_kafka_event_error_string(rkev);
251 TEST_ASSERT(err == RD_KAFKA_RESP_ERR__TIMED_OUT,
252 "expected CreateTopics to return error %s, not %s (%s)",
253 rd_kafka_err2str(RD_KAFKA_RESP_ERR__TIMED_OUT),
254 rd_kafka_err2str(err),
255 err ? errstr2 : "n/a");
256
257 /* Attempt to extract topics anyway, should return NULL. */
258 restopics = rd_kafka_CreateTopics_result_topics(res, &restopic_cnt);
259 TEST_ASSERT(!restopics && restopic_cnt == 0,
260 "expected no result_topics, got %p cnt %"PRIusz,
261 restopics, restopic_cnt);
262
263 rd_kafka_event_destroy(rkev);
264
265 rd_kafka_NewTopic_destroy_array(new_topics, MY_NEW_TOPICS_CNT);
266
267 if (options)
268 rd_kafka_AdminOptions_destroy(options);
269
270 if (!useq)
271 rd_kafka_queue_destroy(q);
272
273 SUB_TEST_PASS();
274 }
275
276
277
278
279
280
281 /**
282 * @brief DeleteTopics tests
283 *
284 *
285 *
286 */
do_test_DeleteTopics(const char * what,rd_kafka_t * rk,rd_kafka_queue_t * useq,int with_options)287 static void do_test_DeleteTopics (const char *what,
288 rd_kafka_t *rk, rd_kafka_queue_t *useq,
289 int with_options) {
290 rd_kafka_queue_t *q;
291 #define MY_DEL_TOPICS_CNT 4
292 rd_kafka_DeleteTopic_t *del_topics[MY_DEL_TOPICS_CNT];
293 rd_kafka_AdminOptions_t *options = NULL;
294 int exp_timeout = MY_SOCKET_TIMEOUT_MS;
295 int i;
296 char errstr[512];
297 const char *errstr2;
298 rd_kafka_resp_err_t err;
299 test_timing_t timing;
300 rd_kafka_event_t *rkev;
301 const rd_kafka_DeleteTopics_result_t *res;
302 const rd_kafka_topic_result_t **restopics;
303 size_t restopic_cnt;
304 void *my_opaque = NULL, *opaque;
305
306 SUB_TEST_QUICK("%s DeleteTopics with %s, timeout %dms",
307 rd_kafka_name(rk), what, exp_timeout);
308
309 q = useq ? useq : rd_kafka_queue_new(rk);
310
311 for (i = 0 ; i < MY_DEL_TOPICS_CNT ; i++)
312 del_topics[i] = rd_kafka_DeleteTopic_new(test_mk_topic_name(__FUNCTION__, 1));
313
314 if (with_options) {
315 options = rd_kafka_AdminOptions_new(
316 rk, RD_KAFKA_ADMIN_OP_DELETETOPICS);
317
318 exp_timeout = MY_SOCKET_TIMEOUT_MS * 2;
319 err = rd_kafka_AdminOptions_set_request_timeout(
320 options, exp_timeout, errstr, sizeof(errstr));
321 TEST_ASSERT(!err, "%s", rd_kafka_err2str(err));
322
323 if (useq) {
324 my_opaque = (void *)456;
325 rd_kafka_AdminOptions_set_opaque(options, my_opaque);
326 }
327 }
328
329 TIMING_START(&timing, "DeleteTopics");
330 TEST_SAY("Call DeleteTopics, timeout is %dms\n", exp_timeout);
331 rd_kafka_DeleteTopics(rk, del_topics, MY_DEL_TOPICS_CNT,
332 options, q);
333 TIMING_ASSERT_LATER(&timing, 0, 50);
334
335 /* Poll result queue */
336 TIMING_START(&timing, "DeleteTopics.queue_poll");
337 rkev = rd_kafka_queue_poll(q, exp_timeout + 1000);
338 TIMING_ASSERT_LATER(&timing, exp_timeout-100, exp_timeout+100);
339 TEST_ASSERT(rkev != NULL, "expected result in %dms", exp_timeout);
340 TEST_SAY("DeleteTopics: got %s in %.3fs\n",
341 rd_kafka_event_name(rkev), TIMING_DURATION(&timing) / 1000.0f);
342
343 /* Convert event to proper result */
344 res = rd_kafka_event_DeleteTopics_result(rkev);
345 TEST_ASSERT(res, "expected DeleteTopics_result, not %s",
346 rd_kafka_event_name(rkev));
347
348 opaque = rd_kafka_event_opaque(rkev);
349 TEST_ASSERT(opaque == my_opaque, "expected opaque to be %p, not %p",
350 my_opaque, opaque);
351
352 /* Expecting error */
353 err = rd_kafka_event_error(rkev);
354 errstr2 = rd_kafka_event_error_string(rkev);
355 TEST_ASSERT(err == RD_KAFKA_RESP_ERR__TIMED_OUT,
356 "expected DeleteTopics to return error %s, not %s (%s)",
357 rd_kafka_err2str(RD_KAFKA_RESP_ERR__TIMED_OUT),
358 rd_kafka_err2str(err),
359 err ? errstr2 : "n/a");
360
361 /* Attempt to extract topics anyway, should return NULL. */
362 restopics = rd_kafka_DeleteTopics_result_topics(res, &restopic_cnt);
363 TEST_ASSERT(!restopics && restopic_cnt == 0,
364 "expected no result_topics, got %p cnt %"PRIusz,
365 restopics, restopic_cnt);
366
367 rd_kafka_event_destroy(rkev);
368
369 rd_kafka_DeleteTopic_destroy_array(del_topics, MY_DEL_TOPICS_CNT);
370
371 if (options)
372 rd_kafka_AdminOptions_destroy(options);
373
374 if (!useq)
375 rd_kafka_queue_destroy(q);
376 #undef MY_DEL_TOPICS_CNT
377
378 SUB_TEST_QUICK();
379 }
380
381 /**
382 * @brief DeleteGroups tests
383 *
384 *
385 *
386 */
do_test_DeleteGroups(const char * what,rd_kafka_t * rk,rd_kafka_queue_t * useq,int with_options,rd_bool_t destroy)387 static void do_test_DeleteGroups (const char *what,
388 rd_kafka_t *rk, rd_kafka_queue_t *useq,
389 int with_options,
390 rd_bool_t destroy) {
391 rd_kafka_queue_t *q;
392 #define MY_DEL_GROUPS_CNT 4
393 char *group_names[MY_DEL_GROUPS_CNT];
394 rd_kafka_DeleteGroup_t *del_groups[MY_DEL_GROUPS_CNT];
395 rd_kafka_AdminOptions_t *options = NULL;
396 int exp_timeout = MY_SOCKET_TIMEOUT_MS;
397 int i;
398 char errstr[512];
399 const char *errstr2;
400 rd_kafka_resp_err_t err;
401 test_timing_t timing;
402 rd_kafka_event_t *rkev;
403 const rd_kafka_DeleteGroups_result_t *res;
404 const rd_kafka_group_result_t **resgroups;
405 size_t resgroup_cnt;
406 void *my_opaque = NULL, *opaque;
407
408 SUB_TEST_QUICK("%s DeleteGroups with %s, timeout %dms",
409 rd_kafka_name(rk), what, exp_timeout);
410
411 q = useq ? useq : rd_kafka_queue_new(rk);
412
413 for (i = 0 ; i < MY_DEL_GROUPS_CNT ; i++) {
414 group_names[i] = rd_strdup(test_mk_topic_name(__FUNCTION__, 1));
415 del_groups[i] = rd_kafka_DeleteGroup_new(group_names[i]);
416 }
417
418 if (with_options) {
419 options = rd_kafka_AdminOptions_new(
420 rk, RD_KAFKA_ADMIN_OP_DELETEGROUPS);
421
422 exp_timeout = MY_SOCKET_TIMEOUT_MS * 2;
423 err = rd_kafka_AdminOptions_set_request_timeout(
424 options, exp_timeout, errstr, sizeof(errstr));
425 TEST_ASSERT(!err, "%s", rd_kafka_err2str(err));
426
427 if (useq) {
428 my_opaque = (void *)456;
429 rd_kafka_AdminOptions_set_opaque(options, my_opaque);
430 }
431 }
432
433 TIMING_START(&timing, "DeleteGroups");
434 TEST_SAY("Call DeleteGroups, timeout is %dms\n", exp_timeout);
435 rd_kafka_DeleteGroups(rk, del_groups, MY_DEL_GROUPS_CNT,
436 options, q);
437 TIMING_ASSERT_LATER(&timing, 0, 50);
438
439 if (destroy)
440 goto destroy;
441
442 /* Poll result queue */
443 TIMING_START(&timing, "DeleteGroups.queue_poll");
444 rkev = rd_kafka_queue_poll(q, exp_timeout + 1000);
445 TIMING_ASSERT_LATER(&timing, exp_timeout-100, exp_timeout+100);
446 TEST_ASSERT(rkev != NULL, "expected result in %dms", exp_timeout);
447 TEST_SAY("DeleteGroups: got %s in %.3fs\n",
448 rd_kafka_event_name(rkev), TIMING_DURATION(&timing) / 1000.0f);
449
450 /* Convert event to proper result */
451 res = rd_kafka_event_DeleteGroups_result(rkev);
452 TEST_ASSERT(res, "expected DeleteGroups_result, not %s",
453 rd_kafka_event_name(rkev));
454
455 opaque = rd_kafka_event_opaque(rkev);
456 TEST_ASSERT(opaque == my_opaque, "expected opaque to be %p, not %p",
457 my_opaque, opaque);
458
459 /* Expecting no error (errors will be per-group) */
460 err = rd_kafka_event_error(rkev);
461 errstr2 = rd_kafka_event_error_string(rkev);
462 TEST_ASSERT(err == RD_KAFKA_RESP_ERR_NO_ERROR,
463 "expected DeleteGroups to return error %s, not %s (%s)",
464 rd_kafka_err2str(RD_KAFKA_RESP_ERR_NO_ERROR),
465 rd_kafka_err2str(err),
466 err ? errstr2 : "n/a");
467
468 /* Extract groups, should return MY_DEL_GROUPS_CNT groups. */
469 resgroups = rd_kafka_DeleteGroups_result_groups(res, &resgroup_cnt);
470 TEST_ASSERT(resgroups && resgroup_cnt == MY_DEL_GROUPS_CNT,
471 "expected %d result_groups, got %p cnt %"PRIusz,
472 MY_DEL_GROUPS_CNT, resgroups, resgroup_cnt);
473
474 /* The returned groups should be in the original order, and
475 * should all have timed out. */
476 for (i = 0; i < MY_DEL_GROUPS_CNT; i++) {
477 TEST_ASSERT(!strcmp(group_names[i],
478 rd_kafka_group_result_name(resgroups[i])),
479 "expected group '%s' at position %d, not '%s'",
480 group_names[i], i,
481 rd_kafka_group_result_name(resgroups[i]));
482 TEST_ASSERT(rd_kafka_error_code(rd_kafka_group_result_error(
483 resgroups[i])) ==
484 RD_KAFKA_RESP_ERR__TIMED_OUT,
485 "expected group '%s' to have timed out, got %s",
486 group_names[i],
487 rd_kafka_error_string(
488 rd_kafka_group_result_error(resgroups[i])));
489 }
490
491 rd_kafka_event_destroy(rkev);
492
493 destroy:
494 for (i = 0; i < MY_DEL_GROUPS_CNT; i++) {
495 rd_kafka_DeleteGroup_destroy(del_groups[i]);
496 rd_free(group_names[i]);
497 }
498
499 if (options)
500 rd_kafka_AdminOptions_destroy(options);
501
502 if (!useq)
503 rd_kafka_queue_destroy(q);
504 #undef MY_DEL_GROUPS_CNT
505
506 SUB_TEST_QUICK();
507 }
508
do_test_DeleteRecords(const char * what,rd_kafka_t * rk,rd_kafka_queue_t * useq,int with_options,rd_bool_t destroy)509 static void do_test_DeleteRecords (const char *what,
510 rd_kafka_t *rk, rd_kafka_queue_t *useq,
511 int with_options, rd_bool_t destroy) {
512 rd_kafka_queue_t *q;
513 #define MY_DEL_RECORDS_CNT 4
514 rd_kafka_AdminOptions_t *options = NULL;
515 rd_kafka_topic_partition_list_t *offsets = NULL;
516 rd_kafka_DeleteRecords_t *del_records;
517 const rd_kafka_DeleteRecords_result_t *res;
518 char *topics[MY_DEL_RECORDS_CNT];
519 int exp_timeout = MY_SOCKET_TIMEOUT_MS;
520 int i;
521 char errstr[512];
522 rd_kafka_resp_err_t err;
523 test_timing_t timing;
524 rd_kafka_event_t *rkev;
525 void *my_opaque = NULL, *opaque;
526
527 SUB_TEST_QUICK("%s DeleteRecords with %s, timeout %dms",
528 rd_kafka_name(rk), what, exp_timeout);
529
530 q = useq ? useq : rd_kafka_queue_new(rk);
531
532 for (i = 0 ; i < MY_DEL_RECORDS_CNT ; i++) {
533 topics[i] = rd_strdup(test_mk_topic_name(__FUNCTION__, 1));
534 }
535
536 if (with_options) {
537 options = rd_kafka_AdminOptions_new(
538 rk, RD_KAFKA_ADMIN_OP_DELETERECORDS);
539
540 exp_timeout = MY_SOCKET_TIMEOUT_MS * 2;
541
542 err = rd_kafka_AdminOptions_set_request_timeout(
543 options, exp_timeout, errstr, sizeof(errstr));
544 TEST_ASSERT(!err, "%s", rd_kafka_err2str(err));
545
546 if (useq) {
547 my_opaque = (void *)4567;
548 rd_kafka_AdminOptions_set_opaque(options, my_opaque);
549 }
550 }
551
552 offsets = rd_kafka_topic_partition_list_new(MY_DEL_RECORDS_CNT);
553
554 for (i = 0; i < MY_DEL_RECORDS_CNT; i++)
555 rd_kafka_topic_partition_list_add(offsets,topics[i], i)->
556 offset = RD_KAFKA_OFFSET_END;
557
558 del_records = rd_kafka_DeleteRecords_new(offsets);
559 rd_kafka_topic_partition_list_destroy(offsets);
560
561 TIMING_START(&timing, "DeleteRecords");
562 TEST_SAY("Call DeleteRecords, timeout is %dms\n", exp_timeout);
563 rd_kafka_DeleteRecords(rk, &del_records, 1, options, q);
564 TIMING_ASSERT_LATER(&timing, 0, 10);
565
566 rd_kafka_DeleteRecords_destroy(del_records);
567
568 if (destroy)
569 goto destroy;
570
571 /* Poll result queue */
572 TIMING_START(&timing, "DeleteRecords.queue_poll");
573 rkev = rd_kafka_queue_poll(q, exp_timeout + 1000);
574 TIMING_ASSERT(&timing, exp_timeout-100, exp_timeout+100);
575 TEST_ASSERT(rkev != NULL, "expected result in %dms", exp_timeout);
576 TEST_SAY("DeleteRecords: got %s in %.3fs\n",
577 rd_kafka_event_name(rkev), TIMING_DURATION(&timing) / 1000.0f);
578
579 /* Convert event to proper result */
580 res = rd_kafka_event_DeleteRecords_result(rkev);
581 TEST_ASSERT(res, "expected DeleteRecords_result, not %s",
582 rd_kafka_event_name(rkev));
583
584 opaque = rd_kafka_event_opaque(rkev);
585 TEST_ASSERT(opaque == my_opaque, "expected opaque to be %p, not %p",
586 my_opaque, opaque);
587
588 /* Expecting error (pre-fanout leader_req will fail) */
589 err = rd_kafka_event_error(rkev);
590 TEST_ASSERT(err, "expected DeleteRecords to fail");
591
592 rd_kafka_event_destroy(rkev);
593
594 destroy:
595
596 if (options)
597 rd_kafka_AdminOptions_destroy(options);
598
599 if (!useq)
600 rd_kafka_queue_destroy(q);
601
602 for (i = 0 ; i < MY_DEL_RECORDS_CNT ; i++)
603 rd_free(topics[i]);
604
605 #undef MY_DEL_RECORDS_CNT
606
607 SUB_TEST_PASS();
608 }
609
610
do_test_DeleteConsumerGroupOffsets(const char * what,rd_kafka_t * rk,rd_kafka_queue_t * useq,int with_options)611 static void do_test_DeleteConsumerGroupOffsets (const char *what,
612 rd_kafka_t *rk,
613 rd_kafka_queue_t *useq,
614 int with_options) {
615 rd_kafka_queue_t *q;
616 #define MY_DEL_CGRPOFFS_CNT 1
617 rd_kafka_AdminOptions_t *options = NULL;
618 const rd_kafka_DeleteConsumerGroupOffsets_result_t *res;
619 rd_kafka_DeleteConsumerGroupOffsets_t *cgoffsets[MY_DEL_CGRPOFFS_CNT];
620 int exp_timeout = MY_SOCKET_TIMEOUT_MS;
621 int i;
622 char errstr[512];
623 rd_kafka_resp_err_t err;
624 test_timing_t timing;
625 rd_kafka_event_t *rkev;
626 void *my_opaque = NULL, *opaque;
627
628 SUB_TEST_QUICK("%s DeleteConsumerGroupOffsets with %s, timeout %dms",
629 rd_kafka_name(rk), what, exp_timeout);
630
631 q = useq ? useq : rd_kafka_queue_new(rk);
632
633 for (i = 0 ; i < MY_DEL_CGRPOFFS_CNT ; i++) {
634 rd_kafka_topic_partition_list_t *partitions =
635 rd_kafka_topic_partition_list_new(3);
636 rd_kafka_topic_partition_list_add(partitions, "topic1", 9);
637 rd_kafka_topic_partition_list_add(partitions, "topic3", 15);
638 rd_kafka_topic_partition_list_add(partitions, "topic1", 1);
639 cgoffsets[i] = rd_kafka_DeleteConsumerGroupOffsets_new(
640 "mygroup", partitions);
641 rd_kafka_topic_partition_list_destroy(partitions);
642 }
643
644 if (with_options) {
645 options = rd_kafka_AdminOptions_new(
646 rk, RD_KAFKA_ADMIN_OP_DELETECONSUMERGROUPOFFSETS);
647
648 exp_timeout = MY_SOCKET_TIMEOUT_MS * 2;
649
650 err = rd_kafka_AdminOptions_set_request_timeout(
651 options, exp_timeout, errstr, sizeof(errstr));
652 TEST_ASSERT(!err, "%s", rd_kafka_err2str(err));
653
654 if (useq) {
655 my_opaque = (void *)99981;
656 rd_kafka_AdminOptions_set_opaque(options, my_opaque);
657 }
658 }
659
660 TIMING_START(&timing, "DeleteConsumerGroupOffsets");
661 TEST_SAY("Call DeleteConsumerGroupOffsets, timeout is %dms\n",
662 exp_timeout);
663 rd_kafka_DeleteConsumerGroupOffsets(rk, cgoffsets,
664 MY_DEL_CGRPOFFS_CNT,
665 options, q);
666 TIMING_ASSERT_LATER(&timing, 0, 10);
667
668 /* Poll result queue */
669 TIMING_START(&timing, "DeleteConsumerGroupOffsets.queue_poll");
670 rkev = rd_kafka_queue_poll(q, exp_timeout + 1000);
671 TIMING_ASSERT(&timing, exp_timeout-100, exp_timeout+100);
672 TEST_ASSERT(rkev != NULL, "expected result in %dms", exp_timeout);
673 TEST_SAY("DeleteConsumerGroupOffsets: got %s in %.3fs\n",
674 rd_kafka_event_name(rkev), TIMING_DURATION(&timing) / 1000.0f);
675
676 /* Convert event to proper result */
677 res = rd_kafka_event_DeleteConsumerGroupOffsets_result(rkev);
678 TEST_ASSERT(res, "expected DeleteConsumerGroupOffsets_result, not %s",
679 rd_kafka_event_name(rkev));
680
681 opaque = rd_kafka_event_opaque(rkev);
682 TEST_ASSERT(opaque == my_opaque, "expected opaque to be %p, not %p",
683 my_opaque, opaque);
684
685 /* Expecting error */
686 err = rd_kafka_event_error(rkev);
687 TEST_ASSERT(err, "expected DeleteConsumerGroupOffsets to fail");
688
689 rd_kafka_event_destroy(rkev);
690
691 if (options)
692 rd_kafka_AdminOptions_destroy(options);
693
694 if (!useq)
695 rd_kafka_queue_destroy(q);
696
697 rd_kafka_DeleteConsumerGroupOffsets_destroy_array(
698 cgoffsets, MY_DEL_CGRPOFFS_CNT);
699
700 #undef MY_DEL_CGRPOFFSETS_CNT
701
702 SUB_TEST_PASS();
703 }
704
705
706
707 /**
708 * @brief Test a mix of APIs using the same replyq.
709 *
710 * - Create topics A,B
711 * - Delete topic B
712 * - Create topic C
713 * - Delete groups A,B,C
714 * - Delete records from A,B,C
715 * - Create extra partitions for topic D
716 */
do_test_mix(rd_kafka_t * rk,rd_kafka_queue_t * rkqu)717 static void do_test_mix (rd_kafka_t *rk, rd_kafka_queue_t *rkqu) {
718 char *topics[] = { "topicA", "topicB", "topicC" };
719 int cnt = 0;
720 struct waiting {
721 rd_kafka_event_type_t evtype;
722 int seen;
723 };
724 struct waiting id1 = {RD_KAFKA_EVENT_CREATETOPICS_RESULT};
725 struct waiting id2 = {RD_KAFKA_EVENT_DELETETOPICS_RESULT};
726 struct waiting id3 = {RD_KAFKA_EVENT_CREATETOPICS_RESULT};
727 struct waiting id4 = {RD_KAFKA_EVENT_DELETEGROUPS_RESULT};
728 struct waiting id5 = {RD_KAFKA_EVENT_DELETERECORDS_RESULT};
729 struct waiting id6 = {RD_KAFKA_EVENT_CREATEPARTITIONS_RESULT};
730 struct waiting id7 = {RD_KAFKA_EVENT_DELETECONSUMERGROUPOFFSETS_RESULT};
731 struct waiting id8 = {RD_KAFKA_EVENT_DELETECONSUMERGROUPOFFSETS_RESULT};
732 struct waiting id9 = {RD_KAFKA_EVENT_CREATETOPICS_RESULT};
733 rd_kafka_topic_partition_list_t *offsets;
734
735
736 SUB_TEST_QUICK();
737
738 offsets = rd_kafka_topic_partition_list_new(3);
739 rd_kafka_topic_partition_list_add(offsets, topics[0], 0)->offset =
740 RD_KAFKA_OFFSET_END;
741 rd_kafka_topic_partition_list_add(offsets, topics[1], 0)->offset =
742 RD_KAFKA_OFFSET_END;
743 rd_kafka_topic_partition_list_add(offsets, topics[2], 0)->offset =
744 RD_KAFKA_OFFSET_END;
745
746 test_CreateTopics_simple(rk, rkqu, topics, 2, 1, &id1);
747 test_DeleteTopics_simple(rk, rkqu, &topics[1], 1, &id2);
748 test_CreateTopics_simple(rk, rkqu, &topics[2], 1, 1, &id3);
749 test_DeleteGroups_simple(rk, rkqu, topics, 3, &id4);
750 test_DeleteRecords_simple(rk, rkqu, offsets, &id5);
751 test_CreatePartitions_simple(rk, rkqu, "topicD", 15, &id6);
752 test_DeleteConsumerGroupOffsets_simple(rk, rkqu, "mygroup", offsets,
753 &id7);
754 test_DeleteConsumerGroupOffsets_simple(rk, rkqu, NULL, NULL, &id8);
755 /* Use broker-side defaults for partition count */
756 test_CreateTopics_simple(rk, rkqu, topics, 2, -1, &id9);
757
758 rd_kafka_topic_partition_list_destroy(offsets);
759
760 while (cnt < 9) {
761 rd_kafka_event_t *rkev;
762 struct waiting *w;
763
764 rkev = rd_kafka_queue_poll(rkqu, -1);
765 TEST_ASSERT(rkev);
766
767 TEST_SAY("Got event %s: %s\n",
768 rd_kafka_event_name(rkev),
769 rd_kafka_event_error_string(rkev));
770
771 w = rd_kafka_event_opaque(rkev);
772 TEST_ASSERT(w);
773
774 TEST_ASSERT(w->evtype == rd_kafka_event_type(rkev),
775 "Expected evtype %d, not %d (%s)",
776 w->evtype, rd_kafka_event_type(rkev),
777 rd_kafka_event_name(rkev));
778
779 TEST_ASSERT(w->seen == 0, "Duplicate results");
780
781 w->seen++;
782 cnt++;
783
784 rd_kafka_event_destroy(rkev);
785 }
786
787 SUB_TEST_PASS();
788 }
789
790
791 /**
792 * @brief Test AlterConfigs and DescribeConfigs
793 */
do_test_configs(rd_kafka_t * rk,rd_kafka_queue_t * rkqu)794 static void do_test_configs (rd_kafka_t *rk, rd_kafka_queue_t *rkqu) {
795 #define MY_CONFRES_CNT RD_KAFKA_RESOURCE__CNT + 2
796 rd_kafka_ConfigResource_t *configs[MY_CONFRES_CNT];
797 rd_kafka_AdminOptions_t *options;
798 rd_kafka_event_t *rkev;
799 rd_kafka_resp_err_t err;
800 const rd_kafka_AlterConfigs_result_t *res;
801 const rd_kafka_ConfigResource_t **rconfigs;
802 size_t rconfig_cnt;
803 char errstr[128];
804 int i;
805
806 SUB_TEST_QUICK();
807
808 /* Check invalids */
809 configs[0] = rd_kafka_ConfigResource_new(
810 (rd_kafka_ResourceType_t)-1, "something");
811 TEST_ASSERT(!configs[0]);
812
813 configs[0] = rd_kafka_ConfigResource_new(
814 (rd_kafka_ResourceType_t)0, NULL);
815 TEST_ASSERT(!configs[0]);
816
817
818 for (i = 0 ; i < MY_CONFRES_CNT ; i++) {
819 int set_config = !(i % 2);
820
821 /* librdkafka shall not limit the use of illogical
822 * or unknown settings, they are enforced by the broker. */
823 configs[i] = rd_kafka_ConfigResource_new(
824 (rd_kafka_ResourceType_t)i, "3");
825 TEST_ASSERT(configs[i] != NULL);
826
827 if (set_config) {
828 rd_kafka_ConfigResource_set_config(configs[i],
829 "some.conf",
830 "which remains "
831 "unchecked");
832 rd_kafka_ConfigResource_set_config(configs[i],
833 "some.conf.null",
834 NULL);
835 }
836 }
837
838
839 options = rd_kafka_AdminOptions_new(rk, RD_KAFKA_ADMIN_OP_ANY);
840 err = rd_kafka_AdminOptions_set_request_timeout(options, 1000, errstr,
841 sizeof(errstr));
842 TEST_ASSERT(!err, "%s", errstr);
843
844 /* AlterConfigs */
845 rd_kafka_AlterConfigs(rk, configs, MY_CONFRES_CNT,
846 options, rkqu);
847
848 rkev = test_wait_admin_result(rkqu, RD_KAFKA_EVENT_ALTERCONFIGS_RESULT,
849 2000);
850
851 TEST_ASSERT(rd_kafka_event_error(rkev) == RD_KAFKA_RESP_ERR__TIMED_OUT,
852 "Expected timeout, not %s",
853 rd_kafka_event_error_string(rkev));
854
855 res = rd_kafka_event_AlterConfigs_result(rkev);
856 TEST_ASSERT(res);
857
858 rconfigs = rd_kafka_AlterConfigs_result_resources(res, &rconfig_cnt);
859 TEST_ASSERT(!rconfigs && !rconfig_cnt,
860 "Expected no result resources, got %"PRIusz,
861 rconfig_cnt);
862
863 rd_kafka_event_destroy(rkev);
864
865 /* DescribeConfigs: reuse same configs and options */
866 rd_kafka_DescribeConfigs(rk, configs, MY_CONFRES_CNT,
867 options, rkqu);
868
869 rd_kafka_AdminOptions_destroy(options);
870 rd_kafka_ConfigResource_destroy_array(configs, MY_CONFRES_CNT);
871
872 rkev = test_wait_admin_result(rkqu,
873 RD_KAFKA_EVENT_DESCRIBECONFIGS_RESULT,
874 2000);
875
876 TEST_ASSERT(rd_kafka_event_error(rkev) == RD_KAFKA_RESP_ERR__TIMED_OUT,
877 "Expected timeout, not %s",
878 rd_kafka_event_error_string(rkev));
879
880 res = rd_kafka_event_DescribeConfigs_result(rkev);
881 TEST_ASSERT(res);
882
883 rconfigs = rd_kafka_DescribeConfigs_result_resources(res, &rconfig_cnt);
884 TEST_ASSERT(!rconfigs && !rconfig_cnt,
885 "Expected no result resources, got %"PRIusz,
886 rconfig_cnt);
887
888 rd_kafka_event_destroy(rkev);
889
890 SUB_TEST_PASS();
891 }
892
893
894 /**
895 * @brief Verify that an unclean rd_kafka_destroy() does not hang or crash.
896 */
do_test_unclean_destroy(rd_kafka_type_t cltype,int with_mainq)897 static void do_test_unclean_destroy (rd_kafka_type_t cltype, int with_mainq) {
898 rd_kafka_t *rk;
899 char errstr[512];
900 rd_kafka_conf_t *conf;
901 rd_kafka_queue_t *q;
902 rd_kafka_event_t *rkev;
903 rd_kafka_DeleteTopic_t *topic;
904 test_timing_t t_destroy;
905
906 SUB_TEST_QUICK("Test unclean destroy using %s",
907 with_mainq ? "mainq" : "tempq");
908
909 test_conf_init(&conf, NULL, 0);
910 /* Remove brokers, if any, since this is a local test and we
911 * rely on the controller not being found. */
912 test_conf_set(conf, "bootstrap.servers", "");
913 test_conf_set(conf, "socket.timeout.ms", "60000");
914
915 rk = rd_kafka_new(cltype, conf, errstr, sizeof(errstr));
916 TEST_ASSERT(rk, "kafka_new(%d): %s", cltype, errstr);
917
918 if (with_mainq)
919 q = rd_kafka_queue_get_main(rk);
920 else
921 q = rd_kafka_queue_new(rk);
922
923 topic = rd_kafka_DeleteTopic_new("test");
924 rd_kafka_DeleteTopics(rk, &topic, 1, NULL, q);
925 rd_kafka_DeleteTopic_destroy(topic);
926
927 /* We're not expecting a result yet since DeleteTopics will attempt
928 * to look up the controller for socket.timeout.ms (1 minute). */
929 rkev = rd_kafka_queue_poll(q, 100);
930 TEST_ASSERT(!rkev, "Did not expect result: %s",
931 rd_kafka_event_name(rkev));
932
933 rd_kafka_queue_destroy(q);
934
935 TEST_SAY("Giving rd_kafka_destroy() 5s to finish, "
936 "despite Admin API request being processed\n");
937 test_timeout_set(5);
938 TIMING_START(&t_destroy, "rd_kafka_destroy()");
939 rd_kafka_destroy(rk);
940 TIMING_STOP(&t_destroy);
941
942 SUB_TEST_PASS();
943
944 /* Restore timeout */
945 test_timeout_set(60);
946 }
947
948
949 /**
950 * @brief Test AdminOptions
951 */
do_test_options(rd_kafka_t * rk)952 static void do_test_options (rd_kafka_t *rk) {
953 #define _all_apis { RD_KAFKA_ADMIN_OP_CREATETOPICS, \
954 RD_KAFKA_ADMIN_OP_DELETETOPICS, \
955 RD_KAFKA_ADMIN_OP_CREATEPARTITIONS, \
956 RD_KAFKA_ADMIN_OP_ALTERCONFIGS, \
957 RD_KAFKA_ADMIN_OP_DESCRIBECONFIGS, \
958 RD_KAFKA_ADMIN_OP_DELETEGROUPS, \
959 RD_KAFKA_ADMIN_OP_DELETERECORDS, \
960 RD_KAFKA_ADMIN_OP_DELETECONSUMERGROUPOFFSETS, \
961 RD_KAFKA_ADMIN_OP_ANY /* Must be last */}
962 struct {
963 const char *setter;
964 const rd_kafka_admin_op_t valid_apis[9];
965 } matrix[] = {
966 { "request_timeout", _all_apis },
967 { "operation_timeout", { RD_KAFKA_ADMIN_OP_CREATETOPICS,
968 RD_KAFKA_ADMIN_OP_DELETETOPICS,
969 RD_KAFKA_ADMIN_OP_CREATEPARTITIONS,
970 RD_KAFKA_ADMIN_OP_DELETERECORDS } },
971 { "validate_only", { RD_KAFKA_ADMIN_OP_CREATETOPICS,
972 RD_KAFKA_ADMIN_OP_CREATEPARTITIONS,
973 RD_KAFKA_ADMIN_OP_ALTERCONFIGS } },
974 { "broker", _all_apis },
975 { "opaque", _all_apis },
976 { NULL },
977 };
978 int i;
979 rd_kafka_AdminOptions_t *options;
980
981 SUB_TEST_QUICK();
982
983 for (i = 0 ; matrix[i].setter ; i++) {
984 static const rd_kafka_admin_op_t all_apis[] = _all_apis;
985 const rd_kafka_admin_op_t *for_api;
986
987 for (for_api = all_apis ; ; for_api++) {
988 rd_kafka_resp_err_t err = RD_KAFKA_RESP_ERR_NO_ERROR;
989 rd_kafka_resp_err_t exp_err = RD_KAFKA_RESP_ERR_NO_ERROR;
990 char errstr[512];
991 int fi;
992
993 options = rd_kafka_AdminOptions_new(rk, *for_api);
994 TEST_ASSERT(options,
995 "AdminOptions_new(%d) failed", *for_api);
996
997 if (!strcmp(matrix[i].setter, "request_timeout"))
998 err = rd_kafka_AdminOptions_set_request_timeout(
999 options, 1234, errstr, sizeof(errstr));
1000 else if (!strcmp(matrix[i].setter, "operation_timeout"))
1001 err = rd_kafka_AdminOptions_set_operation_timeout(
1002 options, 12345, errstr, sizeof(errstr));
1003 else if (!strcmp(matrix[i].setter, "validate_only"))
1004 err = rd_kafka_AdminOptions_set_validate_only(
1005 options, 1, errstr, sizeof(errstr));
1006 else if (!strcmp(matrix[i].setter, "broker"))
1007 err = rd_kafka_AdminOptions_set_broker(
1008 options, 5, errstr, sizeof(errstr));
1009 else if (!strcmp(matrix[i].setter, "opaque")) {
1010 rd_kafka_AdminOptions_set_opaque(
1011 options, (void *)options);
1012 err = RD_KAFKA_RESP_ERR_NO_ERROR;
1013 } else
1014 TEST_FAIL("Invalid setter: %s",
1015 matrix[i].setter);
1016
1017
1018 TEST_SAYL(3, "AdminOptions_set_%s on "
1019 "RD_KAFKA_ADMIN_OP_%d options "
1020 "returned %s: %s\n",
1021 matrix[i].setter,
1022 *for_api,
1023 rd_kafka_err2name(err),
1024 err ? errstr : "success");
1025
1026 /* Scan matrix valid_apis to see if this
1027 * setter should be accepted or not. */
1028 if (exp_err) {
1029 /* An expected error is already set */
1030 } else if (*for_api != RD_KAFKA_ADMIN_OP_ANY) {
1031 exp_err = RD_KAFKA_RESP_ERR__INVALID_ARG;
1032
1033 for (fi = 0 ; matrix[i].valid_apis[fi] ; fi++) {
1034 if (matrix[i].valid_apis[fi] ==
1035 *for_api)
1036 exp_err = RD_KAFKA_RESP_ERR_NO_ERROR;
1037 }
1038 } else {
1039 exp_err = RD_KAFKA_RESP_ERR_NO_ERROR;
1040 }
1041
1042 if (err != exp_err)
1043 TEST_FAIL_LATER("Expected AdminOptions_set_%s "
1044 "for RD_KAFKA_ADMIN_OP_%d "
1045 "options to return %s, "
1046 "not %s",
1047 matrix[i].setter,
1048 *for_api,
1049 rd_kafka_err2name(exp_err),
1050 rd_kafka_err2name(err));
1051
1052 rd_kafka_AdminOptions_destroy(options);
1053
1054 if (*for_api == RD_KAFKA_ADMIN_OP_ANY)
1055 break; /* This was the last one */
1056 }
1057 }
1058
1059 /* Try an invalid for_api */
1060 options = rd_kafka_AdminOptions_new(rk, (rd_kafka_admin_op_t)1234);
1061 TEST_ASSERT(!options, "Expected AdminOptions_new() to fail "
1062 "with an invalid for_api, didn't.");
1063
1064 TEST_LATER_CHECK();
1065
1066 SUB_TEST_PASS();
1067 }
1068
1069
create_admin_client(rd_kafka_type_t cltype)1070 static rd_kafka_t *create_admin_client (rd_kafka_type_t cltype) {
1071 rd_kafka_t *rk;
1072 char errstr[512];
1073 rd_kafka_conf_t *conf;
1074
1075 test_conf_init(&conf, NULL, 0);
1076 /* Remove brokers, if any, since this is a local test and we
1077 * rely on the controller not being found. */
1078 test_conf_set(conf, "bootstrap.servers", "");
1079 test_conf_set(conf, "socket.timeout.ms", MY_SOCKET_TIMEOUT_MS_STR);
1080 /* For use with the background queue */
1081 rd_kafka_conf_set_background_event_cb(conf, background_event_cb);
1082
1083 rk = rd_kafka_new(cltype, conf, errstr, sizeof(errstr));
1084 TEST_ASSERT(rk, "kafka_new(%d): %s", cltype, errstr);
1085
1086 return rk;
1087 }
1088
1089
do_test_apis(rd_kafka_type_t cltype)1090 static void do_test_apis (rd_kafka_type_t cltype) {
1091 rd_kafka_t *rk;
1092 rd_kafka_queue_t *mainq, *backgroundq;
1093
1094 mtx_init(&last_event_lock, mtx_plain);
1095 cnd_init(&last_event_cnd);
1096
1097 do_test_unclean_destroy(cltype, 0/*tempq*/);
1098 do_test_unclean_destroy(cltype, 1/*mainq*/);
1099
1100 rk = create_admin_client(cltype);
1101
1102 mainq = rd_kafka_queue_get_main(rk);
1103 backgroundq = rd_kafka_queue_get_background(rk);
1104
1105 do_test_options(rk);
1106
1107 do_test_CreateTopics("temp queue, no options", rk, NULL, 0, 0);
1108 do_test_CreateTopics("temp queue, no options, background_event_cb",
1109 rk, backgroundq, 1, 0);
1110 do_test_CreateTopics("temp queue, options", rk, NULL, 0, 1);
1111 do_test_CreateTopics("main queue, options", rk, mainq, 0, 1);
1112
1113 do_test_DeleteTopics("temp queue, no options", rk, NULL, 0);
1114 do_test_DeleteTopics("temp queue, options", rk, NULL, 1);
1115 do_test_DeleteTopics("main queue, options", rk, mainq, 1);
1116
1117 do_test_DeleteGroups("temp queue, no options", rk, NULL, 0, rd_false);
1118 do_test_DeleteGroups("temp queue, options", rk, NULL, 1, rd_false);
1119 do_test_DeleteGroups("main queue, options", rk, mainq, 1, rd_false);
1120
1121 do_test_DeleteRecords("temp queue, no options", rk, NULL, 0, rd_false);
1122 do_test_DeleteRecords("temp queue, options", rk, NULL, 1, rd_false);
1123 do_test_DeleteRecords("main queue, options", rk, mainq, 1, rd_false);
1124
1125 do_test_DeleteConsumerGroupOffsets("temp queue, no options",
1126 rk, NULL, 0);
1127 do_test_DeleteConsumerGroupOffsets("temp queue, options", rk, NULL, 1);
1128 do_test_DeleteConsumerGroupOffsets("main queue, options", rk, mainq, 1);
1129
1130 do_test_mix(rk, mainq);
1131
1132 do_test_configs(rk, mainq);
1133
1134 rd_kafka_queue_destroy(backgroundq);
1135 rd_kafka_queue_destroy(mainq);
1136
1137 rd_kafka_destroy(rk);
1138
1139 /*
1140 * Tests which require a unique unused client instance.
1141 */
1142 rk = create_admin_client(cltype);
1143 mainq = rd_kafka_queue_get_main(rk);
1144 do_test_DeleteRecords("main queue, options, destroy", rk, mainq, 1,
1145 rd_true/*destroy instance before finishing*/);
1146 rd_kafka_queue_destroy(mainq);
1147 rd_kafka_destroy(rk);
1148
1149 rk = create_admin_client(cltype);
1150 mainq = rd_kafka_queue_get_main(rk);
1151 do_test_DeleteGroups("main queue, options, destroy", rk, mainq, 1,
1152 rd_true/*destroy instance before finishing*/);
1153 rd_kafka_queue_destroy(mainq);
1154 rd_kafka_destroy(rk);
1155
1156
1157 /* Done */
1158 mtx_destroy(&last_event_lock);
1159 cnd_destroy(&last_event_cnd);
1160 }
1161
1162
main_0080_admin_ut(int argc,char ** argv)1163 int main_0080_admin_ut (int argc, char **argv) {
1164 do_test_apis(RD_KAFKA_PRODUCER);
1165 do_test_apis(RD_KAFKA_CONSUMER);
1166 return 0;
1167 }
1168