1 /*
2  * librdkafka - Apache Kafka C library
3  *
4  * Copyright (c) 2012-2015, Magnus Edenhill
5  * All rights reserved.
6  *
7  * Redistribution and use in source and binary forms, with or without
8  * modification, are permitted provided that the following conditions are met:
9  *
10  * 1. Redistributions of source code must retain the above copyright notice,
11  *    this list of conditions and the following disclaimer.
12  * 2. Redistributions in binary form must reproduce the above copyright notice,
13  *    this list of conditions and the following disclaimer in the documentation
14  *    and/or other materials provided with the distribution.
15  *
16  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
17  * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
18  * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
19  * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
20  * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
21  * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
22  * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
23  * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
24  * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
25  * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
26  * POSSIBILITY OF SUCH DAMAGE.
27  */
28 
29 #include "test.h"
30 #include "rdkafka.h"
31 
32 /**
33  * @brief Admin API integration tests.
34  */
35 
36 
37 static int32_t *avail_brokers;
38 static size_t avail_broker_cnt;
39 
40 
41 
42 
do_test_CreateTopics(const char * what,rd_kafka_t * rk,rd_kafka_queue_t * useq,int op_timeout,rd_bool_t validate_only)43 static void do_test_CreateTopics (const char *what,
44                                   rd_kafka_t *rk, rd_kafka_queue_t *useq,
45                                   int op_timeout, rd_bool_t validate_only) {
46         rd_kafka_queue_t *q;
47 #define MY_NEW_TOPICS_CNT 7
48         char *topics[MY_NEW_TOPICS_CNT];
49         rd_kafka_NewTopic_t *new_topics[MY_NEW_TOPICS_CNT];
50         rd_kafka_AdminOptions_t *options = NULL;
51         rd_kafka_resp_err_t exp_topicerr[MY_NEW_TOPICS_CNT] = {0};
52         rd_kafka_resp_err_t exp_err = RD_KAFKA_RESP_ERR_NO_ERROR;
53         /* Expected topics in metadata */
54         rd_kafka_metadata_topic_t exp_mdtopics[MY_NEW_TOPICS_CNT] = {{0}};
55         int exp_mdtopic_cnt = 0;
56         /* Not expected topics in metadata */
57         rd_kafka_metadata_topic_t exp_not_mdtopics[MY_NEW_TOPICS_CNT] = {{0}};
58         int exp_not_mdtopic_cnt = 0;
59         int i;
60         char errstr[512];
61         const char *errstr2;
62         rd_kafka_resp_err_t err;
63         test_timing_t timing;
64         rd_kafka_event_t *rkev;
65         const rd_kafka_CreateTopics_result_t *res;
66         const rd_kafka_topic_result_t **restopics;
67         size_t restopic_cnt;
68         int metadata_tmout ;
69         int num_replicas = (int)avail_broker_cnt;
70         int32_t *replicas;
71 
72         SUB_TEST_QUICK("%s CreateTopics with %s, "
73                        "op_timeout %d, validate_only %d",
74                        rd_kafka_name(rk), what, op_timeout, validate_only);
75 
76         q = useq ? useq : rd_kafka_queue_new(rk);
77 
78         /* Set up replicas */
79         replicas = rd_alloca(sizeof(*replicas) * num_replicas);
80         for (i = 0 ; i < num_replicas ; i++)
81                 replicas[i] = avail_brokers[i];
82 
83         /**
84          * Construct NewTopic array with different properties for
85          * different partitions.
86          */
87         for (i = 0 ; i < MY_NEW_TOPICS_CNT ; i++) {
88                 char *topic = rd_strdup(test_mk_topic_name(__FUNCTION__, 1));
89                 int use_defaults = i == 6 &&
90                         test_broker_version >= TEST_BRKVER(2,4,0,0);
91                 int num_parts = !use_defaults ? (i * 7 + 1) : -1;
92                 int set_config = (i & 1);
93                 int add_invalid_config = (i == 1);
94                 int set_replicas = !use_defaults && !(i % 3);
95                 rd_kafka_resp_err_t this_exp_err = RD_KAFKA_RESP_ERR_NO_ERROR;
96 
97                 topics[i] = topic;
98                 new_topics[i] = rd_kafka_NewTopic_new(topic,
99                                                       num_parts,
100                                                       set_replicas ? -1 :
101                                                       num_replicas,
102                                                       NULL, 0);
103 
104                 if (set_config) {
105                         /*
106                          * Add various configuration properties
107                          */
108                         err = rd_kafka_NewTopic_set_config(
109                                 new_topics[i], "compression.type", "lz4");
110                         TEST_ASSERT(!err, "%s", rd_kafka_err2str(err));
111 
112                         err = rd_kafka_NewTopic_set_config(
113                                 new_topics[i], "delete.retention.ms", "900");
114                         TEST_ASSERT(!err, "%s", rd_kafka_err2str(err));
115                 }
116 
117                 if (add_invalid_config) {
118                         /* Add invalid config property */
119                         err = rd_kafka_NewTopic_set_config(
120                                 new_topics[i],
121                                 "dummy.doesntexist",
122                                 "broker is verifying this");
123                         TEST_ASSERT(!err, "%s", rd_kafka_err2str(err));
124                         this_exp_err = RD_KAFKA_RESP_ERR_INVALID_CONFIG;
125                 }
126 
127                 TEST_SAY("Expecting result for topic #%d: %s "
128                          "(set_config=%d, add_invalid_config=%d, "
129                          "set_replicas=%d, use_defaults=%d)\n",
130                          i, rd_kafka_err2name(this_exp_err),
131                          set_config, add_invalid_config, set_replicas,
132                          use_defaults);
133 
134                 if (set_replicas) {
135                         int32_t p;
136 
137                         /*
138                          * Set valid replica assignments
139                          */
140                         for (p = 0 ; p < num_parts ; p++) {
141                                 err = rd_kafka_NewTopic_set_replica_assignment(
142                                         new_topics[i], p,
143                                         replicas, num_replicas,
144                                         errstr, sizeof(errstr));
145                                 TEST_ASSERT(!err, "%s", errstr);
146                         }
147                 }
148 
149                 if (this_exp_err || validate_only) {
150                         exp_topicerr[i] = this_exp_err;
151                         exp_not_mdtopics[exp_not_mdtopic_cnt++].topic = topic;
152 
153                 } else {
154                         exp_mdtopics[exp_mdtopic_cnt].topic = topic;
155                         exp_mdtopics[exp_mdtopic_cnt].partition_cnt =
156                                 num_parts;
157                         exp_mdtopic_cnt++;
158                 }
159         }
160 
161         if (op_timeout != -1 || validate_only) {
162                 options = rd_kafka_AdminOptions_new(
163                         rk, RD_KAFKA_ADMIN_OP_CREATETOPICS);
164 
165                 if (op_timeout != -1) {
166                         err = rd_kafka_AdminOptions_set_operation_timeout(
167                                 options, op_timeout, errstr, sizeof(errstr));
168                         TEST_ASSERT(!err, "%s", rd_kafka_err2str(err));
169                 }
170 
171                 if (validate_only) {
172                         err = rd_kafka_AdminOptions_set_validate_only(
173                                 options, validate_only, errstr, sizeof(errstr));
174                         TEST_ASSERT(!err, "%s", rd_kafka_err2str(err));
175                 }
176         }
177 
178         TIMING_START(&timing, "CreateTopics");
179         TEST_SAY("Call CreateTopics\n");
180         rd_kafka_CreateTopics(rk, new_topics, MY_NEW_TOPICS_CNT,
181                                     options, q);
182         TIMING_ASSERT_LATER(&timing, 0, 50);
183 
184         /* Poll result queue for CreateTopics result.
185          * Print but otherwise ignore other event types
186          * (typically generic Error events). */
187         TIMING_START(&timing, "CreateTopics.queue_poll");
188         do {
189                 rkev = rd_kafka_queue_poll(q, tmout_multip(20*1000));
190                 TEST_SAY("CreateTopics: got %s in %.3fms\n",
191                          rd_kafka_event_name(rkev),
192                          TIMING_DURATION(&timing) / 1000.0f);
193                 if (rd_kafka_event_error(rkev))
194                         TEST_SAY("%s: %s\n",
195                                  rd_kafka_event_name(rkev),
196                                  rd_kafka_event_error_string(rkev));
197         } while (rd_kafka_event_type(rkev) !=
198                  RD_KAFKA_EVENT_CREATETOPICS_RESULT);
199 
200         /* Convert event to proper result */
201         res = rd_kafka_event_CreateTopics_result(rkev);
202         TEST_ASSERT(res, "expected CreateTopics_result, not %s",
203                     rd_kafka_event_name(rkev));
204 
205         /* Expecting error */
206         err = rd_kafka_event_error(rkev);
207         errstr2 = rd_kafka_event_error_string(rkev);
208         TEST_ASSERT(err == exp_err,
209                     "expected CreateTopics to return %s, not %s (%s)",
210                     rd_kafka_err2str(exp_err),
211                     rd_kafka_err2str(err),
212                     err ? errstr2 : "n/a");
213 
214         TEST_SAY("CreateTopics: returned %s (%s)\n",
215                  rd_kafka_err2str(err), err ? errstr2 : "n/a");
216 
217         /* Extract topics */
218         restopics = rd_kafka_CreateTopics_result_topics(res, &restopic_cnt);
219 
220 
221         /* Scan topics for proper fields and expected failures. */
222         for (i = 0 ; i < (int)restopic_cnt ; i++) {
223                 const rd_kafka_topic_result_t *terr = restopics[i];
224 
225                 /* Verify that topic order matches our request. */
226                 if (strcmp(rd_kafka_topic_result_name(terr), topics[i]))
227                         TEST_FAIL_LATER("Topic result order mismatch at #%d: "
228                                         "expected %s, got %s",
229                                         i, topics[i],
230                                         rd_kafka_topic_result_name(terr));
231 
232                 TEST_SAY("CreateTopics result: #%d: %s: %s: %s\n",
233                          i,
234                          rd_kafka_topic_result_name(terr),
235                          rd_kafka_err2name(rd_kafka_topic_result_error(terr)),
236                          rd_kafka_topic_result_error_string(terr));
237                 if (rd_kafka_topic_result_error(terr) != exp_topicerr[i])
238                         TEST_FAIL_LATER(
239                                 "Expected %s, not %d: %s",
240                                 rd_kafka_err2name(exp_topicerr[i]),
241                                 rd_kafka_topic_result_error(terr),
242                                 rd_kafka_err2name(rd_kafka_topic_result_error(
243                                                           terr)));
244         }
245 
246         /**
247          * Verify that the expecteded topics are created and the non-expected
248          * are not. Allow it some time to propagate.
249          */
250         if (validate_only) {
251                 /* No topics should have been created, give it some time
252                  * before checking. */
253                 rd_sleep(2);
254                 metadata_tmout = 5 * 1000;
255         } else {
256                 if (op_timeout > 0)
257                         metadata_tmout = op_timeout + 1000;
258                 else
259                         metadata_tmout = 10 * 1000;
260         }
261 
262         test_wait_metadata_update(rk,
263                                   exp_mdtopics,
264                                   exp_mdtopic_cnt,
265                                   exp_not_mdtopics,
266                                   exp_not_mdtopic_cnt,
267                                   metadata_tmout);
268 
269         rd_kafka_event_destroy(rkev);
270 
271         for (i = 0 ; i < MY_NEW_TOPICS_CNT ; i++) {
272                 rd_kafka_NewTopic_destroy(new_topics[i]);
273                 rd_free(topics[i]);
274         }
275 
276         if (options)
277                 rd_kafka_AdminOptions_destroy(options);
278 
279         if (!useq)
280                 rd_kafka_queue_destroy(q);
281 
282         TEST_LATER_CHECK();
283 #undef MY_NEW_TOPICS_CNT
284 
285         SUB_TEST_PASS();
286 }
287 
288 
289 
290 
291 /**
292  * @brief Test deletion of topics
293  *
294  *
295  */
do_test_DeleteTopics(const char * what,rd_kafka_t * rk,rd_kafka_queue_t * useq,int op_timeout)296 static void do_test_DeleteTopics (const char *what,
297                                   rd_kafka_t *rk, rd_kafka_queue_t *useq,
298                                   int op_timeout) {
299         rd_kafka_queue_t *q;
300         const int skip_topic_cnt = 2;
301 #define MY_DEL_TOPICS_CNT 9
302         char *topics[MY_DEL_TOPICS_CNT];
303         rd_kafka_DeleteTopic_t *del_topics[MY_DEL_TOPICS_CNT];
304         rd_kafka_AdminOptions_t *options = NULL;
305         rd_kafka_resp_err_t exp_topicerr[MY_DEL_TOPICS_CNT] = {0};
306         rd_kafka_resp_err_t exp_err = RD_KAFKA_RESP_ERR_NO_ERROR;
307         /* Expected topics in metadata */
308         rd_kafka_metadata_topic_t exp_mdtopics[MY_DEL_TOPICS_CNT] = {{0}};
309         int exp_mdtopic_cnt = 0;
310         /* Not expected topics in metadata */
311         rd_kafka_metadata_topic_t exp_not_mdtopics[MY_DEL_TOPICS_CNT] = {{0}};
312         int exp_not_mdtopic_cnt = 0;
313         int i;
314         char errstr[512];
315         const char *errstr2;
316         rd_kafka_resp_err_t err;
317         test_timing_t timing;
318         rd_kafka_event_t *rkev;
319         const rd_kafka_DeleteTopics_result_t *res;
320         const rd_kafka_topic_result_t **restopics;
321         size_t restopic_cnt;
322         int metadata_tmout;
323 
324         SUB_TEST_QUICK("%s DeleteTopics with %s, op_timeout %d",
325                        rd_kafka_name(rk), what, op_timeout);
326 
327         q = useq ? useq : rd_kafka_queue_new(rk);
328 
329         /**
330          * Construct DeleteTopic array
331          */
332         for (i = 0 ; i < MY_DEL_TOPICS_CNT ; i++) {
333                 char *topic = rd_strdup(test_mk_topic_name(__FUNCTION__, 1));
334                 int notexist_topic = i >= MY_DEL_TOPICS_CNT - skip_topic_cnt;
335 
336                 topics[i] = topic;
337 
338                 del_topics[i] = rd_kafka_DeleteTopic_new(topic);
339 
340                 if (notexist_topic)
341                         exp_topicerr[i] =
342                                 RD_KAFKA_RESP_ERR_UNKNOWN_TOPIC_OR_PART;
343                 else {
344                         exp_topicerr[i] =
345                                 RD_KAFKA_RESP_ERR_NO_ERROR;
346 
347                         exp_mdtopics[exp_mdtopic_cnt++].topic = topic;
348                 }
349 
350                 exp_not_mdtopics[exp_not_mdtopic_cnt++].topic = topic;
351         }
352 
353         if (op_timeout != -1) {
354                 options = rd_kafka_AdminOptions_new(
355                         rk, RD_KAFKA_ADMIN_OP_ANY);
356 
357                 err = rd_kafka_AdminOptions_set_operation_timeout(
358                         options, op_timeout, errstr, sizeof(errstr));
359                 TEST_ASSERT(!err, "%s", rd_kafka_err2str(err));
360         }
361 
362 
363         /* Create the topics first, minus the skip count. */
364         test_CreateTopics_simple(rk, NULL, topics,
365                                  MY_DEL_TOPICS_CNT-skip_topic_cnt,
366                                  2/*num_partitions*/,
367                                  NULL);
368 
369         /* Verify that topics are reported by metadata */
370         test_wait_metadata_update(rk,
371                                   exp_mdtopics, exp_mdtopic_cnt,
372                                   NULL, 0,
373                                   15*1000);
374 
375         TIMING_START(&timing, "DeleteTopics");
376         TEST_SAY("Call DeleteTopics\n");
377         rd_kafka_DeleteTopics(rk, del_topics, MY_DEL_TOPICS_CNT,
378                                     options, q);
379         TIMING_ASSERT_LATER(&timing, 0, 50);
380 
381         /* Poll result queue for DeleteTopics result.
382          * Print but otherwise ignore other event types
383          * (typically generic Error events). */
384         TIMING_START(&timing, "DeleteTopics.queue_poll");
385         while (1) {
386                 rkev = rd_kafka_queue_poll(q, tmout_multip(20*1000));
387                 TEST_SAY("DeleteTopics: got %s in %.3fms\n",
388                          rd_kafka_event_name(rkev),
389                          TIMING_DURATION(&timing) / 1000.0f);
390                 if (rd_kafka_event_error(rkev))
391                         TEST_SAY("%s: %s\n",
392                                  rd_kafka_event_name(rkev),
393                                  rd_kafka_event_error_string(rkev));
394 
395                 if (rd_kafka_event_type(rkev) ==
396                     RD_KAFKA_EVENT_DELETETOPICS_RESULT)
397                         break;
398 
399                 rd_kafka_event_destroy(rkev);
400         }
401 
402         /* Convert event to proper result */
403         res = rd_kafka_event_DeleteTopics_result(rkev);
404         TEST_ASSERT(res, "expected DeleteTopics_result, not %s",
405                     rd_kafka_event_name(rkev));
406 
407         /* Expecting error */
408         err = rd_kafka_event_error(rkev);
409         errstr2 = rd_kafka_event_error_string(rkev);
410         TEST_ASSERT(err == exp_err,
411                     "expected DeleteTopics to return %s, not %s (%s)",
412                     rd_kafka_err2str(exp_err),
413                     rd_kafka_err2str(err),
414                     err ? errstr2 : "n/a");
415 
416         TEST_SAY("DeleteTopics: returned %s (%s)\n",
417                  rd_kafka_err2str(err), err ? errstr2 : "n/a");
418 
419         /* Extract topics */
420         restopics = rd_kafka_DeleteTopics_result_topics(res, &restopic_cnt);
421 
422 
423         /* Scan topics for proper fields and expected failures. */
424         for (i = 0 ; i < (int)restopic_cnt ; i++) {
425                 const rd_kafka_topic_result_t *terr = restopics[i];
426 
427                 /* Verify that topic order matches our request. */
428                 if (strcmp(rd_kafka_topic_result_name(terr), topics[i]))
429                         TEST_FAIL_LATER("Topic result order mismatch at #%d: "
430                                         "expected %s, got %s",
431                                         i, topics[i],
432                                         rd_kafka_topic_result_name(terr));
433 
434                 TEST_SAY("DeleteTopics result: #%d: %s: %s: %s\n",
435                          i,
436                          rd_kafka_topic_result_name(terr),
437                          rd_kafka_err2name(rd_kafka_topic_result_error(terr)),
438                          rd_kafka_topic_result_error_string(terr));
439                 if (rd_kafka_topic_result_error(terr) != exp_topicerr[i])
440                         TEST_FAIL_LATER(
441                                 "Expected %s, not %d: %s",
442                                 rd_kafka_err2name(exp_topicerr[i]),
443                                 rd_kafka_topic_result_error(terr),
444                                 rd_kafka_err2name(rd_kafka_topic_result_error(
445                                                           terr)));
446         }
447 
448         /**
449          * Verify that the expected topics are deleted and the non-expected
450          * are not. Allow it some time to propagate.
451          */
452         if (op_timeout > 0)
453                 metadata_tmout = op_timeout + 1000;
454         else
455                 metadata_tmout = 10 * 1000;
456 
457         test_wait_metadata_update(rk,
458                                   NULL, 0,
459                                   exp_not_mdtopics,
460                                   exp_not_mdtopic_cnt,
461                                   metadata_tmout);
462 
463         rd_kafka_event_destroy(rkev);
464 
465         for (i = 0 ; i < MY_DEL_TOPICS_CNT ; i++) {
466                 rd_kafka_DeleteTopic_destroy(del_topics[i]);
467                 rd_free(topics[i]);
468         }
469 
470         if (options)
471                 rd_kafka_AdminOptions_destroy(options);
472 
473         if (!useq)
474                 rd_kafka_queue_destroy(q);
475 
476         TEST_LATER_CHECK();
477 #undef MY_DEL_TOPICS_CNT
478 
479         SUB_TEST_PASS();
480 }
481 
482 
483 
484 /**
485  * @brief Test creation of partitions
486  *
487  *
488  */
do_test_CreatePartitions(const char * what,rd_kafka_t * rk,rd_kafka_queue_t * useq,int op_timeout)489 static void do_test_CreatePartitions (const char *what,
490                                       rd_kafka_t *rk, rd_kafka_queue_t *useq,
491                                       int op_timeout) {
492         rd_kafka_queue_t *q;
493 #define MY_CRP_TOPICS_CNT 9
494         char *topics[MY_CRP_TOPICS_CNT];
495         rd_kafka_NewTopic_t *new_topics[MY_CRP_TOPICS_CNT];
496         rd_kafka_NewPartitions_t *crp_topics[MY_CRP_TOPICS_CNT];
497         rd_kafka_AdminOptions_t *options = NULL;
498         /* Expected topics in metadata */
499         rd_kafka_metadata_topic_t exp_mdtopics[MY_CRP_TOPICS_CNT] = {{0}};
500         rd_kafka_metadata_partition_t exp_mdparts[2] = {{0}};
501         int exp_mdtopic_cnt = 0;
502         int i;
503         char errstr[512];
504         rd_kafka_resp_err_t err;
505         test_timing_t timing;
506         int metadata_tmout;
507         int num_replicas = (int)avail_broker_cnt;
508 
509         SUB_TEST_QUICK("%s CreatePartitions with %s, op_timeout %d",
510                        rd_kafka_name(rk), what, op_timeout);
511 
512         q = useq ? useq : rd_kafka_queue_new(rk);
513 
514         /* Set up two expected partitions with different replication sets
515          * so they can be matched by the metadata checker later.
516          * Even partitions use exp_mdparts[0] while odd partitions
517          * use exp_mdparts[1]. */
518 
519         /* Set valid replica assignments (even, and odd (reverse) ) */
520         exp_mdparts[0].replicas = rd_alloca(sizeof(*exp_mdparts[0].replicas) *
521                                             num_replicas);
522         exp_mdparts[1].replicas = rd_alloca(sizeof(*exp_mdparts[1].replicas) *
523                                             num_replicas);
524         exp_mdparts[0].replica_cnt = num_replicas;
525         exp_mdparts[1].replica_cnt = num_replicas;
526         for (i = 0 ; i < num_replicas ; i++) {
527                 exp_mdparts[0].replicas[i] = avail_brokers[i];
528                 exp_mdparts[1].replicas[i] = avail_brokers[num_replicas-i-1];
529         }
530 
531         /**
532          * Construct CreatePartitions array
533          */
534         for (i = 0 ; i < MY_CRP_TOPICS_CNT ; i++) {
535                 char *topic = rd_strdup(test_mk_topic_name(__FUNCTION__, 1));
536                 int initial_part_cnt = 1 + (i * 2);
537                 int new_part_cnt = 1 + (i / 2);
538                 int final_part_cnt = initial_part_cnt + new_part_cnt;
539                 int set_replicas = !(i % 2);
540                 int pi;
541 
542                 topics[i] = topic;
543 
544                 /* Topic to create with initial partition count */
545                 new_topics[i] = rd_kafka_NewTopic_new(topic, initial_part_cnt,
546                                                       set_replicas ?
547                                                       -1 : num_replicas,
548                                                       NULL, 0);
549 
550                 /* .. and later add more partitions to */
551                 crp_topics[i] = rd_kafka_NewPartitions_new(topic,
552                                                            final_part_cnt,
553                                                            errstr,
554                                                            sizeof(errstr));
555 
556                 if (set_replicas) {
557                         exp_mdtopics[exp_mdtopic_cnt].partitions =
558                                 rd_alloca(final_part_cnt *
559                                           sizeof(*exp_mdtopics[exp_mdtopic_cnt].
560                                                  partitions));
561 
562                         for (pi = 0 ; pi < final_part_cnt ; pi++) {
563                                 const rd_kafka_metadata_partition_t *exp_mdp =
564                                         &exp_mdparts[pi & 1];
565 
566                                 exp_mdtopics[exp_mdtopic_cnt].
567                                         partitions[pi] = *exp_mdp; /* copy */
568 
569                                 exp_mdtopics[exp_mdtopic_cnt].
570                                         partitions[pi].id = pi;
571 
572                                 if (pi < initial_part_cnt) {
573                                         /* Set replica assignment
574                                          * for initial partitions */
575                                         err = rd_kafka_NewTopic_set_replica_assignment(
576                                                 new_topics[i], pi,
577                                                 exp_mdp->replicas,
578                                                 (size_t)exp_mdp->replica_cnt,
579                                                 errstr, sizeof(errstr));
580                                         TEST_ASSERT(!err, "NewTopic_set_replica_assignment: %s",
581                                                 errstr);
582                                 } else {
583                                         /* Set replica assignment for new
584                                          * partitions */
585                                         err = rd_kafka_NewPartitions_set_replica_assignment(
586                                                 crp_topics[i],
587                                                 pi - initial_part_cnt,
588                                                 exp_mdp->replicas,
589                                                 (size_t)exp_mdp->replica_cnt,
590                                                 errstr, sizeof(errstr));
591                                         TEST_ASSERT(!err, "NewPartitions_set_replica_assignment: %s",
592                                                 errstr);
593                                 }
594 
595                         }
596                 }
597 
598                 TEST_SAY(_C_YEL "Topic %s with %d initial partitions will grow "
599                          "by %d to %d total partitions with%s replicas set\n",
600                          topics[i],
601                          initial_part_cnt, new_part_cnt, final_part_cnt,
602                          set_replicas ? "" : "out");
603 
604                 exp_mdtopics[exp_mdtopic_cnt].topic = topic;
605                 exp_mdtopics[exp_mdtopic_cnt].partition_cnt = final_part_cnt;
606 
607                 exp_mdtopic_cnt++;
608         }
609 
610         if (op_timeout != -1) {
611                 options = rd_kafka_AdminOptions_new(
612                         rk, RD_KAFKA_ADMIN_OP_ANY);
613 
614                 err = rd_kafka_AdminOptions_set_operation_timeout(
615                         options, op_timeout, errstr, sizeof(errstr));
616                 TEST_ASSERT(!err, "%s", rd_kafka_err2str(err));
617         }
618 
619         /*
620          * Create topics with initial partition count
621          */
622         TIMING_START(&timing, "CreateTopics");
623         TEST_SAY("Creating topics with initial partition counts\n");
624         rd_kafka_CreateTopics(rk, new_topics, MY_CRP_TOPICS_CNT,
625                                     options, q);
626         TIMING_ASSERT_LATER(&timing, 0, 50);
627 
628         err = test_wait_topic_admin_result(q,
629                                            RD_KAFKA_EVENT_CREATETOPICS_RESULT,
630                                            NULL, 15000);
631         TEST_ASSERT(!err, "CreateTopics failed: %s", rd_kafka_err2str(err));
632 
633         rd_kafka_NewTopic_destroy_array(new_topics, MY_CRP_TOPICS_CNT);
634 
635 
636         /*
637          * Create new partitions
638          */
639         TIMING_START(&timing, "CreatePartitions");
640         TEST_SAY("Creating partitions\n");
641         rd_kafka_CreatePartitions(rk, crp_topics, MY_CRP_TOPICS_CNT,
642                                     options, q);
643         TIMING_ASSERT_LATER(&timing, 0, 50);
644 
645         err = test_wait_topic_admin_result(q,
646                                            RD_KAFKA_EVENT_CREATEPARTITIONS_RESULT,
647                                            NULL, 15000);
648         TEST_ASSERT(!err, "CreatePartitions failed: %s", rd_kafka_err2str(err));
649 
650         rd_kafka_NewPartitions_destroy_array(crp_topics, MY_CRP_TOPICS_CNT);
651 
652 
653         /**
654          * Verify that the expected topics are deleted and the non-expected
655          * are not. Allow it some time to propagate.
656          */
657         if (op_timeout > 0)
658                 metadata_tmout = op_timeout + 1000;
659         else
660                 metadata_tmout = 10 * 1000;
661 
662         test_wait_metadata_update(rk,
663                                   exp_mdtopics,
664                                   exp_mdtopic_cnt,
665                                   NULL, 0,
666                                   metadata_tmout);
667 
668         for (i = 0 ; i < MY_CRP_TOPICS_CNT ; i++)
669                 rd_free(topics[i]);
670 
671         if (options)
672                 rd_kafka_AdminOptions_destroy(options);
673 
674         if (!useq)
675                 rd_kafka_queue_destroy(q);
676 
677         TEST_LATER_CHECK();
678 #undef MY_CRP_TOPICS_CNT
679 
680         SUB_TEST_PASS();
681 }
682 
683 
684 
685 /**
686  * @brief Print the ConfigEntrys in the provided array.
687  */
688 static void
test_print_ConfigEntry_array(const rd_kafka_ConfigEntry_t ** entries,size_t entry_cnt,unsigned int depth)689 test_print_ConfigEntry_array (const rd_kafka_ConfigEntry_t **entries,
690                               size_t entry_cnt, unsigned int depth) {
691         const char *indent = &"    "[4 - (depth > 4 ? 4 : depth)];
692         size_t ei;
693 
694         for (ei = 0 ; ei < entry_cnt ; ei++) {
695                 const rd_kafka_ConfigEntry_t *e = entries[ei];
696                 const rd_kafka_ConfigEntry_t **syns;
697                 size_t syn_cnt;
698 
699                 syns = rd_kafka_ConfigEntry_synonyms(e, &syn_cnt);
700 
701 #define YN(v) ((v) ? "y" : "n")
702                 TEST_SAYL(3,
703                           "%s#%"PRIusz"/%"PRIusz
704                           ": Source %s (%d): \"%s\"=\"%s\" "
705                           "[is read-only=%s, default=%s, sensitive=%s, "
706                           "synonym=%s] with %"PRIusz" synonym(s)\n",
707                           indent,
708                           ei, entry_cnt,
709                           rd_kafka_ConfigSource_name(
710                                   rd_kafka_ConfigEntry_source(e)),
711                           rd_kafka_ConfigEntry_source(e),
712                           rd_kafka_ConfigEntry_name(e),
713                           rd_kafka_ConfigEntry_value(e) ?
714                           rd_kafka_ConfigEntry_value(e) : "(NULL)",
715                           YN(rd_kafka_ConfigEntry_is_read_only(e)),
716                           YN(rd_kafka_ConfigEntry_is_default(e)),
717                           YN(rd_kafka_ConfigEntry_is_sensitive(e)),
718                           YN(rd_kafka_ConfigEntry_is_synonym(e)),
719                           syn_cnt);
720 #undef YN
721 
722                 if (syn_cnt > 0)
723                         test_print_ConfigEntry_array(syns, syn_cnt, depth+1);
724         }
725 }
726 
727 
728 /**
729  * @brief Test AlterConfigs
730  */
do_test_AlterConfigs(rd_kafka_t * rk,rd_kafka_queue_t * rkqu)731 static void do_test_AlterConfigs (rd_kafka_t *rk, rd_kafka_queue_t *rkqu) {
732 #define MY_CONFRES_CNT 3
733         char *topics[MY_CONFRES_CNT];
734         rd_kafka_ConfigResource_t *configs[MY_CONFRES_CNT];
735         rd_kafka_AdminOptions_t *options;
736         rd_kafka_resp_err_t exp_err[MY_CONFRES_CNT];
737         rd_kafka_event_t *rkev;
738         rd_kafka_resp_err_t err;
739         const rd_kafka_AlterConfigs_result_t *res;
740         const rd_kafka_ConfigResource_t **rconfigs;
741         size_t rconfig_cnt;
742         char errstr[128];
743         const char *errstr2;
744         int ci = 0;
745         int i;
746         int fails = 0;
747 
748         SUB_TEST_QUICK();
749 
750         /*
751          * Only create one topic, the others will be non-existent.
752          */
753         for (i = 0 ; i < MY_CONFRES_CNT ; i++)
754                 rd_strdupa(&topics[i], test_mk_topic_name(__FUNCTION__, 1));
755 
756         test_CreateTopics_simple(rk, NULL, topics, 1, 1, NULL);
757 
758         test_wait_topic_exists(rk, topics[0], 10000);
759 
760         /*
761          * ConfigResource #0: valid topic config
762          */
763         configs[ci] = rd_kafka_ConfigResource_new(
764                 RD_KAFKA_RESOURCE_TOPIC, topics[ci]);
765 
766         err = rd_kafka_ConfigResource_set_config(configs[ci],
767                                                  "compression.type", "gzip");
768         TEST_ASSERT(!err, "%s", rd_kafka_err2str(err));
769 
770         err = rd_kafka_ConfigResource_set_config(configs[ci],
771                                                  "flush.ms", "12345678");
772         TEST_ASSERT(!err, "%s", rd_kafka_err2str(err));
773 
774         exp_err[ci] = RD_KAFKA_RESP_ERR_NO_ERROR;
775         ci++;
776 
777 
778         if (test_broker_version >= TEST_BRKVER(1, 1, 0, 0)) {
779                 /*
780                  * ConfigResource #1: valid broker config
781                  */
782                 configs[ci] = rd_kafka_ConfigResource_new(
783                         RD_KAFKA_RESOURCE_BROKER,
784                         tsprintf("%"PRId32, avail_brokers[0]));
785 
786                 err = rd_kafka_ConfigResource_set_config(
787                         configs[ci],
788                         "sasl.kerberos.min.time.before.relogin", "58000");
789                 TEST_ASSERT(!err, "%s", rd_kafka_err2str(err));
790 
791                 exp_err[ci] = RD_KAFKA_RESP_ERR_NO_ERROR;
792                 ci++;
793         } else {
794                 TEST_WARN("Skipping RESOURCE_BROKER test on unsupported "
795                           "broker version\n");
796         }
797 
798         /*
799          * ConfigResource #2: valid topic config, non-existent topic
800          */
801         configs[ci] = rd_kafka_ConfigResource_new(
802                 RD_KAFKA_RESOURCE_TOPIC, topics[ci]);
803 
804         err = rd_kafka_ConfigResource_set_config(configs[ci],
805                                                  "compression.type", "lz4");
806         TEST_ASSERT(!err, "%s", rd_kafka_err2str(err));
807 
808         err = rd_kafka_ConfigResource_set_config(configs[ci],
809                                                  "offset.metadata.max.bytes",
810                                                  "12345");
811         TEST_ASSERT(!err, "%s", rd_kafka_err2str(err));
812 
813         if (test_broker_version >= TEST_BRKVER(2, 7, 0, 0))
814                 exp_err[ci] = RD_KAFKA_RESP_ERR_UNKNOWN_TOPIC_OR_PART;
815         else
816                 exp_err[ci] = RD_KAFKA_RESP_ERR_UNKNOWN;
817         ci++;
818 
819 
820         /*
821          * Timeout options
822          */
823         options = rd_kafka_AdminOptions_new(rk, RD_KAFKA_ADMIN_OP_ALTERCONFIGS);
824         err = rd_kafka_AdminOptions_set_request_timeout(options, 10000, errstr,
825                                                         sizeof(errstr));
826         TEST_ASSERT(!err, "%s", errstr);
827 
828 
829         /*
830          * Fire off request
831          */
832         rd_kafka_AlterConfigs(rk, configs, ci, options, rkqu);
833 
834         rd_kafka_AdminOptions_destroy(options);
835 
836         /*
837          * Wait for result
838          */
839         rkev = test_wait_admin_result(rkqu, RD_KAFKA_EVENT_ALTERCONFIGS_RESULT,
840                                       10000+1000);
841 
842         /*
843          * Extract result
844          */
845         res = rd_kafka_event_AlterConfigs_result(rkev);
846         TEST_ASSERT(res, "Expected AlterConfigs result, not %s",
847                     rd_kafka_event_name(rkev));
848 
849         err = rd_kafka_event_error(rkev);
850         errstr2 = rd_kafka_event_error_string(rkev);
851         TEST_ASSERT(!err,
852                     "Expected success, not %s: %s",
853                     rd_kafka_err2name(err), errstr2);
854 
855         rconfigs = rd_kafka_AlterConfigs_result_resources(res, &rconfig_cnt);
856         TEST_ASSERT((int)rconfig_cnt == ci,
857                     "Expected %d result resources, got %"PRIusz"\n",
858                     ci, rconfig_cnt);
859 
860         /*
861          * Verify status per resource
862          */
863         for (i = 0 ; i < (int)rconfig_cnt ; i++) {
864                 const rd_kafka_ConfigEntry_t **entries;
865                 size_t entry_cnt;
866 
867                 err = rd_kafka_ConfigResource_error(rconfigs[i]);
868                 errstr2 = rd_kafka_ConfigResource_error_string(rconfigs[i]);
869 
870                 entries = rd_kafka_ConfigResource_configs(rconfigs[i],
871                                                           &entry_cnt);
872 
873                 TEST_SAY("ConfigResource #%d: type %s (%d), \"%s\": "
874                          "%"PRIusz" ConfigEntries, error %s (%s)\n",
875                          i,
876                          rd_kafka_ResourceType_name(
877                                  rd_kafka_ConfigResource_type(rconfigs[i])),
878                          rd_kafka_ConfigResource_type(rconfigs[i]),
879                          rd_kafka_ConfigResource_name(rconfigs[i]),
880                          entry_cnt,
881                          rd_kafka_err2name(err), errstr2 ? errstr2 : "");
882 
883                 test_print_ConfigEntry_array(entries, entry_cnt, 1);
884 
885                 if (rd_kafka_ConfigResource_type(rconfigs[i]) !=
886                     rd_kafka_ConfigResource_type(configs[i]) ||
887                     strcmp(rd_kafka_ConfigResource_name(rconfigs[i]),
888                            rd_kafka_ConfigResource_name(configs[i]))) {
889                         TEST_FAIL_LATER(
890                                 "ConfigResource #%d: "
891                                 "expected type %s name %s, "
892                                 "got type %s name %s",
893                                 i,
894                                 rd_kafka_ResourceType_name(rd_kafka_ConfigResource_type(configs[i])),
895                                 rd_kafka_ConfigResource_name(configs[i]),
896                                 rd_kafka_ResourceType_name(rd_kafka_ConfigResource_type(rconfigs[i])),
897                                 rd_kafka_ConfigResource_name(rconfigs[i]));
898                         fails++;
899                         continue;
900                 }
901 
902 
903                 if (err != exp_err[i]) {
904                         TEST_FAIL_LATER("ConfigResource #%d: "
905                                         "expected %s (%d), got %s (%s)",
906                                         i,
907                                         rd_kafka_err2name(exp_err[i]),
908                                         exp_err[i],
909                                         rd_kafka_err2name(err),
910                                         errstr2 ? errstr2 : "");
911                         fails++;
912                 }
913         }
914 
915         TEST_ASSERT(!fails, "See %d previous failure(s)", fails);
916 
917         rd_kafka_event_destroy(rkev);
918 
919         rd_kafka_ConfigResource_destroy_array(configs, ci);
920 
921         TEST_LATER_CHECK();
922 #undef MY_CONFRES_CNT
923 
924         SUB_TEST_PASS();
925 }
926 
927 
928 
929 /**
930  * @brief Test DescribeConfigs
931  */
do_test_DescribeConfigs(rd_kafka_t * rk,rd_kafka_queue_t * rkqu)932 static void do_test_DescribeConfigs (rd_kafka_t *rk, rd_kafka_queue_t *rkqu) {
933 #define MY_CONFRES_CNT 3
934         char *topics[MY_CONFRES_CNT];
935         rd_kafka_ConfigResource_t *configs[MY_CONFRES_CNT];
936         rd_kafka_AdminOptions_t *options;
937         rd_kafka_resp_err_t exp_err[MY_CONFRES_CNT];
938         rd_kafka_event_t *rkev;
939         rd_kafka_resp_err_t err;
940         const rd_kafka_DescribeConfigs_result_t *res;
941         const rd_kafka_ConfigResource_t **rconfigs;
942         size_t rconfig_cnt;
943         char errstr[128];
944         const char *errstr2;
945         int ci = 0;
946         int i;
947         int fails = 0;
948         int max_retry_describe = 3;
949 
950         SUB_TEST_QUICK();
951 
952         /*
953          * Only create one topic, the others will be non-existent.
954          */
955         rd_strdupa(&topics[0], test_mk_topic_name("DescribeConfigs_exist", 1));
956         for (i = 1 ; i < MY_CONFRES_CNT ; i++)
957                 rd_strdupa(&topics[i],
958                            test_mk_topic_name("DescribeConfigs_notexist", 1));
959 
960         test_CreateTopics_simple(rk, NULL, topics, 1, 1, NULL);
961 
962         /*
963          * ConfigResource #0: topic config, no config entries.
964          */
965         configs[ci] = rd_kafka_ConfigResource_new(
966                 RD_KAFKA_RESOURCE_TOPIC, topics[ci]);
967         exp_err[ci] = RD_KAFKA_RESP_ERR_NO_ERROR;
968         ci++;
969 
970         /*
971          * ConfigResource #1:broker config, no config entries
972          */
973         configs[ci] = rd_kafka_ConfigResource_new(
974                 RD_KAFKA_RESOURCE_BROKER,
975                 tsprintf("%"PRId32, avail_brokers[0]));
976 
977         exp_err[ci] = RD_KAFKA_RESP_ERR_NO_ERROR;
978         ci++;
979 
980         /*
981          * ConfigResource #2: topic config, non-existent topic, no config entr.
982          */
983         configs[ci] = rd_kafka_ConfigResource_new(
984                 RD_KAFKA_RESOURCE_TOPIC, topics[ci]);
985         /* FIXME: This is a bug in the broker (<v2.0.0), it returns a full response
986          *        for unknown topics.
987          *        https://issues.apache.org/jira/browse/KAFKA-6778
988          */
989         if (test_broker_version < TEST_BRKVER(2,0,0,0))
990                 exp_err[ci] = RD_KAFKA_RESP_ERR_NO_ERROR;
991         else
992                 exp_err[ci] = RD_KAFKA_RESP_ERR_UNKNOWN_TOPIC_OR_PART;
993         ci++;
994 
995 
996  retry_describe:
997         /*
998          * Timeout options
999          */
1000         options = rd_kafka_AdminOptions_new(rk, RD_KAFKA_ADMIN_OP_ANY);
1001         err = rd_kafka_AdminOptions_set_request_timeout(options, 10000, errstr,
1002                                                         sizeof(errstr));
1003         TEST_ASSERT(!err, "%s", errstr);
1004 
1005 
1006         /*
1007          * Fire off request
1008          */
1009         rd_kafka_DescribeConfigs(rk, configs, ci, options, rkqu);
1010 
1011         rd_kafka_AdminOptions_destroy(options);
1012 
1013         /*
1014          * Wait for result
1015          */
1016         rkev = test_wait_admin_result(rkqu,
1017                                       RD_KAFKA_EVENT_DESCRIBECONFIGS_RESULT,
1018                                       10000+1000);
1019 
1020         /*
1021          * Extract result
1022          */
1023         res = rd_kafka_event_DescribeConfigs_result(rkev);
1024         TEST_ASSERT(res, "Expected DescribeConfigs result, not %s",
1025                     rd_kafka_event_name(rkev));
1026 
1027         err = rd_kafka_event_error(rkev);
1028         errstr2 = rd_kafka_event_error_string(rkev);
1029         TEST_ASSERT(!err,
1030                     "Expected success, not %s: %s",
1031                     rd_kafka_err2name(err), errstr2);
1032 
1033         rconfigs = rd_kafka_DescribeConfigs_result_resources(res, &rconfig_cnt);
1034         TEST_ASSERT((int)rconfig_cnt == ci,
1035                     "Expected %d result resources, got %"PRIusz"\n",
1036                     ci, rconfig_cnt);
1037 
1038         /*
1039          * Verify status per resource
1040          */
1041         for (i = 0 ; i < (int)rconfig_cnt ; i++) {
1042                 const rd_kafka_ConfigEntry_t **entries;
1043                 size_t entry_cnt;
1044 
1045                 err = rd_kafka_ConfigResource_error(rconfigs[i]);
1046                 errstr2 = rd_kafka_ConfigResource_error_string(rconfigs[i]);
1047 
1048                 entries = rd_kafka_ConfigResource_configs(rconfigs[i],
1049                                                           &entry_cnt);
1050 
1051                 TEST_SAY("ConfigResource #%d: type %s (%d), \"%s\": "
1052                          "%"PRIusz" ConfigEntries, error %s (%s)\n",
1053                          i,
1054                          rd_kafka_ResourceType_name(
1055                                  rd_kafka_ConfigResource_type(rconfigs[i])),
1056                          rd_kafka_ConfigResource_type(rconfigs[i]),
1057                          rd_kafka_ConfigResource_name(rconfigs[i]),
1058                          entry_cnt,
1059                          rd_kafka_err2name(err), errstr2 ? errstr2 : "");
1060 
1061                 test_print_ConfigEntry_array(entries, entry_cnt, 1);
1062 
1063                 if (rd_kafka_ConfigResource_type(rconfigs[i]) !=
1064                     rd_kafka_ConfigResource_type(configs[i]) ||
1065                     strcmp(rd_kafka_ConfigResource_name(rconfigs[i]),
1066                            rd_kafka_ConfigResource_name(configs[i]))) {
1067                         TEST_FAIL_LATER(
1068                                 "ConfigResource #%d: "
1069                                 "expected type %s name %s, "
1070                                 "got type %s name %s",
1071                                 i,
1072                                 rd_kafka_ResourceType_name(rd_kafka_ConfigResource_type(configs[i])),
1073                                 rd_kafka_ConfigResource_name(configs[i]),
1074                                 rd_kafka_ResourceType_name(rd_kafka_ConfigResource_type(rconfigs[i])),
1075                                 rd_kafka_ConfigResource_name(rconfigs[i]));
1076                         fails++;
1077                         continue;
1078                 }
1079 
1080 
1081                 if (err != exp_err[i]) {
1082                         if (err == RD_KAFKA_RESP_ERR_UNKNOWN_TOPIC_OR_PART &&
1083                             max_retry_describe-- > 0) {
1084                                 TEST_WARN("ConfigResource #%d: "
1085                                           "expected %s (%d), got %s (%s): "
1086                                           "this is typically a temporary "
1087                                           "error while the new resource "
1088                                           "is propagating: retrying",
1089                                           i,
1090                                           rd_kafka_err2name(exp_err[i]),
1091                                           exp_err[i],
1092                                           rd_kafka_err2name(err),
1093                                           errstr2 ? errstr2 : "");
1094                                 rd_kafka_event_destroy(rkev);
1095                                 rd_sleep(1);
1096                                 goto retry_describe;
1097                         }
1098 
1099                         TEST_FAIL_LATER("ConfigResource #%d: "
1100                                         "expected %s (%d), got %s (%s)",
1101                                         i,
1102                                         rd_kafka_err2name(exp_err[i]),
1103                                         exp_err[i],
1104                                         rd_kafka_err2name(err),
1105                                         errstr2 ? errstr2 : "");
1106                         fails++;
1107                 }
1108         }
1109 
1110         TEST_ASSERT(!fails, "See %d previous failure(s)", fails);
1111 
1112         rd_kafka_event_destroy(rkev);
1113 
1114         rd_kafka_ConfigResource_destroy_array(configs, ci);
1115 
1116         TEST_LATER_CHECK();
1117 #undef MY_CONFRES_CNT
1118 
1119         SUB_TEST_PASS();
1120 }
1121 
1122 
1123 
1124 /**
1125  * @brief Verify that an unclean rd_kafka_destroy() does not hang.
1126  */
do_test_unclean_destroy(rd_kafka_type_t cltype,int with_mainq)1127 static void do_test_unclean_destroy (rd_kafka_type_t cltype, int with_mainq) {
1128         rd_kafka_t *rk;
1129         char errstr[512];
1130         rd_kafka_conf_t *conf;
1131         rd_kafka_queue_t *q;
1132         rd_kafka_NewTopic_t *topic;
1133         test_timing_t t_destroy;
1134 
1135         SUB_TEST_QUICK("Test unclean destroy using %s",
1136                        with_mainq ? "mainq" : "tempq");
1137 
1138         test_conf_init(&conf, NULL, 0);
1139 
1140         rk = rd_kafka_new(cltype, conf, errstr, sizeof(errstr));
1141         TEST_ASSERT(rk, "kafka_new(%d): %s", cltype, errstr);
1142 
1143         if (with_mainq)
1144                 q = rd_kafka_queue_get_main(rk);
1145         else
1146                 q = rd_kafka_queue_new(rk);
1147 
1148         topic = rd_kafka_NewTopic_new(test_mk_topic_name(__FUNCTION__, 1),
1149                                       3, 1, NULL, 0);
1150         rd_kafka_CreateTopics(rk, &topic, 1, NULL, q);
1151         rd_kafka_NewTopic_destroy(topic);
1152 
1153         rd_kafka_queue_destroy(q);
1154 
1155         TEST_SAY("Giving rd_kafka_destroy() 5s to finish, "
1156                  "despite Admin API request being processed\n");
1157         test_timeout_set(5);
1158         TIMING_START(&t_destroy, "rd_kafka_destroy()");
1159         rd_kafka_destroy(rk);
1160         TIMING_STOP(&t_destroy);
1161 
1162         SUB_TEST_PASS();
1163 
1164         /* Restore timeout */
1165         test_timeout_set(60);
1166 }
1167 
1168 
1169 
1170 
1171 /**
1172   * @brief Test deletion of records
1173   *
1174   *
1175   */
do_test_DeleteRecords(const char * what,rd_kafka_t * rk,rd_kafka_queue_t * useq,int op_timeout)1176 static void do_test_DeleteRecords (const char *what,
1177                                    rd_kafka_t *rk, rd_kafka_queue_t *useq,
1178                                    int op_timeout) {
1179         rd_kafka_queue_t *q;
1180         rd_kafka_AdminOptions_t *options = NULL;
1181         rd_kafka_topic_partition_list_t *offsets = NULL;
1182         rd_kafka_event_t *rkev = NULL;
1183         rd_kafka_resp_err_t err;
1184         char errstr[512];
1185         const char *errstr2;
1186 #define MY_DEL_RECORDS_CNT 3
1187         rd_kafka_topic_partition_list_t *results = NULL;
1188         int i;
1189         const int partitions_cnt = 3;
1190         const int msgs_cnt = 100;
1191         char *topics[MY_DEL_RECORDS_CNT];
1192         rd_kafka_metadata_topic_t exp_mdtopics[MY_DEL_RECORDS_CNT] = {{0}};
1193         int exp_mdtopic_cnt = 0;
1194         test_timing_t timing;
1195         rd_kafka_resp_err_t exp_err = RD_KAFKA_RESP_ERR_NO_ERROR;
1196         rd_kafka_DeleteRecords_t *del_records;
1197         const rd_kafka_DeleteRecords_result_t *res;
1198 
1199         SUB_TEST_QUICK("%s DeleteRecords with %s, op_timeout %d",
1200                        rd_kafka_name(rk), what, op_timeout);
1201 
1202         q = useq ? useq : rd_kafka_queue_new(rk);
1203 
1204         if (op_timeout != -1) {
1205                 options = rd_kafka_AdminOptions_new(
1206                         rk, RD_KAFKA_ADMIN_OP_ANY);
1207 
1208                 err = rd_kafka_AdminOptions_set_operation_timeout(
1209                         options, op_timeout, errstr, sizeof(errstr));
1210                 TEST_ASSERT(!err, "%s", rd_kafka_err2str(err));
1211         }
1212 
1213 
1214         for (i = 0 ; i < MY_DEL_RECORDS_CNT ; i++) {
1215                 char pfx[32];
1216                 char *topic;
1217 
1218                 rd_snprintf(pfx, sizeof(pfx), "DeleteRecords-topic%d", i);
1219                 topic = rd_strdup(test_mk_topic_name(pfx, 1));
1220 
1221                 topics[i] = topic;
1222                 exp_mdtopics[exp_mdtopic_cnt++].topic = topic;
1223         }
1224 
1225         /* Create the topics first. */
1226         test_CreateTopics_simple(rk, NULL, topics,
1227                                  MY_DEL_RECORDS_CNT,
1228                                  partitions_cnt /*num_partitions*/,
1229                                  NULL);
1230 
1231         /* Verify that topics are reported by metadata */
1232         test_wait_metadata_update(rk,
1233                                   exp_mdtopics, exp_mdtopic_cnt,
1234                                   NULL, 0,
1235                                   15*1000);
1236 
1237         /* Produce 100 msgs / partition */
1238         for (i = 0 ; i < MY_DEL_RECORDS_CNT; i++ ) {
1239                 int32_t partition;
1240                 for (partition = 0 ; partition < partitions_cnt; partition++ ) {
1241                         test_produce_msgs_easy(topics[i], 0, partition,
1242                                                msgs_cnt);
1243                 }
1244         }
1245 
1246         offsets = rd_kafka_topic_partition_list_new(10);
1247 
1248         /* Wipe all data from topic 0 */
1249         for (i = 0 ; i < partitions_cnt; i++)
1250                 rd_kafka_topic_partition_list_add(offsets, topics[0], i)->
1251                         offset = RD_KAFKA_OFFSET_END;
1252 
1253         /* Wipe all data from partition 0 in topic 1 */
1254         rd_kafka_topic_partition_list_add(offsets, topics[1], 0)->
1255                 offset = RD_KAFKA_OFFSET_END;
1256 
1257         /* Wipe some data from partition 2 in topic 1 */
1258         rd_kafka_topic_partition_list_add(offsets, topics[1], 2)->
1259                 offset = msgs_cnt / 2;
1260 
1261         /* Not changing the offset (out of range) for topic 2 partition 0 */
1262         rd_kafka_topic_partition_list_add(offsets, topics[2], 0);
1263 
1264         /* Offset out of range for topic 2 partition 1 */
1265         rd_kafka_topic_partition_list_add(offsets, topics[2], 1)->
1266                 offset = msgs_cnt + 1;
1267 
1268         del_records = rd_kafka_DeleteRecords_new(offsets);
1269 
1270         TIMING_START(&timing, "DeleteRecords");
1271         TEST_SAY("Call DeleteRecords\n");
1272         rd_kafka_DeleteRecords(rk, &del_records, 1, options, q);
1273         TIMING_ASSERT_LATER(&timing, 0, 50);
1274 
1275         rd_kafka_DeleteRecords_destroy(del_records);
1276 
1277         TIMING_START(&timing, "DeleteRecords.queue_poll");
1278 
1279         /* Poll result queue for DeleteRecords result.
1280          * Print but otherwise ignore other event types
1281          * (typically generic Error events). */
1282         while (1) {
1283                 rkev = rd_kafka_queue_poll(q, tmout_multip(20*1000));
1284                 TEST_SAY("DeleteRecords: got %s in %.3fms\n",
1285                          rd_kafka_event_name(rkev),
1286                          TIMING_DURATION(&timing) / 1000.0f);
1287                 if (rkev == NULL)
1288                         continue;
1289                 if (rd_kafka_event_error(rkev))
1290                         TEST_SAY("%s: %s\n",
1291                                  rd_kafka_event_name(rkev),
1292                                  rd_kafka_event_error_string(rkev));
1293 
1294                 if (rd_kafka_event_type(rkev) ==
1295                     RD_KAFKA_EVENT_DELETERECORDS_RESULT) {
1296                         break;
1297                 }
1298 
1299                 rd_kafka_event_destroy(rkev);
1300         }
1301         /* Convert event to proper result */
1302         res = rd_kafka_event_DeleteRecords_result(rkev);
1303         TEST_ASSERT(res, "expected DeleteRecords_result, not %s",
1304                     rd_kafka_event_name(rkev));
1305 
1306         /* Expecting error */
1307         err = rd_kafka_event_error(rkev);
1308         errstr2 = rd_kafka_event_error_string(rkev);
1309         TEST_ASSERT(err == exp_err,
1310                     "expected DeleteRecords to return %s, not %s (%s)",
1311                     rd_kafka_err2str(exp_err),
1312                     rd_kafka_err2str(err),
1313                     err ? errstr2 : "n/a");
1314 
1315         TEST_SAY("DeleteRecords: returned %s (%s)\n",
1316                  rd_kafka_err2str(err), err ? errstr2 : "n/a");
1317 
1318         results = rd_kafka_topic_partition_list_copy(
1319                 rd_kafka_DeleteRecords_result_offsets(res));
1320 
1321         /* Sort both input and output list */
1322         rd_kafka_topic_partition_list_sort(offsets, NULL, NULL);
1323         rd_kafka_topic_partition_list_sort(results, NULL, NULL);
1324 
1325         TEST_SAY("Input partitions:\n");
1326         test_print_partition_list(offsets);
1327         TEST_SAY("Result partitions:\n");
1328         test_print_partition_list(results);
1329 
1330         TEST_ASSERT(offsets->cnt == results->cnt,
1331                     "expected DeleteRecords_result_offsets to return %d items, "
1332                     "not %d",
1333                     offsets->cnt,
1334                     results->cnt);
1335 
1336         for (i = 0 ; i < results->cnt ; i++) {
1337                 const rd_kafka_topic_partition_t *input =&offsets->elems[i];
1338                 const rd_kafka_topic_partition_t *output = &results->elems[i];
1339                 int64_t expected_offset = input->offset;
1340                 rd_kafka_resp_err_t expected_err = 0;
1341 
1342                 if (expected_offset == RD_KAFKA_OFFSET_END)
1343                         expected_offset = msgs_cnt;
1344 
1345                 /* Expect Offset out of range error */
1346                 if (input->offset < RD_KAFKA_OFFSET_END ||
1347                     input->offset > msgs_cnt)
1348                         expected_err = 1;
1349 
1350                 TEST_SAY("DeleteRecords Returned %s for %s [%"PRId32"] "
1351                          "low-watermark = %d\n",
1352                          rd_kafka_err2name(output->err),
1353                          output->topic,
1354                          output->partition,
1355                          (int)output->offset);
1356 
1357                 if (strcmp(output->topic, input->topic))
1358                         TEST_FAIL_LATER("Result order mismatch at #%d: "
1359                                         "expected topic %s, got %s",
1360                                         i,
1361                                         input->topic,
1362                                         output->topic);
1363 
1364                 if (output->partition != input->partition)
1365                         TEST_FAIL_LATER("Result order mismatch at #%d: "
1366                                         "expected partition %d, got %d",
1367                                         i,
1368                                         input->partition,
1369                                         output->partition);
1370 
1371                 if (output->err != expected_err)
1372                         TEST_FAIL_LATER("%s [%"PRId32"]: "
1373                                         "expected error code %d (%s), "
1374                                         "got %d (%s)",
1375                                         output->topic,
1376                                         output->partition,
1377                                         expected_err,
1378                                         rd_kafka_err2str(expected_err),
1379                                         output->err,
1380                                         rd_kafka_err2str(output->err));
1381 
1382                 if (output->err == 0 && output->offset != expected_offset)
1383                         TEST_FAIL_LATER("%s [%"PRId32"]: "
1384                                         "expected offset %"PRId64", "
1385                                         "got %"PRId64,
1386                                         output->topic,
1387                                         output->partition,
1388                                         expected_offset,
1389                                         output->offset);
1390         }
1391 
1392         /* Check watermarks for partitions */
1393         for (i = 0 ; i < MY_DEL_RECORDS_CNT; i++ ) {
1394                 int32_t partition;
1395                 for (partition = 0 ; partition < partitions_cnt; partition++ ) {
1396                         const rd_kafka_topic_partition_t *del =
1397                                 rd_kafka_topic_partition_list_find(
1398                                         results, topics[i], partition);
1399                         int64_t expected_low = 0;
1400                         int64_t expected_high = msgs_cnt;
1401                         int64_t low, high;
1402 
1403                         if (del && del->err == 0) {
1404                                 expected_low = del->offset;
1405                         }
1406 
1407                         err = rd_kafka_query_watermark_offsets(
1408                                 rk, topics[i], partition,
1409                                 &low, &high, tmout_multip(10000));
1410                         if (err)
1411                                 TEST_FAIL("query_watermark_offsets failed: "
1412                                           "%s\n",
1413                                           rd_kafka_err2str(err));
1414 
1415                         if (low != expected_low)
1416                                 TEST_FAIL_LATER("For %s [%"PRId32"] expected "
1417                                                 "a low watermark of %"PRId64
1418                                                 ", got %"PRId64,
1419                                                 topics[i],
1420                                                 partition,
1421                                                 expected_low,
1422                                                 low);
1423 
1424                         if (high != expected_high)
1425                                 TEST_FAIL_LATER("For %s [%"PRId32"] expected "
1426                                                 "a high watermark of %"PRId64
1427                                                 ", got %"PRId64,
1428                                                 topics[i],
1429                                                 partition,
1430                                                 expected_high,
1431                                                 high);
1432                 }
1433         }
1434 
1435         rd_kafka_event_destroy(rkev);
1436 
1437         for (i = 0 ; i < MY_DEL_RECORDS_CNT ; i++)
1438                 rd_free(topics[i]);
1439 
1440         if (results)
1441                 rd_kafka_topic_partition_list_destroy(results);
1442 
1443         if (offsets)
1444                 rd_kafka_topic_partition_list_destroy(offsets);
1445 
1446         if (options)
1447                 rd_kafka_AdminOptions_destroy(options);
1448 
1449         if (!useq)
1450                 rd_kafka_queue_destroy(q);
1451 
1452         TEST_LATER_CHECK();
1453 #undef MY_DEL_RECORDS_CNT
1454 
1455         SUB_TEST_PASS();
1456 }
1457 
1458 /**
1459   * @brief Test deletion of groups
1460   *
1461   *
1462   */
1463 
1464 typedef struct expected_group_result {
1465         char *group;
1466         rd_kafka_resp_err_t err;
1467 } expected_group_result_t;
1468 
do_test_DeleteGroups(const char * what,rd_kafka_t * rk,rd_kafka_queue_t * useq,int op_timeout)1469 static void do_test_DeleteGroups (const char *what,
1470                                   rd_kafka_t *rk, rd_kafka_queue_t *useq,
1471                                   int op_timeout) {
1472         rd_kafka_queue_t *q;
1473         rd_kafka_AdminOptions_t *options = NULL;
1474         rd_kafka_event_t *rkev = NULL;
1475         rd_kafka_resp_err_t err;
1476         char errstr[512];
1477         const char *errstr2;
1478 #define MY_DEL_GROUPS_CNT 4
1479         int known_groups = MY_DEL_GROUPS_CNT - 1;
1480         int i;
1481         const int partitions_cnt = 1;
1482         const int msgs_cnt = 100;
1483         char *topic;
1484         rd_kafka_metadata_topic_t exp_mdtopic = {0};
1485         int64_t testid = test_id_generate();
1486         test_timing_t timing;
1487         rd_kafka_resp_err_t exp_err = RD_KAFKA_RESP_ERR_NO_ERROR;
1488         const rd_kafka_group_result_t **results = NULL;
1489         expected_group_result_t expected[MY_DEL_GROUPS_CNT] = {{0}};
1490         rd_kafka_DeleteGroup_t *del_groups[MY_DEL_GROUPS_CNT];
1491         const rd_kafka_DeleteGroups_result_t *res;
1492 
1493         SUB_TEST_QUICK("%s DeleteGroups with %s, op_timeout %d",
1494                        rd_kafka_name(rk), what, op_timeout);
1495 
1496         q = useq ? useq : rd_kafka_queue_new(rk);
1497 
1498         if (op_timeout != -1) {
1499                 options = rd_kafka_AdminOptions_new(
1500                         rk, RD_KAFKA_ADMIN_OP_ANY);
1501 
1502                 err = rd_kafka_AdminOptions_set_operation_timeout(
1503                         options, op_timeout, errstr, sizeof(errstr));
1504                 TEST_ASSERT(!err, "%s", rd_kafka_err2str(err));
1505         }
1506 
1507 
1508         topic = rd_strdup(test_mk_topic_name(__FUNCTION__, 1));
1509         exp_mdtopic.topic = topic;
1510 
1511         /* Create the topics first. */
1512         test_CreateTopics_simple(rk, NULL, &topic, 1,
1513                                  partitions_cnt,
1514                                  NULL);
1515 
1516         /* Verify that topics are reported by metadata */
1517         test_wait_metadata_update(rk,
1518                                   &exp_mdtopic, 1,
1519                                   NULL, 0,
1520                                   15*1000);
1521 
1522         /* Produce 100 msgs */
1523         test_produce_msgs_easy(topic, testid, 0, msgs_cnt);
1524 
1525         for (i = 0; i < MY_DEL_GROUPS_CNT; i++) {
1526                 char *group = rd_strdup(test_mk_topic_name(__FUNCTION__, 1));
1527                 if (i < known_groups) {
1528                         test_consume_msgs_easy(group, topic, testid, -1, msgs_cnt, NULL);
1529                         expected[i].group = group;
1530                         expected[i].err = RD_KAFKA_RESP_ERR_NO_ERROR;
1531                 } else {
1532                         expected[i].group = group;
1533                         expected[i].err = RD_KAFKA_RESP_ERR_GROUP_ID_NOT_FOUND;
1534                 }
1535                 del_groups[i] = rd_kafka_DeleteGroup_new(group);
1536         }
1537 
1538         TIMING_START(&timing, "DeleteGroups");
1539         TEST_SAY("Call DeleteGroups\n");
1540         rd_kafka_DeleteGroups(rk, del_groups, MY_DEL_GROUPS_CNT, options, q);
1541         TIMING_ASSERT_LATER(&timing, 0, 50);
1542 
1543         TIMING_START(&timing, "DeleteGroups.queue_poll");
1544 
1545         /* Poll result queue for DeleteGroups result.
1546          * Print but otherwise ignore other event types
1547          * (typically generic Error events). */
1548         while(1) {
1549                 rkev = rd_kafka_queue_poll(q, tmout_multip(20*1000));
1550                 TEST_SAY("DeleteGroups: got %s in %.3fms\n",
1551                          rd_kafka_event_name(rkev),
1552                          TIMING_DURATION(&timing) / 1000.0f);
1553                 if (rkev == NULL)
1554                         continue;
1555                 if (rd_kafka_event_error(rkev))
1556                         TEST_SAY("%s: %s\n",
1557                                  rd_kafka_event_name(rkev),
1558                                  rd_kafka_event_error_string(rkev));
1559 
1560                 if (rd_kafka_event_type(rkev) ==
1561                     RD_KAFKA_EVENT_DELETEGROUPS_RESULT) {
1562                         break;
1563                 }
1564 
1565                 rd_kafka_event_destroy(rkev);
1566         }
1567         /* Convert event to proper result */
1568         res = rd_kafka_event_DeleteGroups_result(rkev);
1569         TEST_ASSERT(res, "expected DeleteGroups_result, not %s",
1570                     rd_kafka_event_name(rkev));
1571 
1572         /* Expecting error */
1573         err = rd_kafka_event_error(rkev);
1574         errstr2 = rd_kafka_event_error_string(rkev);
1575         TEST_ASSERT(err == exp_err,
1576                     "expected DeleteGroups to return %s, not %s (%s)",
1577                     rd_kafka_err2str(exp_err),
1578                     rd_kafka_err2str(err),
1579                     err ? errstr2 : "n/a");
1580 
1581         TEST_SAY("DeleteGroups: returned %s (%s)\n",
1582                  rd_kafka_err2str(err), err ? errstr2 : "n/a");
1583 
1584         size_t cnt = 0;
1585         results = rd_kafka_DeleteGroups_result_groups(res, &cnt);
1586 
1587         TEST_ASSERT(MY_DEL_GROUPS_CNT == cnt,
1588                     "expected DeleteGroups_result_groups to return %d items, not %"PRIusz,
1589                     MY_DEL_GROUPS_CNT,
1590                     cnt);
1591 
1592         for (i = 0 ; i < MY_DEL_GROUPS_CNT ; i++) {
1593                 const expected_group_result_t *exp = &expected[i];
1594                 rd_kafka_resp_err_t exp_err = exp->err;
1595                 const rd_kafka_group_result_t *act = results[i];
1596                 rd_kafka_resp_err_t act_err = rd_kafka_error_code(rd_kafka_group_result_error(act));
1597                 TEST_ASSERT(strcmp(exp->group, rd_kafka_group_result_name(act)) == 0,
1598                             "Result order mismatch at #%d: expected group name to be %s, not %s",
1599                             i, exp->group, rd_kafka_group_result_name(act));
1600                 TEST_ASSERT(exp_err == act_err,
1601                             "expected err=%d for group %s, not %d (%s)",
1602                             exp_err,
1603                             exp->group,
1604                             act_err,
1605                             rd_kafka_err2str(act_err));
1606         }
1607 
1608         rd_kafka_event_destroy(rkev);
1609 
1610         for (i = 0 ; i < MY_DEL_GROUPS_CNT ; i++) {
1611                 rd_kafka_DeleteGroup_destroy(del_groups[i]);
1612                 rd_free(expected[i].group);
1613         }
1614 
1615         rd_free(topic);
1616 
1617         if (options)
1618                 rd_kafka_AdminOptions_destroy(options);
1619 
1620         if (!useq)
1621                 rd_kafka_queue_destroy(q);
1622 
1623         TEST_LATER_CHECK();
1624 #undef MY_DEL_GROUPS_CNT
1625 
1626         SUB_TEST_PASS();
1627 }
1628 
1629 
1630 /**
1631   * @brief Test deletion of committed offsets.
1632   *
1633   *
1634   */
do_test_DeleteConsumerGroupOffsets(const char * what,rd_kafka_t * rk,rd_kafka_queue_t * useq,int op_timeout,rd_bool_t sub_consumer)1635 static void do_test_DeleteConsumerGroupOffsets (const char *what,
1636                                                 rd_kafka_t *rk,
1637                                                 rd_kafka_queue_t *useq,
1638                                                 int op_timeout,
1639                                                 rd_bool_t sub_consumer) {
1640         rd_kafka_queue_t *q;
1641         rd_kafka_AdminOptions_t *options = NULL;
1642         rd_kafka_topic_partition_list_t *orig_offsets, *offsets,
1643                 *to_delete, *committed, *deleted, *subscription = NULL;
1644         rd_kafka_event_t *rkev = NULL;
1645         rd_kafka_resp_err_t err;
1646         char errstr[512];
1647         const char *errstr2;
1648 #define MY_TOPIC_CNT 3
1649         int i;
1650         const int partitions_cnt = 3;
1651         char *topics[MY_TOPIC_CNT];
1652         rd_kafka_metadata_topic_t exp_mdtopics[MY_TOPIC_CNT] = {{0}};
1653         int exp_mdtopic_cnt = 0;
1654         test_timing_t timing;
1655         rd_kafka_resp_err_t exp_err = RD_KAFKA_RESP_ERR_NO_ERROR;
1656         rd_kafka_DeleteConsumerGroupOffsets_t *cgoffsets;
1657         const rd_kafka_DeleteConsumerGroupOffsets_result_t *res;
1658         const rd_kafka_group_result_t **gres;
1659         size_t gres_cnt;
1660         rd_kafka_t *consumer;
1661         char *groupid;
1662 
1663         SUB_TEST_QUICK("%s DeleteConsumerGroupOffsets with %s, op_timeout %d%s",
1664                        rd_kafka_name(rk), what, op_timeout,
1665                        sub_consumer ? ", with subscribing consumer" : "");
1666 
1667         if (sub_consumer)
1668                 exp_err = RD_KAFKA_RESP_ERR_GROUP_SUBSCRIBED_TO_TOPIC;
1669 
1670         q = useq ? useq : rd_kafka_queue_new(rk);
1671 
1672         if (op_timeout != -1) {
1673                 options = rd_kafka_AdminOptions_new(
1674                         rk, RD_KAFKA_ADMIN_OP_ANY);
1675 
1676                 err = rd_kafka_AdminOptions_set_operation_timeout(
1677                         options, op_timeout, errstr, sizeof(errstr));
1678                 TEST_ASSERT(!err, "%s", rd_kafka_err2str(err));
1679         }
1680 
1681 
1682         subscription = rd_kafka_topic_partition_list_new(MY_TOPIC_CNT);
1683 
1684         for (i = 0 ; i < MY_TOPIC_CNT ; i++) {
1685                 char pfx[64];
1686                 char *topic;
1687 
1688                 rd_snprintf(pfx, sizeof(pfx),
1689                             "DCGO-topic%d", i);
1690                 topic = rd_strdup(test_mk_topic_name(pfx, 1));
1691 
1692                 topics[i] = topic;
1693                 exp_mdtopics[exp_mdtopic_cnt++].topic = topic;
1694 
1695                 rd_kafka_topic_partition_list_add(subscription, topic,
1696                                                   RD_KAFKA_PARTITION_UA);
1697         }
1698 
1699         groupid = topics[0];
1700 
1701         /* Create the topics first. */
1702         test_CreateTopics_simple(rk, NULL, topics, MY_TOPIC_CNT,
1703                                  partitions_cnt, NULL);
1704 
1705         /* Verify that topics are reported by metadata */
1706         test_wait_metadata_update(rk,
1707                                   exp_mdtopics, exp_mdtopic_cnt,
1708                                   NULL, 0,
1709                                   15*1000);
1710 
1711         rd_sleep(1); /* Additional wait time for cluster propagation */
1712 
1713         consumer = test_create_consumer(groupid, NULL, NULL, NULL);
1714 
1715         if (sub_consumer) {
1716                 TEST_CALL_ERR__(rd_kafka_subscribe(consumer, subscription));
1717                 test_consumer_wait_assignment(consumer, rd_true);
1718         }
1719 
1720         /* Commit some offsets */
1721         orig_offsets = rd_kafka_topic_partition_list_new(MY_TOPIC_CNT * 2);
1722         for (i = 0 ; i < MY_TOPIC_CNT * 2 ; i++)
1723                 rd_kafka_topic_partition_list_add(
1724                         orig_offsets, topics[i/2],
1725                         i % MY_TOPIC_CNT)->offset = (i+1)*10;
1726 
1727         TEST_CALL_ERR__(rd_kafka_commit(consumer, orig_offsets, 0/*sync*/));
1728 
1729         /* Verify committed offsets match */
1730         committed = rd_kafka_topic_partition_list_copy(orig_offsets);
1731         TEST_CALL_ERR__(rd_kafka_committed(consumer, committed,
1732                                            tmout_multip(5*1000)));
1733 
1734         if (test_partition_list_cmp(committed, orig_offsets)) {
1735                 TEST_SAY("commit() list:\n");
1736                 test_print_partition_list(orig_offsets);
1737                 TEST_SAY("committed() list:\n");
1738                 test_print_partition_list(committed);
1739                 TEST_FAIL("committed offsets don't match");
1740         }
1741 
1742         rd_kafka_topic_partition_list_destroy(committed);
1743 
1744         /* Now delete second half of the commits */
1745         offsets = rd_kafka_topic_partition_list_new(orig_offsets->cnt / 2);
1746         to_delete = rd_kafka_topic_partition_list_new(orig_offsets->cnt / 2);
1747         for (i = 0 ; i < orig_offsets->cnt ; i++) {
1748                 if (i < orig_offsets->cnt / 2)
1749                         rd_kafka_topic_partition_list_add(
1750                                 offsets,
1751                                 orig_offsets->elems[i].topic,
1752                                 orig_offsets->elems[i].partition);
1753                 else {
1754                         rd_kafka_topic_partition_list_add(
1755                                 to_delete,
1756                                 orig_offsets->elems[i].topic,
1757                                 orig_offsets->elems[i].partition);
1758                         rd_kafka_topic_partition_list_add(
1759                                 offsets,
1760                                 orig_offsets->elems[i].topic,
1761                                 orig_offsets->elems[i].partition)->offset =
1762                                 RD_KAFKA_OFFSET_INVALID;
1763                 }
1764 
1765         }
1766 
1767         cgoffsets = rd_kafka_DeleteConsumerGroupOffsets_new(groupid, to_delete);
1768 
1769         TIMING_START(&timing, "DeleteConsumerGroupOffsets");
1770         TEST_SAY("Call DeleteConsumerGroupOffsets\n");
1771         rd_kafka_DeleteConsumerGroupOffsets(rk, &cgoffsets, 1, options, q);
1772         TIMING_ASSERT_LATER(&timing, 0, 50);
1773 
1774         rd_kafka_DeleteConsumerGroupOffsets_destroy(cgoffsets);
1775 
1776         TIMING_START(&timing, "DeleteConsumerGroupOffsets.queue_poll");
1777         /* Poll result queue for DeleteConsumerGroupOffsets result.
1778          * Print but otherwise ignore other event types
1779          * (typically generic Error events). */
1780         while (1) {
1781                 rkev = rd_kafka_queue_poll(q, tmout_multip(10*1000));
1782                 TEST_SAY("DeleteConsumerGroupOffsets: got %s in %.3fms\n",
1783                          rd_kafka_event_name(rkev),
1784                          TIMING_DURATION(&timing) / 1000.0f);
1785                 if (rkev == NULL)
1786                         continue;
1787                 if (rd_kafka_event_error(rkev))
1788                         TEST_SAY("%s: %s\n",
1789                                  rd_kafka_event_name(rkev),
1790                                  rd_kafka_event_error_string(rkev));
1791 
1792                 if (rd_kafka_event_type(rkev) ==
1793                     RD_KAFKA_EVENT_DELETECONSUMERGROUPOFFSETS_RESULT)
1794                         break;
1795 
1796                 rd_kafka_event_destroy(rkev);
1797         }
1798 
1799         /* Convert event to proper result */
1800         res = rd_kafka_event_DeleteConsumerGroupOffsets_result(rkev);
1801         TEST_ASSERT(res, "expected DeleteConsumerGroupOffsets_result, not %s",
1802                     rd_kafka_event_name(rkev));
1803 
1804         /* Expecting error */
1805         err = rd_kafka_event_error(rkev);
1806         errstr2 = rd_kafka_event_error_string(rkev);
1807         TEST_ASSERT(!err,
1808                     "expected DeleteConsumerGroupOffsets to succeed, "
1809                     "got %s (%s)",
1810                     rd_kafka_err2name(err),
1811                     err ? errstr2 : "n/a");
1812 
1813         TEST_SAY("DeleteConsumerGroupOffsets: returned %s (%s)\n",
1814                  rd_kafka_err2str(err), err ? errstr2 : "n/a");
1815 
1816         gres = rd_kafka_DeleteConsumerGroupOffsets_result_groups(res,
1817                                                                  &gres_cnt);
1818         TEST_ASSERT(gres && gres_cnt == 1,
1819                     "expected gres_cnt == 1, not %"PRIusz, gres_cnt);
1820 
1821         deleted = rd_kafka_topic_partition_list_copy(
1822                 rd_kafka_group_result_partitions(gres[0]));
1823 
1824         if (test_partition_list_cmp(deleted, to_delete)) {
1825                 TEST_SAY("Result list:\n");
1826                 test_print_partition_list(deleted);
1827                 TEST_SAY("Partitions passed to DeleteConsumerGroupOffsets:\n");
1828                 test_print_partition_list(to_delete);
1829                 TEST_FAIL("deleted/requested offsets don't match");
1830         }
1831 
1832         /* Verify expected errors */
1833         for (i = 0 ; i < deleted->cnt ; i++) {
1834                 TEST_ASSERT_LATER(deleted->elems[i].err == exp_err,
1835                                   "Result %s [%"PRId32"] has error %s, "
1836                                   "expected %s",
1837                                   deleted->elems[i].topic,
1838                                   deleted->elems[i].partition,
1839                                   rd_kafka_err2name(deleted->elems[i].err),
1840                                   rd_kafka_err2name(exp_err));
1841         }
1842 
1843         TEST_LATER_CHECK();
1844 
1845         rd_kafka_topic_partition_list_destroy(deleted);
1846         rd_kafka_topic_partition_list_destroy(to_delete);
1847 
1848         rd_kafka_event_destroy(rkev);
1849 
1850 
1851         /* Verify committed offsets match */
1852         committed = rd_kafka_topic_partition_list_copy(orig_offsets);
1853         TEST_CALL_ERR__(rd_kafka_committed(consumer, committed,
1854                                            tmout_multip(5*1000)));
1855 
1856         TEST_SAY("Original committed offsets:\n");
1857         test_print_partition_list(orig_offsets);
1858 
1859         TEST_SAY("Committed offsets after delete:\n");
1860         test_print_partition_list(committed);
1861 
1862         if (test_partition_list_cmp(committed, offsets)) {
1863                 TEST_SAY("expected list:\n");
1864                 test_print_partition_list(offsets);
1865                 TEST_SAY("committed() list:\n");
1866                 test_print_partition_list(committed);
1867                 TEST_FAIL("committed offsets don't match");
1868         }
1869 
1870         rd_kafka_topic_partition_list_destroy(committed);
1871         rd_kafka_topic_partition_list_destroy(offsets);
1872         rd_kafka_topic_partition_list_destroy(orig_offsets);
1873         rd_kafka_topic_partition_list_destroy(subscription);
1874 
1875         for (i = 0 ; i < MY_TOPIC_CNT ; i++)
1876                 rd_free(topics[i]);
1877 
1878         rd_kafka_destroy(consumer);
1879 
1880         if (options)
1881                 rd_kafka_AdminOptions_destroy(options);
1882 
1883         if (!useq)
1884                 rd_kafka_queue_destroy(q);
1885 
1886         TEST_LATER_CHECK();
1887 #undef MY_DEL_RECORDS_CNT
1888 
1889         SUB_TEST_PASS();
1890 }
1891 
1892 
do_test_apis(rd_kafka_type_t cltype)1893 static void do_test_apis (rd_kafka_type_t cltype) {
1894         rd_kafka_t *rk;
1895         rd_kafka_conf_t *conf;
1896         rd_kafka_queue_t *mainq;
1897 
1898         /* Get the available brokers, but use a separate rd_kafka_t instance
1899          * so we don't jinx the tests by having up-to-date metadata. */
1900         avail_brokers = test_get_broker_ids(NULL, &avail_broker_cnt);
1901         TEST_SAY("%"PRIusz" brokers in cluster "
1902                  "which will be used for replica sets\n",
1903                  avail_broker_cnt);
1904 
1905         do_test_unclean_destroy(cltype, 0/*tempq*/);
1906         do_test_unclean_destroy(cltype, 1/*mainq*/);
1907 
1908         test_conf_init(&conf, NULL, 180);
1909         test_conf_set(conf, "socket.timeout.ms", "10000");
1910         rk = test_create_handle(cltype, conf);
1911 
1912         mainq = rd_kafka_queue_get_main(rk);
1913 
1914         /* Create topics */
1915         do_test_CreateTopics("temp queue, op timeout 0",
1916                              rk, NULL, 0, 0);
1917         do_test_CreateTopics("temp queue, op timeout 15000",
1918                              rk, NULL, 15000, 0);
1919         do_test_CreateTopics("temp queue, op timeout 300, "
1920                              "validate only",
1921                              rk, NULL, 300, rd_true);
1922         do_test_CreateTopics("temp queue, op timeout 9000, validate_only",
1923                              rk, NULL, 9000, rd_true);
1924         do_test_CreateTopics("main queue, options", rk, mainq, -1, 0);
1925 
1926         /* Delete topics */
1927         do_test_DeleteTopics("temp queue, op timeout 0", rk, NULL, 0);
1928         do_test_DeleteTopics("main queue, op timeout 15000", rk, mainq, 1500);
1929 
1930         if (test_broker_version >= TEST_BRKVER(1,0,0,0)) {
1931                 /* Create Partitions */
1932                 do_test_CreatePartitions("temp queue, op timeout 6500",
1933                                          rk, NULL, 6500);
1934                 do_test_CreatePartitions("main queue, op timeout 0",
1935                                          rk, mainq, 0);
1936         }
1937 
1938         /* AlterConfigs */
1939         do_test_AlterConfigs(rk, mainq);
1940 
1941         /* DescribeConfigs */
1942         do_test_DescribeConfigs(rk, mainq);
1943 
1944         /* Delete records */
1945         do_test_DeleteRecords("temp queue, op timeout 0", rk, NULL, 0);
1946         do_test_DeleteRecords("main queue, op timeout 1500", rk, mainq, 1500);
1947 
1948         /* Delete groups */
1949         do_test_DeleteGroups("temp queue, op timeout 0", rk, NULL, 0);
1950         do_test_DeleteGroups("main queue, op timeout 1500", rk, mainq, 1500);
1951         do_test_DeleteGroups("main queue, op timeout 1500", rk, mainq, 1500);
1952 
1953         if (test_broker_version >= TEST_BRKVER(2,4,0,0)) {
1954                 /* Delete committed offsets */
1955                 do_test_DeleteConsumerGroupOffsets(
1956                         "temp queue, op timeout 0", rk, NULL, 0, rd_false);
1957                 do_test_DeleteConsumerGroupOffsets(
1958                         "main queue, op timeout 1500", rk, mainq, 1500,
1959                         rd_false);
1960                 do_test_DeleteConsumerGroupOffsets(
1961                         "main queue, op timeout 1500", rk, mainq, 1500,
1962                         rd_true/*with subscribing consumer*/);
1963         }
1964 
1965         rd_kafka_queue_destroy(mainq);
1966 
1967         rd_kafka_destroy(rk);
1968 
1969         free(avail_brokers);
1970 }
1971 
1972 
main_0081_admin(int argc,char ** argv)1973 int main_0081_admin (int argc, char **argv) {
1974 
1975         do_test_apis(RD_KAFKA_PRODUCER);
1976 
1977         if (test_quick) {
1978                 TEST_SAY("Skipping further 0081 tests due to quick mode\n");
1979                 return 0;
1980         }
1981 
1982         do_test_apis(RD_KAFKA_CONSUMER);
1983 
1984         return 0;
1985 }
1986