1 /*
2 * librdkafka - Apache Kafka C library
3 *
4 * Copyright (c) 2019, Magnus Edenhill
5 * All rights reserved.
6 *
7 * Redistribution and use in source and binary forms, with or without
8 * modification, are permitted provided that the following conditions are met:
9 *
10 * 1. Redistributions of source code must retain the above copyright notice,
11 * this list of conditions and the following disclaimer.
12 * 2. Redistributions in binary form must reproduce the above copyright notice,
13 * this list of conditions and the following disclaimer in the documentation
14 * and/or other materials provided with the distribution.
15 *
16 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
17 * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
18 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
19 * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
20 * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
21 * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
22 * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
23 * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
24 * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
25 * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
26 * POSSIBILITY OF SUCH DAMAGE.
27 */
28
29 #include "test.h"
30
31
32 /**
33 * @name KafkaConsumer static membership tests
34 *
35 * Runs two consumers subscribing to multiple topics simulating various
36 * rebalance scenarios with static group membership enabled.
37 */
38
39 #define _CONSUMER_CNT 2
40
41 typedef struct _consumer_s {
42 rd_kafka_t *rk;
43 test_msgver_t *mv;
44 int64_t assigned_at;
45 int64_t revoked_at;
46 int partition_cnt;
47 rd_kafka_resp_err_t expected_rb_event;
48 int curr_line;
49 } _consumer_t;
50
51
52 /**
53 * @brief Call poll until a rebalance has been triggered
54 */
static_member_wait_rebalance0(int line,_consumer_t * c,int64_t start,int64_t * target,int timeout_ms)55 static int static_member_wait_rebalance0 (int line,
56 _consumer_t *c, int64_t start,
57 int64_t *target, int timeout_ms) {
58 int64_t tmout = test_clock() + (timeout_ms * 1000);
59
60 c->curr_line = line;
61
62 TEST_SAY("line %d: %s awaiting %s event\n",
63 line, rd_kafka_name(c->rk),
64 rd_kafka_err2name(c->expected_rb_event));
65
66 while (timeout_ms < 0 ? 1 : test_clock() <= tmout) {
67 if (*target > start) {
68 c->curr_line = 0;
69 return 1;
70 }
71 test_consumer_poll_once(c->rk, c->mv, 1000);
72 }
73
74 c->curr_line = 0;
75
76 TEST_SAY("line %d: %s timed out awaiting %s event\n",
77 line, rd_kafka_name(c->rk),
78 rd_kafka_err2name(c->expected_rb_event));
79
80 return 0;
81 }
82
83 #define static_member_expect_rebalance(C,START,TARGET,TIMEOUT_MS) do { \
84 if (!static_member_wait_rebalance0(__LINE__,C, \
85 START,TARGET,TIMEOUT_MS)) \
86 TEST_FAIL("%s: timed out waiting for %s event", \
87 rd_kafka_name((C)->rk), \
88 rd_kafka_err2name((C)->expected_rb_event)); \
89 } while (0)
90
91 #define static_member_wait_rebalance(C,START,TARGET,TIMEOUT_MS) \
92 static_member_wait_rebalance0(__LINE__,C, START,TARGET,TIMEOUT_MS)
93
94
rebalance_cb(rd_kafka_t * rk,rd_kafka_resp_err_t err,rd_kafka_topic_partition_list_t * parts,void * opaque)95 static void rebalance_cb (rd_kafka_t *rk,
96 rd_kafka_resp_err_t err,
97 rd_kafka_topic_partition_list_t *parts,
98 void *opaque) {
99 _consumer_t *c = opaque;
100
101 TEST_ASSERT(c->expected_rb_event == err,
102 "line %d: %s: Expected rebalance event %s got %s\n",
103 c->curr_line, rd_kafka_name(rk),
104 rd_kafka_err2name(c->expected_rb_event),
105 rd_kafka_err2name(err));
106
107 switch (err)
108 {
109 case RD_KAFKA_RESP_ERR__ASSIGN_PARTITIONS:
110 TEST_SAY("line %d: %s Assignment (%d partition(s)):\n",
111 c->curr_line, rd_kafka_name(rk), parts->cnt);
112 test_print_partition_list(parts);
113
114 c->partition_cnt = parts->cnt;
115 c->assigned_at = test_clock();
116 rd_kafka_assign(rk, parts);
117
118 break;
119
120 case RD_KAFKA_RESP_ERR__REVOKE_PARTITIONS:
121 c->revoked_at = test_clock();
122 rd_kafka_assign(rk, NULL);
123 TEST_SAY("line %d: %s revoked %d partitions\n",
124 c->curr_line, rd_kafka_name(c->rk), parts->cnt);
125
126 break;
127
128 default:
129 TEST_FAIL("rebalance failed: %s", rd_kafka_err2str(err));
130 break;
131 }
132
133 /* Reset error */
134 c->expected_rb_event = RD_KAFKA_RESP_ERR_NO_ERROR;
135
136 /* prevent poll from triggering more than one rebalance event */
137 rd_kafka_yield(rk);
138 }
139
140
do_test_static_group_rebalance(void)141 static void do_test_static_group_rebalance (void) {
142 rd_kafka_conf_t *conf;
143 test_msgver_t mv;
144 int64_t rebalance_start;
145 _consumer_t c[_CONSUMER_CNT] = RD_ZERO_INIT;
146 const int msgcnt = 100;
147 uint64_t testid = test_id_generate();
148 const char *topic = test_mk_topic_name("0102_static_group_rebalance",
149 1);
150 char *topics = rd_strdup(tsprintf("^%s.*", topic));
151 test_timing_t t_close;
152
153 test_conf_init(&conf, NULL, 70);
154 test_msgver_init(&mv, testid);
155 c[0].mv = &mv;
156 c[1].mv = &mv;
157
158 test_create_topic(NULL, topic, 3, 1);
159 test_produce_msgs_easy(topic, testid, RD_KAFKA_PARTITION_UA, msgcnt);
160
161 test_conf_set(conf, "max.poll.interval.ms", "9000");
162 test_conf_set(conf, "session.timeout.ms", "6000");
163 test_conf_set(conf, "auto.offset.reset", "earliest");
164 test_conf_set(conf, "topic.metadata.refresh.interval.ms", "500");
165 test_conf_set(conf, "enable.partition.eof", "true");
166 test_conf_set(conf, "group.instance.id", "consumer1");
167
168 rd_kafka_conf_set_opaque(conf, &c[0]);
169 c[0].rk = test_create_consumer(topic, rebalance_cb,
170 rd_kafka_conf_dup(conf), NULL);
171
172 rd_kafka_conf_set_opaque(conf, &c[1]);
173 test_conf_set(conf, "group.instance.id", "consumer2");
174 c[1].rk = test_create_consumer(topic, rebalance_cb,
175 rd_kafka_conf_dup(conf), NULL);
176 rd_kafka_conf_destroy(conf);
177
178 test_consumer_subscribe(c[0].rk, topics);
179 test_consumer_subscribe(c[1].rk, topics);
180
181 /*
182 * Static members enforce `max.poll.interval.ms` which may prompt
183 * an unwanted rebalance while the other consumer awaits its assignment.
184 * These members remain in the member list however so we must
185 * interleave calls to poll while awaiting our assignment to avoid
186 * unexpected rebalances being triggered.
187 */
188 rebalance_start = test_clock();
189 c[0].expected_rb_event = RD_KAFKA_RESP_ERR__ASSIGN_PARTITIONS;
190 c[1].expected_rb_event = RD_KAFKA_RESP_ERR__ASSIGN_PARTITIONS;
191 while (!static_member_wait_rebalance(&c[0], rebalance_start,
192 &c[0].assigned_at, 1000)) {
193 /* keep consumer 2 alive while consumer 1 awaits
194 * its assignment
195 */
196 c[1].curr_line = __LINE__;
197 test_consumer_poll_once(c[1].rk, &mv, 0);
198 }
199
200 static_member_expect_rebalance(&c[1], rebalance_start,
201 &c[1].assigned_at, -1);
202
203 /*
204 * Consume all the messages so we can watch for duplicates
205 * after rejoin/rebalance operations.
206 */
207 c[0].curr_line = __LINE__;
208 test_consumer_poll("serve.queue",
209 c[0].rk, testid, c[0].partition_cnt, 0, -1, &mv);
210 c[1].curr_line = __LINE__;
211 test_consumer_poll("serve.queue",
212 c[1].rk, testid, c[1].partition_cnt, 0, -1, &mv);
213
214 test_msgver_verify("first.verify", &mv, TEST_MSGVER_ALL, 0, msgcnt);
215
216 TEST_SAY("== Testing consumer restart ==\n");
217 conf = rd_kafka_conf_dup(rd_kafka_conf(c[1].rk));
218
219 /* Only c[1] should exhibit rebalance behavior */
220 c[1].expected_rb_event = RD_KAFKA_RESP_ERR__REVOKE_PARTITIONS;
221 TIMING_START(&t_close, "consumer restart");
222 test_consumer_close(c[1].rk);
223 rd_kafka_destroy(c[1].rk);
224
225 c[1].rk = test_create_handle(RD_KAFKA_CONSUMER, conf);
226 rd_kafka_poll_set_consumer(c[1].rk);
227
228 test_consumer_subscribe(c[1].rk, topics);
229
230 /* Await assignment */
231 c[1].expected_rb_event = RD_KAFKA_RESP_ERR__ASSIGN_PARTITIONS;
232 rebalance_start = test_clock();
233 while (!static_member_wait_rebalance(&c[1], rebalance_start,
234 &c[1].assigned_at, 1000)) {
235 c[0].curr_line = __LINE__;
236 test_consumer_poll_once(c[0].rk, &mv, 0);
237 }
238 TIMING_STOP(&t_close);
239
240 /* Should complete before `session.timeout.ms` */
241 TIMING_ASSERT(&t_close, 0, 6000);
242
243
244 TEST_SAY("== Testing subscription expansion ==\n");
245
246 /*
247 * New topics matching the subscription pattern should cause
248 * group rebalance
249 */
250 test_create_topic(c->rk, tsprintf("%snew", topic), 1, 1);
251
252 /* Await revocation */
253 rebalance_start = test_clock();
254 c[0].expected_rb_event = RD_KAFKA_RESP_ERR__REVOKE_PARTITIONS;
255 c[1].expected_rb_event = RD_KAFKA_RESP_ERR__REVOKE_PARTITIONS;
256 while (!static_member_wait_rebalance(&c[0], rebalance_start,
257 &c[0].revoked_at, 1000)) {
258 c[1].curr_line = __LINE__;
259 test_consumer_poll_once(c[1].rk, &mv, 0);
260 }
261
262 static_member_expect_rebalance(&c[1], rebalance_start,
263 &c[1].revoked_at, -1);
264
265 /* Await assignment */
266 c[0].expected_rb_event = RD_KAFKA_RESP_ERR__ASSIGN_PARTITIONS;
267 c[1].expected_rb_event = RD_KAFKA_RESP_ERR__ASSIGN_PARTITIONS;
268 while (!static_member_wait_rebalance(&c[0], rebalance_start,
269 &c[0].assigned_at, 1000)) {
270 c[1].curr_line = __LINE__;
271 test_consumer_poll_once(c[1].rk, &mv, 0);
272 }
273
274 static_member_expect_rebalance(&c[1], rebalance_start,
275 &c[1].assigned_at, -1);
276
277 TEST_SAY("== Testing consumer unsubscribe ==\n");
278
279 /* Unsubscribe should send a LeaveGroupRequest invoking a rebalance */
280
281 /* Send LeaveGroup incrementing generation by 1 */
282 rebalance_start = test_clock();
283 rd_kafka_unsubscribe(c[1].rk);
284
285 /* Await revocation */
286 c[0].expected_rb_event = RD_KAFKA_RESP_ERR__REVOKE_PARTITIONS;
287 c[1].expected_rb_event = RD_KAFKA_RESP_ERR__REVOKE_PARTITIONS;
288 static_member_expect_rebalance(&c[1], rebalance_start,
289 &c[1].revoked_at, -1);
290 static_member_expect_rebalance(&c[0], rebalance_start,
291 &c[0].revoked_at, -1);
292
293 /* New cgrp generation with 1 member, c[0] */
294 c[0].expected_rb_event = RD_KAFKA_RESP_ERR__ASSIGN_PARTITIONS;
295 static_member_expect_rebalance(&c[0], rebalance_start,
296 &c[0].assigned_at, -1);
297
298 /* Send JoinGroup bumping generation by 1 */
299 rebalance_start = test_clock();
300 test_consumer_subscribe(c[1].rk, topics);
301
302 /* End previous single member generation */
303 c[0].expected_rb_event = RD_KAFKA_RESP_ERR__REVOKE_PARTITIONS;
304 static_member_expect_rebalance(&c[0], rebalance_start,
305 &c[0].revoked_at, -1);
306
307 /* Await assignment */
308 c[0].expected_rb_event = RD_KAFKA_RESP_ERR__ASSIGN_PARTITIONS;
309 c[1].expected_rb_event = RD_KAFKA_RESP_ERR__ASSIGN_PARTITIONS;
310 while (!static_member_wait_rebalance(&c[1], rebalance_start,
311 &c[1].assigned_at, 1000)) {
312 c[0].curr_line = __LINE__;
313 test_consumer_poll_once(c[0].rk, &mv, 0);
314 }
315
316 static_member_expect_rebalance(&c[0], rebalance_start,
317 &c[0].assigned_at, -1);
318
319 TEST_SAY("== Testing max poll violation ==\n");
320 /* max.poll.interval.ms should still be enforced by the consumer */
321
322 /*
323 * Block long enough for consumer 2 to be evicted from the group
324 * `max.poll.interval.ms` + `session.timeout.ms`
325 */
326 rebalance_start = test_clock();
327 c[1].expected_rb_event = RD_KAFKA_RESP_ERR__REVOKE_PARTITIONS;
328 c[0].expected_rb_event = RD_KAFKA_RESP_ERR__REVOKE_PARTITIONS;
329 c[0].curr_line = __LINE__;
330 test_consumer_poll_no_msgs("wait.max.poll", c[0].rk, testid,
331 6000 + 9000);
332 c[1].curr_line = __LINE__;
333 test_consumer_poll_expect_err(c[1].rk, testid, 1000,
334 RD_KAFKA_RESP_ERR__MAX_POLL_EXCEEDED);
335
336 /* Await revocation */
337 while (!static_member_wait_rebalance(&c[0], rebalance_start,
338 &c[0].revoked_at, 1000)) {
339 c[1].curr_line = __LINE__;
340 test_consumer_poll_once(c[1].rk, &mv, 0);
341 }
342
343 static_member_expect_rebalance(&c[1], rebalance_start,
344 &c[1].revoked_at, -1);
345
346 /* Await assignment */
347 c[0].expected_rb_event = RD_KAFKA_RESP_ERR__ASSIGN_PARTITIONS;
348 c[1].expected_rb_event = RD_KAFKA_RESP_ERR__ASSIGN_PARTITIONS;
349 while (!static_member_wait_rebalance(&c[1], rebalance_start,
350 &c[1].assigned_at, 1000)) {
351 c[0].curr_line = __LINE__;
352 test_consumer_poll_once(c[0].rk, &mv, 0);
353 }
354
355 static_member_expect_rebalance(&c[0], rebalance_start,
356 &c[0].assigned_at, -1);
357
358 TEST_SAY("== Testing `session.timeout.ms` member eviction ==\n");
359
360 rebalance_start = test_clock();
361 c[0].expected_rb_event = RD_KAFKA_RESP_ERR__REVOKE_PARTITIONS;
362 TIMING_START(&t_close, "consumer close");
363 test_consumer_close(c[0].rk);
364 rd_kafka_destroy(c[0].rk);
365
366 c[1].expected_rb_event = RD_KAFKA_RESP_ERR__REVOKE_PARTITIONS;
367 static_member_expect_rebalance(&c[1], rebalance_start,
368 &c[1].revoked_at, 2*7000);
369
370 c[1].expected_rb_event = RD_KAFKA_RESP_ERR__ASSIGN_PARTITIONS;
371 static_member_expect_rebalance(&c[1], rebalance_start,
372 &c[1].assigned_at, 2000);
373
374 /* Should take at least as long as `session.timeout.ms` but less than
375 * `max.poll.interval.ms`, but since we can't really know when
376 * the last Heartbeat or SyncGroup request was sent we need to
377 * allow some leeway on the minimum side (4s), and also some on
378 * the maximum side (1s) for slow runtimes. */
379 TIMING_ASSERT(&t_close, 6000-4000, 9000+1000);
380
381 c[1].expected_rb_event = RD_KAFKA_RESP_ERR__REVOKE_PARTITIONS;
382 test_consumer_close(c[1].rk);
383 rd_kafka_destroy(c[1].rk);
384
385 test_msgver_verify("final.validation", &mv, TEST_MSGVER_ALL, 0,
386 msgcnt);
387 test_msgver_clear(&mv);
388 free(topics);
389 }
390
391
392 /**
393 * @brief Await a non-empty assignment for all consumers in \p c
394 */
await_assignment_multi(const char * what,rd_kafka_t ** c,int cnt)395 static void await_assignment_multi (const char *what, rd_kafka_t **c, int cnt) {
396 rd_kafka_topic_partition_list_t *parts;
397 int assignment_cnt;
398
399 TEST_SAY("%s\n", what);
400
401 do {
402 int i;
403 int timeout_ms = 1000;
404
405 assignment_cnt = 0;
406
407 for (i = 0 ; i < cnt ; i++) {
408 test_consumer_poll_no_msgs("poll", c[i], 0, timeout_ms);
409 timeout_ms = 100;
410
411 if (!rd_kafka_assignment(c[i], &parts) && parts) {
412 TEST_SAY("%s has %d partition(s) assigned\n",
413 rd_kafka_name(c[i]), parts->cnt);
414 if (parts->cnt > 0)
415 assignment_cnt++;
416 rd_kafka_topic_partition_list_destroy(parts);
417 }
418 }
419
420 } while (assignment_cnt < cnt);
421 }
422
423
424 static const rd_kafka_t *valid_fatal_rk;
425 /**
426 * @brief Tells test harness that fatal error should not fail the current test
427 */
is_fatal_cb(rd_kafka_t * rk,rd_kafka_resp_err_t err,const char * reason)428 static int is_fatal_cb (rd_kafka_t *rk, rd_kafka_resp_err_t err,
429 const char *reason) {
430 return rk != valid_fatal_rk;
431 }
432
433 /**
434 * @brief Test that consumer fencing raises a fatal error
435 */
do_test_fenced_member(void)436 static void do_test_fenced_member (void) {
437 rd_kafka_t *c[3]; /* 0: consumer2b, 1: consumer1, 2: consumer2a */
438 rd_kafka_conf_t *conf;
439 const char *topic = test_mk_topic_name("0102_static_group_rebalance",
440 1);
441 rd_kafka_message_t *rkm;
442 char errstr[512];
443 rd_kafka_resp_err_t err;
444
445 TEST_SAY(_C_MAG "[ Test fenced member ]\n");
446
447 test_conf_init(&conf, NULL, 30);
448
449 test_create_topic(NULL, topic, 3, 1);
450
451 test_conf_set(conf, "group.instance.id", "consumer1");
452 c[1] = test_create_consumer(topic, NULL, rd_kafka_conf_dup(conf), NULL);
453
454 test_conf_set(conf, "group.instance.id", "consumer2");
455 c[2] = test_create_consumer(topic, NULL, rd_kafka_conf_dup(conf), NULL);
456
457 test_consumer_subscribe(c[1], topic);
458 test_consumer_subscribe(c[2], topic);
459
460 await_assignment_multi("Awaiting initial assignments", &c[1], 2);
461
462 /* Create conflicting consumer */
463 TEST_SAY("Creating conflicting consumer2 instance\n");
464 test_conf_set(conf, "group.instance.id", "consumer2");
465 c[0] = test_create_consumer(topic, NULL, rd_kafka_conf_dup(conf), NULL);
466 rd_kafka_conf_destroy(conf);
467
468 test_curr->is_fatal_cb = is_fatal_cb;
469 valid_fatal_rk = c[2]; /* consumer2a is the consumer that should fail */
470
471 test_consumer_subscribe(c[0], topic);
472
473 /* consumer1 should not be affected (other than a rebalance which
474 * we ignore here)... */
475 test_consumer_poll_no_msgs("consumer1", c[1], 0, 5000);
476
477 /* .. but consumer2a should now have been fenced off by consumer2b */
478 rkm = rd_kafka_consumer_poll(c[2], 5000);
479 TEST_ASSERT(rkm != NULL, "Expected error, not timeout");
480 TEST_ASSERT(rkm->err == RD_KAFKA_RESP_ERR__FATAL,
481 "Expected ERR__FATAL, not %s: %s",
482 rd_kafka_err2str(rkm->err),
483 rd_kafka_message_errstr(rkm));
484 TEST_SAY("Fenced consumer returned expected: %s: %s\n",
485 rd_kafka_err2name(rkm->err),
486 rd_kafka_message_errstr(rkm));
487
488
489 /* Read the actual error */
490 err = rd_kafka_fatal_error(c[2], errstr, sizeof(errstr));
491 TEST_SAY("%s fatal error: %s: %s\n",
492 rd_kafka_name(c[2]), rd_kafka_err2name(err), errstr);
493 TEST_ASSERT(err == RD_KAFKA_RESP_ERR_FENCED_INSTANCE_ID,
494 "Expected ERR_FENCED_INSTANCE_ID as fatal error, not %s",
495 rd_kafka_err2name(err));
496
497 TEST_SAY("close\n");
498 /* Close consumer2a, should also return a fatal error */
499 err = rd_kafka_consumer_close(c[2]);
500 TEST_ASSERT(err == RD_KAFKA_RESP_ERR__FATAL,
501 "Expected close on %s to return ERR__FATAL, not %s",
502 rd_kafka_name(c[2]), rd_kafka_err2name(err));
503
504 rd_kafka_destroy(c[2]);
505
506 /* consumer2b and consumer1 should be fine and get their
507 * assignments */
508 await_assignment_multi("Awaiting post-fencing assignment", c, 2);
509
510 rd_kafka_destroy(c[0]);
511 rd_kafka_destroy(c[1]);
512 }
513
514
515
main_0102_static_group_rebalance(int argc,char ** argv)516 int main_0102_static_group_rebalance (int argc, char **argv) {
517
518 do_test_static_group_rebalance();
519
520 do_test_fenced_member();
521
522 return 0;
523 }
524