1 /*
2  * librdkafka - Apache Kafka C library
3  *
4  * Copyright (c) 2019, Magnus Edenhill
5  * All rights reserved.
6  *
7  * Redistribution and use in source and binary forms, with or without
8  * modification, are permitted provided that the following conditions are met:
9  *
10  * 1. Redistributions of source code must retain the above copyright notice,
11  *    this list of conditions and the following disclaimer.
12  * 2. Redistributions in binary form must reproduce the above copyright notice,
13  *    this list of conditions and the following disclaimer in the documentation
14  *    and/or other materials provided with the distribution.
15  *
16  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
17  * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
18  * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
19  * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
20  * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
21  * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
22  * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
23  * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
24  * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
25  * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
26  * POSSIBILITY OF SUCH DAMAGE.
27  */
28 
29 #include "test.h"
30 
31 
32 /**
33  * @name KafkaConsumer static membership tests
34  *
35  * Runs two consumers subscribing to multiple topics simulating various
36  * rebalance scenarios with static group membership enabled.
37  */
38 
39 #define _CONSUMER_CNT 2
40 
41 typedef struct _consumer_s {
42         rd_kafka_t *rk;
43         test_msgver_t *mv;
44         int64_t assigned_at;
45         int64_t revoked_at;
46         int partition_cnt;
47         rd_kafka_resp_err_t expected_rb_event;
48         int curr_line;
49 } _consumer_t;
50 
51 
52 /**
53  * @brief Call poll until a rebalance has been triggered
54  */
static_member_wait_rebalance0(int line,_consumer_t * c,int64_t start,int64_t * target,int timeout_ms)55 static int static_member_wait_rebalance0 (int line,
56                                           _consumer_t *c, int64_t start,
57                                           int64_t *target, int timeout_ms) {
58         int64_t tmout = test_clock() + (timeout_ms * 1000);
59 
60         c->curr_line = line;
61 
62         TEST_SAY("line %d: %s awaiting %s event\n",
63                  line, rd_kafka_name(c->rk),
64                  rd_kafka_err2name(c->expected_rb_event));
65 
66         while (timeout_ms < 0 ? 1 : test_clock() <= tmout) {
67                 if (*target > start) {
68                         c->curr_line = 0;
69                         return 1;
70                 }
71                 test_consumer_poll_once(c->rk, c->mv, 1000);
72         }
73 
74         c->curr_line = 0;
75 
76         TEST_SAY("line %d: %s timed out awaiting %s event\n",
77                  line, rd_kafka_name(c->rk),
78                  rd_kafka_err2name(c->expected_rb_event));
79 
80         return 0;
81 }
82 
83 #define static_member_expect_rebalance(C,START,TARGET,TIMEOUT_MS) do {  \
84                 if (!static_member_wait_rebalance0(__LINE__,C,          \
85                                                    START,TARGET,TIMEOUT_MS)) \
86                         TEST_FAIL("%s: timed out waiting for %s event", \
87                                   rd_kafka_name((C)->rk),               \
88                                   rd_kafka_err2name((C)->expected_rb_event)); \
89         } while (0)
90 
91 #define static_member_wait_rebalance(C,START,TARGET,TIMEOUT_MS)         \
92         static_member_wait_rebalance0(__LINE__,C, START,TARGET,TIMEOUT_MS)
93 
94 
rebalance_cb(rd_kafka_t * rk,rd_kafka_resp_err_t err,rd_kafka_topic_partition_list_t * parts,void * opaque)95 static void rebalance_cb (rd_kafka_t *rk,
96                           rd_kafka_resp_err_t err,
97                           rd_kafka_topic_partition_list_t *parts,
98                           void *opaque) {
99         _consumer_t *c = opaque;
100 
101         TEST_ASSERT(c->expected_rb_event == err,
102                     "line %d: %s: Expected rebalance event %s got %s\n",
103                     c->curr_line, rd_kafka_name(rk),
104                     rd_kafka_err2name(c->expected_rb_event),
105                     rd_kafka_err2name(err));
106 
107         switch (err)
108         {
109         case RD_KAFKA_RESP_ERR__ASSIGN_PARTITIONS:
110                 TEST_SAY("line %d: %s Assignment (%d partition(s)):\n",
111                          c->curr_line, rd_kafka_name(rk), parts->cnt);
112                 test_print_partition_list(parts);
113 
114                 c->partition_cnt = parts->cnt;
115                 c->assigned_at = test_clock();
116                 rd_kafka_assign(rk, parts);
117 
118                 break;
119 
120         case RD_KAFKA_RESP_ERR__REVOKE_PARTITIONS:
121                 c->revoked_at = test_clock();
122                 rd_kafka_assign(rk, NULL);
123                 TEST_SAY("line %d: %s revoked %d partitions\n",
124                          c->curr_line, rd_kafka_name(c->rk), parts->cnt);
125 
126                 break;
127 
128         default:
129                 TEST_FAIL("rebalance failed: %s", rd_kafka_err2str(err));
130                 break;
131         }
132 
133         /* Reset error */
134         c->expected_rb_event = RD_KAFKA_RESP_ERR_NO_ERROR;
135 
136         /* prevent poll from triggering more than one rebalance event */
137         rd_kafka_yield(rk);
138 }
139 
140 
do_test_static_group_rebalance(void)141 static void do_test_static_group_rebalance (void) {
142         rd_kafka_conf_t *conf;
143         test_msgver_t mv;
144         int64_t rebalance_start;
145         _consumer_t c[_CONSUMER_CNT] = RD_ZERO_INIT;
146         const int msgcnt = 100;
147         uint64_t testid  = test_id_generate();
148         const char *topic = test_mk_topic_name("0102_static_group_rebalance",
149                                                1);
150         char *topics = rd_strdup(tsprintf("^%s.*", topic));
151         test_timing_t t_close;
152 
153         test_conf_init(&conf, NULL, 70);
154         test_msgver_init(&mv, testid);
155         c[0].mv = &mv;
156         c[1].mv = &mv;
157 
158         test_create_topic(NULL, topic, 3, 1);
159         test_produce_msgs_easy(topic, testid, RD_KAFKA_PARTITION_UA, msgcnt);
160 
161         test_conf_set(conf, "max.poll.interval.ms", "9000");
162         test_conf_set(conf, "session.timeout.ms", "6000");
163         test_conf_set(conf, "auto.offset.reset", "earliest");
164         test_conf_set(conf, "topic.metadata.refresh.interval.ms", "500");
165         test_conf_set(conf, "enable.partition.eof", "true");
166         test_conf_set(conf, "group.instance.id", "consumer1");
167 
168         rd_kafka_conf_set_opaque(conf, &c[0]);
169         c[0].rk = test_create_consumer(topic, rebalance_cb,
170                                        rd_kafka_conf_dup(conf), NULL);
171 
172         rd_kafka_conf_set_opaque(conf, &c[1]);
173         test_conf_set(conf, "group.instance.id", "consumer2");
174         c[1].rk = test_create_consumer(topic, rebalance_cb,
175                                        rd_kafka_conf_dup(conf), NULL);
176         rd_kafka_conf_destroy(conf);
177 
178         test_consumer_subscribe(c[0].rk, topics);
179         test_consumer_subscribe(c[1].rk, topics);
180 
181         /*
182          * Static members enforce `max.poll.interval.ms` which may prompt
183          * an unwanted rebalance while the other consumer awaits its assignment.
184          * These members remain in the member list however so we must
185          * interleave calls to poll while awaiting our assignment to avoid
186          * unexpected rebalances being triggered.
187          */
188         rebalance_start = test_clock();
189         c[0].expected_rb_event = RD_KAFKA_RESP_ERR__ASSIGN_PARTITIONS;
190         c[1].expected_rb_event = RD_KAFKA_RESP_ERR__ASSIGN_PARTITIONS;
191         while (!static_member_wait_rebalance(&c[0], rebalance_start,
192                                              &c[0].assigned_at, 1000)) {
193                 /* keep consumer 2 alive while consumer 1 awaits
194                  * its assignment
195                  */
196                 c[1].curr_line = __LINE__;
197                 test_consumer_poll_once(c[1].rk, &mv, 0);
198         }
199 
200         static_member_expect_rebalance(&c[1], rebalance_start,
201                                        &c[1].assigned_at, -1);
202 
203         /*
204          * Consume all the messages so we can watch for duplicates
205          * after rejoin/rebalance operations.
206          */
207         c[0].curr_line = __LINE__;
208         test_consumer_poll("serve.queue",
209                            c[0].rk, testid, c[0].partition_cnt, 0, -1, &mv);
210         c[1].curr_line = __LINE__;
211         test_consumer_poll("serve.queue",
212                            c[1].rk, testid, c[1].partition_cnt, 0, -1, &mv);
213 
214         test_msgver_verify("first.verify", &mv, TEST_MSGVER_ALL, 0, msgcnt);
215 
216         TEST_SAY("== Testing consumer restart ==\n");
217         conf = rd_kafka_conf_dup(rd_kafka_conf(c[1].rk));
218 
219         /* Only c[1] should exhibit rebalance behavior */
220         c[1].expected_rb_event = RD_KAFKA_RESP_ERR__REVOKE_PARTITIONS;
221         TIMING_START(&t_close, "consumer restart");
222         test_consumer_close(c[1].rk);
223         rd_kafka_destroy(c[1].rk);
224 
225         c[1].rk = test_create_handle(RD_KAFKA_CONSUMER, conf);
226         rd_kafka_poll_set_consumer(c[1].rk);
227 
228         test_consumer_subscribe(c[1].rk, topics);
229 
230         /* Await assignment */
231         c[1].expected_rb_event = RD_KAFKA_RESP_ERR__ASSIGN_PARTITIONS;
232         rebalance_start = test_clock();
233         while (!static_member_wait_rebalance(&c[1], rebalance_start,
234                                              &c[1].assigned_at, 1000)) {
235                 c[0].curr_line = __LINE__;
236                 test_consumer_poll_once(c[0].rk, &mv, 0);
237         }
238         TIMING_STOP(&t_close);
239 
240         /* Should complete before `session.timeout.ms` */
241         TIMING_ASSERT(&t_close, 0, 6000);
242 
243 
244         TEST_SAY("== Testing subscription expansion ==\n");
245 
246         /*
247          * New topics matching the subscription pattern should cause
248          * group rebalance
249          */
250         test_create_topic(c->rk, tsprintf("%snew", topic), 1, 1);
251 
252         /* Await revocation */
253         rebalance_start = test_clock();
254         c[0].expected_rb_event = RD_KAFKA_RESP_ERR__REVOKE_PARTITIONS;
255         c[1].expected_rb_event = RD_KAFKA_RESP_ERR__REVOKE_PARTITIONS;
256         while (!static_member_wait_rebalance(&c[0], rebalance_start,
257                                              &c[0].revoked_at, 1000)) {
258                 c[1].curr_line = __LINE__;
259                 test_consumer_poll_once(c[1].rk, &mv, 0);
260         }
261 
262         static_member_expect_rebalance(&c[1], rebalance_start,
263                                        &c[1].revoked_at, -1);
264 
265         /* Await assignment */
266         c[0].expected_rb_event = RD_KAFKA_RESP_ERR__ASSIGN_PARTITIONS;
267         c[1].expected_rb_event = RD_KAFKA_RESP_ERR__ASSIGN_PARTITIONS;
268         while (!static_member_wait_rebalance(&c[0], rebalance_start,
269                                              &c[0].assigned_at, 1000)) {
270                 c[1].curr_line = __LINE__;
271                 test_consumer_poll_once(c[1].rk, &mv, 0);
272         }
273 
274         static_member_expect_rebalance(&c[1], rebalance_start,
275                                        &c[1].assigned_at, -1);
276 
277         TEST_SAY("== Testing consumer unsubscribe ==\n");
278 
279         /* Unsubscribe should send a LeaveGroupRequest invoking a rebalance */
280 
281         /* Send LeaveGroup incrementing generation by 1 */
282         rebalance_start = test_clock();
283         rd_kafka_unsubscribe(c[1].rk);
284 
285         /* Await revocation */
286         c[0].expected_rb_event = RD_KAFKA_RESP_ERR__REVOKE_PARTITIONS;
287         c[1].expected_rb_event = RD_KAFKA_RESP_ERR__REVOKE_PARTITIONS;
288         static_member_expect_rebalance(&c[1], rebalance_start,
289                                        &c[1].revoked_at, -1);
290         static_member_expect_rebalance(&c[0], rebalance_start,
291                                        &c[0].revoked_at, -1);
292 
293         /* New cgrp generation with 1 member, c[0] */
294         c[0].expected_rb_event = RD_KAFKA_RESP_ERR__ASSIGN_PARTITIONS;
295         static_member_expect_rebalance(&c[0], rebalance_start,
296                                        &c[0].assigned_at, -1);
297 
298         /* Send JoinGroup bumping generation by 1 */
299         rebalance_start = test_clock();
300         test_consumer_subscribe(c[1].rk, topics);
301 
302         /* End previous single member generation */
303         c[0].expected_rb_event = RD_KAFKA_RESP_ERR__REVOKE_PARTITIONS;
304         static_member_expect_rebalance(&c[0], rebalance_start,
305                                        &c[0].revoked_at, -1);
306 
307         /* Await assignment */
308         c[0].expected_rb_event = RD_KAFKA_RESP_ERR__ASSIGN_PARTITIONS;
309         c[1].expected_rb_event = RD_KAFKA_RESP_ERR__ASSIGN_PARTITIONS;
310         while (!static_member_wait_rebalance(&c[1], rebalance_start,
311                                              &c[1].assigned_at, 1000)) {
312                 c[0].curr_line = __LINE__;
313                 test_consumer_poll_once(c[0].rk, &mv, 0);
314         }
315 
316         static_member_expect_rebalance(&c[0], rebalance_start,
317                                        &c[0].assigned_at, -1);
318 
319         TEST_SAY("== Testing max poll violation ==\n");
320         /* max.poll.interval.ms should still be enforced by the consumer */
321 
322         /*
323          * Block long enough for consumer 2 to be evicted from the group
324          * `max.poll.interval.ms` + `session.timeout.ms`
325          */
326         rebalance_start = test_clock();
327         c[1].expected_rb_event = RD_KAFKA_RESP_ERR__REVOKE_PARTITIONS;
328         c[0].expected_rb_event = RD_KAFKA_RESP_ERR__REVOKE_PARTITIONS;
329         c[0].curr_line = __LINE__;
330         test_consumer_poll_no_msgs("wait.max.poll", c[0].rk, testid,
331                                    6000 + 9000);
332         c[1].curr_line = __LINE__;
333         test_consumer_poll_expect_err(c[1].rk, testid, 1000,
334                                       RD_KAFKA_RESP_ERR__MAX_POLL_EXCEEDED);
335 
336         /* Await revocation */
337         while (!static_member_wait_rebalance(&c[0], rebalance_start,
338                                              &c[0].revoked_at, 1000)) {
339                 c[1].curr_line = __LINE__;
340                 test_consumer_poll_once(c[1].rk, &mv, 0);
341         }
342 
343         static_member_expect_rebalance(&c[1], rebalance_start,
344                                        &c[1].revoked_at, -1);
345 
346         /* Await assignment */
347         c[0].expected_rb_event = RD_KAFKA_RESP_ERR__ASSIGN_PARTITIONS;
348         c[1].expected_rb_event = RD_KAFKA_RESP_ERR__ASSIGN_PARTITIONS;
349         while (!static_member_wait_rebalance(&c[1], rebalance_start,
350                                              &c[1].assigned_at, 1000)) {
351                 c[0].curr_line = __LINE__;
352                 test_consumer_poll_once(c[0].rk, &mv, 0);
353         }
354 
355         static_member_expect_rebalance(&c[0], rebalance_start,
356                                        &c[0].assigned_at, -1);
357 
358         TEST_SAY("== Testing `session.timeout.ms` member eviction ==\n");
359 
360         rebalance_start = test_clock();
361         c[0].expected_rb_event = RD_KAFKA_RESP_ERR__REVOKE_PARTITIONS;
362         TIMING_START(&t_close, "consumer close");
363         test_consumer_close(c[0].rk);
364         rd_kafka_destroy(c[0].rk);
365 
366         c[1].expected_rb_event = RD_KAFKA_RESP_ERR__REVOKE_PARTITIONS;
367         static_member_expect_rebalance(&c[1], rebalance_start,
368                                        &c[1].revoked_at, 2*7000);
369 
370         c[1].expected_rb_event = RD_KAFKA_RESP_ERR__ASSIGN_PARTITIONS;
371         static_member_expect_rebalance(&c[1], rebalance_start,
372                                        &c[1].assigned_at, 2000);
373 
374         /* Should take at least as long as `session.timeout.ms` but less than
375          * `max.poll.interval.ms`, but since we can't really know when
376          * the last Heartbeat or SyncGroup request was sent we need to
377          * allow some leeway on the minimum side (4s), and also some on
378          * the maximum side (1s) for slow runtimes. */
379         TIMING_ASSERT(&t_close, 6000-4000, 9000+1000);
380 
381         c[1].expected_rb_event = RD_KAFKA_RESP_ERR__REVOKE_PARTITIONS;
382         test_consumer_close(c[1].rk);
383         rd_kafka_destroy(c[1].rk);
384 
385         test_msgver_verify("final.validation", &mv, TEST_MSGVER_ALL, 0,
386                            msgcnt);
387         test_msgver_clear(&mv);
388         free(topics);
389 }
390 
391 
392 /**
393  * @brief Await a non-empty assignment for all consumers in \p c
394  */
await_assignment_multi(const char * what,rd_kafka_t ** c,int cnt)395 static void await_assignment_multi (const char *what, rd_kafka_t **c, int cnt) {
396         rd_kafka_topic_partition_list_t *parts;
397         int assignment_cnt;
398 
399         TEST_SAY("%s\n", what);
400 
401         do {
402                 int i;
403                 int timeout_ms = 1000;
404 
405                 assignment_cnt = 0;
406 
407                 for (i = 0 ; i < cnt ; i++) {
408                         test_consumer_poll_no_msgs("poll", c[i], 0, timeout_ms);
409                         timeout_ms = 100;
410 
411                         if (!rd_kafka_assignment(c[i], &parts) && parts) {
412                                 TEST_SAY("%s has %d partition(s) assigned\n",
413                                          rd_kafka_name(c[i]), parts->cnt);
414                                 if (parts->cnt > 0)
415                                         assignment_cnt++;
416                                 rd_kafka_topic_partition_list_destroy(parts);
417                         }
418                 }
419 
420         } while (assignment_cnt < cnt);
421 }
422 
423 
424 static const rd_kafka_t *valid_fatal_rk;
425 /**
426  * @brief Tells test harness that fatal error should not fail the current test
427  */
is_fatal_cb(rd_kafka_t * rk,rd_kafka_resp_err_t err,const char * reason)428 static int is_fatal_cb (rd_kafka_t *rk, rd_kafka_resp_err_t err,
429                         const char *reason) {
430         return rk != valid_fatal_rk;
431 }
432 
433 /**
434  * @brief Test that consumer fencing raises a fatal error
435  */
do_test_fenced_member(void)436 static void do_test_fenced_member (void) {
437         rd_kafka_t *c[3]; /* 0: consumer2b, 1: consumer1, 2: consumer2a */
438         rd_kafka_conf_t *conf;
439         const char *topic = test_mk_topic_name("0102_static_group_rebalance",
440                                                1);
441         rd_kafka_message_t *rkm;
442         char errstr[512];
443         rd_kafka_resp_err_t err;
444 
445         TEST_SAY(_C_MAG "[ Test fenced member ]\n");
446 
447         test_conf_init(&conf, NULL, 30);
448 
449         test_create_topic(NULL, topic, 3, 1);
450 
451         test_conf_set(conf, "group.instance.id", "consumer1");
452         c[1] = test_create_consumer(topic, NULL, rd_kafka_conf_dup(conf), NULL);
453 
454         test_conf_set(conf, "group.instance.id", "consumer2");
455         c[2] = test_create_consumer(topic, NULL, rd_kafka_conf_dup(conf), NULL);
456 
457         test_consumer_subscribe(c[1], topic);
458         test_consumer_subscribe(c[2], topic);
459 
460         await_assignment_multi("Awaiting initial assignments", &c[1], 2);
461 
462         /* Create conflicting consumer */
463         TEST_SAY("Creating conflicting consumer2 instance\n");
464         test_conf_set(conf, "group.instance.id", "consumer2");
465         c[0] = test_create_consumer(topic, NULL, rd_kafka_conf_dup(conf), NULL);
466         rd_kafka_conf_destroy(conf);
467 
468         test_curr->is_fatal_cb = is_fatal_cb;
469         valid_fatal_rk = c[2]; /* consumer2a is the consumer that should fail */
470 
471         test_consumer_subscribe(c[0], topic);
472 
473         /* consumer1 should not be affected (other than a rebalance which
474          * we ignore here)... */
475         test_consumer_poll_no_msgs("consumer1", c[1], 0, 5000);
476 
477         /* .. but consumer2a should now have been fenced off by consumer2b */
478         rkm = rd_kafka_consumer_poll(c[2], 5000);
479         TEST_ASSERT(rkm != NULL, "Expected error, not timeout");
480         TEST_ASSERT(rkm->err == RD_KAFKA_RESP_ERR__FATAL,
481                     "Expected ERR__FATAL, not %s: %s",
482                     rd_kafka_err2str(rkm->err),
483                     rd_kafka_message_errstr(rkm));
484         TEST_SAY("Fenced consumer returned expected: %s: %s\n",
485                  rd_kafka_err2name(rkm->err),
486                  rd_kafka_message_errstr(rkm));
487 
488 
489         /* Read the actual error */
490         err = rd_kafka_fatal_error(c[2], errstr, sizeof(errstr));
491         TEST_SAY("%s fatal error: %s: %s\n",
492                  rd_kafka_name(c[2]), rd_kafka_err2name(err), errstr);
493         TEST_ASSERT(err == RD_KAFKA_RESP_ERR_FENCED_INSTANCE_ID,
494                     "Expected ERR_FENCED_INSTANCE_ID as fatal error, not %s",
495                     rd_kafka_err2name(err));
496 
497         TEST_SAY("close\n");
498         /* Close consumer2a, should also return a fatal error */
499         err = rd_kafka_consumer_close(c[2]);
500         TEST_ASSERT(err == RD_KAFKA_RESP_ERR__FATAL,
501                     "Expected close on %s to return ERR__FATAL, not %s",
502                     rd_kafka_name(c[2]), rd_kafka_err2name(err));
503 
504         rd_kafka_destroy(c[2]);
505 
506         /* consumer2b and consumer1 should be fine and get their
507          * assignments */
508         await_assignment_multi("Awaiting post-fencing assignment", c, 2);
509 
510         rd_kafka_destroy(c[0]);
511         rd_kafka_destroy(c[1]);
512 }
513 
514 
515 
main_0102_static_group_rebalance(int argc,char ** argv)516 int main_0102_static_group_rebalance (int argc, char **argv) {
517 
518         do_test_static_group_rebalance();
519 
520         do_test_fenced_member();
521 
522         return 0;
523 }
524