1 /*
2  * librdkafka - Apache Kafka C library
3  *
4  * Copyright (c) 2012-2015, Magnus Edenhill
5  * All rights reserved.
6  *
7  * Redistribution and use in source and binary forms, with or without
8  * modification, are permitted provided that the following conditions are met:
9  *
10  * 1. Redistributions of source code must retain the above copyright notice,
11  *    this list of conditions and the following disclaimer.
12  * 2. Redistributions in binary form must reproduce the above copyright notice,
13  *    this list of conditions and the following disclaimer in the documentation
14  *    and/or other materials provided with the distribution.
15  *
16  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
17  * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
18  * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
19  * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
20  * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
21  * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
22  * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
23  * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
24  * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
25  * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
26  * POSSIBILITY OF SUCH DAMAGE.
27  */
28 
29 #include "test.h"
30 #include "rdkafka.h"
31 
32 /**
33  * @brief Admin API local dry-run unit-tests.
34  */
35 
36 #define MY_SOCKET_TIMEOUT_MS      100
37 #define MY_SOCKET_TIMEOUT_MS_STR "100"
38 
39 
40 
41 static mtx_t last_event_lock;
42 static cnd_t last_event_cnd;
43 static rd_kafka_event_t *last_event = NULL;
44 
45 /**
46  * @brief The background event callback is called automatically
47  *        by librdkafka from a background thread.
48  */
background_event_cb(rd_kafka_t * rk,rd_kafka_event_t * rkev,void * opaque)49 static void background_event_cb (rd_kafka_t *rk, rd_kafka_event_t *rkev,
50                                  void *opaque) {
51         mtx_lock(&last_event_lock);
52         TEST_ASSERT(!last_event, "Multiple events seen in background_event_cb "
53                     "(existing %s, new %s)",
54                     rd_kafka_event_name(last_event), rd_kafka_event_name(rkev));
55         last_event = rkev;
56         mtx_unlock(&last_event_lock);
57         cnd_broadcast(&last_event_cnd);
58         rd_sleep(1);
59 }
60 
wait_background_event_cb(void)61 static rd_kafka_event_t *wait_background_event_cb (void) {
62         rd_kafka_event_t *rkev;
63         mtx_lock(&last_event_lock);
64         while (!(rkev = last_event))
65                 cnd_wait(&last_event_cnd, &last_event_lock);
66         last_event = NULL;
67         mtx_unlock(&last_event_lock);
68 
69         return rkev;
70 }
71 
72 
73 /**
74  * @brief CreateTopics tests
75  *
76  *
77  *
78  */
do_test_CreateTopics(const char * what,rd_kafka_t * rk,rd_kafka_queue_t * useq,int with_background_event_cb,int with_options)79 static void do_test_CreateTopics (const char *what,
80                                   rd_kafka_t *rk, rd_kafka_queue_t *useq,
81                                   int with_background_event_cb,
82                                   int with_options) {
83         rd_kafka_queue_t *q = useq ? useq : rd_kafka_queue_new(rk);
84 #define MY_NEW_TOPICS_CNT 6
85         rd_kafka_NewTopic_t *new_topics[MY_NEW_TOPICS_CNT];
86         rd_kafka_AdminOptions_t *options = NULL;
87         int exp_timeout = MY_SOCKET_TIMEOUT_MS;
88         int i;
89         char errstr[512];
90         const char *errstr2;
91         rd_kafka_resp_err_t err;
92         test_timing_t timing;
93         rd_kafka_event_t *rkev;
94         const rd_kafka_CreateTopics_result_t *res;
95         const rd_kafka_topic_result_t **restopics;
96         size_t restopic_cnt;
97         void *my_opaque = NULL, *opaque;
98 
99         TEST_SAY(_C_MAG "[ %s CreateTopics with %s, timeout %dms ]\n",
100                  rd_kafka_name(rk), what, exp_timeout);
101 
102         /**
103          * Construct NewTopic array with different properties for
104          * different partitions.
105          */
106         for (i = 0 ; i < MY_NEW_TOPICS_CNT ; i++) {
107                 const char *topic = test_mk_topic_name(__FUNCTION__, 1);
108                 int num_parts = i * 51 + 1;
109                 int num_replicas = jitter(1, MY_NEW_TOPICS_CNT-1);
110                 int set_config = (i & 2);
111                 int set_replicas = !(i % 1);
112 
113                 new_topics[i] = rd_kafka_NewTopic_new(topic,
114                                                       num_parts,
115                                                       set_replicas ? -1 :
116                                                       num_replicas,
117                                                       NULL, 0);
118 
119                 if (set_config) {
120                         /*
121                          * Add various (unverified) configuration properties
122                          */
123                         err = rd_kafka_NewTopic_set_config(new_topics[i],
124                                                            "dummy.doesntexist",
125                                                            "butThere'sNothing "
126                                                            "to verify that");
127                         TEST_ASSERT(!err, "%s", rd_kafka_err2str(err));
128 
129                         err = rd_kafka_NewTopic_set_config(new_topics[i],
130                                                            "try.a.null.value",
131                                                            NULL);
132                         TEST_ASSERT(!err, "%s", rd_kafka_err2str(err));
133 
134                         err = rd_kafka_NewTopic_set_config(new_topics[i],
135                                                            "or.empty", "");
136                         TEST_ASSERT(!err, "%s", rd_kafka_err2str(err));
137                 }
138 
139 
140                 if (set_replicas) {
141                         int32_t p;
142                         int32_t replicas[MY_NEW_TOPICS_CNT];
143                         int j;
144 
145                         for (j = 0 ; j < num_replicas ; j++)
146                                 replicas[j] = j;
147 
148                         /*
149                          * Set valid replica assignments
150                          */
151                         for (p = 0 ; p < num_parts ; p++) {
152                                 /* Try adding an existing out of order,
153                                  * should fail */
154                                 if (p == 1) {
155                                         err = rd_kafka_NewTopic_set_replica_assignment(
156                                                 new_topics[i], p+1,
157                                                 replicas, num_replicas,
158                                                 errstr, sizeof(errstr));
159                                         TEST_ASSERT(err == RD_KAFKA_RESP_ERR__INVALID_ARG,
160                                                     "%s", rd_kafka_err2str(err));
161                                 }
162 
163                                 err = rd_kafka_NewTopic_set_replica_assignment(
164                                         new_topics[i], p,
165                                         replicas, num_replicas,
166                                         errstr, sizeof(errstr));
167                                 TEST_ASSERT(!err, "%s", errstr);
168                         }
169 
170                         /* Try to add an existing partition, should fail */
171                         err = rd_kafka_NewTopic_set_replica_assignment(
172                                 new_topics[i], 0,
173                                 replicas, num_replicas, NULL, 0);
174                         TEST_ASSERT(err == RD_KAFKA_RESP_ERR__INVALID_ARG,
175                                     "%s", rd_kafka_err2str(err));
176 
177                 } else {
178                         int32_t dummy_replicas[1] = {1};
179 
180                         /* Test invalid partition */
181                         err = rd_kafka_NewTopic_set_replica_assignment(
182                                 new_topics[i], num_parts+1, dummy_replicas, 1,
183                                 errstr, sizeof(errstr));
184                         TEST_ASSERT(err == RD_KAFKA_RESP_ERR__INVALID_ARG,
185                                     "%s: %s", rd_kafka_err2str(err),
186                                     err == RD_KAFKA_RESP_ERR_NO_ERROR ?
187                                     "" : errstr);
188 
189                         /* Setting replicas with with default replicas != -1
190                          * is an error. */
191                         err = rd_kafka_NewTopic_set_replica_assignment(
192                                 new_topics[i], 0, dummy_replicas, 1,
193                                 errstr, sizeof(errstr));
194                         TEST_ASSERT(err == RD_KAFKA_RESP_ERR__INVALID_ARG,
195                                     "%s: %s", rd_kafka_err2str(err),
196                                     err == RD_KAFKA_RESP_ERR_NO_ERROR ?
197                                     "" : errstr);
198                 }
199         }
200 
201         if (with_options) {
202                 options = rd_kafka_AdminOptions_new(rk, RD_KAFKA_ADMIN_OP_ANY);
203 
204                 exp_timeout = MY_SOCKET_TIMEOUT_MS * 2;
205                 err = rd_kafka_AdminOptions_set_request_timeout(
206                         options, exp_timeout, errstr, sizeof(errstr));
207                 TEST_ASSERT(!err, "%s", rd_kafka_err2str(err));
208 
209                 my_opaque = (void *)123;
210                 rd_kafka_AdminOptions_set_opaque(options, my_opaque);
211         }
212 
213         TIMING_START(&timing, "CreateTopics");
214         TEST_SAY("Call CreateTopics, timeout is %dms\n", exp_timeout);
215         rd_kafka_CreateTopics(rk, new_topics, MY_NEW_TOPICS_CNT,
216                               options, q);
217         TIMING_ASSERT_LATER(&timing, 0, 50);
218 
219         if (with_background_event_cb) {
220                 /* Result event will be triggered by callback from
221                  * librdkafka background queue thread. */
222                 TIMING_START(&timing, "CreateTopics.wait_background_event_cb");
223                 rkev = wait_background_event_cb();
224         } else {
225                 /* Poll result queue */
226                 TIMING_START(&timing, "CreateTopics.queue_poll");
227                 rkev = rd_kafka_queue_poll(q, exp_timeout + 1000);
228         }
229 
230         TIMING_ASSERT_LATER(&timing, exp_timeout-100, exp_timeout+100);
231         TEST_ASSERT(rkev != NULL, "expected result in %dms",
232                     exp_timeout);
233         TEST_SAY("CreateTopics: got %s in %.3fs\n",
234                  rd_kafka_event_name(rkev),
235                  TIMING_DURATION(&timing) / 1000.0f);
236 
237         /* Convert event to proper result */
238         res = rd_kafka_event_CreateTopics_result(rkev);
239         TEST_ASSERT(res, "expected CreateTopics_result, not %s",
240                     rd_kafka_event_name(rkev));
241 
242         opaque = rd_kafka_event_opaque(rkev);
243         TEST_ASSERT(opaque == my_opaque, "expected opaque to be %p, not %p",
244                     my_opaque, opaque);
245 
246         /* Expecting error */
247         err = rd_kafka_event_error(rkev);
248         errstr2 = rd_kafka_event_error_string(rkev);
249         TEST_ASSERT(err == RD_KAFKA_RESP_ERR__TIMED_OUT,
250                     "expected CreateTopics to return error %s, not %s (%s)",
251                     rd_kafka_err2str(RD_KAFKA_RESP_ERR__TIMED_OUT),
252                     rd_kafka_err2str(err),
253                     err ? errstr2 : "n/a");
254 
255         /* Attempt to extract topics anyway, should return NULL. */
256         restopics = rd_kafka_CreateTopics_result_topics(res, &restopic_cnt);
257         TEST_ASSERT(!restopics && restopic_cnt == 0,
258                     "expected no result_topics, got %p cnt %"PRIusz,
259                     restopics, restopic_cnt);
260 
261         rd_kafka_event_destroy(rkev);
262 
263         rd_kafka_NewTopic_destroy_array(new_topics, MY_NEW_TOPICS_CNT);
264 
265         if (options)
266                 rd_kafka_AdminOptions_destroy(options);
267 
268         if (!useq)
269                 rd_kafka_queue_destroy(q);
270 }
271 
272 
273 
274 
275 
276 
277 /**
278  * @brief DeleteTopics tests
279  *
280  *
281  *
282  */
do_test_DeleteTopics(const char * what,rd_kafka_t * rk,rd_kafka_queue_t * useq,int with_options)283 static void do_test_DeleteTopics (const char *what,
284                                   rd_kafka_t *rk, rd_kafka_queue_t *useq,
285                                   int with_options) {
286         rd_kafka_queue_t *q = useq ? useq : rd_kafka_queue_new(rk);
287 #define MY_DEL_TOPICS_CNT 4
288         rd_kafka_DeleteTopic_t *del_topics[MY_DEL_TOPICS_CNT];
289         rd_kafka_AdminOptions_t *options = NULL;
290         int exp_timeout = MY_SOCKET_TIMEOUT_MS;
291         int i;
292         char errstr[512];
293         const char *errstr2;
294         rd_kafka_resp_err_t err;
295         test_timing_t timing;
296         rd_kafka_event_t *rkev;
297         const rd_kafka_DeleteTopics_result_t *res;
298         const rd_kafka_topic_result_t **restopics;
299         size_t restopic_cnt;
300         void *my_opaque = NULL, *opaque;
301 
302         TEST_SAY(_C_MAG "[ %s DeleteTopics with %s, timeout %dms ]\n",
303                  rd_kafka_name(rk), what, exp_timeout);
304 
305         for (i = 0 ; i < MY_DEL_TOPICS_CNT ; i++)
306                 del_topics[i] = rd_kafka_DeleteTopic_new(test_mk_topic_name(__FUNCTION__, 1));
307 
308         if (with_options) {
309                 options = rd_kafka_AdminOptions_new(
310                         rk, RD_KAFKA_ADMIN_OP_DELETETOPICS);
311 
312                 exp_timeout = MY_SOCKET_TIMEOUT_MS * 2;
313                 err = rd_kafka_AdminOptions_set_request_timeout(
314                         options, exp_timeout, errstr, sizeof(errstr));
315                 TEST_ASSERT(!err, "%s", rd_kafka_err2str(err));
316 
317                 if (useq) {
318                         my_opaque = (void *)456;
319                         rd_kafka_AdminOptions_set_opaque(options, my_opaque);
320                 }
321         }
322 
323         TIMING_START(&timing, "DeleteTopics");
324         TEST_SAY("Call DeleteTopics, timeout is %dms\n", exp_timeout);
325         rd_kafka_DeleteTopics(rk, del_topics, MY_DEL_TOPICS_CNT,
326                               options, q);
327         TIMING_ASSERT_LATER(&timing, 0, 50);
328 
329         /* Poll result queue */
330         TIMING_START(&timing, "DeleteTopics.queue_poll");
331         rkev = rd_kafka_queue_poll(q, exp_timeout + 1000);
332         TIMING_ASSERT_LATER(&timing, exp_timeout-100, exp_timeout+100);
333         TEST_ASSERT(rkev != NULL, "expected result in %dms", exp_timeout);
334         TEST_SAY("DeleteTopics: got %s in %.3fs\n",
335                  rd_kafka_event_name(rkev), TIMING_DURATION(&timing) / 1000.0f);
336 
337         /* Convert event to proper result */
338         res = rd_kafka_event_DeleteTopics_result(rkev);
339         TEST_ASSERT(res, "expected DeleteTopics_result, not %s",
340                     rd_kafka_event_name(rkev));
341 
342         opaque = rd_kafka_event_opaque(rkev);
343         TEST_ASSERT(opaque == my_opaque, "expected opaque to be %p, not %p",
344                     my_opaque, opaque);
345 
346         /* Expecting error */
347         err = rd_kafka_event_error(rkev);
348         errstr2 = rd_kafka_event_error_string(rkev);
349         TEST_ASSERT(err == RD_KAFKA_RESP_ERR__TIMED_OUT,
350                     "expected DeleteTopics to return error %s, not %s (%s)",
351                     rd_kafka_err2str(RD_KAFKA_RESP_ERR__TIMED_OUT),
352                     rd_kafka_err2str(err),
353                     err ? errstr2 : "n/a");
354 
355         /* Attempt to extract topics anyway, should return NULL. */
356         restopics = rd_kafka_DeleteTopics_result_topics(res, &restopic_cnt);
357         TEST_ASSERT(!restopics && restopic_cnt == 0,
358                     "expected no result_topics, got %p cnt %"PRIusz,
359                     restopics, restopic_cnt);
360 
361         rd_kafka_event_destroy(rkev);
362 
363         rd_kafka_DeleteTopic_destroy_array(del_topics, MY_DEL_TOPICS_CNT);
364 
365         if (options)
366                 rd_kafka_AdminOptions_destroy(options);
367 
368         if (!useq)
369                 rd_kafka_queue_destroy(q);
370 }
371 
372 
373 /**
374  * @brief Test a mix of APIs using the same replyq.
375  *
376  *  - Create topics A,B
377  *  - Delete topic B
378  *  - Create topic C
379  *  - Create extra partitions for topic D
380  */
do_test_mix(rd_kafka_t * rk,rd_kafka_queue_t * rkqu)381 static void do_test_mix (rd_kafka_t *rk, rd_kafka_queue_t *rkqu) {
382         char *topics[] = { "topicA", "topicB", "topicC" };
383         int cnt = 0;
384         struct waiting {
385                 rd_kafka_event_type_t evtype;
386                 int seen;
387         };
388         struct waiting id1 = {RD_KAFKA_EVENT_CREATETOPICS_RESULT};
389         struct waiting id2 = {RD_KAFKA_EVENT_DELETETOPICS_RESULT};
390         struct waiting id3 = {RD_KAFKA_EVENT_CREATETOPICS_RESULT};
391         struct waiting id4 = {RD_KAFKA_EVENT_CREATEPARTITIONS_RESULT};
392 
393         TEST_SAY(_C_MAG "[ Mixed mode test on %s]\n", rd_kafka_name(rk));
394 
395         test_CreateTopics_simple(rk, rkqu, topics, 2, 1, &id1);
396         test_DeleteTopics_simple(rk, rkqu, &topics[1], 1, &id2);
397         test_CreateTopics_simple(rk, rkqu, &topics[2], 1, 1, &id3);
398         test_CreatePartitions_simple(rk, rkqu, "topicD", 15, &id4);
399 
400         while (cnt < 4) {
401                 rd_kafka_event_t *rkev;
402                 struct waiting *w;
403 
404                 rkev = rd_kafka_queue_poll(rkqu, -1);
405                 TEST_ASSERT(rkev);
406 
407                 TEST_SAY("Got event %s: %s\n",
408                          rd_kafka_event_name(rkev),
409                          rd_kafka_event_error_string(rkev));
410 
411                 w = rd_kafka_event_opaque(rkev);
412                 TEST_ASSERT(w);
413 
414                 TEST_ASSERT(w->evtype == rd_kafka_event_type(rkev),
415                             "Expected evtype %d, not %d (%s)",
416                             w->evtype, rd_kafka_event_type(rkev),
417                             rd_kafka_event_name(rkev));
418 
419                 TEST_ASSERT(w->seen == 0, "Duplicate results");
420 
421                 w->seen++;
422                 cnt++;
423 
424                 rd_kafka_event_destroy(rkev);
425         }
426 }
427 
428 
429 /**
430  * @brief Test AlterConfigs and DescribeConfigs
431  */
do_test_configs(rd_kafka_t * rk,rd_kafka_queue_t * rkqu)432 static void do_test_configs (rd_kafka_t *rk, rd_kafka_queue_t *rkqu) {
433 #define MY_CONFRES_CNT RD_KAFKA_RESOURCE__CNT + 2
434         rd_kafka_ConfigResource_t *configs[MY_CONFRES_CNT];
435         rd_kafka_AdminOptions_t *options;
436         rd_kafka_event_t *rkev;
437         rd_kafka_resp_err_t err;
438         const rd_kafka_AlterConfigs_result_t *res;
439         const rd_kafka_ConfigResource_t **rconfigs;
440         size_t rconfig_cnt;
441         char errstr[128];
442         int i;
443 
444         /* Check invalids */
445         configs[0] = rd_kafka_ConfigResource_new(
446                 (rd_kafka_ResourceType_t)-1, "something");
447         TEST_ASSERT(!configs[0]);
448 
449         configs[0] = rd_kafka_ConfigResource_new(
450                 (rd_kafka_ResourceType_t)0, NULL);
451         TEST_ASSERT(!configs[0]);
452 
453 
454         for (i = 0 ; i < MY_CONFRES_CNT ; i++) {
455                 int set_config = !(i % 2);
456 
457                 /* librdkafka shall not limit the use of illogical
458                  * or unknown settings, they are enforced by the broker. */
459                 configs[i] = rd_kafka_ConfigResource_new(
460                         (rd_kafka_ResourceType_t)i, "3");
461                 TEST_ASSERT(configs[i] != NULL);
462 
463                 if (set_config) {
464                         rd_kafka_ConfigResource_set_config(configs[i],
465                                                            "some.conf",
466                                                            "which remains "
467                                                            "unchecked");
468                         rd_kafka_ConfigResource_set_config(configs[i],
469                                                            "some.conf.null",
470                                                            NULL);
471                 }
472         }
473 
474 
475         options = rd_kafka_AdminOptions_new(rk, RD_KAFKA_ADMIN_OP_ANY);
476         err = rd_kafka_AdminOptions_set_request_timeout(options, 1000, errstr,
477                                                         sizeof(errstr));
478         TEST_ASSERT(!err, "%s", errstr);
479 
480         /* AlterConfigs */
481         rd_kafka_AlterConfigs(rk, configs, MY_CONFRES_CNT,
482                               options, rkqu);
483 
484         rkev = test_wait_admin_result(rkqu, RD_KAFKA_EVENT_ALTERCONFIGS_RESULT,
485                                       2000);
486 
487         TEST_ASSERT(rd_kafka_event_error(rkev) == RD_KAFKA_RESP_ERR__TIMED_OUT,
488                     "Expected timeout, not %s",
489                     rd_kafka_event_error_string(rkev));
490 
491         res = rd_kafka_event_AlterConfigs_result(rkev);
492         TEST_ASSERT(res);
493 
494         rconfigs = rd_kafka_AlterConfigs_result_resources(res, &rconfig_cnt);
495         TEST_ASSERT(!rconfigs && !rconfig_cnt,
496                     "Expected no result resources, got %"PRIusz,
497                     rconfig_cnt);
498 
499         rd_kafka_event_destroy(rkev);
500 
501         /* DescribeConfigs: reuse same configs and options */
502         rd_kafka_DescribeConfigs(rk, configs, MY_CONFRES_CNT,
503                                  options, rkqu);
504 
505         rd_kafka_AdminOptions_destroy(options);
506         rd_kafka_ConfigResource_destroy_array(configs, MY_CONFRES_CNT);
507 
508         rkev = test_wait_admin_result(rkqu,
509                                       RD_KAFKA_EVENT_DESCRIBECONFIGS_RESULT,
510                                       2000);
511 
512         TEST_ASSERT(rd_kafka_event_error(rkev) == RD_KAFKA_RESP_ERR__TIMED_OUT,
513                     "Expected timeout, not %s",
514                     rd_kafka_event_error_string(rkev));
515 
516         res = rd_kafka_event_DescribeConfigs_result(rkev);
517         TEST_ASSERT(res);
518 
519         rconfigs = rd_kafka_DescribeConfigs_result_resources(res, &rconfig_cnt);
520         TEST_ASSERT(!rconfigs && !rconfig_cnt,
521                     "Expected no result resources, got %"PRIusz,
522                     rconfig_cnt);
523 
524         rd_kafka_event_destroy(rkev);
525 }
526 
527 
528 /**
529  * @brief Verify that an unclean rd_kafka_destroy() does not hang.
530  */
do_test_unclean_destroy(rd_kafka_type_t cltype,int with_mainq)531 static void do_test_unclean_destroy (rd_kafka_type_t cltype, int with_mainq) {
532         rd_kafka_t *rk;
533         char errstr[512];
534         rd_kafka_conf_t *conf;
535         rd_kafka_queue_t *q;
536         rd_kafka_event_t *rkev;
537         rd_kafka_DeleteTopic_t *topic;
538         test_timing_t t_destroy;
539 
540         test_conf_init(&conf, NULL, 0);
541         /* Remove brokers, if any, since this is a local test and we
542          * rely on the controller not being found. */
543         test_conf_set(conf, "bootstrap.servers", "");
544         test_conf_set(conf, "socket.timeout.ms", "60000");
545 
546         rk = rd_kafka_new(cltype, conf, errstr, sizeof(errstr));
547         TEST_ASSERT(rk, "kafka_new(%d): %s", cltype, errstr);
548 
549         TEST_SAY(_C_MAG "[ Test unclean destroy for %s using %s]\n", rd_kafka_name(rk),
550                  with_mainq ? "mainq" : "tempq");
551 
552         if (with_mainq)
553                 q = rd_kafka_queue_get_main(rk);
554         else
555                 q = rd_kafka_queue_new(rk);
556 
557         topic = rd_kafka_DeleteTopic_new("test");
558         rd_kafka_DeleteTopics(rk, &topic, 1, NULL, q);
559         rd_kafka_DeleteTopic_destroy(topic);
560 
561         /* We're not expecting a result yet since DeleteTopics will attempt
562          * to look up the controller for socket.timeout.ms (1 minute). */
563         rkev = rd_kafka_queue_poll(q, 100);
564         TEST_ASSERT(!rkev, "Did not expect result: %s", rd_kafka_event_name(rkev));
565 
566         rd_kafka_queue_destroy(q);
567 
568         TEST_SAY("Giving rd_kafka_destroy() 5s to finish, "
569                  "despite Admin API request being processed\n");
570         test_timeout_set(5);
571         TIMING_START(&t_destroy, "rd_kafka_destroy()");
572         rd_kafka_destroy(rk);
573         TIMING_STOP(&t_destroy);
574 
575         /* Restore timeout */
576         test_timeout_set(60);
577 }
578 
579 
580 /**
581  * @brief Test AdminOptions
582  */
do_test_options(rd_kafka_t * rk)583 static void do_test_options (rd_kafka_t *rk) {
584 #define _all_apis { RD_KAFKA_ADMIN_OP_CREATETOPICS, \
585                     RD_KAFKA_ADMIN_OP_DELETETOPICS, \
586                     RD_KAFKA_ADMIN_OP_CREATEPARTITIONS, \
587                     RD_KAFKA_ADMIN_OP_ALTERCONFIGS, \
588                     RD_KAFKA_ADMIN_OP_DESCRIBECONFIGS, \
589                     RD_KAFKA_ADMIN_OP_ANY /* Must be last */}
590         struct {
591                 const char *setter;
592                 const rd_kafka_admin_op_t valid_apis[8];
593         } matrix[] = {
594                 { "request_timeout", _all_apis },
595                 { "operation_timeout", { RD_KAFKA_ADMIN_OP_CREATETOPICS,
596                                          RD_KAFKA_ADMIN_OP_DELETETOPICS,
597                                          RD_KAFKA_ADMIN_OP_CREATEPARTITIONS } },
598                 { "validate_only", { RD_KAFKA_ADMIN_OP_CREATETOPICS,
599                                      RD_KAFKA_ADMIN_OP_CREATEPARTITIONS,
600                                      RD_KAFKA_ADMIN_OP_ALTERCONFIGS } },
601                 { "broker", _all_apis },
602                 { "opaque", _all_apis },
603                 { NULL },
604         };
605         int i;
606         rd_kafka_AdminOptions_t *options;
607 
608 
609         for (i = 0 ; matrix[i].setter ; i++) {
610                 static const rd_kafka_admin_op_t all_apis[] = _all_apis;
611                 const rd_kafka_admin_op_t *for_api;
612 
613                 for (for_api = all_apis ; ; for_api++) {
614                         rd_kafka_resp_err_t err = RD_KAFKA_RESP_ERR_NO_ERROR;
615                         rd_kafka_resp_err_t exp_err = RD_KAFKA_RESP_ERR_NO_ERROR;
616                         char errstr[512];
617                         int fi;
618 
619                         options = rd_kafka_AdminOptions_new(rk, *for_api);
620                         TEST_ASSERT(options,
621                                     "AdminOptions_new(%d) failed", *for_api);
622 
623                         if (!strcmp(matrix[i].setter, "request_timeout"))
624                                 err = rd_kafka_AdminOptions_set_request_timeout(
625                                         options, 1234, errstr, sizeof(errstr));
626                         else if (!strcmp(matrix[i].setter, "operation_timeout"))
627                                 err = rd_kafka_AdminOptions_set_operation_timeout(
628                                         options, 12345, errstr, sizeof(errstr));
629                         else if (!strcmp(matrix[i].setter, "validate_only"))
630                                 err = rd_kafka_AdminOptions_set_validate_only(
631                                         options, 1, errstr, sizeof(errstr));
632                         else if (!strcmp(matrix[i].setter, "broker"))
633                                 err = rd_kafka_AdminOptions_set_broker(
634                                         options, 5, errstr, sizeof(errstr));
635                         else if (!strcmp(matrix[i].setter, "opaque")) {
636                                 rd_kafka_AdminOptions_set_opaque(
637                                         options, (void *)options);
638                                 err = RD_KAFKA_RESP_ERR_NO_ERROR;
639                         } else
640                                 TEST_FAIL("Invalid setter: %s",
641                                           matrix[i].setter);
642 
643 
644                         TEST_SAYL(3, "AdminOptions_set_%s on "
645                                   "RD_KAFKA_ADMIN_OP_%d options "
646                                   "returned %s: %s\n",
647                                   matrix[i].setter,
648                                   *for_api,
649                                   rd_kafka_err2name(err),
650                                   err ? errstr : "success");
651 
652                         /* Scan matrix valid_apis to see if this
653                          * setter should be accepted or not. */
654                         if (exp_err) {
655                                 /* An expected error is already set */
656                         } else if (*for_api != RD_KAFKA_ADMIN_OP_ANY) {
657                                 exp_err = RD_KAFKA_RESP_ERR__INVALID_ARG;
658 
659                                 for (fi = 0 ; matrix[i].valid_apis[fi] ; fi++) {
660                                         if (matrix[i].valid_apis[fi] ==
661                                             *for_api)
662                                                 exp_err = RD_KAFKA_RESP_ERR_NO_ERROR;
663                                 }
664                         } else {
665                                 exp_err = RD_KAFKA_RESP_ERR_NO_ERROR;
666                         }
667 
668                         if (err != exp_err)
669                                 TEST_FAIL_LATER("Expected AdminOptions_set_%s "
670                                                 "for RD_KAFKA_ADMIN_OP_%d "
671                                                 "options to return %s, "
672                                                 "not %s",
673                                                 matrix[i].setter,
674                                                 *for_api,
675                                                 rd_kafka_err2name(exp_err),
676                                                 rd_kafka_err2name(err));
677 
678                         rd_kafka_AdminOptions_destroy(options);
679 
680                         if (*for_api == RD_KAFKA_ADMIN_OP_ANY)
681                                 break; /* This was the last one */
682                 }
683         }
684 
685         /* Try an invalid for_api */
686         options = rd_kafka_AdminOptions_new(rk, (rd_kafka_admin_op_t)1234);
687         TEST_ASSERT(!options, "Expectred AdminOptions_new() to fail "
688                     "with an invalid for_api, didn't.");
689 
690         TEST_LATER_CHECK();
691 }
692 
693 
do_test_apis(rd_kafka_type_t cltype)694 static void do_test_apis (rd_kafka_type_t cltype) {
695         rd_kafka_t *rk;
696         char errstr[512];
697         rd_kafka_queue_t *mainq, *backgroundq;
698         rd_kafka_conf_t *conf;
699 
700         mtx_init(&last_event_lock, mtx_plain);
701         cnd_init(&last_event_cnd);
702 
703         do_test_unclean_destroy(cltype, 0/*tempq*/);
704         do_test_unclean_destroy(cltype, 1/*mainq*/);
705 
706         test_conf_init(&conf, NULL, 0);
707         /* Remove brokers, if any, since this is a local test and we
708          * rely on the controller not being found. */
709         test_conf_set(conf, "bootstrap.servers", "");
710         test_conf_set(conf, "socket.timeout.ms", MY_SOCKET_TIMEOUT_MS_STR);
711         /* For use with the background queue */
712         rd_kafka_conf_set_background_event_cb(conf, background_event_cb);
713 
714         rk = rd_kafka_new(cltype, conf, errstr, sizeof(errstr));
715         TEST_ASSERT(rk, "kafka_new(%d): %s", cltype, errstr);
716 
717         mainq = rd_kafka_queue_get_main(rk);
718         backgroundq = rd_kafka_queue_get_background(rk);
719 
720         do_test_options(rk);
721 
722         do_test_CreateTopics("temp queue, no options", rk, NULL, 0, 0);
723         do_test_CreateTopics("temp queue, no options, background_event_cb",
724                              rk, backgroundq, 1, 0);
725         do_test_CreateTopics("temp queue, options", rk, NULL, 0, 1);
726         do_test_CreateTopics("main queue, options", rk, mainq, 0, 1);
727 
728         do_test_DeleteTopics("temp queue, no options", rk, NULL, 0);
729         do_test_DeleteTopics("temp queue, options", rk, NULL, 1);
730         do_test_DeleteTopics("main queue, options", rk, mainq, 1);
731 
732         do_test_mix(rk, mainq);
733 
734         do_test_configs(rk, mainq);
735 
736         rd_kafka_queue_destroy(backgroundq);
737         rd_kafka_queue_destroy(mainq);
738 
739         rd_kafka_destroy(rk);
740 
741         mtx_destroy(&last_event_lock);
742         cnd_destroy(&last_event_cnd);
743 
744 }
745 
746 
main_0080_admin_ut(int argc,char ** argv)747 int main_0080_admin_ut (int argc, char **argv) {
748         do_test_apis(RD_KAFKA_PRODUCER);
749         do_test_apis(RD_KAFKA_CONSUMER);
750         return 0;
751 }
752