1 /*
2  * librdkafka - Apache Kafka C library
3  *
4  * Copyright (c) 2012-2015, Magnus Edenhill
5  * All rights reserved.
6  *
7  * Redistribution and use in source and binary forms, with or without
8  * modification, are permitted provided that the following conditions are met:
9  *
10  * 1. Redistributions of source code must retain the above copyright notice,
11  *    this list of conditions and the following disclaimer.
12  * 2. Redistributions in binary form must reproduce the above copyright notice,
13  *    this list of conditions and the following disclaimer in the documentation
14  *    and/or other materials provided with the distribution.
15  *
16  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
17  * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
18  * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
19  * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
20  * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
21  * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
22  * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
23  * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
24  * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
25  * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
26  * POSSIBILITY OF SUCH DAMAGE.
27  */
28 
29 #include "test.h"
30 #include "rdkafka.h"
31 
32 /**
33  * @brief Admin API local dry-run unit-tests.
34  */
35 
36 #define MY_SOCKET_TIMEOUT_MS      100
37 #define MY_SOCKET_TIMEOUT_MS_STR "100"
38 
39 
40 
41 static mtx_t last_event_lock;
42 static cnd_t last_event_cnd;
43 static rd_kafka_event_t *last_event = NULL;
44 
45 /**
46  * @brief The background event callback is called automatically
47  *        by librdkafka from a background thread.
48  */
background_event_cb(rd_kafka_t * rk,rd_kafka_event_t * rkev,void * opaque)49 static void background_event_cb (rd_kafka_t *rk, rd_kafka_event_t *rkev,
50                                  void *opaque) {
51         mtx_lock(&last_event_lock);
52         TEST_ASSERT(!last_event, "Multiple events seen in background_event_cb "
53                     "(existing %s, new %s)",
54                     rd_kafka_event_name(last_event), rd_kafka_event_name(rkev));
55         last_event = rkev;
56         mtx_unlock(&last_event_lock);
57         cnd_broadcast(&last_event_cnd);
58         rd_sleep(1);
59 }
60 
wait_background_event_cb(void)61 static rd_kafka_event_t *wait_background_event_cb (void) {
62         rd_kafka_event_t *rkev;
63         mtx_lock(&last_event_lock);
64         while (!(rkev = last_event))
65                 cnd_wait(&last_event_cnd, &last_event_lock);
66         last_event = NULL;
67         mtx_unlock(&last_event_lock);
68 
69         return rkev;
70 }
71 
72 
73 /**
74  * @brief CreateTopics tests
75  *
76  *
77  *
78  */
do_test_CreateTopics(const char * what,rd_kafka_t * rk,rd_kafka_queue_t * useq,int with_background_event_cb,int with_options)79 static void do_test_CreateTopics (const char *what,
80                                   rd_kafka_t *rk, rd_kafka_queue_t *useq,
81                                   int with_background_event_cb,
82                                   int with_options) {
83         rd_kafka_queue_t *q;
84 #define MY_NEW_TOPICS_CNT 6
85         rd_kafka_NewTopic_t *new_topics[MY_NEW_TOPICS_CNT];
86         rd_kafka_AdminOptions_t *options = NULL;
87         int exp_timeout = MY_SOCKET_TIMEOUT_MS;
88         int i;
89         char errstr[512];
90         const char *errstr2;
91         rd_kafka_resp_err_t err;
92         test_timing_t timing;
93         rd_kafka_event_t *rkev;
94         const rd_kafka_CreateTopics_result_t *res;
95         const rd_kafka_topic_result_t **restopics;
96         size_t restopic_cnt;
97         void *my_opaque = NULL, *opaque;
98 
99         SUB_TEST_QUICK("%s CreateTopics with %s, timeout %dms",
100                        rd_kafka_name(rk), what, exp_timeout);
101 
102         q = useq ? useq : rd_kafka_queue_new(rk);
103 
104         /**
105          * Construct NewTopic array with different properties for
106          * different partitions.
107          */
108         for (i = 0 ; i < MY_NEW_TOPICS_CNT ; i++) {
109                 const char *topic = test_mk_topic_name(__FUNCTION__, 1);
110                 int num_parts = i * 51 + 1;
111                 int num_replicas = jitter(1, MY_NEW_TOPICS_CNT-1);
112                 int set_config = (i & 2);
113                 int set_replicas = !(i % 1);
114 
115                 new_topics[i] = rd_kafka_NewTopic_new(topic,
116                                                       num_parts,
117                                                       set_replicas ? -1 :
118                                                       num_replicas,
119                                                       NULL, 0);
120 
121                 if (set_config) {
122                         /*
123                          * Add various (unverified) configuration properties
124                          */
125                         err = rd_kafka_NewTopic_set_config(new_topics[i],
126                                                            "dummy.doesntexist",
127                                                            "butThere'sNothing "
128                                                            "to verify that");
129                         TEST_ASSERT(!err, "%s", rd_kafka_err2str(err));
130 
131                         err = rd_kafka_NewTopic_set_config(new_topics[i],
132                                                            "try.a.null.value",
133                                                            NULL);
134                         TEST_ASSERT(!err, "%s", rd_kafka_err2str(err));
135 
136                         err = rd_kafka_NewTopic_set_config(new_topics[i],
137                                                            "or.empty", "");
138                         TEST_ASSERT(!err, "%s", rd_kafka_err2str(err));
139                 }
140 
141 
142                 if (set_replicas) {
143                         int32_t p;
144                         int32_t replicas[MY_NEW_TOPICS_CNT];
145                         int j;
146 
147                         for (j = 0 ; j < num_replicas ; j++)
148                                 replicas[j] = j;
149 
150                         /*
151                          * Set valid replica assignments
152                          */
153                         for (p = 0 ; p < num_parts ; p++) {
154                                 /* Try adding an existing out of order,
155                                  * should fail */
156                                 if (p == 1) {
157                                         err = rd_kafka_NewTopic_set_replica_assignment(
158                                                 new_topics[i], p+1,
159                                                 replicas, num_replicas,
160                                                 errstr, sizeof(errstr));
161                                         TEST_ASSERT(err == RD_KAFKA_RESP_ERR__INVALID_ARG,
162                                                     "%s", rd_kafka_err2str(err));
163                                 }
164 
165                                 err = rd_kafka_NewTopic_set_replica_assignment(
166                                         new_topics[i], p,
167                                         replicas, num_replicas,
168                                         errstr, sizeof(errstr));
169                                 TEST_ASSERT(!err, "%s", errstr);
170                         }
171 
172                         /* Try to add an existing partition, should fail */
173                         err = rd_kafka_NewTopic_set_replica_assignment(
174                                 new_topics[i], 0,
175                                 replicas, num_replicas, NULL, 0);
176                         TEST_ASSERT(err == RD_KAFKA_RESP_ERR__INVALID_ARG,
177                                     "%s", rd_kafka_err2str(err));
178 
179                 } else {
180                         int32_t dummy_replicas[1] = {1};
181 
182                         /* Test invalid partition */
183                         err = rd_kafka_NewTopic_set_replica_assignment(
184                                 new_topics[i], num_parts+1, dummy_replicas, 1,
185                                 errstr, sizeof(errstr));
186                         TEST_ASSERT(err == RD_KAFKA_RESP_ERR__INVALID_ARG,
187                                     "%s: %s", rd_kafka_err2str(err),
188                                     err == RD_KAFKA_RESP_ERR_NO_ERROR ?
189                                     "" : errstr);
190 
191                         /* Setting replicas with with default replicas != -1
192                          * is an error. */
193                         err = rd_kafka_NewTopic_set_replica_assignment(
194                                 new_topics[i], 0, dummy_replicas, 1,
195                                 errstr, sizeof(errstr));
196                         TEST_ASSERT(err == RD_KAFKA_RESP_ERR__INVALID_ARG,
197                                     "%s: %s", rd_kafka_err2str(err),
198                                     err == RD_KAFKA_RESP_ERR_NO_ERROR ?
199                                     "" : errstr);
200                 }
201         }
202 
203         if (with_options) {
204                 options = rd_kafka_AdminOptions_new(rk, RD_KAFKA_ADMIN_OP_ANY);
205 
206                 exp_timeout = MY_SOCKET_TIMEOUT_MS * 2;
207                 err = rd_kafka_AdminOptions_set_request_timeout(
208                         options, exp_timeout, errstr, sizeof(errstr));
209                 TEST_ASSERT(!err, "%s", rd_kafka_err2str(err));
210 
211                 my_opaque = (void *)123;
212                 rd_kafka_AdminOptions_set_opaque(options, my_opaque);
213         }
214 
215         TIMING_START(&timing, "CreateTopics");
216         TEST_SAY("Call CreateTopics, timeout is %dms\n", exp_timeout);
217         rd_kafka_CreateTopics(rk, new_topics, MY_NEW_TOPICS_CNT,
218                               options, q);
219         TIMING_ASSERT_LATER(&timing, 0, 50);
220 
221         if (with_background_event_cb) {
222                 /* Result event will be triggered by callback from
223                  * librdkafka background queue thread. */
224                 TIMING_START(&timing, "CreateTopics.wait_background_event_cb");
225                 rkev = wait_background_event_cb();
226         } else {
227                 /* Poll result queue */
228                 TIMING_START(&timing, "CreateTopics.queue_poll");
229                 rkev = rd_kafka_queue_poll(q, exp_timeout + 1000);
230         }
231 
232         TIMING_ASSERT_LATER(&timing, exp_timeout-100, exp_timeout+100);
233         TEST_ASSERT(rkev != NULL, "expected result in %dms",
234                     exp_timeout);
235         TEST_SAY("CreateTopics: got %s in %.3fs\n",
236                  rd_kafka_event_name(rkev),
237                  TIMING_DURATION(&timing) / 1000.0f);
238 
239         /* Convert event to proper result */
240         res = rd_kafka_event_CreateTopics_result(rkev);
241         TEST_ASSERT(res, "expected CreateTopics_result, not %s",
242                     rd_kafka_event_name(rkev));
243 
244         opaque = rd_kafka_event_opaque(rkev);
245         TEST_ASSERT(opaque == my_opaque, "expected opaque to be %p, not %p",
246                     my_opaque, opaque);
247 
248         /* Expecting error */
249         err = rd_kafka_event_error(rkev);
250         errstr2 = rd_kafka_event_error_string(rkev);
251         TEST_ASSERT(err == RD_KAFKA_RESP_ERR__TIMED_OUT,
252                     "expected CreateTopics to return error %s, not %s (%s)",
253                     rd_kafka_err2str(RD_KAFKA_RESP_ERR__TIMED_OUT),
254                     rd_kafka_err2str(err),
255                     err ? errstr2 : "n/a");
256 
257         /* Attempt to extract topics anyway, should return NULL. */
258         restopics = rd_kafka_CreateTopics_result_topics(res, &restopic_cnt);
259         TEST_ASSERT(!restopics && restopic_cnt == 0,
260                     "expected no result_topics, got %p cnt %"PRIusz,
261                     restopics, restopic_cnt);
262 
263         rd_kafka_event_destroy(rkev);
264 
265         rd_kafka_NewTopic_destroy_array(new_topics, MY_NEW_TOPICS_CNT);
266 
267         if (options)
268                 rd_kafka_AdminOptions_destroy(options);
269 
270         if (!useq)
271                 rd_kafka_queue_destroy(q);
272 
273         SUB_TEST_PASS();
274 }
275 
276 
277 
278 
279 
280 
281 /**
282  * @brief DeleteTopics tests
283  *
284  *
285  *
286  */
do_test_DeleteTopics(const char * what,rd_kafka_t * rk,rd_kafka_queue_t * useq,int with_options)287 static void do_test_DeleteTopics (const char *what,
288                                   rd_kafka_t *rk, rd_kafka_queue_t *useq,
289                                   int with_options) {
290         rd_kafka_queue_t *q;
291 #define MY_DEL_TOPICS_CNT 4
292         rd_kafka_DeleteTopic_t *del_topics[MY_DEL_TOPICS_CNT];
293         rd_kafka_AdminOptions_t *options = NULL;
294         int exp_timeout = MY_SOCKET_TIMEOUT_MS;
295         int i;
296         char errstr[512];
297         const char *errstr2;
298         rd_kafka_resp_err_t err;
299         test_timing_t timing;
300         rd_kafka_event_t *rkev;
301         const rd_kafka_DeleteTopics_result_t *res;
302         const rd_kafka_topic_result_t **restopics;
303         size_t restopic_cnt;
304         void *my_opaque = NULL, *opaque;
305 
306         SUB_TEST_QUICK("%s DeleteTopics with %s, timeout %dms",
307                        rd_kafka_name(rk), what, exp_timeout);
308 
309         q = useq ? useq : rd_kafka_queue_new(rk);
310 
311         for (i = 0 ; i < MY_DEL_TOPICS_CNT ; i++)
312                 del_topics[i] = rd_kafka_DeleteTopic_new(test_mk_topic_name(__FUNCTION__, 1));
313 
314         if (with_options) {
315                 options = rd_kafka_AdminOptions_new(
316                         rk, RD_KAFKA_ADMIN_OP_DELETETOPICS);
317 
318                 exp_timeout = MY_SOCKET_TIMEOUT_MS * 2;
319                 err = rd_kafka_AdminOptions_set_request_timeout(
320                         options, exp_timeout, errstr, sizeof(errstr));
321                 TEST_ASSERT(!err, "%s", rd_kafka_err2str(err));
322 
323                 if (useq) {
324                         my_opaque = (void *)456;
325                         rd_kafka_AdminOptions_set_opaque(options, my_opaque);
326                 }
327         }
328 
329         TIMING_START(&timing, "DeleteTopics");
330         TEST_SAY("Call DeleteTopics, timeout is %dms\n", exp_timeout);
331         rd_kafka_DeleteTopics(rk, del_topics, MY_DEL_TOPICS_CNT,
332                               options, q);
333         TIMING_ASSERT_LATER(&timing, 0, 50);
334 
335         /* Poll result queue */
336         TIMING_START(&timing, "DeleteTopics.queue_poll");
337         rkev = rd_kafka_queue_poll(q, exp_timeout + 1000);
338         TIMING_ASSERT_LATER(&timing, exp_timeout-100, exp_timeout+100);
339         TEST_ASSERT(rkev != NULL, "expected result in %dms", exp_timeout);
340         TEST_SAY("DeleteTopics: got %s in %.3fs\n",
341                  rd_kafka_event_name(rkev), TIMING_DURATION(&timing) / 1000.0f);
342 
343         /* Convert event to proper result */
344         res = rd_kafka_event_DeleteTopics_result(rkev);
345         TEST_ASSERT(res, "expected DeleteTopics_result, not %s",
346                     rd_kafka_event_name(rkev));
347 
348         opaque = rd_kafka_event_opaque(rkev);
349         TEST_ASSERT(opaque == my_opaque, "expected opaque to be %p, not %p",
350                     my_opaque, opaque);
351 
352         /* Expecting error */
353         err = rd_kafka_event_error(rkev);
354         errstr2 = rd_kafka_event_error_string(rkev);
355         TEST_ASSERT(err == RD_KAFKA_RESP_ERR__TIMED_OUT,
356                     "expected DeleteTopics to return error %s, not %s (%s)",
357                     rd_kafka_err2str(RD_KAFKA_RESP_ERR__TIMED_OUT),
358                     rd_kafka_err2str(err),
359                     err ? errstr2 : "n/a");
360 
361         /* Attempt to extract topics anyway, should return NULL. */
362         restopics = rd_kafka_DeleteTopics_result_topics(res, &restopic_cnt);
363         TEST_ASSERT(!restopics && restopic_cnt == 0,
364                     "expected no result_topics, got %p cnt %"PRIusz,
365                     restopics, restopic_cnt);
366 
367         rd_kafka_event_destroy(rkev);
368 
369         rd_kafka_DeleteTopic_destroy_array(del_topics, MY_DEL_TOPICS_CNT);
370 
371         if (options)
372                 rd_kafka_AdminOptions_destroy(options);
373 
374         if (!useq)
375                 rd_kafka_queue_destroy(q);
376 #undef MY_DEL_TOPICS_CNT
377 
378         SUB_TEST_QUICK();
379 }
380 
381 /**
382  * @brief DeleteGroups tests
383  *
384  *
385  *
386  */
do_test_DeleteGroups(const char * what,rd_kafka_t * rk,rd_kafka_queue_t * useq,int with_options,rd_bool_t destroy)387 static void do_test_DeleteGroups (const char *what,
388                                   rd_kafka_t *rk, rd_kafka_queue_t *useq,
389                                   int with_options,
390                                   rd_bool_t destroy) {
391         rd_kafka_queue_t *q;
392 #define MY_DEL_GROUPS_CNT 4
393         char *group_names[MY_DEL_GROUPS_CNT];
394         rd_kafka_DeleteGroup_t *del_groups[MY_DEL_GROUPS_CNT];
395         rd_kafka_AdminOptions_t *options = NULL;
396         int exp_timeout = MY_SOCKET_TIMEOUT_MS;
397         int i;
398         char errstr[512];
399         const char *errstr2;
400         rd_kafka_resp_err_t err;
401         test_timing_t timing;
402         rd_kafka_event_t *rkev;
403         const rd_kafka_DeleteGroups_result_t *res;
404         const rd_kafka_group_result_t **resgroups;
405         size_t resgroup_cnt;
406         void *my_opaque = NULL, *opaque;
407 
408         SUB_TEST_QUICK("%s DeleteGroups with %s, timeout %dms",
409                        rd_kafka_name(rk), what, exp_timeout);
410 
411         q = useq ? useq : rd_kafka_queue_new(rk);
412 
413         for (i = 0 ; i < MY_DEL_GROUPS_CNT ; i++) {
414                 group_names[i] = rd_strdup(test_mk_topic_name(__FUNCTION__, 1));
415                 del_groups[i] = rd_kafka_DeleteGroup_new(group_names[i]);
416         }
417 
418         if (with_options) {
419                 options = rd_kafka_AdminOptions_new(
420                         rk, RD_KAFKA_ADMIN_OP_DELETEGROUPS);
421 
422                 exp_timeout = MY_SOCKET_TIMEOUT_MS * 2;
423                 err = rd_kafka_AdminOptions_set_request_timeout(
424                         options, exp_timeout, errstr, sizeof(errstr));
425                 TEST_ASSERT(!err, "%s", rd_kafka_err2str(err));
426 
427                 if (useq) {
428                         my_opaque = (void *)456;
429                         rd_kafka_AdminOptions_set_opaque(options, my_opaque);
430                 }
431         }
432 
433         TIMING_START(&timing, "DeleteGroups");
434         TEST_SAY("Call DeleteGroups, timeout is %dms\n", exp_timeout);
435         rd_kafka_DeleteGroups(rk, del_groups, MY_DEL_GROUPS_CNT,
436                               options, q);
437         TIMING_ASSERT_LATER(&timing, 0, 50);
438 
439         if (destroy)
440                 goto destroy;
441 
442         /* Poll result queue */
443         TIMING_START(&timing, "DeleteGroups.queue_poll");
444         rkev = rd_kafka_queue_poll(q, exp_timeout + 1000);
445         TIMING_ASSERT_LATER(&timing, exp_timeout-100, exp_timeout+100);
446         TEST_ASSERT(rkev != NULL, "expected result in %dms", exp_timeout);
447         TEST_SAY("DeleteGroups: got %s in %.3fs\n",
448                  rd_kafka_event_name(rkev), TIMING_DURATION(&timing) / 1000.0f);
449 
450         /* Convert event to proper result */
451         res = rd_kafka_event_DeleteGroups_result(rkev);
452         TEST_ASSERT(res, "expected DeleteGroups_result, not %s",
453                     rd_kafka_event_name(rkev));
454 
455         opaque = rd_kafka_event_opaque(rkev);
456         TEST_ASSERT(opaque == my_opaque, "expected opaque to be %p, not %p",
457                     my_opaque, opaque);
458 
459         /* Expecting no error (errors will be per-group) */
460         err = rd_kafka_event_error(rkev);
461         errstr2 = rd_kafka_event_error_string(rkev);
462         TEST_ASSERT(err == RD_KAFKA_RESP_ERR_NO_ERROR,
463                     "expected DeleteGroups to return error %s, not %s (%s)",
464                     rd_kafka_err2str(RD_KAFKA_RESP_ERR_NO_ERROR),
465                     rd_kafka_err2str(err),
466                     err ? errstr2 : "n/a");
467 
468         /* Extract groups, should return MY_DEL_GROUPS_CNT groups. */
469         resgroups = rd_kafka_DeleteGroups_result_groups(res, &resgroup_cnt);
470         TEST_ASSERT(resgroups && resgroup_cnt == MY_DEL_GROUPS_CNT,
471                     "expected %d result_groups, got %p cnt %"PRIusz,
472                     MY_DEL_GROUPS_CNT, resgroups, resgroup_cnt);
473 
474         /* The returned groups should be in the original order, and
475          * should all have timed out. */
476         for (i = 0; i < MY_DEL_GROUPS_CNT; i++) {
477                 TEST_ASSERT(!strcmp(group_names[i],
478                                     rd_kafka_group_result_name(resgroups[i])),
479                             "expected group '%s' at position %d, not '%s'",
480                             group_names[i], i,
481                             rd_kafka_group_result_name(resgroups[i]));
482                 TEST_ASSERT(rd_kafka_error_code(rd_kafka_group_result_error(
483                                                         resgroups[i])) ==
484                             RD_KAFKA_RESP_ERR__TIMED_OUT,
485                             "expected group '%s' to have timed out, got %s",
486                             group_names[i],
487                             rd_kafka_error_string(
488                                     rd_kafka_group_result_error(resgroups[i])));
489         }
490 
491         rd_kafka_event_destroy(rkev);
492 
493  destroy:
494         for (i = 0; i < MY_DEL_GROUPS_CNT; i++) {
495                 rd_kafka_DeleteGroup_destroy(del_groups[i]);
496                 rd_free(group_names[i]);
497         }
498 
499         if (options)
500                 rd_kafka_AdminOptions_destroy(options);
501 
502         if (!useq)
503                 rd_kafka_queue_destroy(q);
504 #undef MY_DEL_GROUPS_CNT
505 
506         SUB_TEST_QUICK();
507 }
508 
do_test_DeleteRecords(const char * what,rd_kafka_t * rk,rd_kafka_queue_t * useq,int with_options,rd_bool_t destroy)509 static void do_test_DeleteRecords (const char *what,
510                                    rd_kafka_t *rk, rd_kafka_queue_t *useq,
511                                    int with_options, rd_bool_t destroy) {
512         rd_kafka_queue_t *q;
513 #define MY_DEL_RECORDS_CNT 4
514         rd_kafka_AdminOptions_t *options = NULL;
515         rd_kafka_topic_partition_list_t *offsets = NULL;
516         rd_kafka_DeleteRecords_t *del_records;
517         const rd_kafka_DeleteRecords_result_t *res;
518         char *topics[MY_DEL_RECORDS_CNT];
519         int exp_timeout = MY_SOCKET_TIMEOUT_MS;
520         int i;
521         char errstr[512];
522         rd_kafka_resp_err_t err;
523         test_timing_t timing;
524         rd_kafka_event_t *rkev;
525         void *my_opaque = NULL, *opaque;
526 
527         SUB_TEST_QUICK("%s DeleteRecords with %s, timeout %dms",
528                        rd_kafka_name(rk), what, exp_timeout);
529 
530         q = useq ? useq : rd_kafka_queue_new(rk);
531 
532         for (i = 0 ; i < MY_DEL_RECORDS_CNT ; i++) {
533                 topics[i] = rd_strdup(test_mk_topic_name(__FUNCTION__, 1));
534         }
535 
536         if (with_options) {
537                 options = rd_kafka_AdminOptions_new(
538                         rk, RD_KAFKA_ADMIN_OP_DELETERECORDS);
539 
540                 exp_timeout = MY_SOCKET_TIMEOUT_MS * 2;
541 
542                 err = rd_kafka_AdminOptions_set_request_timeout(
543                         options, exp_timeout, errstr, sizeof(errstr));
544                 TEST_ASSERT(!err, "%s", rd_kafka_err2str(err));
545 
546                 if (useq) {
547                         my_opaque = (void *)4567;
548                         rd_kafka_AdminOptions_set_opaque(options, my_opaque);
549                 }
550         }
551 
552         offsets = rd_kafka_topic_partition_list_new(MY_DEL_RECORDS_CNT);
553 
554         for (i = 0; i < MY_DEL_RECORDS_CNT; i++)
555                 rd_kafka_topic_partition_list_add(offsets,topics[i], i)->
556                         offset = RD_KAFKA_OFFSET_END;
557 
558         del_records = rd_kafka_DeleteRecords_new(offsets);
559         rd_kafka_topic_partition_list_destroy(offsets);
560 
561         TIMING_START(&timing, "DeleteRecords");
562         TEST_SAY("Call DeleteRecords, timeout is %dms\n", exp_timeout);
563         rd_kafka_DeleteRecords(rk, &del_records, 1, options, q);
564         TIMING_ASSERT_LATER(&timing, 0, 10);
565 
566         rd_kafka_DeleteRecords_destroy(del_records);
567 
568         if (destroy)
569                 goto destroy;
570 
571         /* Poll result queue */
572         TIMING_START(&timing, "DeleteRecords.queue_poll");
573         rkev = rd_kafka_queue_poll(q, exp_timeout + 1000);
574         TIMING_ASSERT(&timing, exp_timeout-100,  exp_timeout+100);
575         TEST_ASSERT(rkev != NULL, "expected result in %dms", exp_timeout);
576         TEST_SAY("DeleteRecords: got %s in %.3fs\n",
577                  rd_kafka_event_name(rkev), TIMING_DURATION(&timing) / 1000.0f);
578 
579         /* Convert event to proper result */
580         res = rd_kafka_event_DeleteRecords_result(rkev);
581         TEST_ASSERT(res, "expected DeleteRecords_result, not %s",
582                     rd_kafka_event_name(rkev));
583 
584         opaque = rd_kafka_event_opaque(rkev);
585         TEST_ASSERT(opaque == my_opaque, "expected opaque to be %p, not %p",
586                     my_opaque, opaque);
587 
588         /* Expecting error (pre-fanout leader_req will fail) */
589         err = rd_kafka_event_error(rkev);
590         TEST_ASSERT(err, "expected DeleteRecords to fail");
591 
592         rd_kafka_event_destroy(rkev);
593 
594  destroy:
595 
596         if (options)
597                 rd_kafka_AdminOptions_destroy(options);
598 
599         if (!useq)
600                 rd_kafka_queue_destroy(q);
601 
602         for (i = 0 ; i < MY_DEL_RECORDS_CNT ; i++)
603                 rd_free(topics[i]);
604 
605 #undef MY_DEL_RECORDS_CNT
606 
607         SUB_TEST_PASS();
608 }
609 
610 
do_test_DeleteConsumerGroupOffsets(const char * what,rd_kafka_t * rk,rd_kafka_queue_t * useq,int with_options)611 static void do_test_DeleteConsumerGroupOffsets (const char *what,
612                                                 rd_kafka_t *rk,
613                                                 rd_kafka_queue_t *useq,
614                                                 int with_options) {
615         rd_kafka_queue_t *q;
616 #define MY_DEL_CGRPOFFS_CNT 1
617         rd_kafka_AdminOptions_t *options = NULL;
618         const rd_kafka_DeleteConsumerGroupOffsets_result_t *res;
619         rd_kafka_DeleteConsumerGroupOffsets_t *cgoffsets[MY_DEL_CGRPOFFS_CNT];
620         int exp_timeout = MY_SOCKET_TIMEOUT_MS;
621         int i;
622         char errstr[512];
623         rd_kafka_resp_err_t err;
624         test_timing_t timing;
625         rd_kafka_event_t *rkev;
626         void *my_opaque = NULL, *opaque;
627 
628         SUB_TEST_QUICK("%s DeleteConsumerGroupOffsets with %s, timeout %dms",
629                        rd_kafka_name(rk), what, exp_timeout);
630 
631         q = useq ? useq : rd_kafka_queue_new(rk);
632 
633         for (i = 0 ; i < MY_DEL_CGRPOFFS_CNT ; i++) {
634                 rd_kafka_topic_partition_list_t *partitions =
635                         rd_kafka_topic_partition_list_new(3);
636                 rd_kafka_topic_partition_list_add(partitions, "topic1", 9);
637                 rd_kafka_topic_partition_list_add(partitions, "topic3", 15);
638                 rd_kafka_topic_partition_list_add(partitions, "topic1", 1);
639                 cgoffsets[i] = rd_kafka_DeleteConsumerGroupOffsets_new(
640                         "mygroup", partitions);
641                 rd_kafka_topic_partition_list_destroy(partitions);
642         }
643 
644         if (with_options) {
645                 options = rd_kafka_AdminOptions_new(
646                         rk, RD_KAFKA_ADMIN_OP_DELETECONSUMERGROUPOFFSETS);
647 
648                 exp_timeout = MY_SOCKET_TIMEOUT_MS * 2;
649 
650                 err = rd_kafka_AdminOptions_set_request_timeout(
651                         options, exp_timeout, errstr, sizeof(errstr));
652                 TEST_ASSERT(!err, "%s", rd_kafka_err2str(err));
653 
654                 if (useq) {
655                         my_opaque = (void *)99981;
656                         rd_kafka_AdminOptions_set_opaque(options, my_opaque);
657                 }
658         }
659 
660         TIMING_START(&timing, "DeleteConsumerGroupOffsets");
661         TEST_SAY("Call DeleteConsumerGroupOffsets, timeout is %dms\n",
662                  exp_timeout);
663         rd_kafka_DeleteConsumerGroupOffsets(rk, cgoffsets,
664                                             MY_DEL_CGRPOFFS_CNT,
665                                             options, q);
666         TIMING_ASSERT_LATER(&timing, 0, 10);
667 
668         /* Poll result queue */
669         TIMING_START(&timing, "DeleteConsumerGroupOffsets.queue_poll");
670         rkev = rd_kafka_queue_poll(q, exp_timeout + 1000);
671         TIMING_ASSERT(&timing, exp_timeout-100,  exp_timeout+100);
672         TEST_ASSERT(rkev != NULL, "expected result in %dms", exp_timeout);
673         TEST_SAY("DeleteConsumerGroupOffsets: got %s in %.3fs\n",
674                  rd_kafka_event_name(rkev), TIMING_DURATION(&timing) / 1000.0f);
675 
676         /* Convert event to proper result */
677         res = rd_kafka_event_DeleteConsumerGroupOffsets_result(rkev);
678         TEST_ASSERT(res, "expected DeleteConsumerGroupOffsets_result, not %s",
679                     rd_kafka_event_name(rkev));
680 
681         opaque = rd_kafka_event_opaque(rkev);
682         TEST_ASSERT(opaque == my_opaque, "expected opaque to be %p, not %p",
683                     my_opaque, opaque);
684 
685         /* Expecting error */
686         err = rd_kafka_event_error(rkev);
687         TEST_ASSERT(err, "expected DeleteConsumerGroupOffsets to fail");
688 
689         rd_kafka_event_destroy(rkev);
690 
691         if (options)
692                 rd_kafka_AdminOptions_destroy(options);
693 
694         if (!useq)
695                 rd_kafka_queue_destroy(q);
696 
697         rd_kafka_DeleteConsumerGroupOffsets_destroy_array(
698                 cgoffsets, MY_DEL_CGRPOFFS_CNT);
699 
700 #undef MY_DEL_CGRPOFFSETS_CNT
701 
702         SUB_TEST_PASS();
703 }
704 
705 
706 
707 /**
708  * @brief Test a mix of APIs using the same replyq.
709  *
710  *  - Create topics A,B
711  *  - Delete topic B
712  *  - Create topic C
713  *  - Delete groups A,B,C
714  *  - Delete records from A,B,C
715  *  - Create extra partitions for topic D
716  */
do_test_mix(rd_kafka_t * rk,rd_kafka_queue_t * rkqu)717 static void do_test_mix (rd_kafka_t *rk, rd_kafka_queue_t *rkqu) {
718         char *topics[] = { "topicA", "topicB", "topicC" };
719         int cnt = 0;
720         struct waiting {
721                 rd_kafka_event_type_t evtype;
722                 int seen;
723         };
724         struct waiting id1 = {RD_KAFKA_EVENT_CREATETOPICS_RESULT};
725         struct waiting id2 = {RD_KAFKA_EVENT_DELETETOPICS_RESULT};
726         struct waiting id3 = {RD_KAFKA_EVENT_CREATETOPICS_RESULT};
727         struct waiting id4 = {RD_KAFKA_EVENT_DELETEGROUPS_RESULT};
728         struct waiting id5 = {RD_KAFKA_EVENT_DELETERECORDS_RESULT};
729         struct waiting id6 = {RD_KAFKA_EVENT_CREATEPARTITIONS_RESULT};
730         struct waiting id7 = {RD_KAFKA_EVENT_DELETECONSUMERGROUPOFFSETS_RESULT};
731         struct waiting id8 = {RD_KAFKA_EVENT_DELETECONSUMERGROUPOFFSETS_RESULT};
732         struct waiting id9 = {RD_KAFKA_EVENT_CREATETOPICS_RESULT};
733         rd_kafka_topic_partition_list_t *offsets;
734 
735 
736         SUB_TEST_QUICK();
737 
738         offsets = rd_kafka_topic_partition_list_new(3);
739         rd_kafka_topic_partition_list_add(offsets, topics[0], 0)->offset =
740                 RD_KAFKA_OFFSET_END;
741         rd_kafka_topic_partition_list_add(offsets, topics[1], 0)->offset =
742                 RD_KAFKA_OFFSET_END;
743         rd_kafka_topic_partition_list_add(offsets, topics[2], 0)->offset =
744                 RD_KAFKA_OFFSET_END;
745 
746         test_CreateTopics_simple(rk, rkqu, topics, 2, 1, &id1);
747         test_DeleteTopics_simple(rk, rkqu, &topics[1], 1, &id2);
748         test_CreateTopics_simple(rk, rkqu, &topics[2], 1, 1, &id3);
749         test_DeleteGroups_simple(rk, rkqu, topics, 3, &id4);
750         test_DeleteRecords_simple(rk, rkqu, offsets, &id5);
751         test_CreatePartitions_simple(rk, rkqu, "topicD", 15, &id6);
752         test_DeleteConsumerGroupOffsets_simple(rk, rkqu, "mygroup", offsets,
753                                                &id7);
754         test_DeleteConsumerGroupOffsets_simple(rk, rkqu, NULL, NULL, &id8);
755         /* Use broker-side defaults for partition count */
756         test_CreateTopics_simple(rk, rkqu, topics, 2, -1, &id9);
757 
758         rd_kafka_topic_partition_list_destroy(offsets);
759 
760         while (cnt < 9) {
761                 rd_kafka_event_t *rkev;
762                 struct waiting *w;
763 
764                 rkev = rd_kafka_queue_poll(rkqu, -1);
765                 TEST_ASSERT(rkev);
766 
767                 TEST_SAY("Got event %s: %s\n",
768                          rd_kafka_event_name(rkev),
769                          rd_kafka_event_error_string(rkev));
770 
771                 w = rd_kafka_event_opaque(rkev);
772                 TEST_ASSERT(w);
773 
774                 TEST_ASSERT(w->evtype == rd_kafka_event_type(rkev),
775                             "Expected evtype %d, not %d (%s)",
776                             w->evtype, rd_kafka_event_type(rkev),
777                             rd_kafka_event_name(rkev));
778 
779                 TEST_ASSERT(w->seen == 0, "Duplicate results");
780 
781                 w->seen++;
782                 cnt++;
783 
784                 rd_kafka_event_destroy(rkev);
785         }
786 
787         SUB_TEST_PASS();
788 }
789 
790 
791 /**
792  * @brief Test AlterConfigs and DescribeConfigs
793  */
do_test_configs(rd_kafka_t * rk,rd_kafka_queue_t * rkqu)794 static void do_test_configs (rd_kafka_t *rk, rd_kafka_queue_t *rkqu) {
795 #define MY_CONFRES_CNT RD_KAFKA_RESOURCE__CNT + 2
796         rd_kafka_ConfigResource_t *configs[MY_CONFRES_CNT];
797         rd_kafka_AdminOptions_t *options;
798         rd_kafka_event_t *rkev;
799         rd_kafka_resp_err_t err;
800         const rd_kafka_AlterConfigs_result_t *res;
801         const rd_kafka_ConfigResource_t **rconfigs;
802         size_t rconfig_cnt;
803         char errstr[128];
804         int i;
805 
806         SUB_TEST_QUICK();
807 
808         /* Check invalids */
809         configs[0] = rd_kafka_ConfigResource_new(
810                 (rd_kafka_ResourceType_t)-1, "something");
811         TEST_ASSERT(!configs[0]);
812 
813         configs[0] = rd_kafka_ConfigResource_new(
814                 (rd_kafka_ResourceType_t)0, NULL);
815         TEST_ASSERT(!configs[0]);
816 
817 
818         for (i = 0 ; i < MY_CONFRES_CNT ; i++) {
819                 int set_config = !(i % 2);
820 
821                 /* librdkafka shall not limit the use of illogical
822                  * or unknown settings, they are enforced by the broker. */
823                 configs[i] = rd_kafka_ConfigResource_new(
824                         (rd_kafka_ResourceType_t)i, "3");
825                 TEST_ASSERT(configs[i] != NULL);
826 
827                 if (set_config) {
828                         rd_kafka_ConfigResource_set_config(configs[i],
829                                                            "some.conf",
830                                                            "which remains "
831                                                            "unchecked");
832                         rd_kafka_ConfigResource_set_config(configs[i],
833                                                            "some.conf.null",
834                                                            NULL);
835                 }
836         }
837 
838 
839         options = rd_kafka_AdminOptions_new(rk, RD_KAFKA_ADMIN_OP_ANY);
840         err = rd_kafka_AdminOptions_set_request_timeout(options, 1000, errstr,
841                                                         sizeof(errstr));
842         TEST_ASSERT(!err, "%s", errstr);
843 
844         /* AlterConfigs */
845         rd_kafka_AlterConfigs(rk, configs, MY_CONFRES_CNT,
846                               options, rkqu);
847 
848         rkev = test_wait_admin_result(rkqu, RD_KAFKA_EVENT_ALTERCONFIGS_RESULT,
849                                       2000);
850 
851         TEST_ASSERT(rd_kafka_event_error(rkev) == RD_KAFKA_RESP_ERR__TIMED_OUT,
852                     "Expected timeout, not %s",
853                     rd_kafka_event_error_string(rkev));
854 
855         res = rd_kafka_event_AlterConfigs_result(rkev);
856         TEST_ASSERT(res);
857 
858         rconfigs = rd_kafka_AlterConfigs_result_resources(res, &rconfig_cnt);
859         TEST_ASSERT(!rconfigs && !rconfig_cnt,
860                     "Expected no result resources, got %"PRIusz,
861                     rconfig_cnt);
862 
863         rd_kafka_event_destroy(rkev);
864 
865         /* DescribeConfigs: reuse same configs and options */
866         rd_kafka_DescribeConfigs(rk, configs, MY_CONFRES_CNT,
867                                  options, rkqu);
868 
869         rd_kafka_AdminOptions_destroy(options);
870         rd_kafka_ConfigResource_destroy_array(configs, MY_CONFRES_CNT);
871 
872         rkev = test_wait_admin_result(rkqu,
873                                       RD_KAFKA_EVENT_DESCRIBECONFIGS_RESULT,
874                                       2000);
875 
876         TEST_ASSERT(rd_kafka_event_error(rkev) == RD_KAFKA_RESP_ERR__TIMED_OUT,
877                     "Expected timeout, not %s",
878                     rd_kafka_event_error_string(rkev));
879 
880         res = rd_kafka_event_DescribeConfigs_result(rkev);
881         TEST_ASSERT(res);
882 
883         rconfigs = rd_kafka_DescribeConfigs_result_resources(res, &rconfig_cnt);
884         TEST_ASSERT(!rconfigs && !rconfig_cnt,
885                     "Expected no result resources, got %"PRIusz,
886                     rconfig_cnt);
887 
888         rd_kafka_event_destroy(rkev);
889 
890         SUB_TEST_PASS();
891 }
892 
893 
894 /**
895  * @brief Verify that an unclean rd_kafka_destroy() does not hang or crash.
896  */
do_test_unclean_destroy(rd_kafka_type_t cltype,int with_mainq)897 static void do_test_unclean_destroy (rd_kafka_type_t cltype, int with_mainq) {
898         rd_kafka_t *rk;
899         char errstr[512];
900         rd_kafka_conf_t *conf;
901         rd_kafka_queue_t *q;
902         rd_kafka_event_t *rkev;
903         rd_kafka_DeleteTopic_t *topic;
904         test_timing_t t_destroy;
905 
906         SUB_TEST_QUICK("Test unclean destroy using %s",
907                        with_mainq ? "mainq" : "tempq");
908 
909         test_conf_init(&conf, NULL, 0);
910         /* Remove brokers, if any, since this is a local test and we
911          * rely on the controller not being found. */
912         test_conf_set(conf, "bootstrap.servers", "");
913         test_conf_set(conf, "socket.timeout.ms", "60000");
914 
915         rk = rd_kafka_new(cltype, conf, errstr, sizeof(errstr));
916         TEST_ASSERT(rk, "kafka_new(%d): %s", cltype, errstr);
917 
918         if (with_mainq)
919                 q = rd_kafka_queue_get_main(rk);
920         else
921                 q = rd_kafka_queue_new(rk);
922 
923         topic = rd_kafka_DeleteTopic_new("test");
924         rd_kafka_DeleteTopics(rk, &topic, 1, NULL, q);
925         rd_kafka_DeleteTopic_destroy(topic);
926 
927         /* We're not expecting a result yet since DeleteTopics will attempt
928          * to look up the controller for socket.timeout.ms (1 minute). */
929         rkev = rd_kafka_queue_poll(q, 100);
930         TEST_ASSERT(!rkev, "Did not expect result: %s",
931                     rd_kafka_event_name(rkev));
932 
933         rd_kafka_queue_destroy(q);
934 
935         TEST_SAY("Giving rd_kafka_destroy() 5s to finish, "
936                  "despite Admin API request being processed\n");
937         test_timeout_set(5);
938         TIMING_START(&t_destroy, "rd_kafka_destroy()");
939         rd_kafka_destroy(rk);
940         TIMING_STOP(&t_destroy);
941 
942         SUB_TEST_PASS();
943 
944         /* Restore timeout */
945         test_timeout_set(60);
946 }
947 
948 
949 /**
950  * @brief Test AdminOptions
951  */
do_test_options(rd_kafka_t * rk)952 static void do_test_options (rd_kafka_t *rk) {
953 #define _all_apis { RD_KAFKA_ADMIN_OP_CREATETOPICS, \
954                     RD_KAFKA_ADMIN_OP_DELETETOPICS, \
955                     RD_KAFKA_ADMIN_OP_CREATEPARTITIONS, \
956                     RD_KAFKA_ADMIN_OP_ALTERCONFIGS, \
957                     RD_KAFKA_ADMIN_OP_DESCRIBECONFIGS, \
958                     RD_KAFKA_ADMIN_OP_DELETEGROUPS, \
959                     RD_KAFKA_ADMIN_OP_DELETERECORDS, \
960                     RD_KAFKA_ADMIN_OP_DELETECONSUMERGROUPOFFSETS, \
961                     RD_KAFKA_ADMIN_OP_ANY /* Must be last */}
962         struct {
963                 const char *setter;
964                 const rd_kafka_admin_op_t valid_apis[9];
965         } matrix[] = {
966                 { "request_timeout", _all_apis },
967                 { "operation_timeout", { RD_KAFKA_ADMIN_OP_CREATETOPICS,
968                                          RD_KAFKA_ADMIN_OP_DELETETOPICS,
969                                          RD_KAFKA_ADMIN_OP_CREATEPARTITIONS,
970                                          RD_KAFKA_ADMIN_OP_DELETERECORDS } },
971                 { "validate_only", { RD_KAFKA_ADMIN_OP_CREATETOPICS,
972                                      RD_KAFKA_ADMIN_OP_CREATEPARTITIONS,
973                                      RD_KAFKA_ADMIN_OP_ALTERCONFIGS } },
974                 { "broker", _all_apis },
975                 { "opaque", _all_apis },
976                 { NULL },
977         };
978         int i;
979         rd_kafka_AdminOptions_t *options;
980 
981         SUB_TEST_QUICK();
982 
983         for (i = 0 ; matrix[i].setter ; i++) {
984                 static const rd_kafka_admin_op_t all_apis[] = _all_apis;
985                 const rd_kafka_admin_op_t *for_api;
986 
987                 for (for_api = all_apis ; ; for_api++) {
988                         rd_kafka_resp_err_t err = RD_KAFKA_RESP_ERR_NO_ERROR;
989                         rd_kafka_resp_err_t exp_err = RD_KAFKA_RESP_ERR_NO_ERROR;
990                         char errstr[512];
991                         int fi;
992 
993                         options = rd_kafka_AdminOptions_new(rk, *for_api);
994                         TEST_ASSERT(options,
995                                     "AdminOptions_new(%d) failed", *for_api);
996 
997                         if (!strcmp(matrix[i].setter, "request_timeout"))
998                                 err = rd_kafka_AdminOptions_set_request_timeout(
999                                         options, 1234, errstr, sizeof(errstr));
1000                         else if (!strcmp(matrix[i].setter, "operation_timeout"))
1001                                 err = rd_kafka_AdminOptions_set_operation_timeout(
1002                                         options, 12345, errstr, sizeof(errstr));
1003                         else if (!strcmp(matrix[i].setter, "validate_only"))
1004                                 err = rd_kafka_AdminOptions_set_validate_only(
1005                                         options, 1, errstr, sizeof(errstr));
1006                         else if (!strcmp(matrix[i].setter, "broker"))
1007                                 err = rd_kafka_AdminOptions_set_broker(
1008                                         options, 5, errstr, sizeof(errstr));
1009                         else if (!strcmp(matrix[i].setter, "opaque")) {
1010                                 rd_kafka_AdminOptions_set_opaque(
1011                                         options, (void *)options);
1012                                 err = RD_KAFKA_RESP_ERR_NO_ERROR;
1013                         } else
1014                                 TEST_FAIL("Invalid setter: %s",
1015                                           matrix[i].setter);
1016 
1017 
1018                         TEST_SAYL(3, "AdminOptions_set_%s on "
1019                                   "RD_KAFKA_ADMIN_OP_%d options "
1020                                   "returned %s: %s\n",
1021                                   matrix[i].setter,
1022                                   *for_api,
1023                                   rd_kafka_err2name(err),
1024                                   err ? errstr : "success");
1025 
1026                         /* Scan matrix valid_apis to see if this
1027                          * setter should be accepted or not. */
1028                         if (exp_err) {
1029                                 /* An expected error is already set */
1030                         } else if (*for_api != RD_KAFKA_ADMIN_OP_ANY) {
1031                                 exp_err = RD_KAFKA_RESP_ERR__INVALID_ARG;
1032 
1033                                 for (fi = 0 ; matrix[i].valid_apis[fi] ; fi++) {
1034                                         if (matrix[i].valid_apis[fi] ==
1035                                             *for_api)
1036                                                 exp_err = RD_KAFKA_RESP_ERR_NO_ERROR;
1037                                 }
1038                         } else {
1039                                 exp_err = RD_KAFKA_RESP_ERR_NO_ERROR;
1040                         }
1041 
1042                         if (err != exp_err)
1043                                 TEST_FAIL_LATER("Expected AdminOptions_set_%s "
1044                                                 "for RD_KAFKA_ADMIN_OP_%d "
1045                                                 "options to return %s, "
1046                                                 "not %s",
1047                                                 matrix[i].setter,
1048                                                 *for_api,
1049                                                 rd_kafka_err2name(exp_err),
1050                                                 rd_kafka_err2name(err));
1051 
1052                         rd_kafka_AdminOptions_destroy(options);
1053 
1054                         if (*for_api == RD_KAFKA_ADMIN_OP_ANY)
1055                                 break; /* This was the last one */
1056                 }
1057         }
1058 
1059         /* Try an invalid for_api */
1060         options = rd_kafka_AdminOptions_new(rk, (rd_kafka_admin_op_t)1234);
1061         TEST_ASSERT(!options, "Expected AdminOptions_new() to fail "
1062                     "with an invalid for_api, didn't.");
1063 
1064         TEST_LATER_CHECK();
1065 
1066         SUB_TEST_PASS();
1067 }
1068 
1069 
create_admin_client(rd_kafka_type_t cltype)1070 static rd_kafka_t *create_admin_client (rd_kafka_type_t cltype) {
1071         rd_kafka_t *rk;
1072         char errstr[512];
1073         rd_kafka_conf_t *conf;
1074 
1075         test_conf_init(&conf, NULL, 0);
1076         /* Remove brokers, if any, since this is a local test and we
1077          * rely on the controller not being found. */
1078         test_conf_set(conf, "bootstrap.servers", "");
1079         test_conf_set(conf, "socket.timeout.ms", MY_SOCKET_TIMEOUT_MS_STR);
1080         /* For use with the background queue */
1081         rd_kafka_conf_set_background_event_cb(conf, background_event_cb);
1082 
1083         rk = rd_kafka_new(cltype, conf, errstr, sizeof(errstr));
1084         TEST_ASSERT(rk, "kafka_new(%d): %s", cltype, errstr);
1085 
1086         return rk;
1087 }
1088 
1089 
do_test_apis(rd_kafka_type_t cltype)1090 static void do_test_apis (rd_kafka_type_t cltype) {
1091         rd_kafka_t *rk;
1092         rd_kafka_queue_t *mainq, *backgroundq;
1093 
1094         mtx_init(&last_event_lock, mtx_plain);
1095         cnd_init(&last_event_cnd);
1096 
1097         do_test_unclean_destroy(cltype, 0/*tempq*/);
1098         do_test_unclean_destroy(cltype, 1/*mainq*/);
1099 
1100         rk = create_admin_client(cltype);
1101 
1102         mainq = rd_kafka_queue_get_main(rk);
1103         backgroundq = rd_kafka_queue_get_background(rk);
1104 
1105         do_test_options(rk);
1106 
1107         do_test_CreateTopics("temp queue, no options", rk, NULL, 0, 0);
1108         do_test_CreateTopics("temp queue, no options, background_event_cb",
1109                              rk, backgroundq, 1, 0);
1110         do_test_CreateTopics("temp queue, options", rk, NULL, 0, 1);
1111         do_test_CreateTopics("main queue, options", rk, mainq, 0, 1);
1112 
1113         do_test_DeleteTopics("temp queue, no options", rk, NULL, 0);
1114         do_test_DeleteTopics("temp queue, options", rk, NULL, 1);
1115         do_test_DeleteTopics("main queue, options", rk, mainq, 1);
1116 
1117         do_test_DeleteGroups("temp queue, no options", rk, NULL, 0, rd_false);
1118         do_test_DeleteGroups("temp queue, options", rk, NULL, 1, rd_false);
1119         do_test_DeleteGroups("main queue, options", rk, mainq, 1, rd_false);
1120 
1121         do_test_DeleteRecords("temp queue, no options", rk, NULL, 0, rd_false);
1122         do_test_DeleteRecords("temp queue, options", rk, NULL, 1, rd_false);
1123         do_test_DeleteRecords("main queue, options", rk, mainq, 1, rd_false);
1124 
1125         do_test_DeleteConsumerGroupOffsets("temp queue, no options",
1126                                            rk, NULL, 0);
1127         do_test_DeleteConsumerGroupOffsets("temp queue, options", rk, NULL, 1);
1128         do_test_DeleteConsumerGroupOffsets("main queue, options", rk, mainq, 1);
1129 
1130         do_test_mix(rk, mainq);
1131 
1132         do_test_configs(rk, mainq);
1133 
1134         rd_kafka_queue_destroy(backgroundq);
1135         rd_kafka_queue_destroy(mainq);
1136 
1137         rd_kafka_destroy(rk);
1138 
1139         /*
1140          * Tests which require a unique unused client instance.
1141          */
1142         rk = create_admin_client(cltype);
1143         mainq = rd_kafka_queue_get_main(rk);
1144         do_test_DeleteRecords("main queue, options, destroy", rk, mainq, 1,
1145                               rd_true/*destroy instance before finishing*/);
1146         rd_kafka_queue_destroy(mainq);
1147         rd_kafka_destroy(rk);
1148 
1149         rk = create_admin_client(cltype);
1150         mainq = rd_kafka_queue_get_main(rk);
1151         do_test_DeleteGroups("main queue, options, destroy", rk, mainq, 1,
1152                              rd_true/*destroy instance before finishing*/);
1153         rd_kafka_queue_destroy(mainq);
1154         rd_kafka_destroy(rk);
1155 
1156 
1157         /* Done */
1158         mtx_destroy(&last_event_lock);
1159         cnd_destroy(&last_event_cnd);
1160 }
1161 
1162 
main_0080_admin_ut(int argc,char ** argv)1163 int main_0080_admin_ut (int argc, char **argv) {
1164         do_test_apis(RD_KAFKA_PRODUCER);
1165         do_test_apis(RD_KAFKA_CONSUMER);
1166         return 0;
1167 }
1168