1 /*
2 * librdkafka - Apache Kafka C library
3 *
4 * Copyright (c) 2012-2015, Magnus Edenhill
5 * All rights reserved.
6 *
7 * Redistribution and use in source and binary forms, with or without
8 * modification, are permitted provided that the following conditions are met:
9 *
10 * 1. Redistributions of source code must retain the above copyright notice,
11 * this list of conditions and the following disclaimer.
12 * 2. Redistributions in binary form must reproduce the above copyright notice,
13 * this list of conditions and the following disclaimer in the documentation
14 * and/or other materials provided with the distribution.
15 *
16 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
17 * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
18 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
19 * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
20 * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
21 * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
22 * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
23 * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
24 * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
25 * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
26 * POSSIBILITY OF SUCH DAMAGE.
27 */
28
29 #include "test.h"
30 #include "rdkafka.h"
31
32 /**
33 * @brief Admin API integration tests.
34 */
35
36
37 static int32_t *avail_brokers;
38 static size_t avail_broker_cnt;
39
40
41
42
do_test_CreateTopics(const char * what,rd_kafka_t * rk,rd_kafka_queue_t * useq,int op_timeout,rd_bool_t validate_only)43 static void do_test_CreateTopics (const char *what,
44 rd_kafka_t *rk, rd_kafka_queue_t *useq,
45 int op_timeout, rd_bool_t validate_only) {
46 rd_kafka_queue_t *q;
47 #define MY_NEW_TOPICS_CNT 7
48 char *topics[MY_NEW_TOPICS_CNT];
49 rd_kafka_NewTopic_t *new_topics[MY_NEW_TOPICS_CNT];
50 rd_kafka_AdminOptions_t *options = NULL;
51 rd_kafka_resp_err_t exp_topicerr[MY_NEW_TOPICS_CNT] = {0};
52 rd_kafka_resp_err_t exp_err = RD_KAFKA_RESP_ERR_NO_ERROR;
53 /* Expected topics in metadata */
54 rd_kafka_metadata_topic_t exp_mdtopics[MY_NEW_TOPICS_CNT] = {{0}};
55 int exp_mdtopic_cnt = 0;
56 /* Not expected topics in metadata */
57 rd_kafka_metadata_topic_t exp_not_mdtopics[MY_NEW_TOPICS_CNT] = {{0}};
58 int exp_not_mdtopic_cnt = 0;
59 int i;
60 char errstr[512];
61 const char *errstr2;
62 rd_kafka_resp_err_t err;
63 test_timing_t timing;
64 rd_kafka_event_t *rkev;
65 const rd_kafka_CreateTopics_result_t *res;
66 const rd_kafka_topic_result_t **restopics;
67 size_t restopic_cnt;
68 int metadata_tmout ;
69 int num_replicas = (int)avail_broker_cnt;
70 int32_t *replicas;
71
72 SUB_TEST_QUICK("%s CreateTopics with %s, "
73 "op_timeout %d, validate_only %d",
74 rd_kafka_name(rk), what, op_timeout, validate_only);
75
76 q = useq ? useq : rd_kafka_queue_new(rk);
77
78 /* Set up replicas */
79 replicas = rd_alloca(sizeof(*replicas) * num_replicas);
80 for (i = 0 ; i < num_replicas ; i++)
81 replicas[i] = avail_brokers[i];
82
83 /**
84 * Construct NewTopic array with different properties for
85 * different partitions.
86 */
87 for (i = 0 ; i < MY_NEW_TOPICS_CNT ; i++) {
88 char *topic = rd_strdup(test_mk_topic_name(__FUNCTION__, 1));
89 int use_defaults = i == 6 &&
90 test_broker_version >= TEST_BRKVER(2,4,0,0);
91 int num_parts = !use_defaults ? (i * 7 + 1) : -1;
92 int set_config = (i & 1);
93 int add_invalid_config = (i == 1);
94 int set_replicas = !use_defaults && !(i % 3);
95 rd_kafka_resp_err_t this_exp_err = RD_KAFKA_RESP_ERR_NO_ERROR;
96
97 topics[i] = topic;
98 new_topics[i] = rd_kafka_NewTopic_new(topic,
99 num_parts,
100 set_replicas ? -1 :
101 num_replicas,
102 NULL, 0);
103
104 if (set_config) {
105 /*
106 * Add various configuration properties
107 */
108 err = rd_kafka_NewTopic_set_config(
109 new_topics[i], "compression.type", "lz4");
110 TEST_ASSERT(!err, "%s", rd_kafka_err2str(err));
111
112 err = rd_kafka_NewTopic_set_config(
113 new_topics[i], "delete.retention.ms", "900");
114 TEST_ASSERT(!err, "%s", rd_kafka_err2str(err));
115 }
116
117 if (add_invalid_config) {
118 /* Add invalid config property */
119 err = rd_kafka_NewTopic_set_config(
120 new_topics[i],
121 "dummy.doesntexist",
122 "broker is verifying this");
123 TEST_ASSERT(!err, "%s", rd_kafka_err2str(err));
124 this_exp_err = RD_KAFKA_RESP_ERR_INVALID_CONFIG;
125 }
126
127 TEST_SAY("Expecting result for topic #%d: %s "
128 "(set_config=%d, add_invalid_config=%d, "
129 "set_replicas=%d, use_defaults=%d)\n",
130 i, rd_kafka_err2name(this_exp_err),
131 set_config, add_invalid_config, set_replicas,
132 use_defaults);
133
134 if (set_replicas) {
135 int32_t p;
136
137 /*
138 * Set valid replica assignments
139 */
140 for (p = 0 ; p < num_parts ; p++) {
141 err = rd_kafka_NewTopic_set_replica_assignment(
142 new_topics[i], p,
143 replicas, num_replicas,
144 errstr, sizeof(errstr));
145 TEST_ASSERT(!err, "%s", errstr);
146 }
147 }
148
149 if (this_exp_err || validate_only) {
150 exp_topicerr[i] = this_exp_err;
151 exp_not_mdtopics[exp_not_mdtopic_cnt++].topic = topic;
152
153 } else {
154 exp_mdtopics[exp_mdtopic_cnt].topic = topic;
155 exp_mdtopics[exp_mdtopic_cnt].partition_cnt =
156 num_parts;
157 exp_mdtopic_cnt++;
158 }
159 }
160
161 if (op_timeout != -1 || validate_only) {
162 options = rd_kafka_AdminOptions_new(
163 rk, RD_KAFKA_ADMIN_OP_CREATETOPICS);
164
165 if (op_timeout != -1) {
166 err = rd_kafka_AdminOptions_set_operation_timeout(
167 options, op_timeout, errstr, sizeof(errstr));
168 TEST_ASSERT(!err, "%s", rd_kafka_err2str(err));
169 }
170
171 if (validate_only) {
172 err = rd_kafka_AdminOptions_set_validate_only(
173 options, validate_only, errstr, sizeof(errstr));
174 TEST_ASSERT(!err, "%s", rd_kafka_err2str(err));
175 }
176 }
177
178 TIMING_START(&timing, "CreateTopics");
179 TEST_SAY("Call CreateTopics\n");
180 rd_kafka_CreateTopics(rk, new_topics, MY_NEW_TOPICS_CNT,
181 options, q);
182 TIMING_ASSERT_LATER(&timing, 0, 50);
183
184 /* Poll result queue for CreateTopics result.
185 * Print but otherwise ignore other event types
186 * (typically generic Error events). */
187 TIMING_START(&timing, "CreateTopics.queue_poll");
188 do {
189 rkev = rd_kafka_queue_poll(q, tmout_multip(20*1000));
190 TEST_SAY("CreateTopics: got %s in %.3fms\n",
191 rd_kafka_event_name(rkev),
192 TIMING_DURATION(&timing) / 1000.0f);
193 if (rd_kafka_event_error(rkev))
194 TEST_SAY("%s: %s\n",
195 rd_kafka_event_name(rkev),
196 rd_kafka_event_error_string(rkev));
197 } while (rd_kafka_event_type(rkev) !=
198 RD_KAFKA_EVENT_CREATETOPICS_RESULT);
199
200 /* Convert event to proper result */
201 res = rd_kafka_event_CreateTopics_result(rkev);
202 TEST_ASSERT(res, "expected CreateTopics_result, not %s",
203 rd_kafka_event_name(rkev));
204
205 /* Expecting error */
206 err = rd_kafka_event_error(rkev);
207 errstr2 = rd_kafka_event_error_string(rkev);
208 TEST_ASSERT(err == exp_err,
209 "expected CreateTopics to return %s, not %s (%s)",
210 rd_kafka_err2str(exp_err),
211 rd_kafka_err2str(err),
212 err ? errstr2 : "n/a");
213
214 TEST_SAY("CreateTopics: returned %s (%s)\n",
215 rd_kafka_err2str(err), err ? errstr2 : "n/a");
216
217 /* Extract topics */
218 restopics = rd_kafka_CreateTopics_result_topics(res, &restopic_cnt);
219
220
221 /* Scan topics for proper fields and expected failures. */
222 for (i = 0 ; i < (int)restopic_cnt ; i++) {
223 const rd_kafka_topic_result_t *terr = restopics[i];
224
225 /* Verify that topic order matches our request. */
226 if (strcmp(rd_kafka_topic_result_name(terr), topics[i]))
227 TEST_FAIL_LATER("Topic result order mismatch at #%d: "
228 "expected %s, got %s",
229 i, topics[i],
230 rd_kafka_topic_result_name(terr));
231
232 TEST_SAY("CreateTopics result: #%d: %s: %s: %s\n",
233 i,
234 rd_kafka_topic_result_name(terr),
235 rd_kafka_err2name(rd_kafka_topic_result_error(terr)),
236 rd_kafka_topic_result_error_string(terr));
237 if (rd_kafka_topic_result_error(terr) != exp_topicerr[i])
238 TEST_FAIL_LATER(
239 "Expected %s, not %d: %s",
240 rd_kafka_err2name(exp_topicerr[i]),
241 rd_kafka_topic_result_error(terr),
242 rd_kafka_err2name(rd_kafka_topic_result_error(
243 terr)));
244 }
245
246 /**
247 * Verify that the expecteded topics are created and the non-expected
248 * are not. Allow it some time to propagate.
249 */
250 if (validate_only) {
251 /* No topics should have been created, give it some time
252 * before checking. */
253 rd_sleep(2);
254 metadata_tmout = 5 * 1000;
255 } else {
256 if (op_timeout > 0)
257 metadata_tmout = op_timeout + 1000;
258 else
259 metadata_tmout = 10 * 1000;
260 }
261
262 test_wait_metadata_update(rk,
263 exp_mdtopics,
264 exp_mdtopic_cnt,
265 exp_not_mdtopics,
266 exp_not_mdtopic_cnt,
267 metadata_tmout);
268
269 rd_kafka_event_destroy(rkev);
270
271 for (i = 0 ; i < MY_NEW_TOPICS_CNT ; i++) {
272 rd_kafka_NewTopic_destroy(new_topics[i]);
273 rd_free(topics[i]);
274 }
275
276 if (options)
277 rd_kafka_AdminOptions_destroy(options);
278
279 if (!useq)
280 rd_kafka_queue_destroy(q);
281
282 TEST_LATER_CHECK();
283 #undef MY_NEW_TOPICS_CNT
284
285 SUB_TEST_PASS();
286 }
287
288
289
290
291 /**
292 * @brief Test deletion of topics
293 *
294 *
295 */
do_test_DeleteTopics(const char * what,rd_kafka_t * rk,rd_kafka_queue_t * useq,int op_timeout)296 static void do_test_DeleteTopics (const char *what,
297 rd_kafka_t *rk, rd_kafka_queue_t *useq,
298 int op_timeout) {
299 rd_kafka_queue_t *q;
300 const int skip_topic_cnt = 2;
301 #define MY_DEL_TOPICS_CNT 9
302 char *topics[MY_DEL_TOPICS_CNT];
303 rd_kafka_DeleteTopic_t *del_topics[MY_DEL_TOPICS_CNT];
304 rd_kafka_AdminOptions_t *options = NULL;
305 rd_kafka_resp_err_t exp_topicerr[MY_DEL_TOPICS_CNT] = {0};
306 rd_kafka_resp_err_t exp_err = RD_KAFKA_RESP_ERR_NO_ERROR;
307 /* Expected topics in metadata */
308 rd_kafka_metadata_topic_t exp_mdtopics[MY_DEL_TOPICS_CNT] = {{0}};
309 int exp_mdtopic_cnt = 0;
310 /* Not expected topics in metadata */
311 rd_kafka_metadata_topic_t exp_not_mdtopics[MY_DEL_TOPICS_CNT] = {{0}};
312 int exp_not_mdtopic_cnt = 0;
313 int i;
314 char errstr[512];
315 const char *errstr2;
316 rd_kafka_resp_err_t err;
317 test_timing_t timing;
318 rd_kafka_event_t *rkev;
319 const rd_kafka_DeleteTopics_result_t *res;
320 const rd_kafka_topic_result_t **restopics;
321 size_t restopic_cnt;
322 int metadata_tmout;
323
324 SUB_TEST_QUICK("%s DeleteTopics with %s, op_timeout %d",
325 rd_kafka_name(rk), what, op_timeout);
326
327 q = useq ? useq : rd_kafka_queue_new(rk);
328
329 /**
330 * Construct DeleteTopic array
331 */
332 for (i = 0 ; i < MY_DEL_TOPICS_CNT ; i++) {
333 char *topic = rd_strdup(test_mk_topic_name(__FUNCTION__, 1));
334 int notexist_topic = i >= MY_DEL_TOPICS_CNT - skip_topic_cnt;
335
336 topics[i] = topic;
337
338 del_topics[i] = rd_kafka_DeleteTopic_new(topic);
339
340 if (notexist_topic)
341 exp_topicerr[i] =
342 RD_KAFKA_RESP_ERR_UNKNOWN_TOPIC_OR_PART;
343 else {
344 exp_topicerr[i] =
345 RD_KAFKA_RESP_ERR_NO_ERROR;
346
347 exp_mdtopics[exp_mdtopic_cnt++].topic = topic;
348 }
349
350 exp_not_mdtopics[exp_not_mdtopic_cnt++].topic = topic;
351 }
352
353 if (op_timeout != -1) {
354 options = rd_kafka_AdminOptions_new(
355 rk, RD_KAFKA_ADMIN_OP_ANY);
356
357 err = rd_kafka_AdminOptions_set_operation_timeout(
358 options, op_timeout, errstr, sizeof(errstr));
359 TEST_ASSERT(!err, "%s", rd_kafka_err2str(err));
360 }
361
362
363 /* Create the topics first, minus the skip count. */
364 test_CreateTopics_simple(rk, NULL, topics,
365 MY_DEL_TOPICS_CNT-skip_topic_cnt,
366 2/*num_partitions*/,
367 NULL);
368
369 /* Verify that topics are reported by metadata */
370 test_wait_metadata_update(rk,
371 exp_mdtopics, exp_mdtopic_cnt,
372 NULL, 0,
373 15*1000);
374
375 TIMING_START(&timing, "DeleteTopics");
376 TEST_SAY("Call DeleteTopics\n");
377 rd_kafka_DeleteTopics(rk, del_topics, MY_DEL_TOPICS_CNT,
378 options, q);
379 TIMING_ASSERT_LATER(&timing, 0, 50);
380
381 /* Poll result queue for DeleteTopics result.
382 * Print but otherwise ignore other event types
383 * (typically generic Error events). */
384 TIMING_START(&timing, "DeleteTopics.queue_poll");
385 while (1) {
386 rkev = rd_kafka_queue_poll(q, tmout_multip(20*1000));
387 TEST_SAY("DeleteTopics: got %s in %.3fms\n",
388 rd_kafka_event_name(rkev),
389 TIMING_DURATION(&timing) / 1000.0f);
390 if (rd_kafka_event_error(rkev))
391 TEST_SAY("%s: %s\n",
392 rd_kafka_event_name(rkev),
393 rd_kafka_event_error_string(rkev));
394
395 if (rd_kafka_event_type(rkev) ==
396 RD_KAFKA_EVENT_DELETETOPICS_RESULT)
397 break;
398
399 rd_kafka_event_destroy(rkev);
400 }
401
402 /* Convert event to proper result */
403 res = rd_kafka_event_DeleteTopics_result(rkev);
404 TEST_ASSERT(res, "expected DeleteTopics_result, not %s",
405 rd_kafka_event_name(rkev));
406
407 /* Expecting error */
408 err = rd_kafka_event_error(rkev);
409 errstr2 = rd_kafka_event_error_string(rkev);
410 TEST_ASSERT(err == exp_err,
411 "expected DeleteTopics to return %s, not %s (%s)",
412 rd_kafka_err2str(exp_err),
413 rd_kafka_err2str(err),
414 err ? errstr2 : "n/a");
415
416 TEST_SAY("DeleteTopics: returned %s (%s)\n",
417 rd_kafka_err2str(err), err ? errstr2 : "n/a");
418
419 /* Extract topics */
420 restopics = rd_kafka_DeleteTopics_result_topics(res, &restopic_cnt);
421
422
423 /* Scan topics for proper fields and expected failures. */
424 for (i = 0 ; i < (int)restopic_cnt ; i++) {
425 const rd_kafka_topic_result_t *terr = restopics[i];
426
427 /* Verify that topic order matches our request. */
428 if (strcmp(rd_kafka_topic_result_name(terr), topics[i]))
429 TEST_FAIL_LATER("Topic result order mismatch at #%d: "
430 "expected %s, got %s",
431 i, topics[i],
432 rd_kafka_topic_result_name(terr));
433
434 TEST_SAY("DeleteTopics result: #%d: %s: %s: %s\n",
435 i,
436 rd_kafka_topic_result_name(terr),
437 rd_kafka_err2name(rd_kafka_topic_result_error(terr)),
438 rd_kafka_topic_result_error_string(terr));
439 if (rd_kafka_topic_result_error(terr) != exp_topicerr[i])
440 TEST_FAIL_LATER(
441 "Expected %s, not %d: %s",
442 rd_kafka_err2name(exp_topicerr[i]),
443 rd_kafka_topic_result_error(terr),
444 rd_kafka_err2name(rd_kafka_topic_result_error(
445 terr)));
446 }
447
448 /**
449 * Verify that the expected topics are deleted and the non-expected
450 * are not. Allow it some time to propagate.
451 */
452 if (op_timeout > 0)
453 metadata_tmout = op_timeout + 1000;
454 else
455 metadata_tmout = 10 * 1000;
456
457 test_wait_metadata_update(rk,
458 NULL, 0,
459 exp_not_mdtopics,
460 exp_not_mdtopic_cnt,
461 metadata_tmout);
462
463 rd_kafka_event_destroy(rkev);
464
465 for (i = 0 ; i < MY_DEL_TOPICS_CNT ; i++) {
466 rd_kafka_DeleteTopic_destroy(del_topics[i]);
467 rd_free(topics[i]);
468 }
469
470 if (options)
471 rd_kafka_AdminOptions_destroy(options);
472
473 if (!useq)
474 rd_kafka_queue_destroy(q);
475
476 TEST_LATER_CHECK();
477 #undef MY_DEL_TOPICS_CNT
478
479 SUB_TEST_PASS();
480 }
481
482
483
484 /**
485 * @brief Test creation of partitions
486 *
487 *
488 */
do_test_CreatePartitions(const char * what,rd_kafka_t * rk,rd_kafka_queue_t * useq,int op_timeout)489 static void do_test_CreatePartitions (const char *what,
490 rd_kafka_t *rk, rd_kafka_queue_t *useq,
491 int op_timeout) {
492 rd_kafka_queue_t *q;
493 #define MY_CRP_TOPICS_CNT 9
494 char *topics[MY_CRP_TOPICS_CNT];
495 rd_kafka_NewTopic_t *new_topics[MY_CRP_TOPICS_CNT];
496 rd_kafka_NewPartitions_t *crp_topics[MY_CRP_TOPICS_CNT];
497 rd_kafka_AdminOptions_t *options = NULL;
498 /* Expected topics in metadata */
499 rd_kafka_metadata_topic_t exp_mdtopics[MY_CRP_TOPICS_CNT] = {{0}};
500 rd_kafka_metadata_partition_t exp_mdparts[2] = {{0}};
501 int exp_mdtopic_cnt = 0;
502 int i;
503 char errstr[512];
504 rd_kafka_resp_err_t err;
505 test_timing_t timing;
506 int metadata_tmout;
507 int num_replicas = (int)avail_broker_cnt;
508
509 SUB_TEST_QUICK("%s CreatePartitions with %s, op_timeout %d",
510 rd_kafka_name(rk), what, op_timeout);
511
512 q = useq ? useq : rd_kafka_queue_new(rk);
513
514 /* Set up two expected partitions with different replication sets
515 * so they can be matched by the metadata checker later.
516 * Even partitions use exp_mdparts[0] while odd partitions
517 * use exp_mdparts[1]. */
518
519 /* Set valid replica assignments (even, and odd (reverse) ) */
520 exp_mdparts[0].replicas = rd_alloca(sizeof(*exp_mdparts[0].replicas) *
521 num_replicas);
522 exp_mdparts[1].replicas = rd_alloca(sizeof(*exp_mdparts[1].replicas) *
523 num_replicas);
524 exp_mdparts[0].replica_cnt = num_replicas;
525 exp_mdparts[1].replica_cnt = num_replicas;
526 for (i = 0 ; i < num_replicas ; i++) {
527 exp_mdparts[0].replicas[i] = avail_brokers[i];
528 exp_mdparts[1].replicas[i] = avail_brokers[num_replicas-i-1];
529 }
530
531 /**
532 * Construct CreatePartitions array
533 */
534 for (i = 0 ; i < MY_CRP_TOPICS_CNT ; i++) {
535 char *topic = rd_strdup(test_mk_topic_name(__FUNCTION__, 1));
536 int initial_part_cnt = 1 + (i * 2);
537 int new_part_cnt = 1 + (i / 2);
538 int final_part_cnt = initial_part_cnt + new_part_cnt;
539 int set_replicas = !(i % 2);
540 int pi;
541
542 topics[i] = topic;
543
544 /* Topic to create with initial partition count */
545 new_topics[i] = rd_kafka_NewTopic_new(topic, initial_part_cnt,
546 set_replicas ?
547 -1 : num_replicas,
548 NULL, 0);
549
550 /* .. and later add more partitions to */
551 crp_topics[i] = rd_kafka_NewPartitions_new(topic,
552 final_part_cnt,
553 errstr,
554 sizeof(errstr));
555
556 if (set_replicas) {
557 exp_mdtopics[exp_mdtopic_cnt].partitions =
558 rd_alloca(final_part_cnt *
559 sizeof(*exp_mdtopics[exp_mdtopic_cnt].
560 partitions));
561
562 for (pi = 0 ; pi < final_part_cnt ; pi++) {
563 const rd_kafka_metadata_partition_t *exp_mdp =
564 &exp_mdparts[pi & 1];
565
566 exp_mdtopics[exp_mdtopic_cnt].
567 partitions[pi] = *exp_mdp; /* copy */
568
569 exp_mdtopics[exp_mdtopic_cnt].
570 partitions[pi].id = pi;
571
572 if (pi < initial_part_cnt) {
573 /* Set replica assignment
574 * for initial partitions */
575 err = rd_kafka_NewTopic_set_replica_assignment(
576 new_topics[i], pi,
577 exp_mdp->replicas,
578 (size_t)exp_mdp->replica_cnt,
579 errstr, sizeof(errstr));
580 TEST_ASSERT(!err, "NewTopic_set_replica_assignment: %s",
581 errstr);
582 } else {
583 /* Set replica assignment for new
584 * partitions */
585 err = rd_kafka_NewPartitions_set_replica_assignment(
586 crp_topics[i],
587 pi - initial_part_cnt,
588 exp_mdp->replicas,
589 (size_t)exp_mdp->replica_cnt,
590 errstr, sizeof(errstr));
591 TEST_ASSERT(!err, "NewPartitions_set_replica_assignment: %s",
592 errstr);
593 }
594
595 }
596 }
597
598 TEST_SAY(_C_YEL "Topic %s with %d initial partitions will grow "
599 "by %d to %d total partitions with%s replicas set\n",
600 topics[i],
601 initial_part_cnt, new_part_cnt, final_part_cnt,
602 set_replicas ? "" : "out");
603
604 exp_mdtopics[exp_mdtopic_cnt].topic = topic;
605 exp_mdtopics[exp_mdtopic_cnt].partition_cnt = final_part_cnt;
606
607 exp_mdtopic_cnt++;
608 }
609
610 if (op_timeout != -1) {
611 options = rd_kafka_AdminOptions_new(
612 rk, RD_KAFKA_ADMIN_OP_ANY);
613
614 err = rd_kafka_AdminOptions_set_operation_timeout(
615 options, op_timeout, errstr, sizeof(errstr));
616 TEST_ASSERT(!err, "%s", rd_kafka_err2str(err));
617 }
618
619 /*
620 * Create topics with initial partition count
621 */
622 TIMING_START(&timing, "CreateTopics");
623 TEST_SAY("Creating topics with initial partition counts\n");
624 rd_kafka_CreateTopics(rk, new_topics, MY_CRP_TOPICS_CNT,
625 options, q);
626 TIMING_ASSERT_LATER(&timing, 0, 50);
627
628 err = test_wait_topic_admin_result(q,
629 RD_KAFKA_EVENT_CREATETOPICS_RESULT,
630 NULL, 15000);
631 TEST_ASSERT(!err, "CreateTopics failed: %s", rd_kafka_err2str(err));
632
633 rd_kafka_NewTopic_destroy_array(new_topics, MY_CRP_TOPICS_CNT);
634
635
636 /*
637 * Create new partitions
638 */
639 TIMING_START(&timing, "CreatePartitions");
640 TEST_SAY("Creating partitions\n");
641 rd_kafka_CreatePartitions(rk, crp_topics, MY_CRP_TOPICS_CNT,
642 options, q);
643 TIMING_ASSERT_LATER(&timing, 0, 50);
644
645 err = test_wait_topic_admin_result(q,
646 RD_KAFKA_EVENT_CREATEPARTITIONS_RESULT,
647 NULL, 15000);
648 TEST_ASSERT(!err, "CreatePartitions failed: %s", rd_kafka_err2str(err));
649
650 rd_kafka_NewPartitions_destroy_array(crp_topics, MY_CRP_TOPICS_CNT);
651
652
653 /**
654 * Verify that the expected topics are deleted and the non-expected
655 * are not. Allow it some time to propagate.
656 */
657 if (op_timeout > 0)
658 metadata_tmout = op_timeout + 1000;
659 else
660 metadata_tmout = 10 * 1000;
661
662 test_wait_metadata_update(rk,
663 exp_mdtopics,
664 exp_mdtopic_cnt,
665 NULL, 0,
666 metadata_tmout);
667
668 for (i = 0 ; i < MY_CRP_TOPICS_CNT ; i++)
669 rd_free(topics[i]);
670
671 if (options)
672 rd_kafka_AdminOptions_destroy(options);
673
674 if (!useq)
675 rd_kafka_queue_destroy(q);
676
677 TEST_LATER_CHECK();
678 #undef MY_CRP_TOPICS_CNT
679
680 SUB_TEST_PASS();
681 }
682
683
684
685 /**
686 * @brief Print the ConfigEntrys in the provided array.
687 */
688 static void
test_print_ConfigEntry_array(const rd_kafka_ConfigEntry_t ** entries,size_t entry_cnt,unsigned int depth)689 test_print_ConfigEntry_array (const rd_kafka_ConfigEntry_t **entries,
690 size_t entry_cnt, unsigned int depth) {
691 const char *indent = &" "[4 - (depth > 4 ? 4 : depth)];
692 size_t ei;
693
694 for (ei = 0 ; ei < entry_cnt ; ei++) {
695 const rd_kafka_ConfigEntry_t *e = entries[ei];
696 const rd_kafka_ConfigEntry_t **syns;
697 size_t syn_cnt;
698
699 syns = rd_kafka_ConfigEntry_synonyms(e, &syn_cnt);
700
701 #define YN(v) ((v) ? "y" : "n")
702 TEST_SAYL(3,
703 "%s#%"PRIusz"/%"PRIusz
704 ": Source %s (%d): \"%s\"=\"%s\" "
705 "[is read-only=%s, default=%s, sensitive=%s, "
706 "synonym=%s] with %"PRIusz" synonym(s)\n",
707 indent,
708 ei, entry_cnt,
709 rd_kafka_ConfigSource_name(
710 rd_kafka_ConfigEntry_source(e)),
711 rd_kafka_ConfigEntry_source(e),
712 rd_kafka_ConfigEntry_name(e),
713 rd_kafka_ConfigEntry_value(e) ?
714 rd_kafka_ConfigEntry_value(e) : "(NULL)",
715 YN(rd_kafka_ConfigEntry_is_read_only(e)),
716 YN(rd_kafka_ConfigEntry_is_default(e)),
717 YN(rd_kafka_ConfigEntry_is_sensitive(e)),
718 YN(rd_kafka_ConfigEntry_is_synonym(e)),
719 syn_cnt);
720 #undef YN
721
722 if (syn_cnt > 0)
723 test_print_ConfigEntry_array(syns, syn_cnt, depth+1);
724 }
725 }
726
727
728 /**
729 * @brief Test AlterConfigs
730 */
do_test_AlterConfigs(rd_kafka_t * rk,rd_kafka_queue_t * rkqu)731 static void do_test_AlterConfigs (rd_kafka_t *rk, rd_kafka_queue_t *rkqu) {
732 #define MY_CONFRES_CNT 3
733 char *topics[MY_CONFRES_CNT];
734 rd_kafka_ConfigResource_t *configs[MY_CONFRES_CNT];
735 rd_kafka_AdminOptions_t *options;
736 rd_kafka_resp_err_t exp_err[MY_CONFRES_CNT];
737 rd_kafka_event_t *rkev;
738 rd_kafka_resp_err_t err;
739 const rd_kafka_AlterConfigs_result_t *res;
740 const rd_kafka_ConfigResource_t **rconfigs;
741 size_t rconfig_cnt;
742 char errstr[128];
743 const char *errstr2;
744 int ci = 0;
745 int i;
746 int fails = 0;
747
748 SUB_TEST_QUICK();
749
750 /*
751 * Only create one topic, the others will be non-existent.
752 */
753 for (i = 0 ; i < MY_CONFRES_CNT ; i++)
754 rd_strdupa(&topics[i], test_mk_topic_name(__FUNCTION__, 1));
755
756 test_CreateTopics_simple(rk, NULL, topics, 1, 1, NULL);
757
758 test_wait_topic_exists(rk, topics[0], 10000);
759
760 /*
761 * ConfigResource #0: valid topic config
762 */
763 configs[ci] = rd_kafka_ConfigResource_new(
764 RD_KAFKA_RESOURCE_TOPIC, topics[ci]);
765
766 err = rd_kafka_ConfigResource_set_config(configs[ci],
767 "compression.type", "gzip");
768 TEST_ASSERT(!err, "%s", rd_kafka_err2str(err));
769
770 err = rd_kafka_ConfigResource_set_config(configs[ci],
771 "flush.ms", "12345678");
772 TEST_ASSERT(!err, "%s", rd_kafka_err2str(err));
773
774 exp_err[ci] = RD_KAFKA_RESP_ERR_NO_ERROR;
775 ci++;
776
777
778 if (test_broker_version >= TEST_BRKVER(1, 1, 0, 0)) {
779 /*
780 * ConfigResource #1: valid broker config
781 */
782 configs[ci] = rd_kafka_ConfigResource_new(
783 RD_KAFKA_RESOURCE_BROKER,
784 tsprintf("%"PRId32, avail_brokers[0]));
785
786 err = rd_kafka_ConfigResource_set_config(
787 configs[ci],
788 "sasl.kerberos.min.time.before.relogin", "58000");
789 TEST_ASSERT(!err, "%s", rd_kafka_err2str(err));
790
791 exp_err[ci] = RD_KAFKA_RESP_ERR_NO_ERROR;
792 ci++;
793 } else {
794 TEST_WARN("Skipping RESOURCE_BROKER test on unsupported "
795 "broker version\n");
796 }
797
798 /*
799 * ConfigResource #2: valid topic config, non-existent topic
800 */
801 configs[ci] = rd_kafka_ConfigResource_new(
802 RD_KAFKA_RESOURCE_TOPIC, topics[ci]);
803
804 err = rd_kafka_ConfigResource_set_config(configs[ci],
805 "compression.type", "lz4");
806 TEST_ASSERT(!err, "%s", rd_kafka_err2str(err));
807
808 err = rd_kafka_ConfigResource_set_config(configs[ci],
809 "offset.metadata.max.bytes",
810 "12345");
811 TEST_ASSERT(!err, "%s", rd_kafka_err2str(err));
812
813 if (test_broker_version >= TEST_BRKVER(2, 7, 0, 0))
814 exp_err[ci] = RD_KAFKA_RESP_ERR_UNKNOWN_TOPIC_OR_PART;
815 else
816 exp_err[ci] = RD_KAFKA_RESP_ERR_UNKNOWN;
817 ci++;
818
819
820 /*
821 * Timeout options
822 */
823 options = rd_kafka_AdminOptions_new(rk, RD_KAFKA_ADMIN_OP_ALTERCONFIGS);
824 err = rd_kafka_AdminOptions_set_request_timeout(options, 10000, errstr,
825 sizeof(errstr));
826 TEST_ASSERT(!err, "%s", errstr);
827
828
829 /*
830 * Fire off request
831 */
832 rd_kafka_AlterConfigs(rk, configs, ci, options, rkqu);
833
834 rd_kafka_AdminOptions_destroy(options);
835
836 /*
837 * Wait for result
838 */
839 rkev = test_wait_admin_result(rkqu, RD_KAFKA_EVENT_ALTERCONFIGS_RESULT,
840 10000+1000);
841
842 /*
843 * Extract result
844 */
845 res = rd_kafka_event_AlterConfigs_result(rkev);
846 TEST_ASSERT(res, "Expected AlterConfigs result, not %s",
847 rd_kafka_event_name(rkev));
848
849 err = rd_kafka_event_error(rkev);
850 errstr2 = rd_kafka_event_error_string(rkev);
851 TEST_ASSERT(!err,
852 "Expected success, not %s: %s",
853 rd_kafka_err2name(err), errstr2);
854
855 rconfigs = rd_kafka_AlterConfigs_result_resources(res, &rconfig_cnt);
856 TEST_ASSERT((int)rconfig_cnt == ci,
857 "Expected %d result resources, got %"PRIusz"\n",
858 ci, rconfig_cnt);
859
860 /*
861 * Verify status per resource
862 */
863 for (i = 0 ; i < (int)rconfig_cnt ; i++) {
864 const rd_kafka_ConfigEntry_t **entries;
865 size_t entry_cnt;
866
867 err = rd_kafka_ConfigResource_error(rconfigs[i]);
868 errstr2 = rd_kafka_ConfigResource_error_string(rconfigs[i]);
869
870 entries = rd_kafka_ConfigResource_configs(rconfigs[i],
871 &entry_cnt);
872
873 TEST_SAY("ConfigResource #%d: type %s (%d), \"%s\": "
874 "%"PRIusz" ConfigEntries, error %s (%s)\n",
875 i,
876 rd_kafka_ResourceType_name(
877 rd_kafka_ConfigResource_type(rconfigs[i])),
878 rd_kafka_ConfigResource_type(rconfigs[i]),
879 rd_kafka_ConfigResource_name(rconfigs[i]),
880 entry_cnt,
881 rd_kafka_err2name(err), errstr2 ? errstr2 : "");
882
883 test_print_ConfigEntry_array(entries, entry_cnt, 1);
884
885 if (rd_kafka_ConfigResource_type(rconfigs[i]) !=
886 rd_kafka_ConfigResource_type(configs[i]) ||
887 strcmp(rd_kafka_ConfigResource_name(rconfigs[i]),
888 rd_kafka_ConfigResource_name(configs[i]))) {
889 TEST_FAIL_LATER(
890 "ConfigResource #%d: "
891 "expected type %s name %s, "
892 "got type %s name %s",
893 i,
894 rd_kafka_ResourceType_name(rd_kafka_ConfigResource_type(configs[i])),
895 rd_kafka_ConfigResource_name(configs[i]),
896 rd_kafka_ResourceType_name(rd_kafka_ConfigResource_type(rconfigs[i])),
897 rd_kafka_ConfigResource_name(rconfigs[i]));
898 fails++;
899 continue;
900 }
901
902
903 if (err != exp_err[i]) {
904 TEST_FAIL_LATER("ConfigResource #%d: "
905 "expected %s (%d), got %s (%s)",
906 i,
907 rd_kafka_err2name(exp_err[i]),
908 exp_err[i],
909 rd_kafka_err2name(err),
910 errstr2 ? errstr2 : "");
911 fails++;
912 }
913 }
914
915 TEST_ASSERT(!fails, "See %d previous failure(s)", fails);
916
917 rd_kafka_event_destroy(rkev);
918
919 rd_kafka_ConfigResource_destroy_array(configs, ci);
920
921 TEST_LATER_CHECK();
922 #undef MY_CONFRES_CNT
923
924 SUB_TEST_PASS();
925 }
926
927
928
929 /**
930 * @brief Test DescribeConfigs
931 */
do_test_DescribeConfigs(rd_kafka_t * rk,rd_kafka_queue_t * rkqu)932 static void do_test_DescribeConfigs (rd_kafka_t *rk, rd_kafka_queue_t *rkqu) {
933 #define MY_CONFRES_CNT 3
934 char *topics[MY_CONFRES_CNT];
935 rd_kafka_ConfigResource_t *configs[MY_CONFRES_CNT];
936 rd_kafka_AdminOptions_t *options;
937 rd_kafka_resp_err_t exp_err[MY_CONFRES_CNT];
938 rd_kafka_event_t *rkev;
939 rd_kafka_resp_err_t err;
940 const rd_kafka_DescribeConfigs_result_t *res;
941 const rd_kafka_ConfigResource_t **rconfigs;
942 size_t rconfig_cnt;
943 char errstr[128];
944 const char *errstr2;
945 int ci = 0;
946 int i;
947 int fails = 0;
948 int max_retry_describe = 3;
949
950 SUB_TEST_QUICK();
951
952 /*
953 * Only create one topic, the others will be non-existent.
954 */
955 rd_strdupa(&topics[0], test_mk_topic_name("DescribeConfigs_exist", 1));
956 for (i = 1 ; i < MY_CONFRES_CNT ; i++)
957 rd_strdupa(&topics[i],
958 test_mk_topic_name("DescribeConfigs_notexist", 1));
959
960 test_CreateTopics_simple(rk, NULL, topics, 1, 1, NULL);
961
962 /*
963 * ConfigResource #0: topic config, no config entries.
964 */
965 configs[ci] = rd_kafka_ConfigResource_new(
966 RD_KAFKA_RESOURCE_TOPIC, topics[ci]);
967 exp_err[ci] = RD_KAFKA_RESP_ERR_NO_ERROR;
968 ci++;
969
970 /*
971 * ConfigResource #1:broker config, no config entries
972 */
973 configs[ci] = rd_kafka_ConfigResource_new(
974 RD_KAFKA_RESOURCE_BROKER,
975 tsprintf("%"PRId32, avail_brokers[0]));
976
977 exp_err[ci] = RD_KAFKA_RESP_ERR_NO_ERROR;
978 ci++;
979
980 /*
981 * ConfigResource #2: topic config, non-existent topic, no config entr.
982 */
983 configs[ci] = rd_kafka_ConfigResource_new(
984 RD_KAFKA_RESOURCE_TOPIC, topics[ci]);
985 /* FIXME: This is a bug in the broker (<v2.0.0), it returns a full response
986 * for unknown topics.
987 * https://issues.apache.org/jira/browse/KAFKA-6778
988 */
989 if (test_broker_version < TEST_BRKVER(2,0,0,0))
990 exp_err[ci] = RD_KAFKA_RESP_ERR_NO_ERROR;
991 else
992 exp_err[ci] = RD_KAFKA_RESP_ERR_UNKNOWN_TOPIC_OR_PART;
993 ci++;
994
995
996 retry_describe:
997 /*
998 * Timeout options
999 */
1000 options = rd_kafka_AdminOptions_new(rk, RD_KAFKA_ADMIN_OP_ANY);
1001 err = rd_kafka_AdminOptions_set_request_timeout(options, 10000, errstr,
1002 sizeof(errstr));
1003 TEST_ASSERT(!err, "%s", errstr);
1004
1005
1006 /*
1007 * Fire off request
1008 */
1009 rd_kafka_DescribeConfigs(rk, configs, ci, options, rkqu);
1010
1011 rd_kafka_AdminOptions_destroy(options);
1012
1013 /*
1014 * Wait for result
1015 */
1016 rkev = test_wait_admin_result(rkqu,
1017 RD_KAFKA_EVENT_DESCRIBECONFIGS_RESULT,
1018 10000+1000);
1019
1020 /*
1021 * Extract result
1022 */
1023 res = rd_kafka_event_DescribeConfigs_result(rkev);
1024 TEST_ASSERT(res, "Expected DescribeConfigs result, not %s",
1025 rd_kafka_event_name(rkev));
1026
1027 err = rd_kafka_event_error(rkev);
1028 errstr2 = rd_kafka_event_error_string(rkev);
1029 TEST_ASSERT(!err,
1030 "Expected success, not %s: %s",
1031 rd_kafka_err2name(err), errstr2);
1032
1033 rconfigs = rd_kafka_DescribeConfigs_result_resources(res, &rconfig_cnt);
1034 TEST_ASSERT((int)rconfig_cnt == ci,
1035 "Expected %d result resources, got %"PRIusz"\n",
1036 ci, rconfig_cnt);
1037
1038 /*
1039 * Verify status per resource
1040 */
1041 for (i = 0 ; i < (int)rconfig_cnt ; i++) {
1042 const rd_kafka_ConfigEntry_t **entries;
1043 size_t entry_cnt;
1044
1045 err = rd_kafka_ConfigResource_error(rconfigs[i]);
1046 errstr2 = rd_kafka_ConfigResource_error_string(rconfigs[i]);
1047
1048 entries = rd_kafka_ConfigResource_configs(rconfigs[i],
1049 &entry_cnt);
1050
1051 TEST_SAY("ConfigResource #%d: type %s (%d), \"%s\": "
1052 "%"PRIusz" ConfigEntries, error %s (%s)\n",
1053 i,
1054 rd_kafka_ResourceType_name(
1055 rd_kafka_ConfigResource_type(rconfigs[i])),
1056 rd_kafka_ConfigResource_type(rconfigs[i]),
1057 rd_kafka_ConfigResource_name(rconfigs[i]),
1058 entry_cnt,
1059 rd_kafka_err2name(err), errstr2 ? errstr2 : "");
1060
1061 test_print_ConfigEntry_array(entries, entry_cnt, 1);
1062
1063 if (rd_kafka_ConfigResource_type(rconfigs[i]) !=
1064 rd_kafka_ConfigResource_type(configs[i]) ||
1065 strcmp(rd_kafka_ConfigResource_name(rconfigs[i]),
1066 rd_kafka_ConfigResource_name(configs[i]))) {
1067 TEST_FAIL_LATER(
1068 "ConfigResource #%d: "
1069 "expected type %s name %s, "
1070 "got type %s name %s",
1071 i,
1072 rd_kafka_ResourceType_name(rd_kafka_ConfigResource_type(configs[i])),
1073 rd_kafka_ConfigResource_name(configs[i]),
1074 rd_kafka_ResourceType_name(rd_kafka_ConfigResource_type(rconfigs[i])),
1075 rd_kafka_ConfigResource_name(rconfigs[i]));
1076 fails++;
1077 continue;
1078 }
1079
1080
1081 if (err != exp_err[i]) {
1082 if (err == RD_KAFKA_RESP_ERR_UNKNOWN_TOPIC_OR_PART &&
1083 max_retry_describe-- > 0) {
1084 TEST_WARN("ConfigResource #%d: "
1085 "expected %s (%d), got %s (%s): "
1086 "this is typically a temporary "
1087 "error while the new resource "
1088 "is propagating: retrying",
1089 i,
1090 rd_kafka_err2name(exp_err[i]),
1091 exp_err[i],
1092 rd_kafka_err2name(err),
1093 errstr2 ? errstr2 : "");
1094 rd_kafka_event_destroy(rkev);
1095 rd_sleep(1);
1096 goto retry_describe;
1097 }
1098
1099 TEST_FAIL_LATER("ConfigResource #%d: "
1100 "expected %s (%d), got %s (%s)",
1101 i,
1102 rd_kafka_err2name(exp_err[i]),
1103 exp_err[i],
1104 rd_kafka_err2name(err),
1105 errstr2 ? errstr2 : "");
1106 fails++;
1107 }
1108 }
1109
1110 TEST_ASSERT(!fails, "See %d previous failure(s)", fails);
1111
1112 rd_kafka_event_destroy(rkev);
1113
1114 rd_kafka_ConfigResource_destroy_array(configs, ci);
1115
1116 TEST_LATER_CHECK();
1117 #undef MY_CONFRES_CNT
1118
1119 SUB_TEST_PASS();
1120 }
1121
1122
1123
1124 /**
1125 * @brief Verify that an unclean rd_kafka_destroy() does not hang.
1126 */
do_test_unclean_destroy(rd_kafka_type_t cltype,int with_mainq)1127 static void do_test_unclean_destroy (rd_kafka_type_t cltype, int with_mainq) {
1128 rd_kafka_t *rk;
1129 char errstr[512];
1130 rd_kafka_conf_t *conf;
1131 rd_kafka_queue_t *q;
1132 rd_kafka_NewTopic_t *topic;
1133 test_timing_t t_destroy;
1134
1135 SUB_TEST_QUICK("Test unclean destroy using %s",
1136 with_mainq ? "mainq" : "tempq");
1137
1138 test_conf_init(&conf, NULL, 0);
1139
1140 rk = rd_kafka_new(cltype, conf, errstr, sizeof(errstr));
1141 TEST_ASSERT(rk, "kafka_new(%d): %s", cltype, errstr);
1142
1143 if (with_mainq)
1144 q = rd_kafka_queue_get_main(rk);
1145 else
1146 q = rd_kafka_queue_new(rk);
1147
1148 topic = rd_kafka_NewTopic_new(test_mk_topic_name(__FUNCTION__, 1),
1149 3, 1, NULL, 0);
1150 rd_kafka_CreateTopics(rk, &topic, 1, NULL, q);
1151 rd_kafka_NewTopic_destroy(topic);
1152
1153 rd_kafka_queue_destroy(q);
1154
1155 TEST_SAY("Giving rd_kafka_destroy() 5s to finish, "
1156 "despite Admin API request being processed\n");
1157 test_timeout_set(5);
1158 TIMING_START(&t_destroy, "rd_kafka_destroy()");
1159 rd_kafka_destroy(rk);
1160 TIMING_STOP(&t_destroy);
1161
1162 SUB_TEST_PASS();
1163
1164 /* Restore timeout */
1165 test_timeout_set(60);
1166 }
1167
1168
1169
1170
1171 /**
1172 * @brief Test deletion of records
1173 *
1174 *
1175 */
do_test_DeleteRecords(const char * what,rd_kafka_t * rk,rd_kafka_queue_t * useq,int op_timeout)1176 static void do_test_DeleteRecords (const char *what,
1177 rd_kafka_t *rk, rd_kafka_queue_t *useq,
1178 int op_timeout) {
1179 rd_kafka_queue_t *q;
1180 rd_kafka_AdminOptions_t *options = NULL;
1181 rd_kafka_topic_partition_list_t *offsets = NULL;
1182 rd_kafka_event_t *rkev = NULL;
1183 rd_kafka_resp_err_t err;
1184 char errstr[512];
1185 const char *errstr2;
1186 #define MY_DEL_RECORDS_CNT 3
1187 rd_kafka_topic_partition_list_t *results = NULL;
1188 int i;
1189 const int partitions_cnt = 3;
1190 const int msgs_cnt = 100;
1191 char *topics[MY_DEL_RECORDS_CNT];
1192 rd_kafka_metadata_topic_t exp_mdtopics[MY_DEL_RECORDS_CNT] = {{0}};
1193 int exp_mdtopic_cnt = 0;
1194 test_timing_t timing;
1195 rd_kafka_resp_err_t exp_err = RD_KAFKA_RESP_ERR_NO_ERROR;
1196 rd_kafka_DeleteRecords_t *del_records;
1197 const rd_kafka_DeleteRecords_result_t *res;
1198
1199 SUB_TEST_QUICK("%s DeleteRecords with %s, op_timeout %d",
1200 rd_kafka_name(rk), what, op_timeout);
1201
1202 q = useq ? useq : rd_kafka_queue_new(rk);
1203
1204 if (op_timeout != -1) {
1205 options = rd_kafka_AdminOptions_new(
1206 rk, RD_KAFKA_ADMIN_OP_ANY);
1207
1208 err = rd_kafka_AdminOptions_set_operation_timeout(
1209 options, op_timeout, errstr, sizeof(errstr));
1210 TEST_ASSERT(!err, "%s", rd_kafka_err2str(err));
1211 }
1212
1213
1214 for (i = 0 ; i < MY_DEL_RECORDS_CNT ; i++) {
1215 char pfx[32];
1216 char *topic;
1217
1218 rd_snprintf(pfx, sizeof(pfx), "DeleteRecords-topic%d", i);
1219 topic = rd_strdup(test_mk_topic_name(pfx, 1));
1220
1221 topics[i] = topic;
1222 exp_mdtopics[exp_mdtopic_cnt++].topic = topic;
1223 }
1224
1225 /* Create the topics first. */
1226 test_CreateTopics_simple(rk, NULL, topics,
1227 MY_DEL_RECORDS_CNT,
1228 partitions_cnt /*num_partitions*/,
1229 NULL);
1230
1231 /* Verify that topics are reported by metadata */
1232 test_wait_metadata_update(rk,
1233 exp_mdtopics, exp_mdtopic_cnt,
1234 NULL, 0,
1235 15*1000);
1236
1237 /* Produce 100 msgs / partition */
1238 for (i = 0 ; i < MY_DEL_RECORDS_CNT; i++ ) {
1239 int32_t partition;
1240 for (partition = 0 ; partition < partitions_cnt; partition++ ) {
1241 test_produce_msgs_easy(topics[i], 0, partition,
1242 msgs_cnt);
1243 }
1244 }
1245
1246 offsets = rd_kafka_topic_partition_list_new(10);
1247
1248 /* Wipe all data from topic 0 */
1249 for (i = 0 ; i < partitions_cnt; i++)
1250 rd_kafka_topic_partition_list_add(offsets, topics[0], i)->
1251 offset = RD_KAFKA_OFFSET_END;
1252
1253 /* Wipe all data from partition 0 in topic 1 */
1254 rd_kafka_topic_partition_list_add(offsets, topics[1], 0)->
1255 offset = RD_KAFKA_OFFSET_END;
1256
1257 /* Wipe some data from partition 2 in topic 1 */
1258 rd_kafka_topic_partition_list_add(offsets, topics[1], 2)->
1259 offset = msgs_cnt / 2;
1260
1261 /* Not changing the offset (out of range) for topic 2 partition 0 */
1262 rd_kafka_topic_partition_list_add(offsets, topics[2], 0);
1263
1264 /* Offset out of range for topic 2 partition 1 */
1265 rd_kafka_topic_partition_list_add(offsets, topics[2], 1)->
1266 offset = msgs_cnt + 1;
1267
1268 del_records = rd_kafka_DeleteRecords_new(offsets);
1269
1270 TIMING_START(&timing, "DeleteRecords");
1271 TEST_SAY("Call DeleteRecords\n");
1272 rd_kafka_DeleteRecords(rk, &del_records, 1, options, q);
1273 TIMING_ASSERT_LATER(&timing, 0, 50);
1274
1275 rd_kafka_DeleteRecords_destroy(del_records);
1276
1277 TIMING_START(&timing, "DeleteRecords.queue_poll");
1278
1279 /* Poll result queue for DeleteRecords result.
1280 * Print but otherwise ignore other event types
1281 * (typically generic Error events). */
1282 while (1) {
1283 rkev = rd_kafka_queue_poll(q, tmout_multip(20*1000));
1284 TEST_SAY("DeleteRecords: got %s in %.3fms\n",
1285 rd_kafka_event_name(rkev),
1286 TIMING_DURATION(&timing) / 1000.0f);
1287 if (rkev == NULL)
1288 continue;
1289 if (rd_kafka_event_error(rkev))
1290 TEST_SAY("%s: %s\n",
1291 rd_kafka_event_name(rkev),
1292 rd_kafka_event_error_string(rkev));
1293
1294 if (rd_kafka_event_type(rkev) ==
1295 RD_KAFKA_EVENT_DELETERECORDS_RESULT) {
1296 break;
1297 }
1298
1299 rd_kafka_event_destroy(rkev);
1300 }
1301 /* Convert event to proper result */
1302 res = rd_kafka_event_DeleteRecords_result(rkev);
1303 TEST_ASSERT(res, "expected DeleteRecords_result, not %s",
1304 rd_kafka_event_name(rkev));
1305
1306 /* Expecting error */
1307 err = rd_kafka_event_error(rkev);
1308 errstr2 = rd_kafka_event_error_string(rkev);
1309 TEST_ASSERT(err == exp_err,
1310 "expected DeleteRecords to return %s, not %s (%s)",
1311 rd_kafka_err2str(exp_err),
1312 rd_kafka_err2str(err),
1313 err ? errstr2 : "n/a");
1314
1315 TEST_SAY("DeleteRecords: returned %s (%s)\n",
1316 rd_kafka_err2str(err), err ? errstr2 : "n/a");
1317
1318 results = rd_kafka_topic_partition_list_copy(
1319 rd_kafka_DeleteRecords_result_offsets(res));
1320
1321 /* Sort both input and output list */
1322 rd_kafka_topic_partition_list_sort(offsets, NULL, NULL);
1323 rd_kafka_topic_partition_list_sort(results, NULL, NULL);
1324
1325 TEST_SAY("Input partitions:\n");
1326 test_print_partition_list(offsets);
1327 TEST_SAY("Result partitions:\n");
1328 test_print_partition_list(results);
1329
1330 TEST_ASSERT(offsets->cnt == results->cnt,
1331 "expected DeleteRecords_result_offsets to return %d items, "
1332 "not %d",
1333 offsets->cnt,
1334 results->cnt);
1335
1336 for (i = 0 ; i < results->cnt ; i++) {
1337 const rd_kafka_topic_partition_t *input =&offsets->elems[i];
1338 const rd_kafka_topic_partition_t *output = &results->elems[i];
1339 int64_t expected_offset = input->offset;
1340 rd_kafka_resp_err_t expected_err = 0;
1341
1342 if (expected_offset == RD_KAFKA_OFFSET_END)
1343 expected_offset = msgs_cnt;
1344
1345 /* Expect Offset out of range error */
1346 if (input->offset < RD_KAFKA_OFFSET_END ||
1347 input->offset > msgs_cnt)
1348 expected_err = 1;
1349
1350 TEST_SAY("DeleteRecords Returned %s for %s [%"PRId32"] "
1351 "low-watermark = %d\n",
1352 rd_kafka_err2name(output->err),
1353 output->topic,
1354 output->partition,
1355 (int)output->offset);
1356
1357 if (strcmp(output->topic, input->topic))
1358 TEST_FAIL_LATER("Result order mismatch at #%d: "
1359 "expected topic %s, got %s",
1360 i,
1361 input->topic,
1362 output->topic);
1363
1364 if (output->partition != input->partition)
1365 TEST_FAIL_LATER("Result order mismatch at #%d: "
1366 "expected partition %d, got %d",
1367 i,
1368 input->partition,
1369 output->partition);
1370
1371 if (output->err != expected_err)
1372 TEST_FAIL_LATER("%s [%"PRId32"]: "
1373 "expected error code %d (%s), "
1374 "got %d (%s)",
1375 output->topic,
1376 output->partition,
1377 expected_err,
1378 rd_kafka_err2str(expected_err),
1379 output->err,
1380 rd_kafka_err2str(output->err));
1381
1382 if (output->err == 0 && output->offset != expected_offset)
1383 TEST_FAIL_LATER("%s [%"PRId32"]: "
1384 "expected offset %"PRId64", "
1385 "got %"PRId64,
1386 output->topic,
1387 output->partition,
1388 expected_offset,
1389 output->offset);
1390 }
1391
1392 /* Check watermarks for partitions */
1393 for (i = 0 ; i < MY_DEL_RECORDS_CNT; i++ ) {
1394 int32_t partition;
1395 for (partition = 0 ; partition < partitions_cnt; partition++ ) {
1396 const rd_kafka_topic_partition_t *del =
1397 rd_kafka_topic_partition_list_find(
1398 results, topics[i], partition);
1399 int64_t expected_low = 0;
1400 int64_t expected_high = msgs_cnt;
1401 int64_t low, high;
1402
1403 if (del && del->err == 0) {
1404 expected_low = del->offset;
1405 }
1406
1407 err = rd_kafka_query_watermark_offsets(
1408 rk, topics[i], partition,
1409 &low, &high, tmout_multip(10000));
1410 if (err)
1411 TEST_FAIL("query_watermark_offsets failed: "
1412 "%s\n",
1413 rd_kafka_err2str(err));
1414
1415 if (low != expected_low)
1416 TEST_FAIL_LATER("For %s [%"PRId32"] expected "
1417 "a low watermark of %"PRId64
1418 ", got %"PRId64,
1419 topics[i],
1420 partition,
1421 expected_low,
1422 low);
1423
1424 if (high != expected_high)
1425 TEST_FAIL_LATER("For %s [%"PRId32"] expected "
1426 "a high watermark of %"PRId64
1427 ", got %"PRId64,
1428 topics[i],
1429 partition,
1430 expected_high,
1431 high);
1432 }
1433 }
1434
1435 rd_kafka_event_destroy(rkev);
1436
1437 for (i = 0 ; i < MY_DEL_RECORDS_CNT ; i++)
1438 rd_free(topics[i]);
1439
1440 if (results)
1441 rd_kafka_topic_partition_list_destroy(results);
1442
1443 if (offsets)
1444 rd_kafka_topic_partition_list_destroy(offsets);
1445
1446 if (options)
1447 rd_kafka_AdminOptions_destroy(options);
1448
1449 if (!useq)
1450 rd_kafka_queue_destroy(q);
1451
1452 TEST_LATER_CHECK();
1453 #undef MY_DEL_RECORDS_CNT
1454
1455 SUB_TEST_PASS();
1456 }
1457
1458 /**
1459 * @brief Test deletion of groups
1460 *
1461 *
1462 */
1463
1464 typedef struct expected_group_result {
1465 char *group;
1466 rd_kafka_resp_err_t err;
1467 } expected_group_result_t;
1468
do_test_DeleteGroups(const char * what,rd_kafka_t * rk,rd_kafka_queue_t * useq,int op_timeout)1469 static void do_test_DeleteGroups (const char *what,
1470 rd_kafka_t *rk, rd_kafka_queue_t *useq,
1471 int op_timeout) {
1472 rd_kafka_queue_t *q;
1473 rd_kafka_AdminOptions_t *options = NULL;
1474 rd_kafka_event_t *rkev = NULL;
1475 rd_kafka_resp_err_t err;
1476 char errstr[512];
1477 const char *errstr2;
1478 #define MY_DEL_GROUPS_CNT 4
1479 int known_groups = MY_DEL_GROUPS_CNT - 1;
1480 int i;
1481 const int partitions_cnt = 1;
1482 const int msgs_cnt = 100;
1483 char *topic;
1484 rd_kafka_metadata_topic_t exp_mdtopic = {0};
1485 int64_t testid = test_id_generate();
1486 test_timing_t timing;
1487 rd_kafka_resp_err_t exp_err = RD_KAFKA_RESP_ERR_NO_ERROR;
1488 const rd_kafka_group_result_t **results = NULL;
1489 expected_group_result_t expected[MY_DEL_GROUPS_CNT] = {{0}};
1490 rd_kafka_DeleteGroup_t *del_groups[MY_DEL_GROUPS_CNT];
1491 const rd_kafka_DeleteGroups_result_t *res;
1492
1493 SUB_TEST_QUICK("%s DeleteGroups with %s, op_timeout %d",
1494 rd_kafka_name(rk), what, op_timeout);
1495
1496 q = useq ? useq : rd_kafka_queue_new(rk);
1497
1498 if (op_timeout != -1) {
1499 options = rd_kafka_AdminOptions_new(
1500 rk, RD_KAFKA_ADMIN_OP_ANY);
1501
1502 err = rd_kafka_AdminOptions_set_operation_timeout(
1503 options, op_timeout, errstr, sizeof(errstr));
1504 TEST_ASSERT(!err, "%s", rd_kafka_err2str(err));
1505 }
1506
1507
1508 topic = rd_strdup(test_mk_topic_name(__FUNCTION__, 1));
1509 exp_mdtopic.topic = topic;
1510
1511 /* Create the topics first. */
1512 test_CreateTopics_simple(rk, NULL, &topic, 1,
1513 partitions_cnt,
1514 NULL);
1515
1516 /* Verify that topics are reported by metadata */
1517 test_wait_metadata_update(rk,
1518 &exp_mdtopic, 1,
1519 NULL, 0,
1520 15*1000);
1521
1522 /* Produce 100 msgs */
1523 test_produce_msgs_easy(topic, testid, 0, msgs_cnt);
1524
1525 for (i = 0; i < MY_DEL_GROUPS_CNT; i++) {
1526 char *group = rd_strdup(test_mk_topic_name(__FUNCTION__, 1));
1527 if (i < known_groups) {
1528 test_consume_msgs_easy(group, topic, testid, -1, msgs_cnt, NULL);
1529 expected[i].group = group;
1530 expected[i].err = RD_KAFKA_RESP_ERR_NO_ERROR;
1531 } else {
1532 expected[i].group = group;
1533 expected[i].err = RD_KAFKA_RESP_ERR_GROUP_ID_NOT_FOUND;
1534 }
1535 del_groups[i] = rd_kafka_DeleteGroup_new(group);
1536 }
1537
1538 TIMING_START(&timing, "DeleteGroups");
1539 TEST_SAY("Call DeleteGroups\n");
1540 rd_kafka_DeleteGroups(rk, del_groups, MY_DEL_GROUPS_CNT, options, q);
1541 TIMING_ASSERT_LATER(&timing, 0, 50);
1542
1543 TIMING_START(&timing, "DeleteGroups.queue_poll");
1544
1545 /* Poll result queue for DeleteGroups result.
1546 * Print but otherwise ignore other event types
1547 * (typically generic Error events). */
1548 while(1) {
1549 rkev = rd_kafka_queue_poll(q, tmout_multip(20*1000));
1550 TEST_SAY("DeleteGroups: got %s in %.3fms\n",
1551 rd_kafka_event_name(rkev),
1552 TIMING_DURATION(&timing) / 1000.0f);
1553 if (rkev == NULL)
1554 continue;
1555 if (rd_kafka_event_error(rkev))
1556 TEST_SAY("%s: %s\n",
1557 rd_kafka_event_name(rkev),
1558 rd_kafka_event_error_string(rkev));
1559
1560 if (rd_kafka_event_type(rkev) ==
1561 RD_KAFKA_EVENT_DELETEGROUPS_RESULT) {
1562 break;
1563 }
1564
1565 rd_kafka_event_destroy(rkev);
1566 }
1567 /* Convert event to proper result */
1568 res = rd_kafka_event_DeleteGroups_result(rkev);
1569 TEST_ASSERT(res, "expected DeleteGroups_result, not %s",
1570 rd_kafka_event_name(rkev));
1571
1572 /* Expecting error */
1573 err = rd_kafka_event_error(rkev);
1574 errstr2 = rd_kafka_event_error_string(rkev);
1575 TEST_ASSERT(err == exp_err,
1576 "expected DeleteGroups to return %s, not %s (%s)",
1577 rd_kafka_err2str(exp_err),
1578 rd_kafka_err2str(err),
1579 err ? errstr2 : "n/a");
1580
1581 TEST_SAY("DeleteGroups: returned %s (%s)\n",
1582 rd_kafka_err2str(err), err ? errstr2 : "n/a");
1583
1584 size_t cnt = 0;
1585 results = rd_kafka_DeleteGroups_result_groups(res, &cnt);
1586
1587 TEST_ASSERT(MY_DEL_GROUPS_CNT == cnt,
1588 "expected DeleteGroups_result_groups to return %d items, not %"PRIusz,
1589 MY_DEL_GROUPS_CNT,
1590 cnt);
1591
1592 for (i = 0 ; i < MY_DEL_GROUPS_CNT ; i++) {
1593 const expected_group_result_t *exp = &expected[i];
1594 rd_kafka_resp_err_t exp_err = exp->err;
1595 const rd_kafka_group_result_t *act = results[i];
1596 rd_kafka_resp_err_t act_err = rd_kafka_error_code(rd_kafka_group_result_error(act));
1597 TEST_ASSERT(strcmp(exp->group, rd_kafka_group_result_name(act)) == 0,
1598 "Result order mismatch at #%d: expected group name to be %s, not %s",
1599 i, exp->group, rd_kafka_group_result_name(act));
1600 TEST_ASSERT(exp_err == act_err,
1601 "expected err=%d for group %s, not %d (%s)",
1602 exp_err,
1603 exp->group,
1604 act_err,
1605 rd_kafka_err2str(act_err));
1606 }
1607
1608 rd_kafka_event_destroy(rkev);
1609
1610 for (i = 0 ; i < MY_DEL_GROUPS_CNT ; i++) {
1611 rd_kafka_DeleteGroup_destroy(del_groups[i]);
1612 rd_free(expected[i].group);
1613 }
1614
1615 rd_free(topic);
1616
1617 if (options)
1618 rd_kafka_AdminOptions_destroy(options);
1619
1620 if (!useq)
1621 rd_kafka_queue_destroy(q);
1622
1623 TEST_LATER_CHECK();
1624 #undef MY_DEL_GROUPS_CNT
1625
1626 SUB_TEST_PASS();
1627 }
1628
1629
1630 /**
1631 * @brief Test deletion of committed offsets.
1632 *
1633 *
1634 */
do_test_DeleteConsumerGroupOffsets(const char * what,rd_kafka_t * rk,rd_kafka_queue_t * useq,int op_timeout,rd_bool_t sub_consumer)1635 static void do_test_DeleteConsumerGroupOffsets (const char *what,
1636 rd_kafka_t *rk,
1637 rd_kafka_queue_t *useq,
1638 int op_timeout,
1639 rd_bool_t sub_consumer) {
1640 rd_kafka_queue_t *q;
1641 rd_kafka_AdminOptions_t *options = NULL;
1642 rd_kafka_topic_partition_list_t *orig_offsets, *offsets,
1643 *to_delete, *committed, *deleted, *subscription = NULL;
1644 rd_kafka_event_t *rkev = NULL;
1645 rd_kafka_resp_err_t err;
1646 char errstr[512];
1647 const char *errstr2;
1648 #define MY_TOPIC_CNT 3
1649 int i;
1650 const int partitions_cnt = 3;
1651 char *topics[MY_TOPIC_CNT];
1652 rd_kafka_metadata_topic_t exp_mdtopics[MY_TOPIC_CNT] = {{0}};
1653 int exp_mdtopic_cnt = 0;
1654 test_timing_t timing;
1655 rd_kafka_resp_err_t exp_err = RD_KAFKA_RESP_ERR_NO_ERROR;
1656 rd_kafka_DeleteConsumerGroupOffsets_t *cgoffsets;
1657 const rd_kafka_DeleteConsumerGroupOffsets_result_t *res;
1658 const rd_kafka_group_result_t **gres;
1659 size_t gres_cnt;
1660 rd_kafka_t *consumer;
1661 char *groupid;
1662
1663 SUB_TEST_QUICK("%s DeleteConsumerGroupOffsets with %s, op_timeout %d%s",
1664 rd_kafka_name(rk), what, op_timeout,
1665 sub_consumer ? ", with subscribing consumer" : "");
1666
1667 if (sub_consumer)
1668 exp_err = RD_KAFKA_RESP_ERR_GROUP_SUBSCRIBED_TO_TOPIC;
1669
1670 q = useq ? useq : rd_kafka_queue_new(rk);
1671
1672 if (op_timeout != -1) {
1673 options = rd_kafka_AdminOptions_new(
1674 rk, RD_KAFKA_ADMIN_OP_ANY);
1675
1676 err = rd_kafka_AdminOptions_set_operation_timeout(
1677 options, op_timeout, errstr, sizeof(errstr));
1678 TEST_ASSERT(!err, "%s", rd_kafka_err2str(err));
1679 }
1680
1681
1682 subscription = rd_kafka_topic_partition_list_new(MY_TOPIC_CNT);
1683
1684 for (i = 0 ; i < MY_TOPIC_CNT ; i++) {
1685 char pfx[64];
1686 char *topic;
1687
1688 rd_snprintf(pfx, sizeof(pfx),
1689 "DCGO-topic%d", i);
1690 topic = rd_strdup(test_mk_topic_name(pfx, 1));
1691
1692 topics[i] = topic;
1693 exp_mdtopics[exp_mdtopic_cnt++].topic = topic;
1694
1695 rd_kafka_topic_partition_list_add(subscription, topic,
1696 RD_KAFKA_PARTITION_UA);
1697 }
1698
1699 groupid = topics[0];
1700
1701 /* Create the topics first. */
1702 test_CreateTopics_simple(rk, NULL, topics, MY_TOPIC_CNT,
1703 partitions_cnt, NULL);
1704
1705 /* Verify that topics are reported by metadata */
1706 test_wait_metadata_update(rk,
1707 exp_mdtopics, exp_mdtopic_cnt,
1708 NULL, 0,
1709 15*1000);
1710
1711 rd_sleep(1); /* Additional wait time for cluster propagation */
1712
1713 consumer = test_create_consumer(groupid, NULL, NULL, NULL);
1714
1715 if (sub_consumer) {
1716 TEST_CALL_ERR__(rd_kafka_subscribe(consumer, subscription));
1717 test_consumer_wait_assignment(consumer, rd_true);
1718 }
1719
1720 /* Commit some offsets */
1721 orig_offsets = rd_kafka_topic_partition_list_new(MY_TOPIC_CNT * 2);
1722 for (i = 0 ; i < MY_TOPIC_CNT * 2 ; i++)
1723 rd_kafka_topic_partition_list_add(
1724 orig_offsets, topics[i/2],
1725 i % MY_TOPIC_CNT)->offset = (i+1)*10;
1726
1727 TEST_CALL_ERR__(rd_kafka_commit(consumer, orig_offsets, 0/*sync*/));
1728
1729 /* Verify committed offsets match */
1730 committed = rd_kafka_topic_partition_list_copy(orig_offsets);
1731 TEST_CALL_ERR__(rd_kafka_committed(consumer, committed,
1732 tmout_multip(5*1000)));
1733
1734 if (test_partition_list_cmp(committed, orig_offsets)) {
1735 TEST_SAY("commit() list:\n");
1736 test_print_partition_list(orig_offsets);
1737 TEST_SAY("committed() list:\n");
1738 test_print_partition_list(committed);
1739 TEST_FAIL("committed offsets don't match");
1740 }
1741
1742 rd_kafka_topic_partition_list_destroy(committed);
1743
1744 /* Now delete second half of the commits */
1745 offsets = rd_kafka_topic_partition_list_new(orig_offsets->cnt / 2);
1746 to_delete = rd_kafka_topic_partition_list_new(orig_offsets->cnt / 2);
1747 for (i = 0 ; i < orig_offsets->cnt ; i++) {
1748 if (i < orig_offsets->cnt / 2)
1749 rd_kafka_topic_partition_list_add(
1750 offsets,
1751 orig_offsets->elems[i].topic,
1752 orig_offsets->elems[i].partition);
1753 else {
1754 rd_kafka_topic_partition_list_add(
1755 to_delete,
1756 orig_offsets->elems[i].topic,
1757 orig_offsets->elems[i].partition);
1758 rd_kafka_topic_partition_list_add(
1759 offsets,
1760 orig_offsets->elems[i].topic,
1761 orig_offsets->elems[i].partition)->offset =
1762 RD_KAFKA_OFFSET_INVALID;
1763 }
1764
1765 }
1766
1767 cgoffsets = rd_kafka_DeleteConsumerGroupOffsets_new(groupid, to_delete);
1768
1769 TIMING_START(&timing, "DeleteConsumerGroupOffsets");
1770 TEST_SAY("Call DeleteConsumerGroupOffsets\n");
1771 rd_kafka_DeleteConsumerGroupOffsets(rk, &cgoffsets, 1, options, q);
1772 TIMING_ASSERT_LATER(&timing, 0, 50);
1773
1774 rd_kafka_DeleteConsumerGroupOffsets_destroy(cgoffsets);
1775
1776 TIMING_START(&timing, "DeleteConsumerGroupOffsets.queue_poll");
1777 /* Poll result queue for DeleteConsumerGroupOffsets result.
1778 * Print but otherwise ignore other event types
1779 * (typically generic Error events). */
1780 while (1) {
1781 rkev = rd_kafka_queue_poll(q, tmout_multip(10*1000));
1782 TEST_SAY("DeleteConsumerGroupOffsets: got %s in %.3fms\n",
1783 rd_kafka_event_name(rkev),
1784 TIMING_DURATION(&timing) / 1000.0f);
1785 if (rkev == NULL)
1786 continue;
1787 if (rd_kafka_event_error(rkev))
1788 TEST_SAY("%s: %s\n",
1789 rd_kafka_event_name(rkev),
1790 rd_kafka_event_error_string(rkev));
1791
1792 if (rd_kafka_event_type(rkev) ==
1793 RD_KAFKA_EVENT_DELETECONSUMERGROUPOFFSETS_RESULT)
1794 break;
1795
1796 rd_kafka_event_destroy(rkev);
1797 }
1798
1799 /* Convert event to proper result */
1800 res = rd_kafka_event_DeleteConsumerGroupOffsets_result(rkev);
1801 TEST_ASSERT(res, "expected DeleteConsumerGroupOffsets_result, not %s",
1802 rd_kafka_event_name(rkev));
1803
1804 /* Expecting error */
1805 err = rd_kafka_event_error(rkev);
1806 errstr2 = rd_kafka_event_error_string(rkev);
1807 TEST_ASSERT(!err,
1808 "expected DeleteConsumerGroupOffsets to succeed, "
1809 "got %s (%s)",
1810 rd_kafka_err2name(err),
1811 err ? errstr2 : "n/a");
1812
1813 TEST_SAY("DeleteConsumerGroupOffsets: returned %s (%s)\n",
1814 rd_kafka_err2str(err), err ? errstr2 : "n/a");
1815
1816 gres = rd_kafka_DeleteConsumerGroupOffsets_result_groups(res,
1817 &gres_cnt);
1818 TEST_ASSERT(gres && gres_cnt == 1,
1819 "expected gres_cnt == 1, not %"PRIusz, gres_cnt);
1820
1821 deleted = rd_kafka_topic_partition_list_copy(
1822 rd_kafka_group_result_partitions(gres[0]));
1823
1824 if (test_partition_list_cmp(deleted, to_delete)) {
1825 TEST_SAY("Result list:\n");
1826 test_print_partition_list(deleted);
1827 TEST_SAY("Partitions passed to DeleteConsumerGroupOffsets:\n");
1828 test_print_partition_list(to_delete);
1829 TEST_FAIL("deleted/requested offsets don't match");
1830 }
1831
1832 /* Verify expected errors */
1833 for (i = 0 ; i < deleted->cnt ; i++) {
1834 TEST_ASSERT_LATER(deleted->elems[i].err == exp_err,
1835 "Result %s [%"PRId32"] has error %s, "
1836 "expected %s",
1837 deleted->elems[i].topic,
1838 deleted->elems[i].partition,
1839 rd_kafka_err2name(deleted->elems[i].err),
1840 rd_kafka_err2name(exp_err));
1841 }
1842
1843 TEST_LATER_CHECK();
1844
1845 rd_kafka_topic_partition_list_destroy(deleted);
1846 rd_kafka_topic_partition_list_destroy(to_delete);
1847
1848 rd_kafka_event_destroy(rkev);
1849
1850
1851 /* Verify committed offsets match */
1852 committed = rd_kafka_topic_partition_list_copy(orig_offsets);
1853 TEST_CALL_ERR__(rd_kafka_committed(consumer, committed,
1854 tmout_multip(5*1000)));
1855
1856 TEST_SAY("Original committed offsets:\n");
1857 test_print_partition_list(orig_offsets);
1858
1859 TEST_SAY("Committed offsets after delete:\n");
1860 test_print_partition_list(committed);
1861
1862 if (test_partition_list_cmp(committed, offsets)) {
1863 TEST_SAY("expected list:\n");
1864 test_print_partition_list(offsets);
1865 TEST_SAY("committed() list:\n");
1866 test_print_partition_list(committed);
1867 TEST_FAIL("committed offsets don't match");
1868 }
1869
1870 rd_kafka_topic_partition_list_destroy(committed);
1871 rd_kafka_topic_partition_list_destroy(offsets);
1872 rd_kafka_topic_partition_list_destroy(orig_offsets);
1873 rd_kafka_topic_partition_list_destroy(subscription);
1874
1875 for (i = 0 ; i < MY_TOPIC_CNT ; i++)
1876 rd_free(topics[i]);
1877
1878 rd_kafka_destroy(consumer);
1879
1880 if (options)
1881 rd_kafka_AdminOptions_destroy(options);
1882
1883 if (!useq)
1884 rd_kafka_queue_destroy(q);
1885
1886 TEST_LATER_CHECK();
1887 #undef MY_DEL_RECORDS_CNT
1888
1889 SUB_TEST_PASS();
1890 }
1891
1892
do_test_apis(rd_kafka_type_t cltype)1893 static void do_test_apis (rd_kafka_type_t cltype) {
1894 rd_kafka_t *rk;
1895 rd_kafka_conf_t *conf;
1896 rd_kafka_queue_t *mainq;
1897
1898 /* Get the available brokers, but use a separate rd_kafka_t instance
1899 * so we don't jinx the tests by having up-to-date metadata. */
1900 avail_brokers = test_get_broker_ids(NULL, &avail_broker_cnt);
1901 TEST_SAY("%"PRIusz" brokers in cluster "
1902 "which will be used for replica sets\n",
1903 avail_broker_cnt);
1904
1905 do_test_unclean_destroy(cltype, 0/*tempq*/);
1906 do_test_unclean_destroy(cltype, 1/*mainq*/);
1907
1908 test_conf_init(&conf, NULL, 180);
1909 test_conf_set(conf, "socket.timeout.ms", "10000");
1910 rk = test_create_handle(cltype, conf);
1911
1912 mainq = rd_kafka_queue_get_main(rk);
1913
1914 /* Create topics */
1915 do_test_CreateTopics("temp queue, op timeout 0",
1916 rk, NULL, 0, 0);
1917 do_test_CreateTopics("temp queue, op timeout 15000",
1918 rk, NULL, 15000, 0);
1919 do_test_CreateTopics("temp queue, op timeout 300, "
1920 "validate only",
1921 rk, NULL, 300, rd_true);
1922 do_test_CreateTopics("temp queue, op timeout 9000, validate_only",
1923 rk, NULL, 9000, rd_true);
1924 do_test_CreateTopics("main queue, options", rk, mainq, -1, 0);
1925
1926 /* Delete topics */
1927 do_test_DeleteTopics("temp queue, op timeout 0", rk, NULL, 0);
1928 do_test_DeleteTopics("main queue, op timeout 15000", rk, mainq, 1500);
1929
1930 if (test_broker_version >= TEST_BRKVER(1,0,0,0)) {
1931 /* Create Partitions */
1932 do_test_CreatePartitions("temp queue, op timeout 6500",
1933 rk, NULL, 6500);
1934 do_test_CreatePartitions("main queue, op timeout 0",
1935 rk, mainq, 0);
1936 }
1937
1938 /* AlterConfigs */
1939 do_test_AlterConfigs(rk, mainq);
1940
1941 /* DescribeConfigs */
1942 do_test_DescribeConfigs(rk, mainq);
1943
1944 /* Delete records */
1945 do_test_DeleteRecords("temp queue, op timeout 0", rk, NULL, 0);
1946 do_test_DeleteRecords("main queue, op timeout 1500", rk, mainq, 1500);
1947
1948 /* Delete groups */
1949 do_test_DeleteGroups("temp queue, op timeout 0", rk, NULL, 0);
1950 do_test_DeleteGroups("main queue, op timeout 1500", rk, mainq, 1500);
1951 do_test_DeleteGroups("main queue, op timeout 1500", rk, mainq, 1500);
1952
1953 if (test_broker_version >= TEST_BRKVER(2,4,0,0)) {
1954 /* Delete committed offsets */
1955 do_test_DeleteConsumerGroupOffsets(
1956 "temp queue, op timeout 0", rk, NULL, 0, rd_false);
1957 do_test_DeleteConsumerGroupOffsets(
1958 "main queue, op timeout 1500", rk, mainq, 1500,
1959 rd_false);
1960 do_test_DeleteConsumerGroupOffsets(
1961 "main queue, op timeout 1500", rk, mainq, 1500,
1962 rd_true/*with subscribing consumer*/);
1963 }
1964
1965 rd_kafka_queue_destroy(mainq);
1966
1967 rd_kafka_destroy(rk);
1968
1969 free(avail_brokers);
1970 }
1971
1972
main_0081_admin(int argc,char ** argv)1973 int main_0081_admin (int argc, char **argv) {
1974
1975 do_test_apis(RD_KAFKA_PRODUCER);
1976
1977 if (test_quick) {
1978 TEST_SAY("Skipping further 0081 tests due to quick mode\n");
1979 return 0;
1980 }
1981
1982 do_test_apis(RD_KAFKA_CONSUMER);
1983
1984 return 0;
1985 }
1986