1 /*
2 * librdkafka - Apache Kafka C library
3 *
4 * Copyright (c) 2012-2015, Magnus Edenhill
5 * All rights reserved.
6 *
7 * Redistribution and use in source and binary forms, with or without
8 * modification, are permitted provided that the following conditions are met:
9 *
10 * 1. Redistributions of source code must retain the above copyright notice,
11 * this list of conditions and the following disclaimer.
12 * 2. Redistributions in binary form must reproduce the above copyright notice,
13 * this list of conditions and the following disclaimer in the documentation
14 * and/or other materials provided with the distribution.
15 *
16 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
17 * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
18 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
19 * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
20 * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
21 * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
22 * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
23 * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
24 * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
25 * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
26 * POSSIBILITY OF SUCH DAMAGE.
27 */
28
29 #include "rdkafka_int.h"
30 #include "rdkafka_broker.h"
31 #include "rdkafka_request.h"
32 #include "rdkafka_topic.h"
33 #include "rdkafka_partition.h"
34 #include "rdkafka_assignor.h"
35 #include "rdkafka_offset.h"
36 #include "rdkafka_metadata.h"
37 #include "rdkafka_cgrp.h"
38 #include "rdkafka_interceptor.h"
39 #include "rdmap.h"
40
41 #include "rdunittest.h"
42
43 #include <ctype.h>
44 #include <stdarg.h>
45
46 static void rd_kafka_cgrp_offset_commit_tmr_cb (rd_kafka_timers_t *rkts,
47 void *arg);
48 static rd_kafka_error_t *
49 rd_kafka_cgrp_assign (rd_kafka_cgrp_t *rkcg,
50 rd_kafka_topic_partition_list_t *assignment);
51 static rd_kafka_error_t *rd_kafka_cgrp_unassign (rd_kafka_cgrp_t *rkcg);
52 static rd_kafka_error_t *
53 rd_kafka_cgrp_incremental_assign (rd_kafka_cgrp_t *rkcg,
54 rd_kafka_topic_partition_list_t
55 *partitions);
56 static rd_kafka_error_t *
57 rd_kafka_cgrp_incremental_unassign (rd_kafka_cgrp_t *rkcg,
58 rd_kafka_topic_partition_list_t
59 *partitions);
60
61 static rd_kafka_op_res_t
62 rd_kafka_cgrp_op_serve (rd_kafka_t *rk, rd_kafka_q_t *rkq,
63 rd_kafka_op_t *rko, rd_kafka_q_cb_type_t cb_type,
64 void *opaque);
65
66 static void rd_kafka_cgrp_group_leader_reset (rd_kafka_cgrp_t *rkcg,
67 const char *reason);
68
69 static RD_INLINE int rd_kafka_cgrp_try_terminate (rd_kafka_cgrp_t *rkcg);
70
71 static void rd_kafka_cgrp_revoke_all_rejoin (rd_kafka_cgrp_t *rkcg,
72 rd_bool_t assignment_lost,
73 rd_bool_t initiating,
74 const char *reason);
75 static void rd_kafka_cgrp_revoke_all_rejoin_maybe (rd_kafka_cgrp_t *rkcg,
76 rd_bool_t
77 assignment_lost,
78 rd_bool_t initiating,
79 const char *reason);
80
81 static void rd_kafka_cgrp_group_is_rebalancing (rd_kafka_cgrp_t *rkcg);
82
83 static void
84 rd_kafka_cgrp_max_poll_interval_check_tmr_cb (rd_kafka_timers_t *rkts,
85 void *arg);
86 static rd_kafka_resp_err_t
87 rd_kafka_cgrp_subscribe (rd_kafka_cgrp_t *rkcg,
88 rd_kafka_topic_partition_list_t *rktparlist);
89
90 static void rd_kafka_cgrp_group_assignment_set (
91 rd_kafka_cgrp_t *rkcg,
92 const rd_kafka_topic_partition_list_t *partitions);
93 static void rd_kafka_cgrp_group_assignment_modify (
94 rd_kafka_cgrp_t *rkcg,
95 rd_bool_t add,
96 const rd_kafka_topic_partition_list_t *partitions);
97
98
99 /**
100 * @returns true if the current assignment is lost.
101 */
rd_kafka_cgrp_assignment_is_lost(rd_kafka_cgrp_t * rkcg)102 rd_bool_t rd_kafka_cgrp_assignment_is_lost (rd_kafka_cgrp_t *rkcg) {
103 return rd_atomic32_get(&rkcg->rkcg_assignment_lost) != 0;
104 }
105
106
107 /**
108 * @brief Call when the current assignment has been lost, with a
109 * human-readable reason.
110 */
111 static void rd_kafka_cgrp_assignment_set_lost (rd_kafka_cgrp_t *rkcg,
112 char *fmt, ...)
113 RD_FORMAT(printf, 2, 3);
rd_kafka_cgrp_assignment_set_lost(rd_kafka_cgrp_t * rkcg,char * fmt,...)114 static void rd_kafka_cgrp_assignment_set_lost (rd_kafka_cgrp_t *rkcg,
115 char *fmt, ...) {
116 va_list ap;
117 char reason[256];
118
119 if (!rkcg->rkcg_group_assignment)
120 return;
121
122 va_start(ap, fmt);
123 rd_vsnprintf(reason, sizeof(reason), fmt, ap);
124 va_end(ap);
125
126 rd_kafka_dbg(rkcg->rkcg_rk, CONSUMER|RD_KAFKA_DBG_CGRP, "LOST",
127 "Group \"%s\": "
128 "current assignment of %d partition(s) lost: %s",
129 rkcg->rkcg_group_id->str,
130 rkcg->rkcg_group_assignment->cnt,
131 reason);
132
133 rd_atomic32_set(&rkcg->rkcg_assignment_lost, rd_true);
134 }
135
136
137 /**
138 * @brief Call when the current assignment is no longer considered lost, with a
139 * human-readable reason.
140 */
rd_kafka_cgrp_assignment_clear_lost(rd_kafka_cgrp_t * rkcg,char * fmt,...)141 static void rd_kafka_cgrp_assignment_clear_lost (rd_kafka_cgrp_t *rkcg,
142 char *fmt, ...) {
143 va_list ap;
144 char reason[256];
145
146 if (!rd_atomic32_get(&rkcg->rkcg_assignment_lost))
147 return;
148
149 va_start(ap, fmt);
150 rd_vsnprintf(reason, sizeof(reason), fmt, ap);
151 va_end(ap);
152
153 rd_kafka_dbg(rkcg->rkcg_rk, CONSUMER|RD_KAFKA_DBG_CGRP, "LOST",
154 "Group \"%s\": "
155 "current assignment no longer considered lost: %s",
156 rkcg->rkcg_group_id->str, reason);
157
158 rd_atomic32_set(&rkcg->rkcg_assignment_lost, rd_false);
159 }
160
161
162
163 /**
164 * @struct Auxillary glue type used for COOPERATIVE rebalance set operations.
165 */
166 typedef struct PartitionMemberInfo_s {
167 const rd_kafka_group_member_t *member;
168 rd_bool_t members_match;
169 } PartitionMemberInfo_t;
170
PartitionMemberInfo_new(const rd_kafka_group_member_t * member,rd_bool_t members_match)171 static PartitionMemberInfo_t *PartitionMemberInfo_new (
172 const rd_kafka_group_member_t *member,
173 rd_bool_t members_match) {
174 PartitionMemberInfo_t *pmi;
175
176 pmi = rd_calloc(1, sizeof(*pmi));
177 pmi->member = member;
178 pmi->members_match = members_match;
179
180 return pmi;
181 }
182
PartitionMemberInfo_free(void * p)183 static void PartitionMemberInfo_free (void *p) {
184 PartitionMemberInfo_t *pmi = p;
185 rd_free(pmi);
186 }
187
188 typedef RD_MAP_TYPE(const rd_kafka_topic_partition_t *,
189 PartitionMemberInfo_t *) map_toppar_member_info_t;
190
191
192 /**
193 * @returns true if consumer has joined the group and thus requires a leave.
194 */
195 #define RD_KAFKA_CGRP_HAS_JOINED(rkcg) \
196 (rkcg->rkcg_member_id != NULL && \
197 RD_KAFKAP_STR_LEN((rkcg)->rkcg_member_id) > 0)
198
199
200 /**
201 * @returns true if cgrp is waiting for a rebalance_cb to be handled by
202 * the application.
203 */
204 #define RD_KAFKA_CGRP_WAIT_ASSIGN_CALL(rkcg) \
205 ((rkcg)->rkcg_join_state == \
206 RD_KAFKA_CGRP_JOIN_STATE_WAIT_ASSIGN_CALL || \
207 (rkcg)->rkcg_join_state == \
208 RD_KAFKA_CGRP_JOIN_STATE_WAIT_UNASSIGN_CALL)
209
210 /**
211 * @returns true if a rebalance is in progress.
212 *
213 * 1. In WAIT_JOIN or WAIT_METADATA state with a member-id set,
214 * this happens on rejoin.
215 * 2. In WAIT_SYNC waiting for the group to rebalance on the broker.
216 * 3. in *_WAIT_UNASSIGN_TO_COMPLETE waiting for unassigned partitions to
217 * stop fetching, et.al.
218 * 4. In _WAIT_*ASSIGN_CALL waiting for the application to handle the
219 * assignment changes in its rebalance callback and then call *assign().
220 * 5. An incremental rebalancing is in progress.
221 * 6. A rebalance-induced rejoin is in progress.
222 */
223 #define RD_KAFKA_CGRP_REBALANCING(rkcg) \
224 ((RD_KAFKA_CGRP_HAS_JOINED(rkcg) && \
225 ((rkcg)->rkcg_join_state == \
226 RD_KAFKA_CGRP_JOIN_STATE_WAIT_JOIN || \
227 (rkcg)->rkcg_join_state == \
228 RD_KAFKA_CGRP_JOIN_STATE_WAIT_METADATA)) || \
229 (rkcg)->rkcg_join_state == RD_KAFKA_CGRP_JOIN_STATE_WAIT_SYNC || \
230 (rkcg)->rkcg_join_state == \
231 RD_KAFKA_CGRP_JOIN_STATE_WAIT_UNASSIGN_TO_COMPLETE || \
232 (rkcg)->rkcg_join_state == \
233 RD_KAFKA_CGRP_JOIN_STATE_WAIT_INCR_UNASSIGN_TO_COMPLETE || \
234 (rkcg)->rkcg_join_state == \
235 RD_KAFKA_CGRP_JOIN_STATE_WAIT_ASSIGN_CALL || \
236 (rkcg)->rkcg_join_state == \
237 RD_KAFKA_CGRP_JOIN_STATE_WAIT_UNASSIGN_CALL || \
238 (rkcg)->rkcg_rebalance_incr_assignment != NULL || \
239 (rkcg)->rkcg_rebalance_rejoin)
240
241
242
243 const char *rd_kafka_cgrp_state_names[] = {
244 "init",
245 "term",
246 "query-coord",
247 "wait-coord",
248 "wait-broker",
249 "wait-broker-transport",
250 "up"
251 };
252
253 const char *rd_kafka_cgrp_join_state_names[] = {
254 "init",
255 "wait-join",
256 "wait-metadata",
257 "wait-sync",
258 "wait-assign-call",
259 "wait-unassign-call",
260 "wait-unassign-to-complete",
261 "wait-incr-unassign-to-complete",
262 "steady",
263 };
264
265
266 /**
267 * @brief Change the cgrp state.
268 *
269 * @returns 1 if the state was changed, else 0.
270 */
rd_kafka_cgrp_set_state(rd_kafka_cgrp_t * rkcg,int state)271 static int rd_kafka_cgrp_set_state (rd_kafka_cgrp_t *rkcg, int state) {
272 if ((int)rkcg->rkcg_state == state)
273 return 0;
274
275 rd_kafka_dbg(rkcg->rkcg_rk, CGRP, "CGRPSTATE",
276 "Group \"%.*s\" changed state %s -> %s "
277 "(join-state %s)",
278 RD_KAFKAP_STR_PR(rkcg->rkcg_group_id),
279 rd_kafka_cgrp_state_names[rkcg->rkcg_state],
280 rd_kafka_cgrp_state_names[state],
281 rd_kafka_cgrp_join_state_names[rkcg->rkcg_join_state]);
282 rkcg->rkcg_state = state;
283 rkcg->rkcg_ts_statechange = rd_clock();
284
285 rd_kafka_brokers_broadcast_state_change(rkcg->rkcg_rk);
286
287 return 1;
288 }
289
290
rd_kafka_cgrp_set_join_state(rd_kafka_cgrp_t * rkcg,int join_state)291 void rd_kafka_cgrp_set_join_state (rd_kafka_cgrp_t *rkcg, int join_state) {
292 if ((int)rkcg->rkcg_join_state == join_state)
293 return;
294
295 rd_kafka_dbg(rkcg->rkcg_rk, CGRP, "CGRPJOINSTATE",
296 "Group \"%.*s\" changed join state %s -> %s "
297 "(state %s)",
298 RD_KAFKAP_STR_PR(rkcg->rkcg_group_id),
299 rd_kafka_cgrp_join_state_names[rkcg->rkcg_join_state],
300 rd_kafka_cgrp_join_state_names[join_state],
301 rd_kafka_cgrp_state_names[rkcg->rkcg_state]);
302 rkcg->rkcg_join_state = join_state;
303 }
304
305
rd_kafka_cgrp_destroy_final(rd_kafka_cgrp_t * rkcg)306 void rd_kafka_cgrp_destroy_final (rd_kafka_cgrp_t *rkcg) {
307 rd_kafka_assert(rkcg->rkcg_rk, !rkcg->rkcg_subscription);
308 rd_kafka_assert(rkcg->rkcg_rk, !rkcg->rkcg_group_leader.members);
309 rd_kafka_cgrp_set_member_id(rkcg, NULL);
310 if (rkcg->rkcg_group_instance_id)
311 rd_kafkap_str_destroy(rkcg->rkcg_group_instance_id);
312
313 rd_kafka_q_destroy_owner(rkcg->rkcg_q);
314 rd_kafka_q_destroy_owner(rkcg->rkcg_ops);
315 rd_kafka_q_destroy_owner(rkcg->rkcg_wait_coord_q);
316 rd_kafka_assert(rkcg->rkcg_rk, TAILQ_EMPTY(&rkcg->rkcg_topics));
317 rd_kafka_assert(rkcg->rkcg_rk, rd_list_empty(&rkcg->rkcg_toppars));
318 rd_list_destroy(&rkcg->rkcg_toppars);
319 rd_list_destroy(rkcg->rkcg_subscribed_topics);
320 rd_kafka_topic_partition_list_destroy(rkcg->rkcg_errored_topics);
321 if (rkcg->rkcg_assignor && rkcg->rkcg_assignor->rkas_destroy_state_cb)
322 rkcg->rkcg_assignor->rkas_destroy_state_cb(
323 rkcg->rkcg_assignor_state);
324 rd_free(rkcg);
325 }
326
327
328
329 /**
330 * @brief Update the absolute session timeout following a successfull
331 * response from the coordinator.
332 * This timeout is used to enforce the session timeout in the
333 * consumer itself.
334 *
335 * @param reset if true the timeout is updated even if the session has expired.
336 */
337 static RD_INLINE void
rd_kafka_cgrp_update_session_timeout(rd_kafka_cgrp_t * rkcg,rd_bool_t reset)338 rd_kafka_cgrp_update_session_timeout (rd_kafka_cgrp_t *rkcg, rd_bool_t reset) {
339 if (reset || rkcg->rkcg_ts_session_timeout != 0)
340 rkcg->rkcg_ts_session_timeout = rd_clock() +
341 (rkcg->rkcg_rk->rk_conf.group_session_timeout_ms*1000);
342 }
343
344
345
rd_kafka_cgrp_new(rd_kafka_t * rk,const rd_kafkap_str_t * group_id,const rd_kafkap_str_t * client_id)346 rd_kafka_cgrp_t *rd_kafka_cgrp_new (rd_kafka_t *rk,
347 const rd_kafkap_str_t *group_id,
348 const rd_kafkap_str_t *client_id) {
349 rd_kafka_cgrp_t *rkcg;
350
351 rkcg = rd_calloc(1, sizeof(*rkcg));
352
353 rkcg->rkcg_rk = rk;
354 rkcg->rkcg_group_id = group_id;
355 rkcg->rkcg_client_id = client_id;
356 rkcg->rkcg_coord_id = -1;
357 rkcg->rkcg_generation_id = -1;
358
359 rkcg->rkcg_ops = rd_kafka_q_new(rk);
360 rkcg->rkcg_ops->rkq_serve = rd_kafka_cgrp_op_serve;
361 rkcg->rkcg_ops->rkq_opaque = rkcg;
362 rkcg->rkcg_wait_coord_q = rd_kafka_q_new(rk);
363 rkcg->rkcg_wait_coord_q->rkq_serve = rkcg->rkcg_ops->rkq_serve;
364 rkcg->rkcg_wait_coord_q->rkq_opaque = rkcg->rkcg_ops->rkq_opaque;
365 rkcg->rkcg_q = rd_kafka_q_new(rk);
366 rkcg->rkcg_group_instance_id =
367 rd_kafkap_str_new(rk->rk_conf.group_instance_id, -1);
368
369 TAILQ_INIT(&rkcg->rkcg_topics);
370 rd_list_init(&rkcg->rkcg_toppars, 32, NULL);
371 rd_kafka_cgrp_set_member_id(rkcg, "");
372 rkcg->rkcg_subscribed_topics =
373 rd_list_new(0, (void *)rd_kafka_topic_info_destroy);
374 rd_interval_init(&rkcg->rkcg_coord_query_intvl);
375 rd_interval_init(&rkcg->rkcg_heartbeat_intvl);
376 rd_interval_init(&rkcg->rkcg_join_intvl);
377 rd_interval_init(&rkcg->rkcg_timeout_scan_intvl);
378 rd_atomic32_init(&rkcg->rkcg_assignment_lost, rd_false);
379
380 rkcg->rkcg_errored_topics = rd_kafka_topic_partition_list_new(0);
381
382 /* Create a logical group coordinator broker to provide
383 * a dedicated connection for group coordination.
384 * This is needed since JoinGroup may block for up to
385 * max.poll.interval.ms, effectively blocking and timing out
386 * any other protocol requests (such as Metadata).
387 * The address for this broker will be updated when
388 * the group coordinator is assigned. */
389 rkcg->rkcg_coord = rd_kafka_broker_add_logical(rk, "GroupCoordinator");
390
391 if (rk->rk_conf.enable_auto_commit &&
392 rk->rk_conf.auto_commit_interval_ms > 0)
393 rd_kafka_timer_start(&rk->rk_timers,
394 &rkcg->rkcg_offset_commit_tmr,
395 rk->rk_conf.
396 auto_commit_interval_ms * 1000ll,
397 rd_kafka_cgrp_offset_commit_tmr_cb,
398 rkcg);
399
400 return rkcg;
401 }
402
403
404 /**
405 * @brief Set the group coordinator broker.
406 */
rd_kafka_cgrp_coord_set_broker(rd_kafka_cgrp_t * rkcg,rd_kafka_broker_t * rkb)407 static void rd_kafka_cgrp_coord_set_broker (rd_kafka_cgrp_t *rkcg,
408 rd_kafka_broker_t *rkb) {
409
410 rd_assert(rkcg->rkcg_curr_coord == NULL);
411
412 rd_assert(RD_KAFKA_CGRP_BROKER_IS_COORD(rkcg, rkb));
413
414 rkcg->rkcg_curr_coord = rkb;
415 rd_kafka_broker_keep(rkb);
416
417 rd_kafka_dbg(rkcg->rkcg_rk, CGRP, "COORDSET",
418 "Group \"%.*s\" coordinator set to broker %s",
419 RD_KAFKAP_STR_PR(rkcg->rkcg_group_id),
420 rd_kafka_broker_name(rkb));
421
422 /* Reset query interval to trigger an immediate
423 * coord query if required */
424 if (!rd_interval_disabled(&rkcg->rkcg_coord_query_intvl))
425 rd_interval_reset(&rkcg->rkcg_coord_query_intvl);
426
427 rd_kafka_cgrp_set_state(rkcg,
428 RD_KAFKA_CGRP_STATE_WAIT_BROKER_TRANSPORT);
429
430 rd_kafka_broker_persistent_connection_add(
431 rkcg->rkcg_coord, &rkcg->rkcg_coord->rkb_persistconn.coord);
432
433 /* Set the logical coordinator's nodename to the
434 * proper broker's nodename, this will trigger a (re)connect
435 * to the new address. */
436 rd_kafka_broker_set_nodename(rkcg->rkcg_coord, rkb);
437 }
438
439
440 /**
441 * @brief Reset/clear the group coordinator broker.
442 */
rd_kafka_cgrp_coord_clear_broker(rd_kafka_cgrp_t * rkcg)443 static void rd_kafka_cgrp_coord_clear_broker (rd_kafka_cgrp_t *rkcg) {
444 rd_kafka_broker_t *rkb = rkcg->rkcg_curr_coord;
445
446 rd_assert(rkcg->rkcg_curr_coord);
447 rd_kafka_dbg(rkcg->rkcg_rk, CGRP, "COORDCLEAR",
448 "Group \"%.*s\" broker %s is no longer coordinator",
449 RD_KAFKAP_STR_PR(rkcg->rkcg_group_id),
450 rd_kafka_broker_name(rkb));
451
452 rd_assert(rkcg->rkcg_coord);
453
454 rd_kafka_broker_persistent_connection_del(
455 rkcg->rkcg_coord,
456 &rkcg->rkcg_coord->rkb_persistconn.coord);
457
458 /* Clear the ephemeral broker's nodename.
459 * This will also trigger a disconnect. */
460 rd_kafka_broker_set_nodename(rkcg->rkcg_coord, NULL);
461
462 rkcg->rkcg_curr_coord = NULL;
463 rd_kafka_broker_destroy(rkb); /* from set_coord_broker() */
464 }
465
466
467 /**
468 * @brief Update/set the group coordinator.
469 *
470 * Will do nothing if there's been no change.
471 *
472 * @returns 1 if the coordinator, or state, was updated, else 0.
473 */
rd_kafka_cgrp_coord_update(rd_kafka_cgrp_t * rkcg,int32_t coord_id)474 static int rd_kafka_cgrp_coord_update (rd_kafka_cgrp_t *rkcg,
475 int32_t coord_id) {
476
477 /* Don't do anything while terminating */
478 if (rkcg->rkcg_state == RD_KAFKA_CGRP_STATE_TERM)
479 return 0;
480
481 /* Check if coordinator changed */
482 if (rkcg->rkcg_coord_id != coord_id) {
483 rd_kafka_dbg(rkcg->rkcg_rk, CGRP, "CGRPCOORD",
484 "Group \"%.*s\" changing coordinator %"PRId32
485 " -> %"PRId32,
486 RD_KAFKAP_STR_PR(rkcg->rkcg_group_id),
487 rkcg->rkcg_coord_id, coord_id);
488
489 /* Update coord id */
490 rkcg->rkcg_coord_id = coord_id;
491
492 /* Clear previous broker handle, if any */
493 if (rkcg->rkcg_curr_coord)
494 rd_kafka_cgrp_coord_clear_broker(rkcg);
495 }
496
497
498 if (rkcg->rkcg_curr_coord) {
499 /* There is already a known coordinator and a
500 * corresponding broker handle. */
501 if (rkcg->rkcg_state != RD_KAFKA_CGRP_STATE_UP)
502 return rd_kafka_cgrp_set_state(
503 rkcg,
504 RD_KAFKA_CGRP_STATE_WAIT_BROKER_TRANSPORT);
505
506 } else if (rkcg->rkcg_coord_id != -1) {
507 rd_kafka_broker_t *rkb;
508
509 /* Try to find the coordinator broker handle */
510 rd_kafka_rdlock(rkcg->rkcg_rk);
511 rkb = rd_kafka_broker_find_by_nodeid(rkcg->rkcg_rk, coord_id);
512 rd_kafka_rdunlock(rkcg->rkcg_rk);
513
514 /* It is possible, due to stale metadata, that the
515 * coordinator id points to a broker we still don't know
516 * about. In this case the client will continue
517 * querying metadata and querying for the coordinator
518 * until a match is found. */
519
520 if (rkb) {
521 /* Coordinator is known and broker handle exists */
522 rd_kafka_cgrp_coord_set_broker(rkcg, rkb);
523 rd_kafka_broker_destroy(rkb); /*from find_by_nodeid()*/
524
525 return 1;
526 } else {
527 /* Coordinator is known but no corresponding
528 * broker handle. */
529 return rd_kafka_cgrp_set_state(
530 rkcg, RD_KAFKA_CGRP_STATE_WAIT_BROKER);
531
532 }
533
534 } else {
535 /* Coordinator still not known, re-query */
536 if (rkcg->rkcg_state >= RD_KAFKA_CGRP_STATE_WAIT_COORD)
537 return rd_kafka_cgrp_set_state(
538 rkcg, RD_KAFKA_CGRP_STATE_QUERY_COORD);
539 }
540
541 return 0; /* no change */
542 }
543
544
545
546
547 /**
548 * Handle FindCoordinator response
549 */
rd_kafka_cgrp_handle_FindCoordinator(rd_kafka_t * rk,rd_kafka_broker_t * rkb,rd_kafka_resp_err_t err,rd_kafka_buf_t * rkbuf,rd_kafka_buf_t * request,void * opaque)550 static void rd_kafka_cgrp_handle_FindCoordinator (rd_kafka_t *rk,
551 rd_kafka_broker_t *rkb,
552 rd_kafka_resp_err_t err,
553 rd_kafka_buf_t *rkbuf,
554 rd_kafka_buf_t *request,
555 void *opaque) {
556 const int log_decode_errors = LOG_ERR;
557 int16_t ErrorCode = 0;
558 int32_t CoordId;
559 rd_kafkap_str_t CoordHost = RD_ZERO_INIT;
560 int32_t CoordPort;
561 rd_kafka_cgrp_t *rkcg = opaque;
562 struct rd_kafka_metadata_broker mdb = RD_ZERO_INIT;
563 char *errstr = NULL;
564 int actions;
565
566 if (likely(!(ErrorCode = err))) {
567 if (rkbuf->rkbuf_reqhdr.ApiVersion >= 1)
568 rd_kafka_buf_read_throttle_time(rkbuf);
569
570 rd_kafka_buf_read_i16(rkbuf, &ErrorCode);
571
572 if (rkbuf->rkbuf_reqhdr.ApiVersion >= 1) {
573 rd_kafkap_str_t ErrorMsg;
574
575 rd_kafka_buf_read_str(rkbuf, &ErrorMsg);
576
577 if (!RD_KAFKAP_STR_IS_NULL(&ErrorMsg))
578 RD_KAFKAP_STR_DUPA(&errstr, &ErrorMsg);
579 }
580
581 rd_kafka_buf_read_i32(rkbuf, &CoordId);
582 rd_kafka_buf_read_str(rkbuf, &CoordHost);
583 rd_kafka_buf_read_i32(rkbuf, &CoordPort);
584 }
585
586 if (ErrorCode)
587 goto err;
588
589
590 mdb.id = CoordId;
591 RD_KAFKAP_STR_DUPA(&mdb.host, &CoordHost);
592 mdb.port = CoordPort;
593
594 rd_rkb_dbg(rkb, CGRP, "CGRPCOORD",
595 "Group \"%.*s\" coordinator is %s:%i id %"PRId32,
596 RD_KAFKAP_STR_PR(rkcg->rkcg_group_id),
597 mdb.host, mdb.port, mdb.id);
598 rd_kafka_broker_update(rkb->rkb_rk, rkb->rkb_proto, &mdb, NULL);
599
600 rd_kafka_cgrp_coord_update(rkcg, CoordId);
601 rd_kafka_cgrp_serve(rkcg); /* Serve updated state, if possible */
602 return;
603
604 err_parse: /* Parse error */
605 ErrorCode = rkbuf->rkbuf_err;
606 /* FALLTHRU */
607
608 err:
609 if (!errstr)
610 errstr = (char *)rd_kafka_err2str(ErrorCode);
611
612 rd_rkb_dbg(rkb, CGRP, "CGRPCOORD",
613 "Group \"%.*s\" FindCoordinator response error: %s: %s",
614 RD_KAFKAP_STR_PR(rkcg->rkcg_group_id),
615 rd_kafka_err2name(ErrorCode), errstr);
616
617 if (ErrorCode == RD_KAFKA_RESP_ERR__DESTROY)
618 return;
619
620 actions = rd_kafka_err_action(
621 rkb, ErrorCode, request,
622
623 RD_KAFKA_ERR_ACTION_RETRY|RD_KAFKA_ERR_ACTION_REFRESH,
624 RD_KAFKA_RESP_ERR_GROUP_COORDINATOR_NOT_AVAILABLE,
625
626 RD_KAFKA_ERR_ACTION_RETRY,
627 RD_KAFKA_RESP_ERR__TRANSPORT,
628
629 RD_KAFKA_ERR_ACTION_RETRY,
630 RD_KAFKA_RESP_ERR__TIMED_OUT,
631
632 RD_KAFKA_ERR_ACTION_RETRY,
633 RD_KAFKA_RESP_ERR__TIMED_OUT_QUEUE,
634
635 RD_KAFKA_ERR_ACTION_END);
636
637
638
639 if (actions & RD_KAFKA_ERR_ACTION_REFRESH) {
640 rd_kafka_cgrp_coord_update(rkcg, -1);
641 } else {
642 if (!(actions & RD_KAFKA_ERR_ACTION_RETRY) &&
643 rkcg->rkcg_last_err != ErrorCode) {
644 /* Propagate non-retriable errors to the application */
645 rd_kafka_consumer_err(
646 rkcg->rkcg_q, rd_kafka_broker_id(rkb),
647 ErrorCode, 0, NULL, NULL,
648 RD_KAFKA_OFFSET_INVALID,
649 "FindCoordinator response error: %s", errstr);
650
651 /* Suppress repeated errors */
652 rkcg->rkcg_last_err = ErrorCode;
653 }
654
655 /* Retries are performed by the timer-intervalled
656 * coord queries, continue querying */
657 rd_kafka_cgrp_set_state(
658 rkcg, RD_KAFKA_CGRP_STATE_QUERY_COORD);
659 }
660
661 rd_kafka_cgrp_serve(rkcg); /* Serve updated state, if possible */
662 }
663
664
665 /**
666 * Query for coordinator.
667 * Ask any broker in state UP
668 *
669 * Locality: main thread
670 */
rd_kafka_cgrp_coord_query(rd_kafka_cgrp_t * rkcg,const char * reason)671 void rd_kafka_cgrp_coord_query (rd_kafka_cgrp_t *rkcg,
672 const char *reason) {
673 rd_kafka_broker_t *rkb;
674 rd_kafka_resp_err_t err;
675
676 rkb = rd_kafka_broker_any_usable(rkcg->rkcg_rk,
677 RD_POLL_NOWAIT,
678 RD_DO_LOCK,
679 RD_KAFKA_FEATURE_BROKER_GROUP_COORD,
680 "coordinator query");
681
682 if (!rkb) {
683 /* Reset the interval because there were no brokers. When a
684 * broker becomes available, we want to query it immediately. */
685 rd_interval_reset(&rkcg->rkcg_coord_query_intvl);
686 rd_kafka_dbg(rkcg->rkcg_rk, CGRP, "CGRPQUERY",
687 "Group \"%.*s\": "
688 "no broker available for coordinator query: %s",
689 RD_KAFKAP_STR_PR(rkcg->rkcg_group_id), reason);
690 return;
691 }
692
693 rd_rkb_dbg(rkb, CGRP, "CGRPQUERY",
694 "Group \"%.*s\": querying for coordinator: %s",
695 RD_KAFKAP_STR_PR(rkcg->rkcg_group_id), reason);
696
697 err = rd_kafka_FindCoordinatorRequest(
698 rkb, RD_KAFKA_COORD_GROUP, rkcg->rkcg_group_id->str,
699 RD_KAFKA_REPLYQ(rkcg->rkcg_ops, 0),
700 rd_kafka_cgrp_handle_FindCoordinator, rkcg);
701
702 if (err) {
703 rd_rkb_dbg(rkb, CGRP, "CGRPQUERY",
704 "Group \"%.*s\": "
705 "unable to send coordinator query: %s",
706 RD_KAFKAP_STR_PR(rkcg->rkcg_group_id),
707 rd_kafka_err2str(err));
708 rd_kafka_broker_destroy(rkb);
709 return;
710 }
711
712 if (rkcg->rkcg_state == RD_KAFKA_CGRP_STATE_QUERY_COORD)
713 rd_kafka_cgrp_set_state(rkcg, RD_KAFKA_CGRP_STATE_WAIT_COORD);
714
715 rd_kafka_broker_destroy(rkb);
716
717 /* Back off the next intervalled query since we just sent one. */
718 rd_interval_reset_to_now(&rkcg->rkcg_coord_query_intvl, 0);
719 }
720
721 /**
722 * @brief Mark the current coordinator as dead.
723 *
724 * @locality main thread
725 */
rd_kafka_cgrp_coord_dead(rd_kafka_cgrp_t * rkcg,rd_kafka_resp_err_t err,const char * reason)726 void rd_kafka_cgrp_coord_dead (rd_kafka_cgrp_t *rkcg, rd_kafka_resp_err_t err,
727 const char *reason) {
728 rd_kafka_dbg(rkcg->rkcg_rk, CGRP, "COORD",
729 "Group \"%.*s\": "
730 "marking the coordinator (%"PRId32") dead: %s: %s",
731 RD_KAFKAP_STR_PR(rkcg->rkcg_group_id),
732 rkcg->rkcg_coord_id, rd_kafka_err2str(err), reason);
733
734 rd_kafka_cgrp_coord_update(rkcg, -1);
735
736 /* Re-query for coordinator */
737 rd_kafka_cgrp_set_state(rkcg, RD_KAFKA_CGRP_STATE_QUERY_COORD);
738 rd_kafka_cgrp_coord_query(rkcg, reason);
739 }
740
741
742 /**
743 * @returns a new reference to the current coordinator, if available, else NULL.
744 *
745 * @locality rdkafka main thread
746 * @locks_required none
747 * @locks_acquired none
748 */
rd_kafka_cgrp_get_coord(rd_kafka_cgrp_t * rkcg)749 rd_kafka_broker_t *rd_kafka_cgrp_get_coord (rd_kafka_cgrp_t *rkcg) {
750 if (rkcg->rkcg_state != RD_KAFKA_CGRP_STATE_UP || !rkcg->rkcg_coord)
751 return NULL;
752
753 rd_kafka_broker_keep(rkcg->rkcg_coord);
754
755 return rkcg->rkcg_coord;
756 }
757
758
759 /**
760 * @brief cgrp handling of LeaveGroup responses
761 * @param opaque must be the cgrp handle.
762 * @locality rdkafka main thread (unless err==ERR__DESTROY)
763 */
rd_kafka_cgrp_handle_LeaveGroup(rd_kafka_t * rk,rd_kafka_broker_t * rkb,rd_kafka_resp_err_t err,rd_kafka_buf_t * rkbuf,rd_kafka_buf_t * request,void * opaque)764 static void rd_kafka_cgrp_handle_LeaveGroup (rd_kafka_t *rk,
765 rd_kafka_broker_t *rkb,
766 rd_kafka_resp_err_t err,
767 rd_kafka_buf_t *rkbuf,
768 rd_kafka_buf_t *request,
769 void *opaque) {
770 rd_kafka_cgrp_t *rkcg = opaque;
771 const int log_decode_errors = LOG_ERR;
772 int16_t ErrorCode = 0;
773
774 if (err) {
775 ErrorCode = err;
776 goto err;
777 }
778
779 if (request->rkbuf_reqhdr.ApiVersion >= 1)
780 rd_kafka_buf_read_throttle_time(rkbuf);
781
782 rd_kafka_buf_read_i16(rkbuf, &ErrorCode);
783
784 err:
785 if (ErrorCode)
786 rd_kafka_dbg(rkb->rkb_rk, CGRP, "LEAVEGROUP",
787 "LeaveGroup response error in state %s: %s",
788 rd_kafka_cgrp_state_names[rkcg->rkcg_state],
789 rd_kafka_err2str(ErrorCode));
790 else
791 rd_kafka_dbg(rkb->rkb_rk, CGRP, "LEAVEGROUP",
792 "LeaveGroup response received in state %s",
793 rd_kafka_cgrp_state_names[rkcg->rkcg_state]);
794
795 if (ErrorCode != RD_KAFKA_RESP_ERR__DESTROY) {
796 rd_assert(thrd_is_current(rk->rk_thread));
797 rkcg->rkcg_flags &= ~RD_KAFKA_CGRP_F_WAIT_LEAVE;
798 rd_kafka_cgrp_try_terminate(rkcg);
799 }
800
801
802
803 return;
804
805 err_parse:
806 ErrorCode = rkbuf->rkbuf_err;
807 goto err;
808 }
809
810
rd_kafka_cgrp_leave(rd_kafka_cgrp_t * rkcg)811 static void rd_kafka_cgrp_leave (rd_kafka_cgrp_t *rkcg) {
812 char *member_id;
813
814 RD_KAFKAP_STR_DUPA(&member_id, rkcg->rkcg_member_id);
815
816 /* Leaving the group invalidates the member id, reset it
817 * now to avoid an ERR_UNKNOWN_MEMBER_ID on the next join. */
818 rd_kafka_cgrp_set_member_id(rkcg, "");
819
820 if (rkcg->rkcg_flags & RD_KAFKA_CGRP_F_WAIT_LEAVE) {
821 rd_kafka_dbg(rkcg->rkcg_rk, CGRP, "LEAVE",
822 "Group \"%.*s\": leave (in state %s): "
823 "LeaveGroupRequest already in-transit",
824 RD_KAFKAP_STR_PR(rkcg->rkcg_group_id),
825 rd_kafka_cgrp_state_names[rkcg->rkcg_state]);
826 return;
827 }
828
829 rd_kafka_dbg(rkcg->rkcg_rk, CGRP, "LEAVE",
830 "Group \"%.*s\": leave (in state %s)",
831 RD_KAFKAP_STR_PR(rkcg->rkcg_group_id),
832 rd_kafka_cgrp_state_names[rkcg->rkcg_state]);
833
834 rkcg->rkcg_flags |= RD_KAFKA_CGRP_F_WAIT_LEAVE;
835
836 if (rkcg->rkcg_state == RD_KAFKA_CGRP_STATE_UP) {
837 rd_rkb_dbg(rkcg->rkcg_curr_coord, CONSUMER, "LEAVE",
838 "Leaving group");
839 rd_kafka_LeaveGroupRequest(rkcg->rkcg_coord,
840 rkcg->rkcg_group_id->str,
841 member_id,
842 RD_KAFKA_REPLYQ(rkcg->rkcg_ops, 0),
843 rd_kafka_cgrp_handle_LeaveGroup,
844 rkcg);
845 } else
846 rd_kafka_cgrp_handle_LeaveGroup(rkcg->rkcg_rk,
847 rkcg->rkcg_coord,
848 RD_KAFKA_RESP_ERR__WAIT_COORD,
849 NULL, NULL, rkcg);
850 }
851
852
853 /**
854 * @brief Leave group, if desired.
855 *
856 * @returns true if a LeaveGroup was issued, else false.
857 */
rd_kafka_cgrp_leave_maybe(rd_kafka_cgrp_t * rkcg)858 static rd_bool_t rd_kafka_cgrp_leave_maybe (rd_kafka_cgrp_t *rkcg) {
859
860 /* We were not instructed to leave in the first place. */
861 if (!(rkcg->rkcg_flags & RD_KAFKA_CGRP_F_LEAVE_ON_UNASSIGN_DONE))
862 return rd_false;
863
864 rkcg->rkcg_flags &= ~RD_KAFKA_CGRP_F_LEAVE_ON_UNASSIGN_DONE;
865
866 /* Don't send Leave when termating with NO_CONSUMER_CLOSE flag */
867 if (rd_kafka_destroy_flags_no_consumer_close(rkcg->rkcg_rk))
868 return rd_false;
869
870 /* KIP-345: Static group members must not send a LeaveGroupRequest
871 * on termination. */
872 if (RD_KAFKA_CGRP_IS_STATIC_MEMBER(rkcg) &&
873 rkcg->rkcg_flags & RD_KAFKA_CGRP_F_TERMINATE)
874 return rd_false;
875
876 rd_kafka_cgrp_leave(rkcg);
877
878 return rd_true;
879 }
880
881
882 /**
883 * @brief Enqueues a rebalance op, delegating responsibility of calling
884 * incremental_assign / incremental_unassign to the application.
885 * If there is no rebalance handler configured, or the action
886 * should not be delegated to the application for some other
887 * reason, incremental_assign / incremental_unassign will be called
888 * automatically, immediately.
889 *
890 * @param rejoin whether or not to rejoin the group following completion
891 * of the incremental assign / unassign.
892 *
893 * @remarks does not take ownership of \p partitions.
894 */
895 void
rd_kafka_rebalance_op_incr(rd_kafka_cgrp_t * rkcg,rd_kafka_resp_err_t err,rd_kafka_topic_partition_list_t * partitions,rd_bool_t rejoin,const char * reason)896 rd_kafka_rebalance_op_incr (rd_kafka_cgrp_t *rkcg,
897 rd_kafka_resp_err_t err,
898 rd_kafka_topic_partition_list_t *partitions,
899 rd_bool_t rejoin,
900 const char *reason) {
901 rd_kafka_error_t *error;
902
903 /* Flag to rejoin after completion of the incr_assign or incr_unassign,
904 if required. */
905 rkcg->rkcg_rebalance_rejoin = rejoin;
906
907 rd_kafka_wrlock(rkcg->rkcg_rk);
908 rkcg->rkcg_c.ts_rebalance = rd_clock();
909 rkcg->rkcg_c.rebalance_cnt++;
910 rd_kafka_wrunlock(rkcg->rkcg_rk);
911
912 if (rd_kafka_destroy_flags_no_consumer_close(rkcg->rkcg_rk) ||
913 rd_kafka_fatal_error_code(rkcg->rkcg_rk)) {
914 /* Total unconditional unassign in these cases */
915 rd_kafka_cgrp_unassign(rkcg);
916
917 /* Now serve the assignment to make updates */
918 rd_kafka_assignment_serve(rkcg->rkcg_rk);
919 goto done;
920 }
921
922 rd_kafka_cgrp_set_join_state(
923 rkcg,
924 err == RD_KAFKA_RESP_ERR__ASSIGN_PARTITIONS ?
925 RD_KAFKA_CGRP_JOIN_STATE_WAIT_ASSIGN_CALL :
926 RD_KAFKA_CGRP_JOIN_STATE_WAIT_UNASSIGN_CALL);
927
928 /* Schedule application rebalance callback/event if enabled */
929 if (rkcg->rkcg_rk->rk_conf.enabled_events & RD_KAFKA_EVENT_REBALANCE) {
930 rd_kafka_op_t *rko;
931
932 rd_kafka_dbg(rkcg->rkcg_rk, CGRP, "ASSIGN",
933 "Group \"%s\": delegating incremental %s of %d "
934 "partition(s) to application on queue %s: %s",
935 rkcg->rkcg_group_id->str,
936 err == RD_KAFKA_RESP_ERR__REVOKE_PARTITIONS ?
937 "revoke" : "assign", partitions->cnt,
938 rd_kafka_q_dest_name(rkcg->rkcg_q), reason);
939
940 /* Pause currently assigned partitions while waiting for
941 * rebalance callback to get called to make sure the
942 * application will not receive any more messages that
943 * might block it from serving the rebalance callback
944 * and to not process messages for partitions it
945 * might have lost in the rebalance. */
946 rd_kafka_assignment_pause(rkcg->rkcg_rk,
947 "incremental rebalance");
948
949 rko = rd_kafka_op_new(RD_KAFKA_OP_REBALANCE);
950 rko->rko_err = err;
951 rko->rko_u.rebalance.partitions =
952 rd_kafka_topic_partition_list_copy(partitions);
953
954 if (rd_kafka_q_enq(rkcg->rkcg_q, rko))
955 goto done; /* Rebalance op successfully enqueued */
956
957 rd_kafka_dbg(rkcg->rkcg_rk, CGRP, "CGRP",
958 "Group \"%s\": ops queue is disabled, not "
959 "delegating partition %s to application",
960 rkcg->rkcg_group_id->str,
961 err == RD_KAFKA_RESP_ERR__REVOKE_PARTITIONS ?
962 "unassign" : "assign");
963 /* FALLTHRU */
964 }
965
966 /* No application rebalance callback/event handler, or it is not
967 * available, do the assign/unassign ourselves.
968 * We need to be careful here not to trigger assignment_serve()
969 * since it may call into the cgrp code again, in which case we
970 * can't really track what the outcome state will be. */
971
972 if (err == RD_KAFKA_RESP_ERR__ASSIGN_PARTITIONS)
973 error = rd_kafka_cgrp_incremental_assign(rkcg, partitions);
974 else
975 error = rd_kafka_cgrp_incremental_unassign(rkcg, partitions);
976
977 if (error) {
978 rd_kafka_log(rkcg->rkcg_rk, LOG_ERR, "REBALANCE",
979 "Group \"%s\": internal incremental %s "
980 "of %d partition(s) failed: %s: "
981 "unassigning all partitions and rejoining",
982 rkcg->rkcg_group_id->str,
983 err == RD_KAFKA_RESP_ERR__REVOKE_PARTITIONS ?
984 "unassign" : "assign",
985 partitions->cnt,
986 rd_kafka_error_string(error));
987 rd_kafka_error_destroy(error);
988
989 rd_kafka_cgrp_set_join_state(rkcg,
990 /* This is a clean state for
991 * assignment_done() to rejoin
992 * from. */
993 RD_KAFKA_CGRP_JOIN_STATE_STEADY);
994 rd_kafka_assignment_clear(rkcg->rkcg_rk);
995 }
996
997 /* Now serve the assignment to make updates */
998 rd_kafka_assignment_serve(rkcg->rkcg_rk);
999
1000 done:
1001 /* Update the current group assignment based on the
1002 * added/removed partitions. */
1003 rd_kafka_cgrp_group_assignment_modify(
1004 rkcg,
1005 err == RD_KAFKA_RESP_ERR__ASSIGN_PARTITIONS,
1006 partitions);
1007 }
1008
1009
1010 /**
1011 * @brief Enqueues a rebalance op, delegating responsibility of calling
1012 * assign / unassign to the application. If there is no rebalance
1013 * handler configured, or the action should not be delegated to the
1014 * application for some other reason, assign / unassign will be
1015 * called automatically.
1016 *
1017 * @remarks \p partitions is copied.
1018 */
1019 static void
rd_kafka_rebalance_op(rd_kafka_cgrp_t * rkcg,rd_kafka_resp_err_t err,rd_kafka_topic_partition_list_t * assignment,const char * reason)1020 rd_kafka_rebalance_op (rd_kafka_cgrp_t *rkcg,
1021 rd_kafka_resp_err_t err,
1022 rd_kafka_topic_partition_list_t *assignment,
1023 const char *reason) {
1024 rd_kafka_error_t *error;
1025
1026 rd_kafka_wrlock(rkcg->rkcg_rk);
1027 rkcg->rkcg_c.ts_rebalance = rd_clock();
1028 rkcg->rkcg_c.rebalance_cnt++;
1029 rd_kafka_wrunlock(rkcg->rkcg_rk);
1030
1031 if (rd_kafka_destroy_flags_no_consumer_close(rkcg->rkcg_rk) ||
1032 rd_kafka_fatal_error_code(rkcg->rkcg_rk)) {
1033 /* Unassign */
1034 rd_kafka_cgrp_unassign(rkcg);
1035
1036 /* Now serve the assignment to make updates */
1037 rd_kafka_assignment_serve(rkcg->rkcg_rk);
1038 goto done;
1039 }
1040
1041 rd_assert(assignment != NULL);
1042
1043 rd_kafka_cgrp_set_join_state(
1044 rkcg,
1045 err == RD_KAFKA_RESP_ERR__ASSIGN_PARTITIONS ?
1046 RD_KAFKA_CGRP_JOIN_STATE_WAIT_ASSIGN_CALL :
1047 RD_KAFKA_CGRP_JOIN_STATE_WAIT_UNASSIGN_CALL);
1048
1049 /* Schedule application rebalance callback/event if enabled */
1050 if (rkcg->rkcg_rk->rk_conf.enabled_events & RD_KAFKA_EVENT_REBALANCE) {
1051 rd_kafka_op_t *rko;
1052
1053 rd_kafka_dbg(rkcg->rkcg_rk, CGRP, "ASSIGN",
1054 "Group \"%s\": delegating %s of %d partition(s) "
1055 "to application on queue %s: %s",
1056 rkcg->rkcg_group_id->str,
1057 err == RD_KAFKA_RESP_ERR__REVOKE_PARTITIONS ?
1058 "revoke":"assign", assignment->cnt,
1059 rd_kafka_q_dest_name(rkcg->rkcg_q), reason);
1060
1061 /* Pause currently assigned partitions while waiting for
1062 * rebalance callback to get called to make sure the
1063 * application will not receive any more messages that
1064 * might block it from serving the rebalance callback
1065 * and to not process messages for partitions it
1066 * might have lost in the rebalance. */
1067 rd_kafka_assignment_pause(rkcg->rkcg_rk, "rebalance");
1068
1069 rko = rd_kafka_op_new(RD_KAFKA_OP_REBALANCE);
1070 rko->rko_err = err;
1071 rko->rko_u.rebalance.partitions =
1072 rd_kafka_topic_partition_list_copy(assignment);
1073
1074 if (rd_kafka_q_enq(rkcg->rkcg_q, rko))
1075 goto done; /* Rebalance op successfully enqueued */
1076
1077 rd_kafka_dbg(rkcg->rkcg_rk, CGRP, "CGRP",
1078 "Group \"%s\": ops queue is disabled, not "
1079 "delegating partition %s to application",
1080 rkcg->rkcg_group_id->str,
1081 err == RD_KAFKA_RESP_ERR__REVOKE_PARTITIONS ?
1082 "unassign" : "assign");
1083
1084 /* FALLTHRU */
1085 }
1086
1087 /* No application rebalance callback/event handler, or it is not
1088 * available, do the assign/unassign ourselves.
1089 * We need to be careful here not to trigger assignment_serve()
1090 * since it may call into the cgrp code again, in which case we
1091 * can't really track what the outcome state will be. */
1092
1093 if (err == RD_KAFKA_RESP_ERR__ASSIGN_PARTITIONS)
1094 error = rd_kafka_cgrp_assign(rkcg, assignment);
1095 else
1096 error = rd_kafka_cgrp_unassign(rkcg);
1097
1098 if (error) {
1099 rd_kafka_log(rkcg->rkcg_rk, LOG_ERR, "REBALANCE",
1100 "Group \"%s\": internal %s "
1101 "of %d partition(s) failed: %s: "
1102 "unassigning all partitions and rejoining",
1103 rkcg->rkcg_group_id->str,
1104 err == RD_KAFKA_RESP_ERR__REVOKE_PARTITIONS ?
1105 "unassign" : "assign",
1106 rkcg->rkcg_group_assignment->cnt,
1107 rd_kafka_error_string(error));
1108 rd_kafka_error_destroy(error);
1109
1110 rd_kafka_cgrp_set_join_state(rkcg,
1111 /* This is a clean state for
1112 * assignment_done() to rejoin
1113 * from. */
1114 RD_KAFKA_CGRP_JOIN_STATE_STEADY);
1115 rd_kafka_assignment_clear(rkcg->rkcg_rk);
1116 }
1117
1118 /* Now serve the assignment to make updates */
1119 rd_kafka_assignment_serve(rkcg->rkcg_rk);
1120
1121 done:
1122 if (err == RD_KAFKA_RESP_ERR__ASSIGN_PARTITIONS)
1123 rd_kafka_cgrp_group_assignment_set(rkcg, assignment);
1124 else
1125 rd_kafka_cgrp_group_assignment_set(rkcg, NULL);
1126 }
1127
1128
1129 /**
1130 * @brief Rejoin the group.
1131 *
1132 * @remark This function must not have any side-effects but setting the
1133 * join state.
1134 */
1135 static void rd_kafka_cgrp_rejoin (rd_kafka_cgrp_t *rkcg, const char *fmt, ...)
1136 RD_FORMAT(printf, 2, 3);
1137
rd_kafka_cgrp_rejoin(rd_kafka_cgrp_t * rkcg,const char * fmt,...)1138 static void rd_kafka_cgrp_rejoin (rd_kafka_cgrp_t *rkcg, const char *fmt, ...) {
1139 char reason[512];
1140 va_list ap;
1141 char astr[128];
1142
1143 va_start(ap, fmt);
1144 rd_vsnprintf(reason, sizeof(reason), fmt, ap);
1145 va_end(ap);
1146
1147 if (rkcg->rkcg_group_assignment)
1148 rd_snprintf(astr, sizeof(astr), " with %d owned partition(s)",
1149 rkcg->rkcg_group_assignment->cnt);
1150 else
1151 rd_snprintf(astr, sizeof(astr), " without an assignment");
1152
1153 if (rkcg->rkcg_subscription || rkcg->rkcg_next_subscription) {
1154 rd_kafka_dbg(rkcg->rkcg_rk, CONSUMER|RD_KAFKA_DBG_CGRP,
1155 "REJOIN",
1156 "Group \"%s\": %s group%s: %s",
1157 rkcg->rkcg_group_id->str,
1158 rkcg->rkcg_join_state ==
1159 RD_KAFKA_CGRP_JOIN_STATE_INIT ?
1160 "Joining" : "Rejoining",
1161 astr, reason);
1162 } else {
1163 rd_kafka_dbg(rkcg->rkcg_rk,CONSUMER|RD_KAFKA_DBG_CGRP,
1164 "NOREJOIN",
1165 "Group \"%s\": Not %s group%s: %s: "
1166 "no subscribed topics",
1167 rkcg->rkcg_group_id->str,
1168 rkcg->rkcg_join_state ==
1169 RD_KAFKA_CGRP_JOIN_STATE_INIT ?
1170 "joining" : "rejoining",
1171 astr, reason);
1172
1173 rd_kafka_cgrp_leave_maybe(rkcg);
1174 }
1175
1176 rd_kafka_cgrp_set_join_state(rkcg, RD_KAFKA_CGRP_JOIN_STATE_INIT);
1177 }
1178
1179
1180 /**
1181 * @brief Collect all assigned or owned partitions from group members.
1182 * The member field of each result element is set to the associated
1183 * group member. The members_match field is set to rd_false.
1184 *
1185 * @param members Array of group members.
1186 * @param member_cnt Number of elements in members.
1187 * @param par_cnt The total number of partitions expected to be collected.
1188 * @param collect_owned If rd_true, rkgm_owned partitions will be collected,
1189 * else rkgm_assignment partitions will be collected.
1190 */
1191 static map_toppar_member_info_t *
rd_kafka_collect_partitions(const rd_kafka_group_member_t * members,size_t member_cnt,size_t par_cnt,rd_bool_t collect_owned)1192 rd_kafka_collect_partitions (const rd_kafka_group_member_t *members,
1193 size_t member_cnt,
1194 size_t par_cnt,
1195 rd_bool_t collect_owned) {
1196 size_t i;
1197 map_toppar_member_info_t *collected = rd_calloc(1, sizeof(*collected));
1198
1199 RD_MAP_INIT(
1200 collected,
1201 par_cnt,
1202 rd_kafka_topic_partition_cmp,
1203 rd_kafka_topic_partition_hash,
1204 rd_kafka_topic_partition_destroy_free,
1205 PartitionMemberInfo_free);
1206
1207 for (i = 0 ; i<member_cnt ; i++) {
1208 size_t j;
1209 const rd_kafka_group_member_t *rkgm = &members[i];
1210 const rd_kafka_topic_partition_list_t *toppars = collect_owned
1211 ? rkgm->rkgm_owned
1212 : rkgm->rkgm_assignment;
1213
1214 for (j = 0; j<(size_t)toppars->cnt; j++) {
1215 rd_kafka_topic_partition_t *rktpar =
1216 rd_kafka_topic_partition_copy(
1217 &toppars->elems[j]);
1218 PartitionMemberInfo_t *pmi =
1219 PartitionMemberInfo_new(rkgm, rd_false);
1220 RD_MAP_SET(collected, rktpar, pmi);
1221 }
1222 }
1223
1224 return collected;
1225 }
1226
1227
1228 /**
1229 * @brief Set intersection. Returns a set of all elements of \p a that
1230 * are also elements of \p b. Additionally, compares the members
1231 * field of matching elements from \p a and \p b and if not NULL
1232 * and equal, sets the members_match field in the result element
1233 * to rd_true and the member field to equal that of the elements,
1234 * else sets the members_match field to rd_false and member field
1235 * to NULL.
1236 */
1237 static map_toppar_member_info_t *
rd_kafka_member_partitions_intersect(map_toppar_member_info_t * a,map_toppar_member_info_t * b)1238 rd_kafka_member_partitions_intersect (
1239 map_toppar_member_info_t *a,
1240 map_toppar_member_info_t *b) {
1241 const rd_kafka_topic_partition_t *key;
1242 const PartitionMemberInfo_t *a_v;
1243 map_toppar_member_info_t *intersection =
1244 rd_calloc(1, sizeof(*intersection));
1245
1246 RD_MAP_INIT(
1247 intersection,
1248 RD_MIN(a ? RD_MAP_CNT(a) : 1, b ? RD_MAP_CNT(b) : 1),
1249 rd_kafka_topic_partition_cmp,
1250 rd_kafka_topic_partition_hash,
1251 rd_kafka_topic_partition_destroy_free,
1252 PartitionMemberInfo_free);
1253
1254 if (!a || !b)
1255 return intersection;
1256
1257 RD_MAP_FOREACH(key, a_v, a) {
1258 rd_bool_t members_match;
1259 const PartitionMemberInfo_t *b_v = RD_MAP_GET(b, key);
1260
1261 if (b_v == NULL)
1262 continue;
1263
1264 members_match =
1265 a_v->member &&
1266 b_v->member &&
1267 rd_kafka_group_member_cmp(a_v->member,
1268 b_v->member) == 0;
1269
1270 RD_MAP_SET(intersection,
1271 rd_kafka_topic_partition_copy(key),
1272 PartitionMemberInfo_new(
1273 b_v->member,
1274 members_match));
1275 }
1276
1277 return intersection;
1278 }
1279
1280
1281 /**
1282 * @brief Set subtraction. Returns a set of all elements of \p a
1283 * that are not elements of \p b. Sets the member field in
1284 * elements in the returned set to equal that of the
1285 * corresponding element in \p a
1286 */
1287 static map_toppar_member_info_t *
rd_kafka_member_partitions_subtract(map_toppar_member_info_t * a,map_toppar_member_info_t * b)1288 rd_kafka_member_partitions_subtract (
1289 map_toppar_member_info_t *a,
1290 map_toppar_member_info_t *b) {
1291 const rd_kafka_topic_partition_t *key;
1292 const PartitionMemberInfo_t *a_v;
1293 map_toppar_member_info_t *difference =
1294 rd_calloc(1, sizeof(*difference));
1295
1296 RD_MAP_INIT(
1297 difference,
1298 a ? RD_MAP_CNT(a) : 1,
1299 rd_kafka_topic_partition_cmp,
1300 rd_kafka_topic_partition_hash,
1301 rd_kafka_topic_partition_destroy_free,
1302 PartitionMemberInfo_free);
1303
1304 if (!a)
1305 return difference;
1306
1307 RD_MAP_FOREACH(key, a_v, a) {
1308 const PartitionMemberInfo_t *b_v = b ? RD_MAP_GET(b, key)
1309 : NULL;
1310
1311 if (!b_v)
1312 RD_MAP_SET(difference,
1313 rd_kafka_topic_partition_copy(key),
1314 PartitionMemberInfo_new(a_v->member,
1315 rd_false));
1316 }
1317
1318 return difference;
1319 }
1320
1321
1322 /**
1323 * @brief Adjust the partition assignment as provided by the assignor
1324 * according to the COOPERATIVE protocol.
1325 */
rd_kafka_cooperative_protocol_adjust_assignment(rd_kafka_cgrp_t * rkcg,rd_kafka_group_member_t * members,int member_cnt)1326 static void rd_kafka_cooperative_protocol_adjust_assignment (
1327 rd_kafka_cgrp_t *rkcg,
1328 rd_kafka_group_member_t *members,
1329 int member_cnt) {
1330
1331 /* https://cwiki.apache.org/confluence/display/KAFKA/KIP-429%3A+Kafk\
1332 a+Consumer+Incremental+Rebalance+Protocol */
1333
1334 int i;
1335 int expected_max_assignment_size;
1336 int total_assigned = 0;
1337 int not_revoking = 0;
1338 size_t par_cnt = 0;
1339 const rd_kafka_topic_partition_t *toppar;
1340 const PartitionMemberInfo_t *pmi;
1341 map_toppar_member_info_t *assigned;
1342 map_toppar_member_info_t *owned;
1343 map_toppar_member_info_t *maybe_revoking;
1344 map_toppar_member_info_t *ready_to_migrate;
1345 map_toppar_member_info_t *unknown_but_owned;
1346
1347 for (i = 0 ; i<member_cnt ; i++)
1348 par_cnt += members[i].rkgm_owned->cnt;
1349
1350 assigned = rd_kafka_collect_partitions(members,
1351 member_cnt,
1352 par_cnt,
1353 rd_false/*assigned*/);
1354
1355 owned = rd_kafka_collect_partitions(members,
1356 member_cnt,
1357 par_cnt,
1358 rd_true/*owned*/);
1359
1360 rd_kafka_dbg(rkcg->rkcg_rk, CGRP, "CGRP",
1361 "Group \"%s\": Partitions owned by members: %d, "
1362 "partitions assigned by assignor: %d",
1363 rkcg->rkcg_group_id->str,
1364 (int)RD_MAP_CNT(owned), (int)RD_MAP_CNT(assigned));
1365
1366 /* Still owned by some members */
1367 maybe_revoking =
1368 rd_kafka_member_partitions_intersect(assigned,
1369 owned);
1370
1371 /* Not previously owned by anyone */
1372 ready_to_migrate =
1373 rd_kafka_member_partitions_subtract(assigned,
1374 owned);
1375
1376 /* Don't exist in assigned partitions */
1377 unknown_but_owned =
1378 rd_kafka_member_partitions_subtract(owned,
1379 assigned);
1380
1381 /* Rough guess at a size that is a bit higher than
1382 * the maximum number of partitions likely to be
1383 * assigned to any partition. */
1384 expected_max_assignment_size =
1385 (int)(RD_MAP_CNT(assigned) / member_cnt) + 4;
1386
1387 for (i = 0 ; i < member_cnt ; i++) {
1388 rd_kafka_group_member_t *rkgm = &members[i];
1389 rd_kafka_topic_partition_list_destroy(
1390 rkgm->rkgm_assignment);
1391
1392 rkgm->rkgm_assignment =
1393 rd_kafka_topic_partition_list_new(
1394 expected_max_assignment_size);
1395 }
1396
1397 /* For maybe-revoking-partitions, check if the owner has
1398 * changed. If yes, exclude them from the assigned-partitions
1399 * list to the new owner. The old owner will realize it does
1400 * not own it any more, revoke it and then trigger another
1401 * rebalance for these partitions to finally be reassigned.
1402 */
1403 RD_MAP_FOREACH(toppar, pmi, maybe_revoking) {
1404 if (!pmi->members_match)
1405 /* Owner has changed. */
1406 continue;
1407
1408 /* Owner hasn't changed. */
1409 rd_kafka_topic_partition_list_add(
1410 pmi->member->rkgm_assignment,
1411 toppar->topic,
1412 toppar->partition);
1413
1414 total_assigned++;
1415 not_revoking++;
1416 }
1417
1418 /* For ready-to-migrate-partitions, it is safe to move them
1419 * to the new member immediately since we know no one owns
1420 * it before, and hence we can encode the owner from the
1421 * newly-assigned-partitions directly.
1422 */
1423 RD_MAP_FOREACH(toppar, pmi, ready_to_migrate) {
1424 rd_kafka_topic_partition_list_add(
1425 pmi->member->rkgm_assignment,
1426 toppar->topic,
1427 toppar->partition);
1428 total_assigned++;
1429 }
1430
1431 /* For unknown-but-owned-partitions, it is also safe to just
1432 * give them back to whoever claimed to be their owners by
1433 * encoding them directly as well. If this is due to a topic
1434 * metadata update, then a later rebalance will be triggered
1435 * anyway.
1436 */
1437 RD_MAP_FOREACH(toppar, pmi, unknown_but_owned) {
1438 rd_kafka_topic_partition_list_add(
1439 pmi->member->rkgm_assignment,
1440 toppar->topic,
1441 toppar->partition);
1442 total_assigned++;
1443 }
1444
1445 rd_kafka_dbg(rkcg->rkcg_rk, CGRP, "CGRP",
1446 "Group \"%s\": COOPERATIVE protocol collection sizes: "
1447 "maybe revoking: %d, ready to migrate: %d, unknown but "
1448 "owned: %d", rkcg->rkcg_group_id->str,
1449 (int)RD_MAP_CNT(maybe_revoking),
1450 (int)RD_MAP_CNT(ready_to_migrate),
1451 (int)RD_MAP_CNT(unknown_but_owned));
1452
1453 rd_kafka_dbg(rkcg->rkcg_rk, CGRP, "CGRP",
1454 "Group \"%s\": %d partitions assigned to consumers",
1455 rkcg->rkcg_group_id->str, total_assigned);
1456
1457 RD_MAP_DESTROY_AND_FREE(maybe_revoking);
1458 RD_MAP_DESTROY_AND_FREE(ready_to_migrate);
1459 RD_MAP_DESTROY_AND_FREE(unknown_but_owned);
1460 RD_MAP_DESTROY_AND_FREE(assigned);
1461 RD_MAP_DESTROY_AND_FREE(owned);
1462 }
1463
1464
1465
1466 /**
1467 * @brief Run group assignment.
1468 */
1469 static void
rd_kafka_cgrp_assignor_run(rd_kafka_cgrp_t * rkcg,rd_kafka_assignor_t * rkas,rd_kafka_resp_err_t err,rd_kafka_metadata_t * metadata,rd_kafka_group_member_t * members,int member_cnt)1470 rd_kafka_cgrp_assignor_run (rd_kafka_cgrp_t *rkcg,
1471 rd_kafka_assignor_t *rkas,
1472 rd_kafka_resp_err_t err,
1473 rd_kafka_metadata_t *metadata,
1474 rd_kafka_group_member_t *members,
1475 int member_cnt) {
1476 char errstr[512];
1477
1478 if (err) {
1479 rd_snprintf(errstr, sizeof(errstr),
1480 "Failed to get cluster metadata: %s",
1481 rd_kafka_err2str(err));
1482 goto err;
1483 }
1484
1485 *errstr = '\0';
1486
1487 /* Run assignor */
1488 err = rd_kafka_assignor_run(rkcg, rkas, metadata,
1489 members, member_cnt,
1490 errstr, sizeof(errstr));
1491
1492 if (err) {
1493 if (!*errstr)
1494 rd_snprintf(errstr, sizeof(errstr), "%s",
1495 rd_kafka_err2str(err));
1496 goto err;
1497 }
1498
1499 rd_kafka_dbg(rkcg->rkcg_rk, CGRP|RD_KAFKA_DBG_CONSUMER, "ASSIGNOR",
1500 "Group \"%s\": \"%s\" assignor run for %d member(s)",
1501 rkcg->rkcg_group_id->str,
1502 rkas->rkas_protocol_name->str,
1503 member_cnt);
1504
1505 if (rkas->rkas_protocol == RD_KAFKA_REBALANCE_PROTOCOL_COOPERATIVE)
1506 rd_kafka_cooperative_protocol_adjust_assignment(rkcg,
1507 members,
1508 member_cnt);
1509
1510 rd_kafka_cgrp_set_join_state(rkcg, RD_KAFKA_CGRP_JOIN_STATE_WAIT_SYNC);
1511
1512 /* Respond to broker with assignment set or error */
1513 rd_kafka_SyncGroupRequest(rkcg->rkcg_coord,
1514 rkcg->rkcg_group_id,
1515 rkcg->rkcg_generation_id,
1516 rkcg->rkcg_member_id,
1517 rkcg->rkcg_group_instance_id,
1518 members, err ? 0 : member_cnt,
1519 RD_KAFKA_REPLYQ(rkcg->rkcg_ops, 0),
1520 rd_kafka_handle_SyncGroup, rkcg);
1521 return;
1522
1523 err:
1524 rd_kafka_log(rkcg->rkcg_rk, LOG_ERR, "ASSIGNOR",
1525 "Group \"%s\": failed to run assignor \"%s\" for "
1526 "%d member(s): %s",
1527 rkcg->rkcg_group_id->str,
1528 rkas->rkas_protocol_name->str,
1529 member_cnt, errstr);
1530
1531 rd_kafka_cgrp_rejoin(rkcg, "%s assignor failed: %s",
1532 rkas->rkas_protocol_name->str, errstr);
1533 }
1534
1535
1536
1537 /**
1538 * @brief Op callback from handle_JoinGroup
1539 */
1540 static rd_kafka_op_res_t
rd_kafka_cgrp_assignor_handle_Metadata_op(rd_kafka_t * rk,rd_kafka_q_t * rkq,rd_kafka_op_t * rko)1541 rd_kafka_cgrp_assignor_handle_Metadata_op (rd_kafka_t *rk,
1542 rd_kafka_q_t *rkq,
1543 rd_kafka_op_t *rko) {
1544 rd_kafka_cgrp_t *rkcg = rk->rk_cgrp;
1545
1546 if (rko->rko_err == RD_KAFKA_RESP_ERR__DESTROY)
1547 return RD_KAFKA_OP_RES_HANDLED; /* Terminating */
1548
1549 if (rkcg->rkcg_join_state != RD_KAFKA_CGRP_JOIN_STATE_WAIT_METADATA)
1550 return RD_KAFKA_OP_RES_HANDLED; /* From outdated state */
1551
1552 if (!rkcg->rkcg_group_leader.members) {
1553 rd_kafka_dbg(rk, CGRP, "GRPLEADER",
1554 "Group \"%.*s\": no longer leader: "
1555 "not running assignor",
1556 RD_KAFKAP_STR_PR(rkcg->rkcg_group_id));
1557 return RD_KAFKA_OP_RES_HANDLED;
1558 }
1559
1560 rd_kafka_cgrp_assignor_run(rkcg,
1561 rkcg->rkcg_assignor,
1562 rko->rko_err, rko->rko_u.metadata.md,
1563 rkcg->rkcg_group_leader.members,
1564 rkcg->rkcg_group_leader.member_cnt);
1565
1566 return RD_KAFKA_OP_RES_HANDLED;
1567 }
1568
1569
1570 /**
1571 * Parse single JoinGroup.Members.MemberMetadata for "consumer" ProtocolType
1572 *
1573 * Protocol definition:
1574 * https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Client-side+Assignment+Proposal
1575 *
1576 * Returns 0 on success or -1 on error.
1577 */
1578 static int
rd_kafka_group_MemberMetadata_consumer_read(rd_kafka_broker_t * rkb,rd_kafka_group_member_t * rkgm,const rd_kafkap_bytes_t * MemberMetadata)1579 rd_kafka_group_MemberMetadata_consumer_read (
1580 rd_kafka_broker_t *rkb, rd_kafka_group_member_t *rkgm,
1581 const rd_kafkap_bytes_t *MemberMetadata) {
1582
1583 rd_kafka_buf_t *rkbuf;
1584 int16_t Version;
1585 int32_t subscription_cnt;
1586 rd_kafkap_bytes_t UserData;
1587 const int log_decode_errors = LOG_ERR;
1588 rd_kafka_resp_err_t err = RD_KAFKA_RESP_ERR__BAD_MSG;
1589
1590 /* Create a shadow-buffer pointing to the metadata to ease parsing. */
1591 rkbuf = rd_kafka_buf_new_shadow(MemberMetadata->data,
1592 RD_KAFKAP_BYTES_LEN(MemberMetadata),
1593 NULL);
1594
1595 rd_kafka_buf_read_i16(rkbuf, &Version);
1596 rd_kafka_buf_read_i32(rkbuf, &subscription_cnt);
1597
1598 if (subscription_cnt > 10000 || subscription_cnt <= 0)
1599 goto err;
1600
1601 rkgm->rkgm_subscription =
1602 rd_kafka_topic_partition_list_new(subscription_cnt);
1603
1604 while (subscription_cnt-- > 0) {
1605 rd_kafkap_str_t Topic;
1606 char *topic_name;
1607 rd_kafka_buf_read_str(rkbuf, &Topic);
1608 RD_KAFKAP_STR_DUPA(&topic_name, &Topic);
1609 rd_kafka_topic_partition_list_add(rkgm->rkgm_subscription,
1610 topic_name,
1611 RD_KAFKA_PARTITION_UA);
1612 }
1613
1614 rd_kafka_buf_read_bytes(rkbuf, &UserData);
1615 rkgm->rkgm_userdata = rd_kafkap_bytes_copy(&UserData);
1616
1617 if (Version >= 1 &&
1618 !(rkgm->rkgm_owned = rd_kafka_buf_read_topic_partitions(
1619 rkbuf, 0, rd_false, rd_false)))
1620 goto err;
1621
1622 rd_kafka_buf_destroy(rkbuf);
1623
1624 return 0;
1625
1626 err_parse:
1627 err = rkbuf->rkbuf_err;
1628
1629 err:
1630 rd_rkb_dbg(rkb, CGRP, "MEMBERMETA",
1631 "Failed to parse MemberMetadata for \"%.*s\": %s",
1632 RD_KAFKAP_STR_PR(rkgm->rkgm_member_id),
1633 rd_kafka_err2str(err));
1634 if (rkgm->rkgm_subscription) {
1635 rd_kafka_topic_partition_list_destroy(rkgm->
1636 rkgm_subscription);
1637 rkgm->rkgm_subscription = NULL;
1638 }
1639
1640 rd_kafka_buf_destroy(rkbuf);
1641 return -1;
1642 }
1643
1644
1645 /**
1646 * @brief The rebalance protocol currently in use. This will be
1647 * RD_KAFKA_REBALANCE_PROTOCOL_NONE if the consumer has not
1648 * (yet) joined a group, else it will match the rebalance
1649 * protocol of the configured assignor(s).
1650 *
1651 * @locality main thread
1652 */
1653 rd_kafka_rebalance_protocol_t
rd_kafka_cgrp_rebalance_protocol(rd_kafka_cgrp_t * rkcg)1654 rd_kafka_cgrp_rebalance_protocol (rd_kafka_cgrp_t *rkcg) {
1655 if (!rkcg->rkcg_assignor)
1656 return RD_KAFKA_REBALANCE_PROTOCOL_NONE;
1657 return rkcg->rkcg_assignor->rkas_protocol;
1658 }
1659
1660
1661 /**
1662 * @brief cgrp handler for JoinGroup responses
1663 * opaque must be the cgrp handle.
1664 *
1665 * @locality rdkafka main thread (unless ERR__DESTROY: arbitrary thread)
1666 */
rd_kafka_cgrp_handle_JoinGroup(rd_kafka_t * rk,rd_kafka_broker_t * rkb,rd_kafka_resp_err_t err,rd_kafka_buf_t * rkbuf,rd_kafka_buf_t * request,void * opaque)1667 static void rd_kafka_cgrp_handle_JoinGroup (rd_kafka_t *rk,
1668 rd_kafka_broker_t *rkb,
1669 rd_kafka_resp_err_t err,
1670 rd_kafka_buf_t *rkbuf,
1671 rd_kafka_buf_t *request,
1672 void *opaque) {
1673 rd_kafka_cgrp_t *rkcg = opaque;
1674 const int log_decode_errors = LOG_ERR;
1675 int16_t ErrorCode = 0;
1676 int32_t GenerationId;
1677 rd_kafkap_str_t Protocol, LeaderId;
1678 rd_kafkap_str_t MyMemberId = RD_KAFKAP_STR_INITIALIZER;
1679 int32_t member_cnt;
1680 int actions;
1681 int i_am_leader = 0;
1682 rd_kafka_assignor_t *rkas = NULL;
1683
1684 if (err == RD_KAFKA_RESP_ERR__DESTROY ||
1685 rkcg->rkcg_flags & RD_KAFKA_CGRP_F_TERMINATE)
1686 return; /* Terminating */
1687
1688 if (rkcg->rkcg_join_state != RD_KAFKA_CGRP_JOIN_STATE_WAIT_JOIN) {
1689 rd_kafka_dbg(rkb->rkb_rk, CGRP, "JOINGROUP",
1690 "JoinGroup response: discarding outdated request "
1691 "(now in join-state %s)",
1692 rd_kafka_cgrp_join_state_names[rkcg->
1693 rkcg_join_state]);
1694 return;
1695 }
1696
1697 if (err) {
1698 ErrorCode = err;
1699 goto err;
1700 }
1701
1702 if (request->rkbuf_reqhdr.ApiVersion >= 2)
1703 rd_kafka_buf_read_throttle_time(rkbuf);
1704
1705 rd_kafka_buf_read_i16(rkbuf, &ErrorCode);
1706 rd_kafka_buf_read_i32(rkbuf, &GenerationId);
1707 rd_kafka_buf_read_str(rkbuf, &Protocol);
1708 rd_kafka_buf_read_str(rkbuf, &LeaderId);
1709 rd_kafka_buf_read_str(rkbuf, &MyMemberId);
1710 rd_kafka_buf_read_i32(rkbuf, &member_cnt);
1711
1712 if (!ErrorCode && RD_KAFKAP_STR_IS_NULL(&Protocol)) {
1713 /* Protocol not set, we will not be able to find
1714 * a matching assignor so error out early. */
1715 ErrorCode = RD_KAFKA_RESP_ERR__BAD_MSG;
1716 } else if (!ErrorCode) {
1717 char *protocol_name;
1718 RD_KAFKAP_STR_DUPA(&protocol_name, &Protocol);
1719 if (!(rkas = rd_kafka_assignor_find(rkcg->rkcg_rk,
1720 protocol_name)) ||
1721 !rkas->rkas_enabled) {
1722 rd_kafka_dbg(rkb->rkb_rk, CGRP, "JOINGROUP",
1723 "Unsupported assignment strategy \"%s\"",
1724 protocol_name);
1725 if (rkcg->rkcg_assignor) {
1726 if (rkcg->rkcg_assignor->rkas_destroy_state_cb)
1727 rkcg->rkcg_assignor->rkas_destroy_state_cb(
1728 rkcg->rkcg_assignor_state);
1729 rkcg->rkcg_assignor_state = NULL;
1730 rkcg->rkcg_assignor = NULL;
1731 }
1732 ErrorCode = RD_KAFKA_RESP_ERR__UNKNOWN_PROTOCOL;
1733 }
1734 }
1735
1736 rd_kafka_dbg(rkb->rkb_rk, CGRP, "JOINGROUP",
1737 "JoinGroup response: GenerationId %"PRId32", "
1738 "Protocol %.*s, LeaderId %.*s%s, my MemberId %.*s, "
1739 "member metadata count ""%"PRId32": %s",
1740 GenerationId,
1741 RD_KAFKAP_STR_PR(&Protocol),
1742 RD_KAFKAP_STR_PR(&LeaderId),
1743 RD_KAFKAP_STR_LEN(&MyMemberId) &&
1744 !rd_kafkap_str_cmp(&LeaderId, &MyMemberId) ? " (me)" : "",
1745 RD_KAFKAP_STR_PR(&MyMemberId),
1746 member_cnt,
1747 ErrorCode ? rd_kafka_err2str(ErrorCode) : "(no error)");
1748
1749 if (!ErrorCode) {
1750 char *my_member_id;
1751 RD_KAFKAP_STR_DUPA(&my_member_id, &MyMemberId);
1752 rd_kafka_cgrp_set_member_id(rkcg, my_member_id);
1753 rkcg->rkcg_generation_id = GenerationId;
1754 i_am_leader = !rd_kafkap_str_cmp(&LeaderId, &MyMemberId);
1755 } else {
1756 rd_interval_backoff(&rkcg->rkcg_join_intvl, 1000*1000);
1757 goto err;
1758 }
1759
1760 if (rkcg->rkcg_assignor && rkcg->rkcg_assignor != rkas) {
1761 if (rkcg->rkcg_assignor->rkas_destroy_state_cb)
1762 rkcg->rkcg_assignor->rkas_destroy_state_cb(
1763 rkcg->rkcg_assignor_state);
1764 rkcg->rkcg_assignor_state = NULL;
1765 }
1766 rkcg->rkcg_assignor = rkas;
1767
1768 if (i_am_leader) {
1769 rd_kafka_group_member_t *members;
1770 int i;
1771 int sub_cnt = 0;
1772 rd_list_t topics;
1773 rd_kafka_op_t *rko;
1774 rd_kafka_dbg(rkb->rkb_rk, CGRP, "JOINGROUP",
1775 "I am elected leader for group \"%s\" "
1776 "with %"PRId32" member(s)",
1777 rkcg->rkcg_group_id->str, member_cnt);
1778
1779 if (member_cnt > 100000) {
1780 err = RD_KAFKA_RESP_ERR__BAD_MSG;
1781 goto err;
1782 }
1783
1784 rd_list_init(&topics, member_cnt, rd_free);
1785
1786 members = rd_calloc(member_cnt, sizeof(*members));
1787
1788 for (i = 0 ; i < member_cnt ; i++) {
1789 rd_kafkap_str_t MemberId;
1790 rd_kafkap_bytes_t MemberMetadata;
1791 rd_kafka_group_member_t *rkgm;
1792 rd_kafkap_str_t GroupInstanceId = RD_KAFKAP_STR_INITIALIZER;
1793
1794 rd_kafka_buf_read_str(rkbuf, &MemberId);
1795 if (request->rkbuf_reqhdr.ApiVersion >= 5)
1796 rd_kafka_buf_read_str(rkbuf, &GroupInstanceId);
1797 rd_kafka_buf_read_bytes(rkbuf, &MemberMetadata);
1798
1799 rkgm = &members[sub_cnt];
1800 rkgm->rkgm_member_id = rd_kafkap_str_copy(&MemberId);
1801 rkgm->rkgm_group_instance_id =
1802 rd_kafkap_str_copy(&GroupInstanceId);
1803 rd_list_init(&rkgm->rkgm_eligible, 0, NULL);
1804 rkgm->rkgm_generation = -1;
1805
1806 if (rd_kafka_group_MemberMetadata_consumer_read(
1807 rkb, rkgm, &MemberMetadata)) {
1808 /* Failed to parse this member's metadata,
1809 * ignore it. */
1810 } else {
1811 sub_cnt++;
1812 rkgm->rkgm_assignment =
1813 rd_kafka_topic_partition_list_new(
1814 rkgm->rkgm_subscription->cnt);
1815 rd_kafka_topic_partition_list_get_topic_names(
1816 rkgm->rkgm_subscription, &topics,
1817 0/*dont include regex*/);
1818 }
1819
1820 }
1821
1822 /* FIXME: What to do if parsing failed for some/all members?
1823 * It is a sign of incompatibility. */
1824
1825
1826 rd_kafka_cgrp_group_leader_reset(rkcg,
1827 "JoinGroup response clean-up");
1828
1829 rd_kafka_assert(NULL, rkcg->rkcg_group_leader.members == NULL);
1830 rkcg->rkcg_group_leader.members = members;
1831 rkcg->rkcg_group_leader.member_cnt = sub_cnt;
1832
1833 rd_kafka_cgrp_set_join_state(
1834 rkcg, RD_KAFKA_CGRP_JOIN_STATE_WAIT_METADATA);
1835
1836 /* The assignor will need metadata so fetch it asynchronously
1837 * and run the assignor when we get a reply.
1838 * Create a callback op that the generic metadata code
1839 * will trigger when metadata has been parsed. */
1840 rko = rd_kafka_op_new_cb(
1841 rkcg->rkcg_rk, RD_KAFKA_OP_METADATA,
1842 rd_kafka_cgrp_assignor_handle_Metadata_op);
1843 rd_kafka_op_set_replyq(rko, rkcg->rkcg_ops, NULL);
1844
1845 rd_kafka_MetadataRequest(
1846 rkb, &topics,
1847 "partition assignor",
1848 rd_false/*!allow_auto_create*/,
1849 /* cgrp_update=false:
1850 * Since the subscription list may not be identical
1851 * across all members of the group and thus the
1852 * Metadata response may not be identical to this
1853 * consumer's subscription list, we want to
1854 * avoid triggering a rejoin or error propagation
1855 * on receiving the response since some topics
1856 * may be missing. */
1857 rd_false,
1858 rko);
1859 rd_list_destroy(&topics);
1860
1861 } else {
1862 rd_kafka_cgrp_set_join_state(
1863 rkcg, RD_KAFKA_CGRP_JOIN_STATE_WAIT_SYNC);
1864
1865 rd_kafka_SyncGroupRequest(rkb, rkcg->rkcg_group_id,
1866 rkcg->rkcg_generation_id,
1867 rkcg->rkcg_member_id,
1868 rkcg->rkcg_group_instance_id,
1869 NULL, 0,
1870 RD_KAFKA_REPLYQ(rkcg->rkcg_ops, 0),
1871 rd_kafka_handle_SyncGroup, rkcg);
1872
1873 }
1874
1875 err:
1876 actions = rd_kafka_err_action(rkb, ErrorCode, request,
1877 RD_KAFKA_ERR_ACTION_IGNORE,
1878 RD_KAFKA_RESP_ERR_UNKNOWN_MEMBER_ID,
1879
1880 RD_KAFKA_ERR_ACTION_IGNORE,
1881 RD_KAFKA_RESP_ERR_MEMBER_ID_REQUIRED,
1882
1883 RD_KAFKA_ERR_ACTION_IGNORE,
1884 RD_KAFKA_RESP_ERR_ILLEGAL_GENERATION,
1885
1886 RD_KAFKA_ERR_ACTION_PERMANENT,
1887 RD_KAFKA_RESP_ERR_FENCED_INSTANCE_ID,
1888
1889 RD_KAFKA_ERR_ACTION_END);
1890
1891 if (actions & RD_KAFKA_ERR_ACTION_REFRESH) {
1892 /* Re-query for coordinator */
1893 rd_kafka_cgrp_op(rkcg, NULL, RD_KAFKA_NO_REPLYQ,
1894 RD_KAFKA_OP_COORD_QUERY, ErrorCode);
1895 }
1896
1897 /* No need for retries here since the join is intervalled,
1898 * see rkcg_join_intvl */
1899
1900 if (ErrorCode) {
1901 if (ErrorCode == RD_KAFKA_RESP_ERR__DESTROY)
1902 return; /* Termination */
1903
1904 if (ErrorCode == RD_KAFKA_RESP_ERR_FENCED_INSTANCE_ID) {
1905 rd_kafka_set_fatal_error(rkcg->rkcg_rk, ErrorCode,
1906 "Fatal consumer error: %s",
1907 rd_kafka_err2str(ErrorCode));
1908 ErrorCode = RD_KAFKA_RESP_ERR__FATAL;
1909
1910 } else if (actions & RD_KAFKA_ERR_ACTION_PERMANENT)
1911 rd_kafka_consumer_err(rkcg->rkcg_q,
1912 rd_kafka_broker_id(rkb),
1913 ErrorCode, 0, NULL, NULL,
1914 RD_KAFKA_OFFSET_INVALID,
1915 "JoinGroup failed: %s",
1916 rd_kafka_err2str(ErrorCode));
1917
1918 if (ErrorCode == RD_KAFKA_RESP_ERR_UNKNOWN_MEMBER_ID)
1919 rd_kafka_cgrp_set_member_id(rkcg, "");
1920 else if (ErrorCode == RD_KAFKA_RESP_ERR_ILLEGAL_GENERATION)
1921 rkcg->rkcg_generation_id = -1;
1922 else if (ErrorCode == RD_KAFKA_RESP_ERR_MEMBER_ID_REQUIRED) {
1923 /* KIP-394 requires member.id on initial join
1924 * group request */
1925 char *my_member_id;
1926 RD_KAFKAP_STR_DUPA(&my_member_id, &MyMemberId);
1927 rd_kafka_cgrp_set_member_id(rkcg, my_member_id);
1928 /* Skip the join backoff */
1929 rd_interval_reset(&rkcg->rkcg_join_intvl);
1930 }
1931
1932 if (rd_kafka_cgrp_rebalance_protocol(rkcg) ==
1933 RD_KAFKA_REBALANCE_PROTOCOL_COOPERATIVE &&
1934 (ErrorCode == RD_KAFKA_RESP_ERR_ILLEGAL_GENERATION ||
1935 ErrorCode == RD_KAFKA_RESP_ERR_MEMBER_ID_REQUIRED))
1936 rd_kafka_cgrp_revoke_all_rejoin(
1937 rkcg,
1938 rd_true/*assignment is lost*/,
1939 rd_true/*this consumer is initiating*/,
1940 "JoinGroup error");
1941 else
1942 rd_kafka_cgrp_rejoin(rkcg,
1943 "JoinGroup error: %s",
1944 rd_kafka_err2str(ErrorCode));
1945
1946 }
1947
1948 return;
1949
1950 err_parse:
1951 ErrorCode = rkbuf->rkbuf_err;
1952 goto err;
1953 }
1954
1955
1956 /**
1957 * @brief Check subscription against requested Metadata.
1958 */
1959 static rd_kafka_op_res_t
rd_kafka_cgrp_handle_Metadata_op(rd_kafka_t * rk,rd_kafka_q_t * rkq,rd_kafka_op_t * rko)1960 rd_kafka_cgrp_handle_Metadata_op (rd_kafka_t *rk, rd_kafka_q_t *rkq,
1961 rd_kafka_op_t *rko) {
1962 rd_kafka_cgrp_t *rkcg = rk->rk_cgrp;
1963
1964 if (rko->rko_err == RD_KAFKA_RESP_ERR__DESTROY)
1965 return RD_KAFKA_OP_RES_HANDLED; /* Terminating */
1966
1967 rd_kafka_cgrp_metadata_update_check(rkcg, rd_false/*dont rejoin*/);
1968
1969 return RD_KAFKA_OP_RES_HANDLED;
1970 }
1971
1972
1973 /**
1974 * @brief (Async) Refresh metadata (for cgrp's needs)
1975 *
1976 * @returns 1 if metadata refresh was requested, or 0 if metadata is
1977 * up to date, or -1 if no broker is available for metadata requests.
1978 *
1979 * @locks none
1980 * @locality rdkafka main thread
1981 */
rd_kafka_cgrp_metadata_refresh(rd_kafka_cgrp_t * rkcg,int * metadata_agep,const char * reason)1982 static int rd_kafka_cgrp_metadata_refresh (rd_kafka_cgrp_t *rkcg,
1983 int *metadata_agep,
1984 const char *reason) {
1985 rd_kafka_t *rk = rkcg->rkcg_rk;
1986 rd_kafka_op_t *rko;
1987 rd_list_t topics;
1988 rd_kafka_resp_err_t err;
1989
1990 rd_list_init(&topics, 8, rd_free);
1991
1992 /* Insert all non-wildcard topics in cache. */
1993 rd_kafka_metadata_cache_hint_rktparlist(rkcg->rkcg_rk,
1994 rkcg->rkcg_subscription,
1995 NULL, 0/*dont replace*/);
1996
1997 if (rkcg->rkcg_flags & RD_KAFKA_CGRP_F_WILDCARD_SUBSCRIPTION) {
1998 /* For wildcard subscriptions make sure the
1999 * cached full metadata isn't too old. */
2000 int metadata_age = -1;
2001
2002 if (rk->rk_ts_full_metadata)
2003 metadata_age = (int)(rd_clock() -
2004 rk->rk_ts_full_metadata)/1000;
2005
2006 *metadata_agep = metadata_age;
2007
2008 if (metadata_age != -1 &&
2009 metadata_age <= rk->rk_conf.metadata_max_age_ms) {
2010 rd_kafka_dbg(rk, CGRP|RD_KAFKA_DBG_METADATA,
2011 "CGRPMETADATA",
2012 "%s: metadata for wildcard subscription "
2013 "is up to date (%dms old)",
2014 reason, *metadata_agep);
2015 rd_list_destroy(&topics);
2016 return 0; /* Up-to-date */
2017 }
2018
2019 } else {
2020 /* Check that all subscribed topics are in the cache. */
2021 int r;
2022
2023 rd_kafka_topic_partition_list_get_topic_names(
2024 rkcg->rkcg_subscription, &topics, 0/*no regexps*/);
2025
2026 rd_kafka_rdlock(rk);
2027 r = rd_kafka_metadata_cache_topics_count_exists(rk, &topics,
2028 metadata_agep);
2029 rd_kafka_rdunlock(rk);
2030
2031 if (r == rd_list_cnt(&topics)) {
2032 rd_kafka_dbg(rk, CGRP|RD_KAFKA_DBG_METADATA,
2033 "CGRPMETADATA",
2034 "%s: metadata for subscription "
2035 "is up to date (%dms old)", reason,
2036 *metadata_agep);
2037 rd_list_destroy(&topics);
2038 return 0; /* Up-to-date and all topics exist. */
2039 }
2040
2041 rd_kafka_dbg(rk, CGRP|RD_KAFKA_DBG_METADATA,
2042 "CGRPMETADATA",
2043 "%s: metadata for subscription "
2044 "only available for %d/%d topics (%dms old)",
2045 reason, r, rd_list_cnt(&topics), *metadata_agep);
2046 }
2047
2048 /* Async request, result will be triggered from
2049 * rd_kafka_parse_metadata(). */
2050 rko = rd_kafka_op_new_cb(rkcg->rkcg_rk, RD_KAFKA_OP_METADATA,
2051 rd_kafka_cgrp_handle_Metadata_op);
2052 rd_kafka_op_set_replyq(rko, rkcg->rkcg_ops, 0);
2053
2054 err = rd_kafka_metadata_request(rkcg->rkcg_rk, NULL, &topics,
2055 rd_false/*!allow auto create */,
2056 rd_true/*cgrp_update*/,
2057 reason, rko);
2058 if (err) {
2059 rd_kafka_dbg(rk, CGRP|RD_KAFKA_DBG_METADATA,
2060 "CGRPMETADATA",
2061 "%s: need to refresh metadata (%dms old) "
2062 "but no usable brokers available: %s",
2063 reason, *metadata_agep, rd_kafka_err2str(err));
2064 rd_kafka_op_destroy(rko);
2065 }
2066
2067 rd_list_destroy(&topics);
2068
2069 return err ? -1 : 1;
2070 }
2071
2072
2073
rd_kafka_cgrp_join(rd_kafka_cgrp_t * rkcg)2074 static void rd_kafka_cgrp_join (rd_kafka_cgrp_t *rkcg) {
2075 int metadata_age;
2076
2077 if (rkcg->rkcg_state != RD_KAFKA_CGRP_STATE_UP ||
2078 rkcg->rkcg_join_state != RD_KAFKA_CGRP_JOIN_STATE_INIT)
2079 return;
2080
2081 /* On max.poll.interval.ms failure, do not rejoin group until the
2082 * application has called poll. */
2083 if ((rkcg->rkcg_flags & RD_KAFKA_CGRP_F_MAX_POLL_EXCEEDED) &&
2084 rd_kafka_max_poll_exceeded(rkcg->rkcg_rk))
2085 return;
2086
2087 rkcg->rkcg_flags &= ~RD_KAFKA_CGRP_F_MAX_POLL_EXCEEDED;
2088
2089 rd_kafka_dbg(rkcg->rkcg_rk, CGRP, "JOIN",
2090 "Group \"%.*s\": join with %d subscribed topic(s)",
2091 RD_KAFKAP_STR_PR(rkcg->rkcg_group_id),
2092 rd_list_cnt(rkcg->rkcg_subscribed_topics));
2093
2094
2095 /* See if we need to query metadata to continue:
2096 * - if subscription contains wildcards:
2097 * * query all topics in cluster
2098 *
2099 * - if subscription does not contain wildcards but
2100 * some topics are missing from the local metadata cache:
2101 * * query subscribed topics (all cached ones)
2102 *
2103 * - otherwise:
2104 * * rely on topic metadata cache
2105 */
2106 /* We need up-to-date full metadata to continue,
2107 * refresh metadata if necessary. */
2108 if (rd_kafka_cgrp_metadata_refresh(rkcg, &metadata_age,
2109 "consumer join") == 1) {
2110 rd_kafka_dbg(rkcg->rkcg_rk, CGRP|RD_KAFKA_DBG_CONSUMER, "JOIN",
2111 "Group \"%.*s\": "
2112 "postponing join until up-to-date "
2113 "metadata is available",
2114 RD_KAFKAP_STR_PR(rkcg->rkcg_group_id));
2115
2116 rd_assert(rkcg->rkcg_join_state ==
2117 RD_KAFKA_CGRP_JOIN_STATE_INIT ||
2118 /* Possible via rd_kafka_cgrp_modify_subscription */
2119 rkcg->rkcg_join_state ==
2120 RD_KAFKA_CGRP_JOIN_STATE_STEADY);
2121
2122 rd_kafka_cgrp_set_join_state(
2123 rkcg, RD_KAFKA_CGRP_JOIN_STATE_WAIT_METADATA);
2124
2125 return; /* ^ async call */
2126 }
2127
2128 if (rd_list_empty(rkcg->rkcg_subscribed_topics))
2129 rd_kafka_cgrp_metadata_update_check(rkcg,
2130 rd_false/*dont join*/);
2131
2132 if (rd_list_empty(rkcg->rkcg_subscribed_topics)) {
2133 rd_kafka_dbg(rkcg->rkcg_rk, CGRP|RD_KAFKA_DBG_CONSUMER, "JOIN",
2134 "Group \"%.*s\": "
2135 "no matching topics based on %dms old metadata: "
2136 "next metadata refresh in %dms",
2137 RD_KAFKAP_STR_PR(rkcg->rkcg_group_id),
2138 metadata_age,
2139 rkcg->rkcg_rk->rk_conf.
2140 metadata_refresh_interval_ms - metadata_age);
2141 return;
2142 }
2143
2144 rd_rkb_dbg(rkcg->rkcg_curr_coord, CONSUMER, "JOIN",
2145 "Joining group \"%.*s\" with %d subscribed topic(s)",
2146 RD_KAFKAP_STR_PR(rkcg->rkcg_group_id),
2147 rd_list_cnt(rkcg->rkcg_subscribed_topics));
2148
2149 rd_kafka_cgrp_set_join_state(rkcg, RD_KAFKA_CGRP_JOIN_STATE_WAIT_JOIN);
2150 rd_kafka_JoinGroupRequest(rkcg->rkcg_coord, rkcg->rkcg_group_id,
2151 rkcg->rkcg_member_id,
2152 rkcg->rkcg_group_instance_id,
2153 rkcg->rkcg_rk->rk_conf.group_protocol_type,
2154 rkcg->rkcg_subscribed_topics,
2155 RD_KAFKA_REPLYQ(rkcg->rkcg_ops, 0),
2156 rd_kafka_cgrp_handle_JoinGroup, rkcg);
2157 }
2158
2159 /**
2160 * Rejoin group on update to effective subscribed topics list
2161 */
rd_kafka_cgrp_revoke_rejoin(rd_kafka_cgrp_t * rkcg,const char * reason)2162 static void rd_kafka_cgrp_revoke_rejoin (rd_kafka_cgrp_t *rkcg,
2163 const char *reason) {
2164 /*
2165 * Clean-up group leader duties, if any.
2166 */
2167 rd_kafka_cgrp_group_leader_reset(rkcg, "group (re)join");
2168
2169 rd_kafka_dbg(rkcg->rkcg_rk, CGRP, "REJOIN",
2170 "Group \"%.*s\" (re)joining in join-state %s "
2171 "with %d assigned partition(s): %s",
2172 RD_KAFKAP_STR_PR(rkcg->rkcg_group_id),
2173 rd_kafka_cgrp_join_state_names[rkcg->rkcg_join_state],
2174 rkcg->rkcg_group_assignment ?
2175 rkcg->rkcg_group_assignment->cnt : 0,
2176 reason);
2177
2178 rd_kafka_cgrp_revoke_all_rejoin(rkcg,
2179 rd_false/*not lost*/,
2180 rd_true/*initiating*/,
2181 reason);
2182 }
2183
2184 /**
2185 * @brief Update the effective list of subscribed topics.
2186 *
2187 * Set \p tinfos to NULL to clear the list.
2188 *
2189 * @param tinfos rd_list_t(rd_kafka_topic_info_t *): new effective topic list
2190 *
2191 * @returns true on change, else false.
2192 *
2193 * @remark Takes ownership of \p tinfos
2194 */
2195 static rd_bool_t
rd_kafka_cgrp_update_subscribed_topics(rd_kafka_cgrp_t * rkcg,rd_list_t * tinfos)2196 rd_kafka_cgrp_update_subscribed_topics (rd_kafka_cgrp_t *rkcg,
2197 rd_list_t *tinfos) {
2198 rd_kafka_topic_info_t *tinfo;
2199 int i;
2200
2201 if (!tinfos) {
2202 if (!rd_list_empty(rkcg->rkcg_subscribed_topics))
2203 rd_kafka_dbg(rkcg->rkcg_rk, CGRP, "SUBSCRIPTION",
2204 "Group \"%.*s\": "
2205 "clearing subscribed topics list (%d)",
2206 RD_KAFKAP_STR_PR(rkcg->rkcg_group_id),
2207 rd_list_cnt(rkcg->rkcg_subscribed_topics));
2208 tinfos = rd_list_new(0, (void *)rd_kafka_topic_info_destroy);
2209
2210 } else {
2211 if (rd_list_cnt(tinfos) == 0)
2212 rd_kafka_dbg(rkcg->rkcg_rk, CGRP, "SUBSCRIPTION",
2213 "Group \"%.*s\": "
2214 "no topics in metadata matched "
2215 "subscription",
2216 RD_KAFKAP_STR_PR(rkcg->rkcg_group_id));
2217 }
2218
2219 /* Sort for comparison */
2220 rd_list_sort(tinfos, rd_kafka_topic_info_cmp);
2221
2222 /* Compare to existing to see if anything changed. */
2223 if (!rd_list_cmp(rkcg->rkcg_subscribed_topics, tinfos,
2224 rd_kafka_topic_info_cmp)) {
2225 /* No change */
2226 rd_list_destroy(tinfos);
2227 return rd_false;
2228 }
2229
2230 rd_kafka_dbg(rkcg->rkcg_rk, CGRP|RD_KAFKA_DBG_METADATA, "SUBSCRIPTION",
2231 "Group \"%.*s\": effective subscription list changed "
2232 "from %d to %d topic(s):",
2233 RD_KAFKAP_STR_PR(rkcg->rkcg_group_id),
2234 rd_list_cnt(rkcg->rkcg_subscribed_topics),
2235 rd_list_cnt(tinfos));
2236
2237 RD_LIST_FOREACH(tinfo, tinfos, i)
2238 rd_kafka_dbg(rkcg->rkcg_rk, CGRP|RD_KAFKA_DBG_METADATA,
2239 "SUBSCRIPTION",
2240 " Topic %s with %d partition(s)",
2241 tinfo->topic, tinfo->partition_cnt);
2242
2243 rd_list_destroy(rkcg->rkcg_subscribed_topics);
2244
2245 rkcg->rkcg_subscribed_topics = tinfos;
2246
2247 return rd_true;
2248 }
2249
2250
2251 /**
2252 * @brief Handle Heartbeat response.
2253 */
rd_kafka_cgrp_handle_Heartbeat(rd_kafka_t * rk,rd_kafka_broker_t * rkb,rd_kafka_resp_err_t err,rd_kafka_buf_t * rkbuf,rd_kafka_buf_t * request,void * opaque)2254 void rd_kafka_cgrp_handle_Heartbeat (rd_kafka_t *rk,
2255 rd_kafka_broker_t *rkb,
2256 rd_kafka_resp_err_t err,
2257 rd_kafka_buf_t *rkbuf,
2258 rd_kafka_buf_t *request,
2259 void *opaque) {
2260 rd_kafka_cgrp_t *rkcg = rk->rk_cgrp;
2261 const int log_decode_errors = LOG_ERR;
2262 int16_t ErrorCode = 0;
2263 int actions = 0;
2264
2265 if (err == RD_KAFKA_RESP_ERR__DESTROY)
2266 return;
2267
2268 rd_dassert(rkcg->rkcg_flags & RD_KAFKA_CGRP_F_HEARTBEAT_IN_TRANSIT);
2269 rkcg->rkcg_flags &= ~RD_KAFKA_CGRP_F_HEARTBEAT_IN_TRANSIT;
2270
2271 rkcg->rkcg_last_heartbeat_err = RD_KAFKA_RESP_ERR_NO_ERROR;
2272
2273 if (err)
2274 goto err;
2275
2276 if (request->rkbuf_reqhdr.ApiVersion >= 1)
2277 rd_kafka_buf_read_throttle_time(rkbuf);
2278
2279 rd_kafka_buf_read_i16(rkbuf, &ErrorCode);
2280 if (ErrorCode) {
2281 err = ErrorCode;
2282 goto err;
2283 }
2284
2285 rd_kafka_cgrp_update_session_timeout(
2286 rkcg, rd_false/*don't update if session has expired*/);
2287
2288 return;
2289
2290 err_parse:
2291 err = rkbuf->rkbuf_err;
2292 err:
2293 rkcg->rkcg_last_heartbeat_err = err;
2294
2295 rd_kafka_dbg(rkcg->rkcg_rk, CGRP, "HEARTBEAT",
2296 "Group \"%s\" heartbeat error response in "
2297 "state %s (join-state %s, %d partition(s) assigned): %s",
2298 rkcg->rkcg_group_id->str,
2299 rd_kafka_cgrp_state_names[rkcg->rkcg_state],
2300 rd_kafka_cgrp_join_state_names[rkcg->rkcg_join_state],
2301 rkcg->rkcg_group_assignment ?
2302 rkcg->rkcg_group_assignment->cnt : 0,
2303 rd_kafka_err2str(err));
2304
2305 if (rkcg->rkcg_join_state <= RD_KAFKA_CGRP_JOIN_STATE_WAIT_SYNC) {
2306 rd_kafka_dbg(rkcg->rkcg_rk, CGRP, "HEARTBEAT",
2307 "Heartbeat response: discarding outdated "
2308 "request (now in join-state %s)",
2309 rd_kafka_cgrp_join_state_names[rkcg->
2310 rkcg_join_state]);
2311 return;
2312 }
2313
2314 switch (err)
2315 {
2316 case RD_KAFKA_RESP_ERR__DESTROY:
2317 /* quick cleanup */
2318 return;
2319
2320 case RD_KAFKA_RESP_ERR_NOT_COORDINATOR_FOR_GROUP:
2321 case RD_KAFKA_RESP_ERR_GROUP_COORDINATOR_NOT_AVAILABLE:
2322 case RD_KAFKA_RESP_ERR__TRANSPORT:
2323 rd_kafka_dbg(rkcg->rkcg_rk, CONSUMER, "HEARTBEAT",
2324 "Heartbeat failed due to coordinator (%s) "
2325 "no longer available: %s: "
2326 "re-querying for coordinator",
2327 rkcg->rkcg_curr_coord ?
2328 rd_kafka_broker_name(rkcg->rkcg_curr_coord) :
2329 "none",
2330 rd_kafka_err2str(err));
2331 /* Remain in joined state and keep querying for coordinator */
2332 actions = RD_KAFKA_ERR_ACTION_REFRESH;
2333 break;
2334
2335 case RD_KAFKA_RESP_ERR_REBALANCE_IN_PROGRESS:
2336 /* No further action if already rebalancing */
2337 if (RD_KAFKA_CGRP_WAIT_ASSIGN_CALL(rkcg))
2338 return;
2339 rd_kafka_cgrp_group_is_rebalancing(rkcg);
2340 return;
2341
2342 case RD_KAFKA_RESP_ERR_UNKNOWN_MEMBER_ID:
2343 rd_kafka_cgrp_set_member_id(rkcg, "");
2344 rd_kafka_cgrp_revoke_all_rejoin_maybe(rkcg,
2345 rd_true/*lost*/,
2346 rd_true/*initiating*/,
2347 "resetting member-id");
2348 return;
2349
2350 case RD_KAFKA_RESP_ERR_ILLEGAL_GENERATION:
2351 rkcg->rkcg_generation_id = -1;
2352 rd_kafka_cgrp_revoke_all_rejoin_maybe(rkcg,
2353 rd_true/*lost*/,
2354 rd_true/*initiating*/,
2355 "illegal generation");
2356 return;
2357
2358 case RD_KAFKA_RESP_ERR_FENCED_INSTANCE_ID:
2359 rd_kafka_set_fatal_error(rkcg->rkcg_rk, err,
2360 "Fatal consumer error: %s",
2361 rd_kafka_err2str(err));
2362 rd_kafka_cgrp_revoke_all_rejoin_maybe(rkcg,
2363 rd_true,/*assignment lost*/
2364 rd_true,/*initiating*/
2365 "consumer fenced by "
2366 "newer instance");
2367 return;
2368
2369 default:
2370 actions = rd_kafka_err_action(rkb, err, request,
2371 RD_KAFKA_ERR_ACTION_END);
2372 break;
2373 }
2374
2375
2376 if (actions & RD_KAFKA_ERR_ACTION_REFRESH) {
2377 /* Re-query for coordinator */
2378 rd_kafka_cgrp_coord_query(rkcg, rd_kafka_err2str(err));
2379 }
2380
2381 if (actions & RD_KAFKA_ERR_ACTION_RETRY &&
2382 rd_kafka_buf_retry(rkb, request)) {
2383 /* Retry */
2384 rkcg->rkcg_flags |= RD_KAFKA_CGRP_F_HEARTBEAT_IN_TRANSIT;
2385 return;
2386 }
2387 }
2388
2389
2390
2391 /**
2392 * @brief Send Heartbeat
2393 */
rd_kafka_cgrp_heartbeat(rd_kafka_cgrp_t * rkcg)2394 static void rd_kafka_cgrp_heartbeat (rd_kafka_cgrp_t *rkcg) {
2395 /* Don't send heartbeats if max.poll.interval.ms was exceeded */
2396 if (rkcg->rkcg_flags & RD_KAFKA_CGRP_F_MAX_POLL_EXCEEDED)
2397 return;
2398
2399 /* Skip heartbeat if we have one in transit */
2400 if (rkcg->rkcg_flags & RD_KAFKA_CGRP_F_HEARTBEAT_IN_TRANSIT)
2401 return;
2402
2403 rkcg->rkcg_flags |= RD_KAFKA_CGRP_F_HEARTBEAT_IN_TRANSIT;
2404 rd_kafka_HeartbeatRequest(rkcg->rkcg_coord, rkcg->rkcg_group_id,
2405 rkcg->rkcg_generation_id,
2406 rkcg->rkcg_member_id,
2407 rkcg->rkcg_group_instance_id,
2408 RD_KAFKA_REPLYQ(rkcg->rkcg_ops, 0),
2409 rd_kafka_cgrp_handle_Heartbeat, NULL);
2410 }
2411
2412 /**
2413 * Cgrp is now terminated: decommission it and signal back to application.
2414 */
rd_kafka_cgrp_terminated(rd_kafka_cgrp_t * rkcg)2415 static void rd_kafka_cgrp_terminated (rd_kafka_cgrp_t *rkcg) {
2416 if (rkcg->rkcg_flags & RD_KAFKA_CGRP_F_TERMINATED)
2417 return; /* terminated() may be called multiple times,
2418 * make sure to only terminate once. */
2419
2420 rd_kafka_cgrp_group_assignment_set(rkcg, NULL);
2421
2422 rd_kafka_assert(NULL, !rd_kafka_assignment_in_progress(rkcg->rkcg_rk));
2423 rd_kafka_assert(NULL, !rkcg->rkcg_group_assignment);
2424 rd_kafka_assert(NULL, rkcg->rkcg_rk->rk_consumer.wait_commit_cnt == 0);
2425 rd_kafka_assert(NULL, rkcg->rkcg_state == RD_KAFKA_CGRP_STATE_TERM);
2426
2427 rd_kafka_timer_stop(&rkcg->rkcg_rk->rk_timers,
2428 &rkcg->rkcg_offset_commit_tmr, 1/*lock*/);
2429
2430 rd_kafka_q_purge(rkcg->rkcg_wait_coord_q);
2431
2432 /* Disable and empty ops queue since there will be no
2433 * (broker) thread serving it anymore after the unassign_broker
2434 * below.
2435 * This prevents hang on destroy where responses are enqueued on rkcg_ops
2436 * without anything serving the queue. */
2437 rd_kafka_q_disable(rkcg->rkcg_ops);
2438 rd_kafka_q_purge(rkcg->rkcg_ops);
2439
2440 if (rkcg->rkcg_curr_coord)
2441 rd_kafka_cgrp_coord_clear_broker(rkcg);
2442
2443 if (rkcg->rkcg_coord) {
2444 rd_kafka_broker_destroy(rkcg->rkcg_coord);
2445 rkcg->rkcg_coord = NULL;
2446 }
2447
2448 if (rkcg->rkcg_reply_rko) {
2449 /* Signal back to application. */
2450 rd_kafka_replyq_enq(&rkcg->rkcg_reply_rko->rko_replyq,
2451 rkcg->rkcg_reply_rko, 0);
2452 rkcg->rkcg_reply_rko = NULL;
2453 }
2454
2455 rkcg->rkcg_flags |= RD_KAFKA_CGRP_F_TERMINATED;
2456 }
2457
2458
2459 /**
2460 * If a cgrp is terminating and all outstanding ops are now finished
2461 * then progress to final termination and return 1.
2462 * Else returns 0.
2463 */
rd_kafka_cgrp_try_terminate(rd_kafka_cgrp_t * rkcg)2464 static RD_INLINE int rd_kafka_cgrp_try_terminate (rd_kafka_cgrp_t *rkcg) {
2465
2466 if (rkcg->rkcg_state == RD_KAFKA_CGRP_STATE_TERM)
2467 return 1;
2468
2469 if (likely(!(rkcg->rkcg_flags & RD_KAFKA_CGRP_F_TERMINATE)))
2470 return 0;
2471
2472 /* Check if wait-coord queue has timed out. */
2473 if (rd_kafka_q_len(rkcg->rkcg_wait_coord_q) > 0 &&
2474 rkcg->rkcg_ts_terminate +
2475 (rkcg->rkcg_rk->rk_conf.group_session_timeout_ms * 1000) <
2476 rd_clock()) {
2477 rd_kafka_dbg(rkcg->rkcg_rk, CGRP, "CGRPTERM",
2478 "Group \"%s\": timing out %d op(s) in "
2479 "wait-for-coordinator queue",
2480 rkcg->rkcg_group_id->str,
2481 rd_kafka_q_len(rkcg->rkcg_wait_coord_q));
2482 rd_kafka_q_disable(rkcg->rkcg_wait_coord_q);
2483 if (rd_kafka_q_concat(rkcg->rkcg_ops,
2484 rkcg->rkcg_wait_coord_q) == -1) {
2485 /* ops queue shut down, purge coord queue */
2486 rd_kafka_q_purge(rkcg->rkcg_wait_coord_q);
2487 }
2488 }
2489
2490 if (!RD_KAFKA_CGRP_WAIT_ASSIGN_CALL(rkcg) &&
2491 rd_list_empty(&rkcg->rkcg_toppars) &&
2492 !rd_kafka_assignment_in_progress(rkcg->rkcg_rk) &&
2493 rkcg->rkcg_rk->rk_consumer.wait_commit_cnt == 0 &&
2494 !(rkcg->rkcg_flags & RD_KAFKA_CGRP_F_WAIT_LEAVE)) {
2495 /* Since we might be deep down in a 'rko' handler
2496 * called from cgrp_op_serve() we cant call terminated()
2497 * directly since it will decommission the rkcg_ops queue
2498 * that might be locked by intermediate functions.
2499 * Instead set the TERM state and let the cgrp terminate
2500 * at its own discretion. */
2501 rd_kafka_cgrp_set_state(rkcg, RD_KAFKA_CGRP_STATE_TERM);
2502
2503 return 1;
2504 } else {
2505 rd_kafka_dbg(rkcg->rkcg_rk, CGRP, "CGRPTERM",
2506 "Group \"%s\": "
2507 "waiting for %s%d toppar(s), "
2508 "%s"
2509 "%d commit(s)%s%s%s (state %s, join-state %s) "
2510 "before terminating",
2511 rkcg->rkcg_group_id->str,
2512 RD_KAFKA_CGRP_WAIT_ASSIGN_CALL(rkcg) ?
2513 "assign call, ": "",
2514 rd_list_cnt(&rkcg->rkcg_toppars),
2515 rd_kafka_assignment_in_progress(rkcg->rkcg_rk) ?
2516 "assignment in progress, " : "",
2517 rkcg->rkcg_rk->rk_consumer.wait_commit_cnt,
2518 (rkcg->rkcg_flags & RD_KAFKA_CGRP_F_WAIT_LEAVE)?
2519 ", wait-leave," : "",
2520 rkcg->rkcg_rebalance_rejoin ?
2521 ", rebalance_rejoin,": "",
2522 (rkcg->rkcg_rebalance_incr_assignment != NULL)?
2523 ", rebalance_incr_assignment,": "",
2524 rd_kafka_cgrp_state_names[rkcg->rkcg_state],
2525 rd_kafka_cgrp_join_state_names[
2526 rkcg->rkcg_join_state]);
2527 return 0;
2528 }
2529 }
2530
2531
2532 /**
2533 * @brief Add partition to this cgrp management
2534 *
2535 * @locks none
2536 */
rd_kafka_cgrp_partition_add(rd_kafka_cgrp_t * rkcg,rd_kafka_toppar_t * rktp)2537 static void rd_kafka_cgrp_partition_add (rd_kafka_cgrp_t *rkcg,
2538 rd_kafka_toppar_t *rktp) {
2539 rd_kafka_dbg(rkcg->rkcg_rk, CGRP,"PARTADD",
2540 "Group \"%s\": add %s [%"PRId32"]",
2541 rkcg->rkcg_group_id->str,
2542 rktp->rktp_rkt->rkt_topic->str,
2543 rktp->rktp_partition);
2544
2545 rd_kafka_toppar_lock(rktp);
2546 rd_assert(!(rktp->rktp_flags & RD_KAFKA_TOPPAR_F_ON_CGRP));
2547 rktp->rktp_flags |= RD_KAFKA_TOPPAR_F_ON_CGRP;
2548 rd_kafka_toppar_unlock(rktp);
2549
2550 rd_kafka_toppar_keep(rktp);
2551 rd_list_add(&rkcg->rkcg_toppars, rktp);
2552 }
2553
2554 /**
2555 * @brief Remove partition from this cgrp management
2556 *
2557 * @locks none
2558 */
rd_kafka_cgrp_partition_del(rd_kafka_cgrp_t * rkcg,rd_kafka_toppar_t * rktp)2559 static void rd_kafka_cgrp_partition_del (rd_kafka_cgrp_t *rkcg,
2560 rd_kafka_toppar_t *rktp) {
2561 rd_kafka_dbg(rkcg->rkcg_rk, CGRP, "PARTDEL",
2562 "Group \"%s\": delete %s [%"PRId32"]",
2563 rkcg->rkcg_group_id->str,
2564 rktp->rktp_rkt->rkt_topic->str,
2565 rktp->rktp_partition);
2566
2567 rd_kafka_toppar_lock(rktp);
2568 rd_assert(rktp->rktp_flags & RD_KAFKA_TOPPAR_F_ON_CGRP);
2569 rktp->rktp_flags &= ~RD_KAFKA_TOPPAR_F_ON_CGRP;
2570 rd_kafka_toppar_unlock(rktp);
2571
2572 rd_list_remove(&rkcg->rkcg_toppars, rktp);
2573
2574 rd_kafka_toppar_destroy(rktp); /* refcnt from _add above */
2575
2576 rd_kafka_cgrp_try_terminate(rkcg);
2577 }
2578
2579
2580
2581
2582 /**
2583 * @brief Defer offset commit (rko) until coordinator is available.
2584 *
2585 * @returns 1 if the rko was deferred or 0 if the defer queue is disabled
2586 * or rko already deferred.
2587 */
rd_kafka_cgrp_defer_offset_commit(rd_kafka_cgrp_t * rkcg,rd_kafka_op_t * rko,const char * reason)2588 static int rd_kafka_cgrp_defer_offset_commit (rd_kafka_cgrp_t *rkcg,
2589 rd_kafka_op_t *rko,
2590 const char *reason) {
2591
2592 /* wait_coord_q is disabled session.timeout.ms after
2593 * group close() has been initated. */
2594 if (rko->rko_u.offset_commit.ts_timeout != 0 ||
2595 !rd_kafka_q_ready(rkcg->rkcg_wait_coord_q))
2596 return 0;
2597
2598 rd_kafka_dbg(rkcg->rkcg_rk, CGRP, "COMMIT",
2599 "Group \"%s\": "
2600 "unable to OffsetCommit in state %s: %s: "
2601 "coordinator (%s) is unavailable: "
2602 "retrying later",
2603 rkcg->rkcg_group_id->str,
2604 rd_kafka_cgrp_state_names[rkcg->rkcg_state],
2605 reason,
2606 rkcg->rkcg_curr_coord ?
2607 rd_kafka_broker_name(rkcg->rkcg_curr_coord) :
2608 "none");
2609
2610 rko->rko_flags |= RD_KAFKA_OP_F_REPROCESS;
2611 rko->rko_u.offset_commit.ts_timeout = rd_clock() +
2612 (rkcg->rkcg_rk->rk_conf.group_session_timeout_ms
2613 * 1000);
2614 rd_kafka_q_enq(rkcg->rkcg_wait_coord_q, rko);
2615
2616 return 1;
2617 }
2618
2619
2620 /**
2621 * @brief Update the committed offsets for the partitions in \p offsets,
2622 *
2623 * @remark \p offsets may be NULL if \p err is set
2624 * @returns the number of partitions with errors encountered
2625 */
2626 static int
rd_kafka_cgrp_update_committed_offsets(rd_kafka_cgrp_t * rkcg,rd_kafka_resp_err_t err,rd_kafka_topic_partition_list_t * offsets)2627 rd_kafka_cgrp_update_committed_offsets (rd_kafka_cgrp_t *rkcg,
2628 rd_kafka_resp_err_t err,
2629 rd_kafka_topic_partition_list_t
2630 *offsets) {
2631 int i;
2632 int errcnt = 0;
2633
2634 /* Update toppars' committed offset or global error */
2635 for (i = 0 ; offsets && i < offsets->cnt ; i++) {
2636 rd_kafka_topic_partition_t *rktpar = &offsets->elems[i];
2637 rd_kafka_toppar_t *rktp;
2638
2639 /* Ignore logical offsets since they were never
2640 * sent to the broker. */
2641 if (RD_KAFKA_OFFSET_IS_LOGICAL(rktpar->offset))
2642 continue;
2643
2644 /* Propagate global error to all partitions that don't have
2645 * explicit error set. */
2646 if (err && !rktpar->err)
2647 rktpar->err = err;
2648
2649 if (rktpar->err) {
2650 rd_kafka_dbg(rkcg->rkcg_rk, TOPIC,
2651 "OFFSET",
2652 "OffsetCommit failed for "
2653 "%s [%"PRId32"] at offset "
2654 "%"PRId64" in join-state %s: %s",
2655 rktpar->topic, rktpar->partition,
2656 rktpar->offset,
2657 rd_kafka_cgrp_join_state_names[
2658 rkcg->rkcg_join_state],
2659 rd_kafka_err2str(rktpar->err));
2660
2661 errcnt++;
2662 continue;
2663 }
2664
2665 rktp = rd_kafka_topic_partition_get_toppar(rkcg->rkcg_rk,
2666 rktpar, rd_false);
2667 if (!rktp)
2668 continue;
2669
2670 rd_kafka_toppar_lock(rktp);
2671 rktp->rktp_committed_offset = rktpar->offset;
2672 rd_kafka_toppar_unlock(rktp);
2673
2674 rd_kafka_toppar_destroy(rktp); /* from get_toppar() */
2675 }
2676
2677 return errcnt;
2678 }
2679
2680
2681 /**
2682 * @brief Propagate OffsetCommit results.
2683 *
2684 * @param rko_orig The original rko that triggered the commit, this is used
2685 * to propagate the result.
2686 * @param err Is the aggregated request-level error, or ERR_NO_ERROR.
2687 * @param errcnt Are the number of partitions in \p offsets that failed
2688 * offset commit.
2689 */
2690 static void
rd_kafka_cgrp_propagate_commit_result(rd_kafka_cgrp_t * rkcg,rd_kafka_op_t * rko_orig,rd_kafka_resp_err_t err,int errcnt,rd_kafka_topic_partition_list_t * offsets)2691 rd_kafka_cgrp_propagate_commit_result (
2692 rd_kafka_cgrp_t *rkcg,
2693 rd_kafka_op_t *rko_orig,
2694 rd_kafka_resp_err_t err,
2695 int errcnt,
2696 rd_kafka_topic_partition_list_t *offsets) {
2697
2698 const rd_kafka_t *rk = rkcg->rkcg_rk;
2699 int offset_commit_cb_served = 0;
2700
2701 /* If no special callback is set but a offset_commit_cb has
2702 * been set in conf then post an event for the latter. */
2703 if (!rko_orig->rko_u.offset_commit.cb && rk->rk_conf.offset_commit_cb) {
2704 rd_kafka_op_t *rko_reply = rd_kafka_op_new_reply(rko_orig, err);
2705
2706 rd_kafka_op_set_prio(rko_reply, RD_KAFKA_PRIO_HIGH);
2707
2708 if (offsets)
2709 rko_reply->rko_u.offset_commit.partitions =
2710 rd_kafka_topic_partition_list_copy(offsets);
2711
2712 rko_reply->rko_u.offset_commit.cb =
2713 rk->rk_conf.offset_commit_cb;
2714 rko_reply->rko_u.offset_commit.opaque = rk->rk_conf.opaque;
2715
2716 rd_kafka_q_enq(rk->rk_rep, rko_reply);
2717 offset_commit_cb_served++;
2718 }
2719
2720
2721 /* Enqueue reply to requester's queue, if any. */
2722 if (rko_orig->rko_replyq.q) {
2723 rd_kafka_op_t *rko_reply = rd_kafka_op_new_reply(rko_orig, err);
2724
2725 rd_kafka_op_set_prio(rko_reply, RD_KAFKA_PRIO_HIGH);
2726
2727 /* Copy offset & partitions & callbacks to reply op */
2728 rko_reply->rko_u.offset_commit = rko_orig->rko_u.offset_commit;
2729 if (offsets)
2730 rko_reply->rko_u.offset_commit.partitions =
2731 rd_kafka_topic_partition_list_copy(offsets);
2732 if (rko_reply->rko_u.offset_commit.reason)
2733 rko_reply->rko_u.offset_commit.reason =
2734 rd_strdup(rko_reply->rko_u.
2735 offset_commit.reason);
2736
2737 rd_kafka_replyq_enq(&rko_orig->rko_replyq, rko_reply, 0);
2738 offset_commit_cb_served++;
2739 }
2740
2741 if (!offset_commit_cb_served &&
2742 offsets &&
2743 (errcnt > 0 ||
2744 (err != RD_KAFKA_RESP_ERR_NO_ERROR &&
2745 err != RD_KAFKA_RESP_ERR__NO_OFFSET))) {
2746 /* If there is no callback or handler for this (auto)
2747 * commit then log an error (#1043) */
2748 char tmp[512];
2749
2750 rd_kafka_topic_partition_list_str(
2751 offsets, tmp, sizeof(tmp),
2752 /* Print per-partition errors unless there was a
2753 * request-level error. */
2754 RD_KAFKA_FMT_F_OFFSET |
2755 (errcnt ? RD_KAFKA_FMT_F_ONLY_ERR : 0));
2756
2757 rd_kafka_log(rkcg->rkcg_rk, LOG_WARNING, "COMMITFAIL",
2758 "Offset commit (%s) failed "
2759 "for %d/%d partition(s) in join-state %s: "
2760 "%s%s%s",
2761 rko_orig->rko_u.offset_commit.reason,
2762 errcnt ? errcnt : offsets->cnt, offsets->cnt,
2763 rd_kafka_cgrp_join_state_names[rkcg->
2764 rkcg_join_state],
2765 errcnt ? rd_kafka_err2str(err) : "",
2766 errcnt ? ": " : "",
2767 tmp);
2768 }
2769 }
2770
2771
2772
2773 /**
2774 * @brief Handle OffsetCommitResponse
2775 * Takes the original 'rko' as opaque argument.
2776 * @remark \p rkb, rkbuf, and request may be NULL in a number of
2777 * error cases (e.g., _NO_OFFSET, _WAIT_COORD)
2778 */
rd_kafka_cgrp_op_handle_OffsetCommit(rd_kafka_t * rk,rd_kafka_broker_t * rkb,rd_kafka_resp_err_t err,rd_kafka_buf_t * rkbuf,rd_kafka_buf_t * request,void * opaque)2779 static void rd_kafka_cgrp_op_handle_OffsetCommit (rd_kafka_t *rk,
2780 rd_kafka_broker_t *rkb,
2781 rd_kafka_resp_err_t err,
2782 rd_kafka_buf_t *rkbuf,
2783 rd_kafka_buf_t *request,
2784 void *opaque) {
2785 rd_kafka_cgrp_t *rkcg = rk->rk_cgrp;
2786 rd_kafka_op_t *rko_orig = opaque;
2787 rd_kafka_topic_partition_list_t *offsets =
2788 rko_orig->rko_u.offset_commit.partitions; /* maybe NULL */
2789 int errcnt;
2790
2791 RD_KAFKA_OP_TYPE_ASSERT(rko_orig, RD_KAFKA_OP_OFFSET_COMMIT);
2792
2793 err = rd_kafka_handle_OffsetCommit(rk, rkb, err, rkbuf,
2794 request, offsets);
2795
2796 /* Suppress empty commit debug logs if allowed */
2797 if (err != RD_KAFKA_RESP_ERR__NO_OFFSET ||
2798 !rko_orig->rko_u.offset_commit.silent_empty) {
2799 if (rkb)
2800 rd_rkb_dbg(rkb, CGRP, "COMMIT",
2801 "OffsetCommit for %d partition(s) in "
2802 "join-state %s: "
2803 "%s: returned: %s",
2804 offsets ? offsets->cnt : -1,
2805 rd_kafka_cgrp_join_state_names[
2806 rkcg->rkcg_join_state],
2807 rko_orig->rko_u.offset_commit.reason,
2808 rd_kafka_err2str(err));
2809 else
2810 rd_kafka_dbg(rk, CGRP, "COMMIT",
2811 "OffsetCommit for %d partition(s) in "
2812 "join-state "
2813 "%s: %s: "
2814 "returned: %s",
2815 offsets ? offsets->cnt : -1,
2816 rd_kafka_cgrp_join_state_names[
2817 rkcg->rkcg_join_state],
2818 rko_orig->rko_u.offset_commit.reason,
2819 rd_kafka_err2str(err));
2820 }
2821
2822
2823 /*
2824 * Error handling
2825 */
2826 switch (err)
2827 {
2828 case RD_KAFKA_RESP_ERR_UNKNOWN_MEMBER_ID:
2829 /* Revoke assignment and rebalance on unknown member */
2830 rd_kafka_cgrp_set_member_id(rk->rk_cgrp, "");
2831 rd_kafka_cgrp_revoke_all_rejoin_maybe(
2832 rkcg,
2833 rd_true/*assignment is lost*/,
2834 rd_true/*this consumer is initiating*/,
2835 "OffsetCommit error: Unknown member");
2836 break;
2837
2838 case RD_KAFKA_RESP_ERR_ILLEGAL_GENERATION:
2839 /* Revoke assignment and rebalance on illegal generation */
2840 rk->rk_cgrp->rkcg_generation_id = -1;
2841 rd_kafka_cgrp_revoke_all_rejoin_maybe(
2842 rkcg,
2843 rd_true/*assignment is lost*/,
2844 rd_true/*this consumer is initiating*/,
2845 "OffsetCommit error: Illegal generation");
2846 break;
2847
2848 case RD_KAFKA_RESP_ERR__IN_PROGRESS:
2849 return; /* Retrying */
2850
2851 case RD_KAFKA_RESP_ERR_NOT_COORDINATOR:
2852 case RD_KAFKA_RESP_ERR_COORDINATOR_NOT_AVAILABLE:
2853 case RD_KAFKA_RESP_ERR__TRANSPORT:
2854 /* The coordinator is not available, defer the offset commit
2855 * to when the coordinator is back up again. */
2856
2857 /* Future-proofing, see timeout_scan(). */
2858 rd_kafka_assert(NULL, err != RD_KAFKA_RESP_ERR__WAIT_COORD);
2859
2860 if (rd_kafka_cgrp_defer_offset_commit(rkcg, rko_orig,
2861 rd_kafka_err2str(err)))
2862 return;
2863 break;
2864
2865 default:
2866 break;
2867 }
2868
2869 /* Call on_commit interceptors */
2870 if (err != RD_KAFKA_RESP_ERR__NO_OFFSET &&
2871 err != RD_KAFKA_RESP_ERR__DESTROY &&
2872 offsets && offsets->cnt > 0)
2873 rd_kafka_interceptors_on_commit(rk, offsets, err);
2874
2875 /* Keep track of outstanding commits */
2876 rd_kafka_assert(NULL, rk->rk_consumer.wait_commit_cnt > 0);
2877 rk->rk_consumer.wait_commit_cnt--;
2878
2879 if (err == RD_KAFKA_RESP_ERR__DESTROY) {
2880 rd_kafka_op_destroy(rko_orig);
2881 return; /* Handle is terminating, this op may be handled
2882 * by the op enq()ing thread rather than the
2883 * rdkafka main thread, it is not safe to
2884 * continue here. */
2885 }
2886
2887 /* Update the committed offsets for each partition's rktp. */
2888 errcnt = rd_kafka_cgrp_update_committed_offsets(rkcg, err, offsets);
2889
2890 if (err != RD_KAFKA_RESP_ERR__DESTROY &&
2891 !(err == RD_KAFKA_RESP_ERR__NO_OFFSET &&
2892 rko_orig->rko_u.offset_commit.silent_empty)) {
2893 /* Propagate commit results (success or permanent error)
2894 * unless we're shutting down or commit was empty. */
2895 rd_kafka_cgrp_propagate_commit_result(rkcg, rko_orig,
2896 err, errcnt, offsets);
2897 }
2898
2899 rd_kafka_op_destroy(rko_orig);
2900
2901 /* If the current state was waiting for commits to finish we'll try to
2902 * transition to the next state. */
2903 if (rk->rk_consumer.wait_commit_cnt == 0)
2904 rd_kafka_assignment_serve(rk);
2905
2906
2907 }
2908
2909
rd_kafka_topic_partition_has_absolute_offset(const rd_kafka_topic_partition_t * rktpar,void * opaque)2910 static size_t rd_kafka_topic_partition_has_absolute_offset (
2911 const rd_kafka_topic_partition_t *rktpar, void *opaque) {
2912 return rktpar->offset >= 0 ? 1 : 0;
2913 }
2914
2915
2916 /**
2917 * Commit a list of offsets.
2918 * Reuse the orignating 'rko' for the async reply.
2919 * 'rko->rko_payload' should either by NULL (to commit current assignment) or
2920 * a proper topic_partition_list_t with offsets to commit.
2921 * The offset list will be altered.
2922 *
2923 * \p rko...silent_empty: if there are no offsets to commit bail out
2924 * silently without posting an op on the reply queue.
2925 * \p set_offsets: set offsets in rko->rko_u.offset_commit.partitions from
2926 * the rktp's stored offset.
2927 *
2928 * Locality: cgrp thread
2929 */
rd_kafka_cgrp_offsets_commit(rd_kafka_cgrp_t * rkcg,rd_kafka_op_t * rko,rd_bool_t set_offsets,const char * reason)2930 static void rd_kafka_cgrp_offsets_commit (rd_kafka_cgrp_t *rkcg,
2931 rd_kafka_op_t *rko,
2932 rd_bool_t set_offsets,
2933 const char *reason) {
2934 rd_kafka_topic_partition_list_t *offsets;
2935 rd_kafka_resp_err_t err;
2936 int valid_offsets = 0;
2937 int r;
2938 rd_kafka_buf_t *rkbuf;
2939 rd_kafka_op_t *reply;
2940
2941 if (!(rko->rko_flags & RD_KAFKA_OP_F_REPROCESS)) {
2942 /* wait_commit_cnt has already been increased for
2943 * reprocessed ops. */
2944 rkcg->rkcg_rk->rk_consumer.wait_commit_cnt++;
2945 }
2946
2947 /* If offsets is NULL we shall use the current assignment
2948 * (not the group assignment). */
2949 if (!rko->rko_u.offset_commit.partitions &&
2950 rkcg->rkcg_rk->rk_consumer.assignment.all->cnt > 0) {
2951 if (rd_kafka_cgrp_assignment_is_lost(rkcg)) {
2952 /* Not committing assigned offsets: assignment lost */
2953 err = RD_KAFKA_RESP_ERR__ASSIGNMENT_LOST;
2954 goto err;
2955 }
2956
2957 rko->rko_u.offset_commit.partitions =
2958 rd_kafka_topic_partition_list_copy(
2959 rkcg->rkcg_rk->rk_consumer.assignment.all);
2960 }
2961
2962 offsets = rko->rko_u.offset_commit.partitions;
2963
2964 if (offsets) {
2965 /* Set offsets to commits */
2966 if (set_offsets)
2967 rd_kafka_topic_partition_list_set_offsets(
2968 rkcg->rkcg_rk, rko->rko_u.offset_commit.partitions, 1,
2969 RD_KAFKA_OFFSET_INVALID/* def */,
2970 1 /* is commit */);
2971
2972 /* Check the number of valid offsets to commit. */
2973 valid_offsets = (int)rd_kafka_topic_partition_list_sum(
2974 offsets,
2975 rd_kafka_topic_partition_has_absolute_offset, NULL);
2976 }
2977
2978 if (rd_kafka_fatal_error_code(rkcg->rkcg_rk)) {
2979 /* Commits are not allowed when a fatal error has been raised */
2980 err = RD_KAFKA_RESP_ERR__FATAL;
2981 goto err;
2982 }
2983
2984 if (!valid_offsets) {
2985 /* No valid offsets */
2986 err = RD_KAFKA_RESP_ERR__NO_OFFSET;
2987 goto err;
2988 }
2989
2990 if (rkcg->rkcg_state != RD_KAFKA_CGRP_STATE_UP) {
2991 rd_kafka_dbg(rkcg->rkcg_rk, CONSUMER|RD_KAFKA_DBG_CGRP,
2992 "COMMIT",
2993 "Deferring \"%s\" offset commit "
2994 "for %d partition(s) in state %s: "
2995 "no coordinator available",
2996 reason, valid_offsets,
2997 rd_kafka_cgrp_state_names[rkcg->rkcg_state]);
2998
2999 if (rd_kafka_cgrp_defer_offset_commit(rkcg, rko, reason))
3000 return;
3001
3002 err = RD_KAFKA_RESP_ERR__WAIT_COORD;
3003 goto err;
3004 }
3005
3006
3007 rd_rkb_dbg(rkcg->rkcg_coord, CONSUMER|RD_KAFKA_DBG_CGRP, "COMMIT",
3008 "Committing offsets for %d partition(s) with "
3009 "generation-id %" PRId32 " in join-state %s: %s",
3010 valid_offsets, rkcg->rkcg_generation_id,
3011 rd_kafka_cgrp_join_state_names[rkcg->rkcg_join_state],
3012 reason);
3013
3014 /* Send OffsetCommit */
3015 r = rd_kafka_OffsetCommitRequest(
3016 rkcg->rkcg_coord, rkcg, offsets,
3017 RD_KAFKA_REPLYQ(rkcg->rkcg_ops, 0),
3018 rd_kafka_cgrp_op_handle_OffsetCommit, rko,
3019 reason);
3020
3021 /* Must have valid offsets to commit if we get here */
3022 rd_kafka_assert(NULL, r != 0);
3023
3024 return;
3025
3026 err:
3027 if (err != RD_KAFKA_RESP_ERR__NO_OFFSET)
3028 rd_kafka_dbg(rkcg->rkcg_rk, CONSUMER|RD_KAFKA_DBG_CGRP,
3029 "COMMIT",
3030 "OffsetCommit internal error: %s",
3031 rd_kafka_err2str(err));
3032
3033 /* Propagate error through dummy buffer object that will
3034 * call the response handler from the main loop, avoiding
3035 * any recursive calls from op_handle_OffsetCommit ->
3036 * assignment_serve() and then back to cgrp_assigned_offsets_commit() */
3037
3038 reply = rd_kafka_op_new(RD_KAFKA_OP_RECV_BUF);
3039 reply->rko_rk = rkcg->rkcg_rk; /* Set rk since the rkbuf will not
3040 * have a rkb to reach it. */
3041 reply->rko_err = err;
3042
3043 rkbuf = rd_kafka_buf_new(0, 0);
3044 rkbuf->rkbuf_cb = rd_kafka_cgrp_op_handle_OffsetCommit;
3045 rkbuf->rkbuf_opaque = rko;
3046 reply->rko_u.xbuf.rkbuf = rkbuf;
3047
3048 rd_kafka_q_enq(rkcg->rkcg_ops, reply);
3049
3050 }
3051
3052
3053 /**
3054 * @brief Commit offsets assigned partitions.
3055 *
3056 * If \p offsets is NULL all partitions in the current assignment will be used.
3057 * If \p set_offsets is true the offsets to commit will be read from the
3058 * rktp's stored offset rather than the .offset fields in \p offsets.
3059 *
3060 * rkcg_wait_commit_cnt will be increased accordingly.
3061 */
3062 void
rd_kafka_cgrp_assigned_offsets_commit(rd_kafka_cgrp_t * rkcg,const rd_kafka_topic_partition_list_t * offsets,rd_bool_t set_offsets,const char * reason)3063 rd_kafka_cgrp_assigned_offsets_commit (
3064 rd_kafka_cgrp_t *rkcg,
3065 const rd_kafka_topic_partition_list_t *offsets,
3066 rd_bool_t set_offsets,
3067 const char *reason) {
3068 rd_kafka_op_t *rko;
3069
3070 if (rd_kafka_cgrp_assignment_is_lost(rkcg)) {
3071 rd_kafka_dbg(rkcg->rkcg_rk, CGRP, "AUTOCOMMIT",
3072 "Group \"%s\": not committing assigned offsets: "
3073 "assignment lost",
3074 rkcg->rkcg_group_id->str);
3075 return;
3076 }
3077
3078 rko = rd_kafka_op_new(RD_KAFKA_OP_OFFSET_COMMIT);
3079 rko->rko_u.offset_commit.reason = rd_strdup(reason);
3080 if (rkcg->rkcg_rk->rk_conf.enabled_events &
3081 RD_KAFKA_EVENT_OFFSET_COMMIT) {
3082 /* Send results to application */
3083 rd_kafka_op_set_replyq(rko, rkcg->rkcg_rk->rk_rep, 0);
3084 rko->rko_u.offset_commit.cb =
3085 rkcg->rkcg_rk->rk_conf.offset_commit_cb; /*maybe NULL*/
3086 rko->rko_u.offset_commit.opaque = rkcg->rkcg_rk->rk_conf.opaque;
3087 }
3088 /* NULL partitions means current assignment */
3089 if (offsets)
3090 rko->rko_u.offset_commit.partitions =
3091 rd_kafka_topic_partition_list_copy(offsets);
3092 rko->rko_u.offset_commit.silent_empty = 1;
3093 rd_kafka_cgrp_offsets_commit(rkcg, rko, set_offsets, reason);
3094 }
3095
3096
3097 /**
3098 * auto.commit.interval.ms commit timer callback.
3099 *
3100 * Trigger a group offset commit.
3101 *
3102 * Locality: rdkafka main thread
3103 */
rd_kafka_cgrp_offset_commit_tmr_cb(rd_kafka_timers_t * rkts,void * arg)3104 static void rd_kafka_cgrp_offset_commit_tmr_cb (rd_kafka_timers_t *rkts,
3105 void *arg) {
3106 rd_kafka_cgrp_t *rkcg = arg;
3107
3108 /* Don't attempt auto commit when rebalancing or initializing since
3109 * the rkcg_generation_id is most likely in flux. */
3110 if (rkcg->rkcg_join_state != RD_KAFKA_CGRP_JOIN_STATE_STEADY)
3111 return;
3112
3113 rd_kafka_cgrp_assigned_offsets_commit(rkcg, NULL,
3114 rd_true/*set offsets*/,
3115 "cgrp auto commit timer");
3116 }
3117
3118
3119 /**
3120 * @brief If rkcg_next_subscription or rkcg_next_unsubscribe are
3121 * set, trigger a state change so that they are applied from the
3122 * main dispatcher.
3123 *
3124 * @returns rd_true if a subscribe was scheduled, else false.
3125 */
3126 static rd_bool_t
rd_kafka_trigger_waiting_subscribe_maybe(rd_kafka_cgrp_t * rkcg)3127 rd_kafka_trigger_waiting_subscribe_maybe (rd_kafka_cgrp_t *rkcg) {
3128
3129 if (rkcg->rkcg_next_subscription || rkcg->rkcg_next_unsubscribe) {
3130 /* Skip the join backoff */
3131 rd_interval_reset(&rkcg->rkcg_join_intvl);
3132 rd_kafka_cgrp_rejoin(rkcg, "Applying next subscription");
3133 return rd_true;
3134 }
3135
3136 return rd_false;
3137 }
3138
3139
3140 /**
3141 * @brief Incrementally add to an existing partition assignment
3142 * May update \p partitions but will not hold on to it.
3143 *
3144 * @returns an error object or NULL on success.
3145 */
3146 static rd_kafka_error_t *
rd_kafka_cgrp_incremental_assign(rd_kafka_cgrp_t * rkcg,rd_kafka_topic_partition_list_t * partitions)3147 rd_kafka_cgrp_incremental_assign (rd_kafka_cgrp_t *rkcg,
3148 rd_kafka_topic_partition_list_t
3149 *partitions) {
3150 rd_kafka_error_t *error;
3151
3152 error = rd_kafka_assignment_add(rkcg->rkcg_rk, partitions);
3153 if (error)
3154 return error;
3155
3156 if (rkcg->rkcg_join_state ==
3157 RD_KAFKA_CGRP_JOIN_STATE_WAIT_ASSIGN_CALL) {
3158 rd_kafka_assignment_resume(rkcg->rkcg_rk,
3159 "incremental assign called");
3160 rd_kafka_cgrp_set_join_state(
3161 rkcg,
3162 RD_KAFKA_CGRP_JOIN_STATE_STEADY);
3163
3164 if (rkcg->rkcg_subscription) {
3165 /* If using subscribe(), start a timer to enforce
3166 * `max.poll.interval.ms`.
3167 * Instead of restarting the timer on each ...poll()
3168 * call, which would be costly (once per message),
3169 * set up an intervalled timer that checks a timestamp
3170 * (that is updated on ..poll()).
3171 * The timer interval is 2 hz. */
3172 rd_kafka_timer_start(
3173 &rkcg->rkcg_rk->rk_timers,
3174 &rkcg->rkcg_max_poll_interval_tmr,
3175 500 * 1000ll /* 500ms */,
3176 rd_kafka_cgrp_max_poll_interval_check_tmr_cb,
3177 rkcg);
3178 }
3179 }
3180
3181 rd_kafka_cgrp_assignment_clear_lost(rkcg,
3182 "incremental_assign() called");
3183
3184 return NULL;
3185 }
3186
3187
3188 /**
3189 * @brief Incrementally remove partitions from an existing partition
3190 * assignment. May update \p partitions but will not hold on
3191 * to it.
3192 *
3193 * @remark This method does not unmark the current assignment as lost
3194 * (if lost). That happens following _incr_unassign_done and
3195 * a group-rejoin initiated.
3196 *
3197 * @returns An error object or NULL on success.
3198 */
3199 static rd_kafka_error_t *
rd_kafka_cgrp_incremental_unassign(rd_kafka_cgrp_t * rkcg,rd_kafka_topic_partition_list_t * partitions)3200 rd_kafka_cgrp_incremental_unassign (rd_kafka_cgrp_t *rkcg,
3201 rd_kafka_topic_partition_list_t
3202 *partitions) {
3203 rd_kafka_error_t *error;
3204
3205 error = rd_kafka_assignment_subtract(rkcg->rkcg_rk, partitions);
3206 if (error)
3207 return error;
3208
3209 if (rkcg->rkcg_join_state ==
3210 RD_KAFKA_CGRP_JOIN_STATE_WAIT_UNASSIGN_CALL) {
3211 rd_kafka_assignment_resume(rkcg->rkcg_rk,
3212 "incremental unassign called");
3213 rd_kafka_cgrp_set_join_state(
3214 rkcg,
3215 RD_KAFKA_CGRP_JOIN_STATE_WAIT_INCR_UNASSIGN_TO_COMPLETE);
3216 }
3217
3218 rd_kafka_cgrp_assignment_clear_lost(rkcg,
3219 "incremental_unassign() called");
3220
3221 return NULL;
3222 }
3223
3224
3225 /**
3226 * @brief Call when all incremental unassign operations are done to transition
3227 * to the next state.
3228 */
rd_kafka_cgrp_incr_unassign_done(rd_kafka_cgrp_t * rkcg)3229 static void rd_kafka_cgrp_incr_unassign_done (rd_kafka_cgrp_t *rkcg) {
3230
3231 /* If this action was underway when a terminate was initiated, it will
3232 * be left to complete. Now that's done, unassign all partitions */
3233 if (rkcg->rkcg_flags & RD_KAFKA_CGRP_F_TERMINATE) {
3234 rd_kafka_dbg(rkcg->rkcg_rk, CGRP, "UNASSIGN",
3235 "Group \"%s\" is terminating, initiating full "
3236 "unassign", rkcg->rkcg_group_id->str);
3237 rd_kafka_cgrp_unassign(rkcg);
3238 return;
3239 }
3240
3241 if (rkcg->rkcg_rebalance_incr_assignment) {
3242
3243 /* This incremental unassign was part of a normal rebalance
3244 * (in which the revoke set was not empty). Immediately
3245 * trigger the assign that follows this revoke. The protocol
3246 * dictates this should occur even if the new assignment
3247 * set is empty.
3248 *
3249 * Also, since this rebalance had some revoked partitions,
3250 * a re-join should occur following the assign.
3251 */
3252
3253 rd_kafka_rebalance_op_incr(
3254 rkcg,
3255 RD_KAFKA_RESP_ERR__ASSIGN_PARTITIONS,
3256 rkcg->rkcg_rebalance_incr_assignment,
3257 rd_true/*rejoin following assign*/,
3258 "cooperative assign after revoke");
3259
3260 rd_kafka_topic_partition_list_destroy(
3261 rkcg->rkcg_rebalance_incr_assignment);
3262 rkcg->rkcg_rebalance_incr_assignment = NULL;
3263
3264 /* Note: rkcg_rebalance_rejoin is actioned / reset in
3265 * rd_kafka_cgrp_incremental_assign call */
3266
3267 } else if (rkcg->rkcg_rebalance_rejoin) {
3268 rkcg->rkcg_rebalance_rejoin = rd_false;
3269
3270 /* There are some cases (lost partitions), where a rejoin
3271 * should occur immediately following the unassign (this
3272 * is not the case under normal conditions), in which case
3273 * the rejoin flag will be set. */
3274
3275 /* Skip the join backoff */
3276 rd_interval_reset(&rkcg->rkcg_join_intvl);
3277
3278 rd_kafka_cgrp_rejoin(rkcg, "Incremental unassignment done");
3279
3280 } else if (!rd_kafka_trigger_waiting_subscribe_maybe(rkcg)) {
3281 /* After this incremental unassignment we're now back in
3282 * a steady state. */
3283 rd_kafka_cgrp_set_join_state(rkcg,
3284 RD_KAFKA_CGRP_JOIN_STATE_STEADY);
3285
3286 }
3287 }
3288
3289
3290 /**
3291 * @brief Call when all absolute (non-incremental) unassign operations are done
3292 * to transition to the next state.
3293 */
rd_kafka_cgrp_unassign_done(rd_kafka_cgrp_t * rkcg)3294 static void rd_kafka_cgrp_unassign_done (rd_kafka_cgrp_t *rkcg) {
3295 rd_kafka_dbg(rkcg->rkcg_rk, CGRP, "UNASSIGN",
3296 "Group \"%s\": unassign done in state %s "
3297 "(join-state %s)",
3298 rkcg->rkcg_group_id->str,
3299 rd_kafka_cgrp_state_names[rkcg->rkcg_state],
3300 rd_kafka_cgrp_join_state_names[rkcg->rkcg_join_state]);
3301
3302 /* Leave group, if desired. */
3303 rd_kafka_cgrp_leave_maybe(rkcg);
3304
3305 if (rkcg->rkcg_join_state !=
3306 RD_KAFKA_CGRP_JOIN_STATE_WAIT_UNASSIGN_TO_COMPLETE)
3307 return;
3308
3309 /* All partitions are unassigned. Rejoin the group. */
3310
3311 /* Skip the join backoff */
3312 rd_interval_reset(&rkcg->rkcg_join_intvl);
3313
3314 rd_kafka_cgrp_rejoin(rkcg, "Unassignment done");
3315 }
3316
3317
3318
3319 /**
3320 * @brief Called from assignment code when all in progress
3321 * assignment/unassignment operations are done, allowing the cgrp to
3322 * transition to other states if needed.
3323 *
3324 * @remark This may be called spontaneously without any need for a state
3325 * change in the rkcg.
3326 */
rd_kafka_cgrp_assignment_done(rd_kafka_cgrp_t * rkcg)3327 void rd_kafka_cgrp_assignment_done (rd_kafka_cgrp_t *rkcg) {
3328 rd_kafka_dbg(rkcg->rkcg_rk, CGRP, "ASSIGNDONE",
3329 "Group \"%s\": "
3330 "assignment operations done in join-state %s "
3331 "(rebalance rejoin=%s)",
3332 rkcg->rkcg_group_id->str,
3333 rd_kafka_cgrp_join_state_names[rkcg->rkcg_join_state],
3334 RD_STR_ToF(rkcg->rkcg_rebalance_rejoin));
3335
3336 switch (rkcg->rkcg_join_state)
3337 {
3338 case RD_KAFKA_CGRP_JOIN_STATE_WAIT_UNASSIGN_TO_COMPLETE:
3339 rd_kafka_cgrp_unassign_done(rkcg);
3340 break;
3341
3342 case RD_KAFKA_CGRP_JOIN_STATE_WAIT_INCR_UNASSIGN_TO_COMPLETE:
3343 rd_kafka_cgrp_incr_unassign_done(rkcg);
3344 break;
3345
3346 case RD_KAFKA_CGRP_JOIN_STATE_STEADY:
3347 /* If an updated/next subscription is available, schedule it. */
3348 if (rd_kafka_trigger_waiting_subscribe_maybe(rkcg))
3349 break;
3350
3351 if (rkcg->rkcg_rebalance_rejoin) {
3352 rkcg->rkcg_rebalance_rejoin = rd_false;
3353
3354 /* Skip the join backoff */
3355 rd_interval_reset(&rkcg->rkcg_join_intvl);
3356
3357 rd_kafka_cgrp_rejoin(
3358 rkcg,
3359 "rejoining group to redistribute "
3360 "previously owned partitions to other "
3361 "group members");
3362 break;
3363 }
3364
3365 /* FALLTHRU */
3366
3367 case RD_KAFKA_CGRP_JOIN_STATE_INIT:
3368 /* Check if cgrp is trying to terminate, which is safe to do
3369 * in these two states. Otherwise we'll need to wait for
3370 * the current state to decommission. */
3371 rd_kafka_cgrp_try_terminate(rkcg);
3372 break;
3373
3374 default:
3375 break;
3376 }
3377 }
3378
3379
3380
3381 /**
3382 * @brief Remove existing assignment.
3383 */
3384 static rd_kafka_error_t *
rd_kafka_cgrp_unassign(rd_kafka_cgrp_t * rkcg)3385 rd_kafka_cgrp_unassign (rd_kafka_cgrp_t *rkcg) {
3386
3387 rd_kafka_assignment_clear(rkcg->rkcg_rk);
3388
3389 if (rkcg->rkcg_join_state ==
3390 RD_KAFKA_CGRP_JOIN_STATE_WAIT_UNASSIGN_CALL) {
3391 rd_kafka_assignment_resume(rkcg->rkcg_rk, "unassign called");
3392 rd_kafka_cgrp_set_join_state(
3393 rkcg,
3394 RD_KAFKA_CGRP_JOIN_STATE_WAIT_UNASSIGN_TO_COMPLETE);
3395 }
3396
3397 rd_kafka_cgrp_assignment_clear_lost(rkcg, "unassign() called");
3398
3399 return NULL;
3400 }
3401
3402
3403 /**
3404 * @brief Set new atomic partition assignment
3405 * May update \p assignment but will not hold on to it.
3406 *
3407 * @returns NULL on success or an error if a fatal error has been raised.
3408 */
3409 static rd_kafka_error_t *
rd_kafka_cgrp_assign(rd_kafka_cgrp_t * rkcg,rd_kafka_topic_partition_list_t * assignment)3410 rd_kafka_cgrp_assign (rd_kafka_cgrp_t *rkcg,
3411 rd_kafka_topic_partition_list_t *assignment) {
3412 rd_kafka_error_t *error;
3413
3414 rd_kafka_dbg(rkcg->rkcg_rk, CGRP|RD_KAFKA_DBG_CONSUMER, "ASSIGN",
3415 "Group \"%s\": new assignment of %d partition(s) "
3416 "in join-state %s",
3417 rkcg->rkcg_group_id->str,
3418 assignment ? assignment->cnt : 0,
3419 rd_kafka_cgrp_join_state_names[rkcg->rkcg_join_state]);
3420
3421 /* Clear existing assignment, if any, and serve its removals. */
3422 if (rd_kafka_assignment_clear(rkcg->rkcg_rk))
3423 rd_kafka_assignment_serve(rkcg->rkcg_rk);
3424
3425 error = rd_kafka_assignment_add(rkcg->rkcg_rk, assignment);
3426 if (error)
3427 return error;
3428
3429 rd_kafka_cgrp_assignment_clear_lost(rkcg, "assign() called");
3430
3431 if (rkcg->rkcg_join_state ==
3432 RD_KAFKA_CGRP_JOIN_STATE_WAIT_ASSIGN_CALL) {
3433 rd_kafka_assignment_resume(rkcg->rkcg_rk, "assign called");
3434 rd_kafka_cgrp_set_join_state(
3435 rkcg,
3436 RD_KAFKA_CGRP_JOIN_STATE_STEADY);
3437
3438 if (rkcg->rkcg_subscription) {
3439 /* If using subscribe(), start a timer to enforce
3440 * `max.poll.interval.ms`.
3441 * Instead of restarting the timer on each ...poll()
3442 * call, which would be costly (once per message),
3443 * set up an intervalled timer that checks a timestamp
3444 * (that is updated on ..poll()).
3445 * The timer interval is 2 hz. */
3446 rd_kafka_timer_start(
3447 &rkcg->rkcg_rk->rk_timers,
3448 &rkcg->rkcg_max_poll_interval_tmr,
3449 500 * 1000ll /* 500ms */,
3450 rd_kafka_cgrp_max_poll_interval_check_tmr_cb,
3451 rkcg);
3452 }
3453 }
3454
3455 return NULL;
3456 }
3457
3458
3459
3460 /**
3461 * @brief Construct a typed map from list \p rktparlist with key corresponding
3462 * to each element in the list and value NULL.
3463 *
3464 * @remark \p rktparlist may be NULL.
3465 */
3466 static map_toppar_member_info_t *
rd_kafka_toppar_list_to_toppar_member_info_map(rd_kafka_topic_partition_list_t * rktparlist)3467 rd_kafka_toppar_list_to_toppar_member_info_map (rd_kafka_topic_partition_list_t
3468 *rktparlist) {
3469 map_toppar_member_info_t *map = rd_calloc(1, sizeof(*map));
3470 const rd_kafka_topic_partition_t *rktpar;
3471
3472 RD_MAP_INIT(
3473 map,
3474 rktparlist ? rktparlist->cnt : 0,
3475 rd_kafka_topic_partition_cmp,
3476 rd_kafka_topic_partition_hash,
3477 rd_kafka_topic_partition_destroy_free,
3478 PartitionMemberInfo_free);
3479
3480 if (!rktparlist)
3481 return map;
3482
3483 RD_KAFKA_TPLIST_FOREACH(rktpar, rktparlist)
3484 RD_MAP_SET(map,
3485 rd_kafka_topic_partition_copy(rktpar),
3486 PartitionMemberInfo_new(NULL, rd_false));
3487
3488 return map;
3489 }
3490
3491
3492 /**
3493 * @brief Construct a toppar list from map \p map with elements corresponding
3494 * to the keys of \p map.
3495 */
3496 static rd_kafka_topic_partition_list_t *
rd_kafka_toppar_member_info_map_to_list(map_toppar_member_info_t * map)3497 rd_kafka_toppar_member_info_map_to_list (map_toppar_member_info_t *map) {
3498 const rd_kafka_topic_partition_t *k;
3499 rd_kafka_topic_partition_list_t *list =
3500 rd_kafka_topic_partition_list_new((int)RD_MAP_CNT(map));
3501
3502 RD_MAP_FOREACH_KEY(k, map) {
3503 rd_kafka_topic_partition_list_add(list,
3504 k->topic,
3505 k->partition);
3506 }
3507
3508 return list;
3509 }
3510
3511
3512 /**
3513 * @brief Handle a rebalance-triggered partition assignment
3514 * (COOPERATIVE case).
3515 */
3516 static void
rd_kafka_cgrp_handle_assignment_cooperative(rd_kafka_cgrp_t * rkcg,rd_kafka_topic_partition_list_t * assignment)3517 rd_kafka_cgrp_handle_assignment_cooperative (rd_kafka_cgrp_t *rkcg,
3518 rd_kafka_topic_partition_list_t
3519 *assignment) {
3520 map_toppar_member_info_t *new_assignment_set;
3521 map_toppar_member_info_t *old_assignment_set;
3522 map_toppar_member_info_t *newly_added_set;
3523 map_toppar_member_info_t *revoked_set;
3524 rd_kafka_topic_partition_list_t *newly_added;
3525 rd_kafka_topic_partition_list_t *revoked;
3526
3527 new_assignment_set =
3528 rd_kafka_toppar_list_to_toppar_member_info_map(assignment);
3529
3530 old_assignment_set =
3531 rd_kafka_toppar_list_to_toppar_member_info_map(
3532 rkcg->rkcg_group_assignment);
3533
3534 newly_added_set =
3535 rd_kafka_member_partitions_subtract(
3536 new_assignment_set, old_assignment_set);
3537 revoked_set =
3538 rd_kafka_member_partitions_subtract(
3539 old_assignment_set, new_assignment_set);
3540
3541 newly_added = rd_kafka_toppar_member_info_map_to_list(newly_added_set);
3542 revoked = rd_kafka_toppar_member_info_map_to_list(revoked_set);
3543
3544 rd_kafka_dbg(rkcg->rkcg_rk, CGRP, "COOPASSIGN",
3545 "Group \"%s\": incremental assignment: %d newly added, "
3546 "%d revoked partitions based on assignment of %d "
3547 "partitions",
3548 rkcg->rkcg_group_id->str,
3549 newly_added->cnt,
3550 revoked->cnt,
3551 assignment->cnt);
3552
3553 if (revoked->cnt > 0) {
3554 /* Setting rkcg_incr_assignment causes a follow on incremental
3555 * assign rebalance op after completion of this incremental
3556 * unassign op. */
3557
3558 rkcg->rkcg_rebalance_incr_assignment = newly_added;
3559 newly_added = NULL;
3560
3561 rd_kafka_rebalance_op_incr(
3562 rkcg,
3563 RD_KAFKA_RESP_ERR__REVOKE_PARTITIONS,
3564 revoked, rd_false/*no rejoin following
3565 unassign*/, "sync group revoke");
3566
3567 } else {
3568 /* There are no revoked partitions - trigger the assign
3569 * rebalance op, and flag that the group does not need
3570 * to be re-joined */
3571
3572 rd_kafka_rebalance_op_incr(rkcg,
3573 RD_KAFKA_RESP_ERR__ASSIGN_PARTITIONS,
3574 newly_added,
3575 rd_false/*no rejoin following assign*/,
3576 "sync group assign");
3577 }
3578
3579 if (newly_added)
3580 rd_kafka_topic_partition_list_destroy(newly_added);
3581 rd_kafka_topic_partition_list_destroy(revoked);
3582 RD_MAP_DESTROY_AND_FREE(revoked_set);
3583 RD_MAP_DESTROY_AND_FREE(newly_added_set);
3584 RD_MAP_DESTROY_AND_FREE(old_assignment_set);
3585 RD_MAP_DESTROY_AND_FREE(new_assignment_set);
3586 }
3587
3588
3589 /**
3590 * @brief Sets or clears the group's partition assignment for our consumer.
3591 *
3592 * Will replace the current group assignment, if any.
3593 */
rd_kafka_cgrp_group_assignment_set(rd_kafka_cgrp_t * rkcg,const rd_kafka_topic_partition_list_t * partitions)3594 static void rd_kafka_cgrp_group_assignment_set (
3595 rd_kafka_cgrp_t *rkcg,
3596 const rd_kafka_topic_partition_list_t *partitions) {
3597
3598 if (rkcg->rkcg_group_assignment)
3599 rd_kafka_topic_partition_list_destroy(
3600 rkcg->rkcg_group_assignment);
3601
3602 if (partitions) {
3603 rkcg->rkcg_group_assignment =
3604 rd_kafka_topic_partition_list_copy(partitions);
3605 rd_kafka_topic_partition_list_sort_by_topic(
3606 rkcg->rkcg_group_assignment);
3607 rd_kafka_dbg(rkcg->rkcg_rk, CGRP, "ASSIGNMENT",
3608 "Group \"%s\": setting group assignment to %d "
3609 "partition(s)",
3610 rkcg->rkcg_group_id->str,
3611 rkcg->rkcg_group_assignment->cnt);
3612
3613 } else {
3614 rd_kafka_dbg(rkcg->rkcg_rk, CGRP, "ASSIGNMENT",
3615 "Group \"%s\": clearing group assignment",
3616 rkcg->rkcg_group_id->str);
3617 rkcg->rkcg_group_assignment = NULL;
3618 }
3619
3620 rd_kafka_wrlock(rkcg->rkcg_rk);
3621 rkcg->rkcg_c.assignment_size = rkcg->rkcg_group_assignment ?
3622 rkcg->rkcg_group_assignment->cnt : 0;
3623 rd_kafka_wrunlock(rkcg->rkcg_rk);
3624
3625 if (rkcg->rkcg_group_assignment)
3626 rd_kafka_topic_partition_list_log(
3627 rkcg->rkcg_rk, "GRPASSIGNMENT", RD_KAFKA_DBG_CGRP,
3628 rkcg->rkcg_group_assignment);
3629 }
3630
3631
3632 /**
3633 * @brief Adds or removes \p partitions from the current group assignment.
3634 *
3635 * @param add Whether to add or remove the partitions.
3636 *
3637 * @remark The added partitions must not already be on the group assignment,
3638 * and the removed partitions must be on the group assignment.
3639 *
3640 * To be used with incremental rebalancing.
3641 *
3642 */
rd_kafka_cgrp_group_assignment_modify(rd_kafka_cgrp_t * rkcg,rd_bool_t add,const rd_kafka_topic_partition_list_t * partitions)3643 static void rd_kafka_cgrp_group_assignment_modify (
3644 rd_kafka_cgrp_t *rkcg,
3645 rd_bool_t add,
3646 const rd_kafka_topic_partition_list_t *partitions) {
3647 const rd_kafka_topic_partition_t *rktpar;
3648 int precnt;
3649 rd_kafka_dbg(rkcg->rkcg_rk, CGRP, "ASSIGNMENT",
3650 "Group \"%s\": %d partition(s) being %s group assignment "
3651 "of %d partition(s)",
3652 rkcg->rkcg_group_id->str,
3653 partitions->cnt,
3654 add ? "added to" : "removed from",
3655 rkcg->rkcg_group_assignment ?
3656 rkcg->rkcg_group_assignment->cnt : 0);
3657
3658 if (partitions == rkcg->rkcg_group_assignment) {
3659 /* \p partitions is the actual assignment, which
3660 * must mean it is all to be removed.
3661 * Short-cut directly to set(NULL). */
3662 rd_assert(!add);
3663 rd_kafka_cgrp_group_assignment_set(rkcg, NULL);
3664 return;
3665 }
3666
3667 if (add &&
3668 (!rkcg->rkcg_group_assignment ||
3669 rkcg->rkcg_group_assignment->cnt == 0)) {
3670 /* Adding to an empty assignment is a set operation. */
3671 rd_kafka_cgrp_group_assignment_set(rkcg, partitions);
3672 return;
3673 }
3674
3675 if (!add) {
3676 /* Removing from an empty assignment is illegal. */
3677 rd_assert(rkcg->rkcg_group_assignment != NULL &&
3678 rkcg->rkcg_group_assignment->cnt > 0);
3679 }
3680
3681
3682 precnt = rkcg->rkcg_group_assignment->cnt;
3683 RD_KAFKA_TPLIST_FOREACH(rktpar, partitions) {
3684 int idx;
3685
3686 idx = rd_kafka_topic_partition_list_find_idx(
3687 rkcg->rkcg_group_assignment,
3688 rktpar->topic,
3689 rktpar->partition);
3690
3691 if (add) {
3692 rd_assert(idx == -1);
3693
3694 rd_kafka_topic_partition_list_add_copy(
3695 rkcg->rkcg_group_assignment, rktpar);
3696
3697 } else {
3698 rd_assert(idx != -1);
3699
3700 rd_kafka_topic_partition_list_del_by_idx(
3701 rkcg->rkcg_group_assignment, idx);
3702
3703 }
3704 }
3705
3706 if (add)
3707 rd_assert(precnt + partitions->cnt ==
3708 rkcg->rkcg_group_assignment->cnt);
3709 else
3710 rd_assert(precnt - partitions->cnt ==
3711 rkcg->rkcg_group_assignment->cnt);
3712
3713 if (rkcg->rkcg_group_assignment->cnt == 0) {
3714 rd_kafka_topic_partition_list_destroy(
3715 rkcg->rkcg_group_assignment);
3716 rkcg->rkcg_group_assignment = NULL;
3717
3718 } else if (add)
3719 rd_kafka_topic_partition_list_sort_by_topic(
3720 rkcg->rkcg_group_assignment);
3721
3722 rd_kafka_wrlock(rkcg->rkcg_rk);
3723 rkcg->rkcg_c.assignment_size = rkcg->rkcg_group_assignment ?
3724 rkcg->rkcg_group_assignment->cnt : 0;
3725 rd_kafka_wrunlock(rkcg->rkcg_rk);
3726
3727 if (rkcg->rkcg_group_assignment)
3728 rd_kafka_topic_partition_list_log(
3729 rkcg->rkcg_rk, "GRPASSIGNMENT", RD_KAFKA_DBG_CGRP,
3730 rkcg->rkcg_group_assignment);
3731 }
3732
3733
3734 /**
3735 * @brief Handle a rebalance-triggered partition assignment.
3736 *
3737 * If a rebalance_cb has been registered we enqueue an op for the app
3738 * and let the app perform the actual assign() call. Otherwise we
3739 * assign() directly from here.
3740 *
3741 * This provides the most flexibility, allowing the app to perform any
3742 * operation it seem fit (e.g., offset writes or reads) before actually
3743 * updating the assign():ment.
3744 */
3745 static void
rd_kafka_cgrp_handle_assignment(rd_kafka_cgrp_t * rkcg,rd_kafka_topic_partition_list_t * assignment)3746 rd_kafka_cgrp_handle_assignment (rd_kafka_cgrp_t *rkcg,
3747 rd_kafka_topic_partition_list_t *assignment) {
3748
3749 if (rd_kafka_cgrp_rebalance_protocol(rkcg) ==
3750 RD_KAFKA_REBALANCE_PROTOCOL_COOPERATIVE) {
3751 rd_kafka_cgrp_handle_assignment_cooperative(rkcg,
3752 assignment);
3753 } else {
3754
3755 rd_kafka_rebalance_op(rkcg,
3756 RD_KAFKA_RESP_ERR__ASSIGN_PARTITIONS,
3757 assignment, "new assignment");
3758 }
3759 }
3760
3761
3762 /**
3763 * Clean up any group-leader related resources.
3764 *
3765 * Locality: cgrp thread
3766 */
rd_kafka_cgrp_group_leader_reset(rd_kafka_cgrp_t * rkcg,const char * reason)3767 static void rd_kafka_cgrp_group_leader_reset (rd_kafka_cgrp_t *rkcg,
3768 const char *reason) {
3769 rd_kafka_dbg(rkcg->rkcg_rk, CGRP, "GRPLEADER",
3770 "Group \"%.*s\": resetting group leader info: %s",
3771 RD_KAFKAP_STR_PR(rkcg->rkcg_group_id), reason);
3772
3773 if (rkcg->rkcg_group_leader.members) {
3774 int i;
3775
3776 for (i = 0 ; i < rkcg->rkcg_group_leader.member_cnt ; i++)
3777 rd_kafka_group_member_clear(&rkcg->rkcg_group_leader.
3778 members[i]);
3779 rkcg->rkcg_group_leader.member_cnt = 0;
3780 rd_free(rkcg->rkcg_group_leader.members);
3781 rkcg->rkcg_group_leader.members = NULL;
3782 }
3783 }
3784
3785
3786 /**
3787 * @brief React to a RD_KAFKA_RESP_ERR_REBALANCE_IN_PROGRESS broker response.
3788 */
rd_kafka_cgrp_group_is_rebalancing(rd_kafka_cgrp_t * rkcg)3789 static void rd_kafka_cgrp_group_is_rebalancing (rd_kafka_cgrp_t *rkcg) {
3790
3791 if (rd_kafka_cgrp_rebalance_protocol(rkcg) ==
3792 RD_KAFKA_REBALANCE_PROTOCOL_EAGER) {
3793 rd_kafka_cgrp_revoke_all_rejoin_maybe(rkcg,
3794 rd_false/*lost*/,
3795 rd_false/*initiating*/,
3796 "rebalance in progress");
3797 return;
3798 }
3799
3800
3801 /* In the COOPERATIVE case, simply rejoin the group
3802 * - partitions are unassigned on SyncGroup response,
3803 * not prior to JoinGroup as with the EAGER case. */
3804
3805 if (RD_KAFKA_CGRP_REBALANCING(rkcg)) {
3806 rd_kafka_dbg(rkcg->rkcg_rk, CONSUMER|RD_KAFKA_DBG_CGRP,
3807 "REBALANCE", "Group \"%.*s\": skipping "
3808 "COOPERATIVE rebalance in state %s "
3809 "(join-state %s)%s%s%s",
3810 RD_KAFKAP_STR_PR(rkcg->rkcg_group_id),
3811 rd_kafka_cgrp_state_names[rkcg->rkcg_state],
3812 rd_kafka_cgrp_join_state_names[
3813 rkcg->rkcg_join_state],
3814 RD_KAFKA_CGRP_WAIT_ASSIGN_CALL(rkcg)
3815 ? " (awaiting assign call)" : "",
3816 (rkcg->rkcg_rebalance_incr_assignment != NULL)
3817 ? " (incremental assignment pending)": "",
3818 rkcg->rkcg_rebalance_rejoin
3819 ? " (rebalance rejoin)": "");
3820 return;
3821 }
3822
3823 rd_kafka_cgrp_rejoin(rkcg, "Group is rebalancing");
3824 }
3825
3826
3827
3828 /**
3829 * @brief Triggers the application rebalance callback if required to
3830 * revoke partitions, and transition to INIT state for (eventual)
3831 * rejoin. Does nothing if a rebalance workflow is already in
3832 * progress
3833 */
rd_kafka_cgrp_revoke_all_rejoin_maybe(rd_kafka_cgrp_t * rkcg,rd_bool_t assignment_lost,rd_bool_t initiating,const char * reason)3834 static void rd_kafka_cgrp_revoke_all_rejoin_maybe (rd_kafka_cgrp_t *rkcg,
3835 rd_bool_t assignment_lost,
3836 rd_bool_t initiating,
3837 const char *reason) {
3838 if (RD_KAFKA_CGRP_REBALANCING(rkcg)) {
3839 rd_kafka_dbg(
3840 rkcg->rkcg_rk, CONSUMER|RD_KAFKA_DBG_CGRP,
3841 "REBALANCE", "Group \"%.*s\": rebalance (%s) "
3842 "already in progress, skipping in state %s "
3843 "(join-state %s) with %d assigned partition(s)%s%s%s: "
3844 "%s",
3845 RD_KAFKAP_STR_PR(rkcg->rkcg_group_id),
3846 rd_kafka_rebalance_protocol2str(
3847 rd_kafka_cgrp_rebalance_protocol(rkcg)),
3848 rd_kafka_cgrp_state_names[rkcg->rkcg_state],
3849 rd_kafka_cgrp_join_state_names[rkcg->rkcg_join_state],
3850 rkcg->rkcg_group_assignment ?
3851 rkcg->rkcg_group_assignment->cnt : 0,
3852 assignment_lost ? " (lost)" : "",
3853 rkcg->rkcg_rebalance_incr_assignment ?
3854 ", incremental assignment in progress" : "",
3855 rkcg->rkcg_rebalance_rejoin ?
3856 ", rejoin on rebalance" : "",
3857 reason);
3858 return;
3859 }
3860
3861 rd_kafka_cgrp_revoke_all_rejoin(rkcg, assignment_lost,
3862 initiating, reason);
3863 }
3864
3865
3866 /**
3867 * @brief Triggers the application rebalance callback if required to
3868 * revoke partitions, and transition to INIT state for (eventual)
3869 * rejoin.
3870 */
rd_kafka_cgrp_revoke_all_rejoin(rd_kafka_cgrp_t * rkcg,rd_bool_t assignment_lost,rd_bool_t initiating,const char * reason)3871 static void rd_kafka_cgrp_revoke_all_rejoin (rd_kafka_cgrp_t *rkcg,
3872 rd_bool_t assignment_lost,
3873 rd_bool_t initiating,
3874 const char *reason) {
3875
3876 rd_kafka_rebalance_protocol_t protocol =
3877 rd_kafka_cgrp_rebalance_protocol(rkcg);
3878
3879 rd_bool_t terminating =
3880 unlikely(rkcg->rkcg_flags & RD_KAFKA_CGRP_F_TERMINATE);
3881
3882
3883 rd_kafka_dbg(rkcg->rkcg_rk, CONSUMER|RD_KAFKA_DBG_CGRP, "REBALANCE",
3884 "Group \"%.*s\" %s (%s) in state %s (join-state %s) "
3885 "with %d assigned partition(s)%s: %s",
3886 RD_KAFKAP_STR_PR(rkcg->rkcg_group_id),
3887 initiating ? "initiating rebalance" : "is rebalancing",
3888 rd_kafka_rebalance_protocol2str(protocol),
3889 rd_kafka_cgrp_state_names[rkcg->rkcg_state],
3890 rd_kafka_cgrp_join_state_names[rkcg->rkcg_join_state],
3891 rkcg->rkcg_group_assignment ?
3892 rkcg->rkcg_group_assignment->cnt : 0,
3893 assignment_lost ? " (lost)" : "",
3894 reason);
3895
3896 rd_snprintf(rkcg->rkcg_c.rebalance_reason,
3897 sizeof(rkcg->rkcg_c.rebalance_reason), "%s", reason);
3898
3899
3900 if (protocol == RD_KAFKA_REBALANCE_PROTOCOL_EAGER ||
3901 protocol == RD_KAFKA_REBALANCE_PROTOCOL_NONE) {
3902 /* EAGER case (or initial subscribe) - revoke partitions which
3903 * will be followed by rejoin, if required. */
3904
3905 if (assignment_lost)
3906 rd_kafka_cgrp_assignment_set_lost(
3907 rkcg, "%s: revoking assignment and rejoining",
3908 reason);
3909
3910 /* Schedule application rebalance op if there is an existing
3911 * assignment (albeit perhaps empty) and there is no
3912 * outstanding rebalance op in progress. */
3913 if (rkcg->rkcg_group_assignment &&
3914 !RD_KAFKA_CGRP_WAIT_ASSIGN_CALL(rkcg)) {
3915 rd_kafka_rebalance_op(
3916 rkcg,
3917 RD_KAFKA_RESP_ERR__REVOKE_PARTITIONS,
3918 rkcg->rkcg_group_assignment, reason);
3919 } else {
3920 /* Skip the join backoff */
3921 rd_interval_reset(&rkcg->rkcg_join_intvl);
3922
3923 rd_kafka_cgrp_rejoin(rkcg, "%s", reason);
3924 }
3925
3926 return;
3927 }
3928
3929
3930 /* COOPERATIVE case. */
3931
3932 /* All partitions should never be revoked unless terminating, leaving
3933 * the group, or on assignment lost. Another scenario represents a
3934 * logic error. Fail fast in this case. */
3935 if (!(terminating ||
3936 assignment_lost ||
3937 (rkcg->rkcg_flags & RD_KAFKA_CGRP_F_LEAVE_ON_UNASSIGN_DONE))) {
3938 rd_kafka_log(rkcg->rkcg_rk, LOG_ERR, "REBALANCE",
3939 "Group \"%s\": unexpected instruction to revoke "
3940 "current assignment and rebalance "
3941 "(terminating=%d, assignment_lost=%d, "
3942 "LEAVE_ON_UNASSIGN_DONE=%d)",
3943 rkcg->rkcg_group_id->str,
3944 terminating, assignment_lost,
3945 (rkcg->rkcg_flags &
3946 RD_KAFKA_CGRP_F_LEAVE_ON_UNASSIGN_DONE));
3947 rd_dassert(!*"BUG: unexpected instruction to revoke "
3948 "current assignment and rebalance");
3949 }
3950
3951 if (rkcg->rkcg_group_assignment &&
3952 rkcg->rkcg_group_assignment->cnt > 0) {
3953 if (assignment_lost)
3954 rd_kafka_cgrp_assignment_set_lost(
3955 rkcg,
3956 "%s: revoking incremental assignment "
3957 "and rejoining", reason);
3958
3959 rd_kafka_dbg(rkcg->rkcg_rk,
3960 CONSUMER|RD_KAFKA_DBG_CGRP,
3961 "REBALANCE", "Group \"%.*s\": revoking "
3962 "all %d partition(s)%s%s",
3963 RD_KAFKAP_STR_PR(rkcg->rkcg_group_id),
3964 rkcg->rkcg_group_assignment->cnt,
3965 terminating ? " (terminating)" : "",
3966 assignment_lost ? " (assignment lost)" : "");
3967
3968 rd_kafka_rebalance_op_incr(
3969 rkcg, RD_KAFKA_RESP_ERR__REVOKE_PARTITIONS,
3970 rkcg->rkcg_group_assignment,
3971 terminating ? rd_false : rd_true /*rejoin*/,
3972 reason);
3973
3974 return;
3975 }
3976
3977 if (terminating) {
3978 /* If terminating, then don't rejoin group. */
3979 rd_kafka_dbg(rkcg->rkcg_rk,
3980 CONSUMER|RD_KAFKA_DBG_CGRP,
3981 "REBALANCE", "Group \"%.*s\": consumer is "
3982 "terminating, skipping rejoin",
3983 RD_KAFKAP_STR_PR(rkcg->rkcg_group_id));
3984 return;
3985 }
3986
3987 rd_kafka_cgrp_rejoin(rkcg, "Current assignment is empty");
3988 }
3989
3990
3991 /**
3992 * @brief `max.poll.interval.ms` enforcement check timer.
3993 *
3994 * @locality rdkafka main thread
3995 * @locks none
3996 */
3997 static void
rd_kafka_cgrp_max_poll_interval_check_tmr_cb(rd_kafka_timers_t * rkts,void * arg)3998 rd_kafka_cgrp_max_poll_interval_check_tmr_cb (rd_kafka_timers_t *rkts,
3999 void *arg) {
4000 rd_kafka_cgrp_t *rkcg = arg;
4001 rd_kafka_t *rk = rkcg->rkcg_rk;
4002 int exceeded;
4003
4004 exceeded = rd_kafka_max_poll_exceeded(rk);
4005
4006 if (likely(!exceeded))
4007 return;
4008
4009 rd_kafka_log(rk, LOG_WARNING, "MAXPOLL",
4010 "Application maximum poll interval (%dms) "
4011 "exceeded by %dms "
4012 "(adjust max.poll.interval.ms for "
4013 "long-running message processing): "
4014 "leaving group",
4015 rk->rk_conf.max_poll_interval_ms, exceeded);
4016
4017 rd_kafka_consumer_err(rkcg->rkcg_q, RD_KAFKA_NODEID_UA,
4018 RD_KAFKA_RESP_ERR__MAX_POLL_EXCEEDED,
4019 0, NULL, NULL, RD_KAFKA_OFFSET_INVALID,
4020 "Application maximum poll interval (%dms) "
4021 "exceeded by %dms",
4022 rk->rk_conf.max_poll_interval_ms, exceeded);
4023
4024 rkcg->rkcg_flags |= RD_KAFKA_CGRP_F_MAX_POLL_EXCEEDED;
4025
4026 rd_kafka_timer_stop(rkts, &rkcg->rkcg_max_poll_interval_tmr,
4027 1/*lock*/);
4028
4029 /* Leave the group before calling rebalance since the standard leave
4030 * will be triggered first after the rebalance callback has been served.
4031 * But since the application is blocked still doing processing
4032 * that leave will be further delayed.
4033 *
4034 * KIP-345: static group members should continue to respect
4035 * `max.poll.interval.ms` but should not send a LeaveGroupRequest.
4036 */
4037 if (!RD_KAFKA_CGRP_IS_STATIC_MEMBER(rkcg))
4038 rd_kafka_cgrp_leave(rkcg);
4039
4040 /* Timing out or leaving the group invalidates the member id, reset it
4041 * now to avoid an ERR_UNKNOWN_MEMBER_ID on the next join. */
4042 rd_kafka_cgrp_set_member_id(rkcg, "");
4043
4044 /* Trigger rebalance */
4045 rd_kafka_cgrp_revoke_all_rejoin_maybe(rkcg,
4046 rd_true/*lost*/,
4047 rd_true/*initiating*/,
4048 "max.poll.interval.ms exceeded");
4049 }
4050
4051
4052 /**
4053 * @brief Generate consumer errors for each topic in the list.
4054 *
4055 * Also replaces the list of last reported topic errors so that repeated
4056 * errors are silenced.
4057 *
4058 * @param errored Errored topics.
4059 * @param error_prefix Error message prefix.
4060 *
4061 * @remark Assumes ownership of \p errored.
4062 */
4063 static void
rd_kafka_propagate_consumer_topic_errors(rd_kafka_cgrp_t * rkcg,rd_kafka_topic_partition_list_t * errored,const char * error_prefix)4064 rd_kafka_propagate_consumer_topic_errors (
4065 rd_kafka_cgrp_t *rkcg, rd_kafka_topic_partition_list_t *errored,
4066 const char *error_prefix) {
4067 int i;
4068
4069 for (i = 0 ; i < errored->cnt ; i++) {
4070 rd_kafka_topic_partition_t *topic = &errored->elems[i];
4071 rd_kafka_topic_partition_t *prev;
4072
4073 rd_assert(topic->err);
4074
4075 /* Normalize error codes, unknown topic may be
4076 * reported by the broker, or the lack of a topic in
4077 * metadata response is figured out by the client.
4078 * Make sure the application only sees one error code
4079 * for both these cases. */
4080 if (topic->err == RD_KAFKA_RESP_ERR__UNKNOWN_TOPIC)
4081 topic->err = RD_KAFKA_RESP_ERR_UNKNOWN_TOPIC_OR_PART;
4082
4083 /* Check if this topic errored previously */
4084 prev = rd_kafka_topic_partition_list_find(
4085 rkcg->rkcg_errored_topics, topic->topic,
4086 RD_KAFKA_PARTITION_UA);
4087
4088 if (prev && prev->err == topic->err)
4089 continue; /* This topic already reported same error */
4090
4091 rd_kafka_dbg(rkcg->rkcg_rk, CONSUMER|RD_KAFKA_DBG_TOPIC,
4092 "TOPICERR",
4093 "%s: %s: %s",
4094 error_prefix, topic->topic,
4095 rd_kafka_err2str(topic->err));
4096
4097 /* Send consumer error to application */
4098 rd_kafka_consumer_err(rkcg->rkcg_q, RD_KAFKA_NODEID_UA,
4099 topic->err, 0,
4100 topic->topic, NULL,
4101 RD_KAFKA_OFFSET_INVALID,
4102 "%s: %s: %s",
4103 error_prefix, topic->topic,
4104 rd_kafka_err2str(topic->err));
4105 }
4106
4107 rd_kafka_topic_partition_list_destroy(rkcg->rkcg_errored_topics);
4108 rkcg->rkcg_errored_topics = errored;
4109 }
4110
4111
4112 /**
4113 * @brief Work out the topics currently subscribed to that do not
4114 * match any pattern in \p subscription.
4115 */
4116 static rd_kafka_topic_partition_list_t *
rd_kafka_cgrp_get_unsubscribing_topics(rd_kafka_cgrp_t * rkcg,rd_kafka_topic_partition_list_t * subscription)4117 rd_kafka_cgrp_get_unsubscribing_topics (rd_kafka_cgrp_t *rkcg,
4118 rd_kafka_topic_partition_list_t
4119 *subscription) {
4120 int i;
4121 rd_kafka_topic_partition_list_t *result;
4122
4123 result = rd_kafka_topic_partition_list_new(
4124 rkcg->rkcg_subscribed_topics->rl_cnt);
4125
4126 /* TODO: Something that isn't O(N*M) */
4127 for (i=0; i<rkcg->rkcg_subscribed_topics->rl_cnt; i++) {
4128 int j;
4129 const char *topic = ((rd_kafka_topic_info_t *)
4130 rkcg->rkcg_subscribed_topics->rl_elems[i])->topic;
4131
4132 for (j=0; j<subscription->cnt; j++) {
4133 const char *pattern = subscription->elems[j].topic;
4134 if (rd_kafka_topic_match(rkcg->rkcg_rk,
4135 pattern,
4136 topic)) {
4137 break;
4138 }
4139 }
4140
4141 if (j == subscription->cnt)
4142 rd_kafka_topic_partition_list_add(
4143 result, topic,
4144 RD_KAFKA_PARTITION_UA);
4145 }
4146
4147 if (result->cnt == 0) {
4148 rd_kafka_topic_partition_list_destroy(result);
4149 return NULL;
4150 }
4151
4152 return result;
4153 }
4154
4155
4156 /**
4157 * @brief Determine the partitions to revoke, given the topics being
4158 * unassigned.
4159 */
4160 static rd_kafka_topic_partition_list_t *
rd_kafka_cgrp_calculate_subscribe_revoking_partitions(rd_kafka_cgrp_t * rkcg,const rd_kafka_topic_partition_list_t * unsubscribing)4161 rd_kafka_cgrp_calculate_subscribe_revoking_partitions(
4162 rd_kafka_cgrp_t *rkcg,
4163 const rd_kafka_topic_partition_list_t *unsubscribing) {
4164 rd_kafka_topic_partition_list_t *revoking;
4165 const rd_kafka_topic_partition_t *rktpar;
4166
4167 if (!unsubscribing)
4168 return NULL;
4169
4170 if (!rkcg->rkcg_group_assignment ||
4171 rkcg->rkcg_group_assignment->cnt == 0)
4172 return NULL;
4173
4174 revoking = rd_kafka_topic_partition_list_new(
4175 rkcg->rkcg_group_assignment->cnt);
4176
4177 /* TODO: Something that isn't O(N*M). */
4178 RD_KAFKA_TPLIST_FOREACH(rktpar, unsubscribing) {
4179 const rd_kafka_topic_partition_t *assigned;
4180
4181 RD_KAFKA_TPLIST_FOREACH(assigned, rkcg->rkcg_group_assignment) {
4182 if (!strcmp(assigned->topic, rktpar->topic)) {
4183 rd_kafka_topic_partition_list_add(
4184 revoking,
4185 assigned->topic,
4186 assigned->partition);
4187 continue;
4188 }
4189 }
4190 }
4191
4192 if (revoking->cnt == 0) {
4193 rd_kafka_topic_partition_list_destroy(revoking);
4194 revoking = NULL;
4195 }
4196
4197 return revoking;
4198 }
4199
4200
4201 /**
4202 * @brief Handle a new subscription that is modifying an existing subscription
4203 * in the COOPERATIVE case.
4204 *
4205 * @remark Assumes ownership of \p rktparlist.
4206 */
4207 static rd_kafka_resp_err_t
rd_kafka_cgrp_modify_subscription(rd_kafka_cgrp_t * rkcg,rd_kafka_topic_partition_list_t * rktparlist)4208 rd_kafka_cgrp_modify_subscription (rd_kafka_cgrp_t *rkcg,
4209 rd_kafka_topic_partition_list_t
4210 *rktparlist) {
4211 rd_kafka_topic_partition_list_t *unsubscribing_topics;
4212 rd_kafka_topic_partition_list_t *revoking;
4213 rd_list_t *tinfos;
4214 rd_kafka_topic_partition_list_t *errored;
4215 int metadata_age;
4216 int old_cnt = rkcg->rkcg_subscription->cnt;
4217
4218 rkcg->rkcg_flags &= ~RD_KAFKA_CGRP_F_WILDCARD_SUBSCRIPTION;
4219
4220 if (rd_kafka_topic_partition_list_regex_cnt(rktparlist) > 0)
4221 rkcg->rkcg_flags |= RD_KAFKA_CGRP_F_WILDCARD_SUBSCRIPTION;
4222
4223 /* Topics in rkcg_subscribed_topics that don't match any pattern in
4224 the new subscription. */
4225 unsubscribing_topics = rd_kafka_cgrp_get_unsubscribing_topics(
4226 rkcg, rktparlist);
4227
4228 /* Currently assigned topic partitions that are no longer desired. */
4229 revoking = rd_kafka_cgrp_calculate_subscribe_revoking_partitions(
4230 rkcg, unsubscribing_topics);
4231
4232 rd_kafka_topic_partition_list_destroy(rkcg->rkcg_subscription);
4233 rkcg->rkcg_subscription = rktparlist;
4234
4235 if (rd_kafka_cgrp_metadata_refresh(rkcg, &metadata_age,
4236 "modify subscription") == 1) {
4237 rd_kafka_dbg(rkcg->rkcg_rk, CGRP|RD_KAFKA_DBG_CONSUMER,
4238 "MODSUB",
4239 "Group \"%.*s\": postponing join until "
4240 "up-to-date metadata is available",
4241 RD_KAFKAP_STR_PR(rkcg->rkcg_group_id));
4242
4243 rd_assert(rkcg->rkcg_join_state ==
4244 RD_KAFKA_CGRP_JOIN_STATE_INIT ||
4245 /* Possible via rd_kafka_cgrp_modify_subscription */
4246 rkcg->rkcg_join_state ==
4247 RD_KAFKA_CGRP_JOIN_STATE_STEADY);
4248
4249 rd_kafka_cgrp_set_join_state(
4250 rkcg, RD_KAFKA_CGRP_JOIN_STATE_WAIT_METADATA);
4251
4252
4253 /* Revoke/join will occur after metadata refresh completes */
4254 if (revoking)
4255 rd_kafka_topic_partition_list_destroy(revoking);
4256 if (unsubscribing_topics)
4257 rd_kafka_topic_partition_list_destroy(
4258 unsubscribing_topics);
4259
4260 return RD_KAFKA_RESP_ERR_NO_ERROR;
4261 }
4262
4263 rd_kafka_dbg(rkcg->rkcg_rk, CGRP|RD_KAFKA_DBG_CONSUMER, "SUBSCRIBE",
4264 "Group \"%.*s\": modifying subscription of size %d to "
4265 "new subscription of size %d, removing %d topic(s), "
4266 "revoking %d partition(s) (join-state %s)",
4267 RD_KAFKAP_STR_PR(rkcg->rkcg_group_id),
4268 old_cnt, rkcg->rkcg_subscription->cnt,
4269 unsubscribing_topics ?
4270 unsubscribing_topics->cnt : 0,
4271 revoking ? revoking->cnt : 0,
4272 rd_kafka_cgrp_join_state_names[rkcg->rkcg_join_state]);
4273
4274 if (unsubscribing_topics)
4275 rd_kafka_topic_partition_list_destroy(unsubscribing_topics);
4276
4277 /* Create a list of the topics in metadata that matches the new
4278 * subscription */
4279 tinfos = rd_list_new(rkcg->rkcg_subscription->cnt,
4280 (void *)rd_kafka_topic_info_destroy);
4281
4282 /* Unmatched topics will be added to the errored list. */
4283 errored = rd_kafka_topic_partition_list_new(0);
4284
4285 if (rkcg->rkcg_flags & RD_KAFKA_CGRP_F_WILDCARD_SUBSCRIPTION)
4286 rd_kafka_metadata_topic_match(rkcg->rkcg_rk,
4287 tinfos, rkcg->rkcg_subscription,
4288 errored);
4289 else
4290 rd_kafka_metadata_topic_filter(rkcg->rkcg_rk,
4291 tinfos,
4292 rkcg->rkcg_subscription,
4293 errored);
4294
4295 /* Propagate consumer errors for any non-existent or errored topics.
4296 * The function takes ownership of errored. */
4297 rd_kafka_propagate_consumer_topic_errors(
4298 rkcg, errored, "Subscribed topic not available");
4299
4300 if (rd_kafka_cgrp_update_subscribed_topics(rkcg, tinfos) &&
4301 !revoking) {
4302 rd_kafka_cgrp_rejoin(rkcg, "Subscription modified");
4303 return RD_KAFKA_RESP_ERR_NO_ERROR;
4304 }
4305
4306 if (revoking) {
4307 rd_kafka_dbg(rkcg->rkcg_rk,
4308 CONSUMER|RD_KAFKA_DBG_CGRP,
4309 "REBALANCE", "Group \"%.*s\" revoking "
4310 "%d of %d partition(s)",
4311 RD_KAFKAP_STR_PR(rkcg->rkcg_group_id),
4312 revoking->cnt,
4313 rkcg->rkcg_group_assignment->cnt);
4314
4315 rd_kafka_rebalance_op_incr(rkcg,
4316 RD_KAFKA_RESP_ERR__REVOKE_PARTITIONS,
4317 revoking, rd_true/*rejoin*/, "subscribe");
4318
4319 rd_kafka_topic_partition_list_destroy(revoking);
4320 }
4321
4322 return RD_KAFKA_RESP_ERR_NO_ERROR;
4323 }
4324
4325
4326 /**
4327 * Remove existing topic subscription.
4328 */
4329 static rd_kafka_resp_err_t
rd_kafka_cgrp_unsubscribe(rd_kafka_cgrp_t * rkcg,rd_bool_t leave_group)4330 rd_kafka_cgrp_unsubscribe (rd_kafka_cgrp_t *rkcg, rd_bool_t leave_group) {
4331
4332 rd_kafka_dbg(rkcg->rkcg_rk, CGRP, "UNSUBSCRIBE",
4333 "Group \"%.*s\": unsubscribe from current %ssubscription "
4334 "of size %d (leave group=%s, has joined=%s, %s, "
4335 "join-state %s)",
4336 RD_KAFKAP_STR_PR(rkcg->rkcg_group_id),
4337 rkcg->rkcg_subscription ? "" : "unset ",
4338 rkcg->rkcg_subscription ? rkcg->rkcg_subscription->cnt : 0,
4339 RD_STR_ToF(leave_group),
4340 RD_STR_ToF(RD_KAFKA_CGRP_HAS_JOINED(rkcg)),
4341 rkcg->rkcg_member_id ?
4342 rkcg->rkcg_member_id->str : "n/a",
4343 rd_kafka_cgrp_join_state_names[rkcg->rkcg_join_state]);
4344
4345 rd_kafka_timer_stop(&rkcg->rkcg_rk->rk_timers,
4346 &rkcg->rkcg_max_poll_interval_tmr, 1/*lock*/);
4347
4348 if (rkcg->rkcg_subscription) {
4349 rd_kafka_topic_partition_list_destroy(rkcg->rkcg_subscription);
4350 rkcg->rkcg_subscription = NULL;
4351 }
4352
4353 rd_kafka_cgrp_update_subscribed_topics(rkcg, NULL);
4354
4355 /*
4356 * Clean-up group leader duties, if any.
4357 */
4358 rd_kafka_cgrp_group_leader_reset(rkcg, "unsubscribe");
4359
4360 if (leave_group && RD_KAFKA_CGRP_HAS_JOINED(rkcg))
4361 rkcg->rkcg_flags |= RD_KAFKA_CGRP_F_LEAVE_ON_UNASSIGN_DONE;
4362
4363 /* FIXME: Why are we only revoking if !assignment_lost ? */
4364 if (!rd_kafka_cgrp_assignment_is_lost(rkcg))
4365 rd_kafka_cgrp_revoke_all_rejoin(rkcg,
4366 rd_false/*not lost*/,
4367 rd_true/*initiating*/,
4368 "unsubscribe");
4369
4370 rkcg->rkcg_flags &= ~(RD_KAFKA_CGRP_F_SUBSCRIPTION |
4371 RD_KAFKA_CGRP_F_WILDCARD_SUBSCRIPTION);
4372
4373 return RD_KAFKA_RESP_ERR_NO_ERROR;
4374 }
4375
4376
4377 /**
4378 * Set new atomic topic subscription.
4379 */
4380 static rd_kafka_resp_err_t
rd_kafka_cgrp_subscribe(rd_kafka_cgrp_t * rkcg,rd_kafka_topic_partition_list_t * rktparlist)4381 rd_kafka_cgrp_subscribe (rd_kafka_cgrp_t *rkcg,
4382 rd_kafka_topic_partition_list_t *rktparlist) {
4383
4384 rd_kafka_dbg(rkcg->rkcg_rk, CGRP|RD_KAFKA_DBG_CONSUMER, "SUBSCRIBE",
4385 "Group \"%.*s\": subscribe to new %ssubscription "
4386 "of %d topics (join-state %s)",
4387 RD_KAFKAP_STR_PR(rkcg->rkcg_group_id),
4388 rktparlist ? "" : "unset ",
4389 rktparlist ? rktparlist->cnt : 0,
4390 rd_kafka_cgrp_join_state_names[rkcg->rkcg_join_state]);
4391
4392 if (rkcg->rkcg_rk->rk_conf.enabled_assignor_cnt == 0)
4393 return RD_KAFKA_RESP_ERR__INVALID_ARG;
4394
4395 /* If the consumer has raised a fatal error treat all subscribes as
4396 unsubscribe */
4397 if (rd_kafka_fatal_error_code(rkcg->rkcg_rk)) {
4398 if (rkcg->rkcg_subscription)
4399 rd_kafka_cgrp_unsubscribe(rkcg,
4400 rd_true/*leave group*/);
4401 return RD_KAFKA_RESP_ERR__FATAL;
4402 }
4403
4404 /* Clear any existing postponed subscribe. */
4405 if (rkcg->rkcg_next_subscription)
4406 rd_kafka_topic_partition_list_destroy_free(
4407 rkcg->rkcg_next_subscription);
4408 rkcg->rkcg_next_subscription = NULL;
4409 rkcg->rkcg_next_unsubscribe = rd_false;
4410
4411 if (RD_KAFKA_CGRP_REBALANCING(rkcg)) {
4412 rd_kafka_dbg(rkcg->rkcg_rk, CGRP|RD_KAFKA_DBG_CONSUMER,
4413 "SUBSCRIBE", "Group \"%.*s\": postponing "
4414 "subscribe until previous rebalance "
4415 "completes (join-state %s)",
4416 RD_KAFKAP_STR_PR(rkcg->rkcg_group_id),
4417 rd_kafka_cgrp_join_state_names[
4418 rkcg->rkcg_join_state]);
4419
4420 if (!rktparlist)
4421 rkcg->rkcg_next_unsubscribe = rd_true;
4422 else
4423 rkcg->rkcg_next_subscription = rktparlist;
4424
4425 return RD_KAFKA_RESP_ERR_NO_ERROR;
4426 }
4427
4428 if (rd_kafka_cgrp_rebalance_protocol(rkcg) ==
4429 RD_KAFKA_REBALANCE_PROTOCOL_COOPERATIVE &&
4430 rktparlist &&
4431 rkcg->rkcg_subscription)
4432 return rd_kafka_cgrp_modify_subscription(rkcg, rktparlist);
4433
4434 /* Remove existing subscription first */
4435 if (rkcg->rkcg_subscription)
4436 rd_kafka_cgrp_unsubscribe(
4437 rkcg,
4438 rktparlist ?
4439 rd_false/* don't leave group if new subscription */ :
4440 rd_true/* leave group if no new subscription */);
4441
4442 if (!rktparlist)
4443 return RD_KAFKA_RESP_ERR_NO_ERROR;
4444
4445 rkcg->rkcg_flags |= RD_KAFKA_CGRP_F_SUBSCRIPTION;
4446
4447 if (rd_kafka_topic_partition_list_regex_cnt(rktparlist) > 0)
4448 rkcg->rkcg_flags |= RD_KAFKA_CGRP_F_WILDCARD_SUBSCRIPTION;
4449
4450 rkcg->rkcg_subscription = rktparlist;
4451
4452 rd_kafka_cgrp_join(rkcg);
4453
4454 return RD_KAFKA_RESP_ERR_NO_ERROR;
4455 }
4456
4457
4458
4459
4460
4461
4462 /**
4463 * Same as cgrp_terminate() but called from the cgrp/main thread upon receiving
4464 * the op 'rko' from cgrp_terminate().
4465 *
4466 * NOTE: Takes ownership of 'rko'
4467 *
4468 * Locality: main thread
4469 */
4470 void
rd_kafka_cgrp_terminate0(rd_kafka_cgrp_t * rkcg,rd_kafka_op_t * rko)4471 rd_kafka_cgrp_terminate0 (rd_kafka_cgrp_t *rkcg, rd_kafka_op_t *rko) {
4472
4473 rd_kafka_assert(NULL, thrd_is_current(rkcg->rkcg_rk->rk_thread));
4474
4475 rd_kafka_dbg(rkcg->rkcg_rk, CGRP, "CGRPTERM",
4476 "Terminating group \"%.*s\" in state %s "
4477 "with %d partition(s)",
4478 RD_KAFKAP_STR_PR(rkcg->rkcg_group_id),
4479 rd_kafka_cgrp_state_names[rkcg->rkcg_state],
4480 rd_list_cnt(&rkcg->rkcg_toppars));
4481
4482 if (unlikely(rkcg->rkcg_state == RD_KAFKA_CGRP_STATE_TERM ||
4483 (rkcg->rkcg_flags & RD_KAFKA_CGRP_F_TERMINATE) ||
4484 rkcg->rkcg_reply_rko != NULL)) {
4485 /* Already terminating or handling a previous terminate */
4486 if (rko) {
4487 rd_kafka_q_t *rkq = rko->rko_replyq.q;
4488 rko->rko_replyq.q = NULL;
4489 rd_kafka_consumer_err(rkq, RD_KAFKA_NODEID_UA,
4490 RD_KAFKA_RESP_ERR__IN_PROGRESS,
4491 rko->rko_replyq.version,
4492 NULL, NULL,
4493 RD_KAFKA_OFFSET_INVALID,
4494 "Group is %s",
4495 rkcg->rkcg_reply_rko ?
4496 "terminating":"terminated");
4497 rd_kafka_q_destroy(rkq);
4498 rd_kafka_op_destroy(rko);
4499 }
4500 return;
4501 }
4502
4503 /* Mark for stopping, the actual state transition
4504 * is performed when all toppars have left. */
4505 rkcg->rkcg_flags |= RD_KAFKA_CGRP_F_TERMINATE;
4506 rkcg->rkcg_ts_terminate = rd_clock();
4507 rkcg->rkcg_reply_rko = rko;
4508
4509 if (rkcg->rkcg_flags & RD_KAFKA_CGRP_F_SUBSCRIPTION)
4510 rd_kafka_cgrp_unsubscribe(
4511 rkcg,
4512 /* Leave group if this is a controlled shutdown */
4513 !rd_kafka_destroy_flags_no_consumer_close(
4514 rkcg->rkcg_rk));
4515
4516 /* Reset the wait-for-LeaveGroup flag if there is an outstanding
4517 * LeaveGroupRequest being waited on (from a prior unsubscribe), but
4518 * the destroy flags have NO_CONSUMER_CLOSE set, which calls
4519 * for immediate termination. */
4520 if (rd_kafka_destroy_flags_no_consumer_close(rkcg->rkcg_rk))
4521 rkcg->rkcg_flags &= ~RD_KAFKA_CGRP_F_WAIT_LEAVE;
4522
4523 /* If there's an oustanding rebalance which has not yet been
4524 * served by the application it will be served from consumer_close().
4525 * If the instance is being terminated with NO_CONSUMER_CLOSE we
4526 * trigger unassign directly to avoid stalling on rebalance callback
4527 * queues that are no longer served by the application. */
4528 if (!RD_KAFKA_CGRP_WAIT_ASSIGN_CALL(rkcg) ||
4529 rd_kafka_destroy_flags_no_consumer_close(rkcg->rkcg_rk))
4530 rd_kafka_cgrp_unassign(rkcg);
4531
4532 /* Serve assignment so it can start to decommission */
4533 rd_kafka_assignment_serve(rkcg->rkcg_rk);
4534
4535 /* Try to terminate right away if all preconditions are met. */
4536 rd_kafka_cgrp_try_terminate(rkcg);
4537 }
4538
4539
4540 /**
4541 * Terminate and decommission a cgrp asynchronously.
4542 *
4543 * Locality: any thread
4544 */
rd_kafka_cgrp_terminate(rd_kafka_cgrp_t * rkcg,rd_kafka_replyq_t replyq)4545 void rd_kafka_cgrp_terminate (rd_kafka_cgrp_t *rkcg, rd_kafka_replyq_t replyq) {
4546 rd_kafka_assert(NULL, !thrd_is_current(rkcg->rkcg_rk->rk_thread));
4547 rd_kafka_cgrp_op(rkcg, NULL, replyq, RD_KAFKA_OP_TERMINATE, 0);
4548 }
4549
4550
4551 struct _op_timeout_offset_commit {
4552 rd_ts_t now;
4553 rd_kafka_t *rk;
4554 rd_list_t expired;
4555 };
4556
4557 /**
4558 * q_filter callback for expiring OFFSET_COMMIT timeouts.
4559 */
rd_kafka_op_offset_commit_timeout_check(rd_kafka_q_t * rkq,rd_kafka_op_t * rko,void * opaque)4560 static int rd_kafka_op_offset_commit_timeout_check (rd_kafka_q_t *rkq,
4561 rd_kafka_op_t *rko,
4562 void *opaque) {
4563 struct _op_timeout_offset_commit *state =
4564 (struct _op_timeout_offset_commit*)opaque;
4565
4566 if (likely(rko->rko_type != RD_KAFKA_OP_OFFSET_COMMIT ||
4567 rko->rko_u.offset_commit.ts_timeout == 0 ||
4568 rko->rko_u.offset_commit.ts_timeout > state->now)) {
4569 return 0;
4570 }
4571
4572 rd_kafka_q_deq0(rkq, rko);
4573
4574 /* Add to temporary list to avoid recursive
4575 * locking of rkcg_wait_coord_q. */
4576 rd_list_add(&state->expired, rko);
4577 return 1;
4578 }
4579
4580
4581 /**
4582 * Scan for various timeouts.
4583 */
rd_kafka_cgrp_timeout_scan(rd_kafka_cgrp_t * rkcg,rd_ts_t now)4584 static void rd_kafka_cgrp_timeout_scan (rd_kafka_cgrp_t *rkcg, rd_ts_t now) {
4585 struct _op_timeout_offset_commit ofc_state;
4586 int i, cnt = 0;
4587 rd_kafka_op_t *rko;
4588
4589 ofc_state.now = now;
4590 ofc_state.rk = rkcg->rkcg_rk;
4591 rd_list_init(&ofc_state.expired, 0, NULL);
4592
4593 cnt += rd_kafka_q_apply(rkcg->rkcg_wait_coord_q,
4594 rd_kafka_op_offset_commit_timeout_check,
4595 &ofc_state);
4596
4597 RD_LIST_FOREACH(rko, &ofc_state.expired, i)
4598 rd_kafka_cgrp_op_handle_OffsetCommit(
4599 rkcg->rkcg_rk, NULL,
4600 RD_KAFKA_RESP_ERR__WAIT_COORD,
4601 NULL, NULL, rko);
4602
4603 rd_list_destroy(&ofc_state.expired);
4604
4605 if (cnt > 0)
4606 rd_kafka_dbg(rkcg->rkcg_rk, CGRP, "CGRPTIMEOUT",
4607 "Group \"%.*s\": timed out %d op(s), %d remain",
4608 RD_KAFKAP_STR_PR(rkcg->rkcg_group_id), cnt,
4609 rd_kafka_q_len(rkcg->rkcg_wait_coord_q));
4610
4611
4612 }
4613
4614
4615 /**
4616 * @brief Handle an assign op.
4617 * @locality rdkafka main thread
4618 * @locks none
4619 */
rd_kafka_cgrp_handle_assign_op(rd_kafka_cgrp_t * rkcg,rd_kafka_op_t * rko)4620 static void rd_kafka_cgrp_handle_assign_op (rd_kafka_cgrp_t *rkcg,
4621 rd_kafka_op_t *rko) {
4622 rd_kafka_error_t *error = NULL;
4623
4624 if (rd_kafka_cgrp_rebalance_protocol(rkcg) ==
4625 RD_KAFKA_REBALANCE_PROTOCOL_COOPERATIVE &&
4626 !(rko->rko_u.assign.method == RD_KAFKA_ASSIGN_METHOD_INCR_ASSIGN ||
4627 rko->rko_u.assign.method == RD_KAFKA_ASSIGN_METHOD_INCR_UNASSIGN))
4628 error = rd_kafka_error_new(RD_KAFKA_RESP_ERR__STATE,
4629 "Changes to the current assignment "
4630 "must be made using "
4631 "incremental_assign() or "
4632 "incremental_unassign() "
4633 "when rebalance protocol type is "
4634 "COOPERATIVE");
4635
4636 else if (rd_kafka_cgrp_rebalance_protocol(rkcg) ==
4637 RD_KAFKA_REBALANCE_PROTOCOL_EAGER &&
4638 !(rko->rko_u.assign.method == RD_KAFKA_ASSIGN_METHOD_ASSIGN))
4639 error = rd_kafka_error_new(RD_KAFKA_RESP_ERR__STATE,
4640 "Changes to the current assignment "
4641 "must be made using "
4642 "assign() when rebalance "
4643 "protocol type is EAGER");
4644
4645 else if (rd_kafka_fatal_error_code(rkcg->rkcg_rk) ||
4646 rkcg->rkcg_flags & RD_KAFKA_CGRP_F_TERMINATE) {
4647 /* Treat all assignments as unassign when a fatal error is
4648 * raised or the cgrp is terminating. */
4649
4650 rd_kafka_dbg(rkcg->rkcg_rk, CGRP|RD_KAFKA_DBG_CONSUMER,
4651 "ASSIGN", "Group \"%s\": Consumer %s: "
4652 "treating assign as unassign",
4653 rkcg->rkcg_group_id->str,
4654 rd_kafka_fatal_error_code(rkcg->rkcg_rk) ?
4655 "has raised a fatal error" : "is terminating");
4656
4657 if (rko->rko_u.assign.partitions) {
4658 rd_kafka_topic_partition_list_destroy(
4659 rko->rko_u.assign.partitions);
4660 rko->rko_u.assign.partitions = NULL;
4661 }
4662 rko->rko_u.assign.method = RD_KAFKA_ASSIGN_METHOD_ASSIGN;
4663 }
4664
4665 if (!error) {
4666 switch (rko->rko_u.assign.method)
4667 {
4668 case RD_KAFKA_ASSIGN_METHOD_ASSIGN:
4669 /* New atomic assignment (partitions != NULL),
4670 * or unassignment (partitions == NULL) */
4671 if (rko->rko_u.assign.partitions)
4672 error = rd_kafka_cgrp_assign(
4673 rkcg,
4674 rko->rko_u.assign.partitions);
4675 else
4676 error = rd_kafka_cgrp_unassign(rkcg);
4677 break;
4678 case RD_KAFKA_ASSIGN_METHOD_INCR_ASSIGN:
4679 error = rd_kafka_cgrp_incremental_assign(
4680 rkcg,
4681 rko->rko_u.assign.partitions);
4682 break;
4683 case RD_KAFKA_ASSIGN_METHOD_INCR_UNASSIGN:
4684 error = rd_kafka_cgrp_incremental_unassign(
4685 rkcg,
4686 rko->rko_u.assign.partitions);
4687 break;
4688 default:
4689 RD_NOTREACHED();
4690 break;
4691 }
4692
4693 /* If call succeeded serve the assignment */
4694 if (!error)
4695 rd_kafka_assignment_serve(rkcg->rkcg_rk);
4696
4697
4698 }
4699
4700 if (error) {
4701 /* Log error since caller might not check
4702 * *assign() return value. */
4703 rd_kafka_log(rkcg->rkcg_rk, LOG_WARNING, "ASSIGN",
4704 "Group \"%s\": application *assign() call "
4705 "failed: %s",
4706 rkcg->rkcg_group_id->str,
4707 rd_kafka_error_string(error));
4708 }
4709
4710 rd_kafka_op_error_reply(rko, error);
4711 }
4712
4713
4714 /**
4715 * @brief Handle cgrp queue op.
4716 * @locality rdkafka main thread
4717 * @locks none
4718 */
4719 static rd_kafka_op_res_t
rd_kafka_cgrp_op_serve(rd_kafka_t * rk,rd_kafka_q_t * rkq,rd_kafka_op_t * rko,rd_kafka_q_cb_type_t cb_type,void * opaque)4720 rd_kafka_cgrp_op_serve (rd_kafka_t *rk, rd_kafka_q_t *rkq,
4721 rd_kafka_op_t *rko, rd_kafka_q_cb_type_t cb_type,
4722 void *opaque) {
4723 rd_kafka_cgrp_t *rkcg = opaque;
4724 rd_kafka_toppar_t *rktp;
4725 rd_kafka_resp_err_t err;
4726 const int silent_op = rko->rko_type == RD_KAFKA_OP_RECV_BUF;
4727
4728 rktp = rko->rko_rktp;
4729
4730 if (rktp && !silent_op)
4731 rd_kafka_dbg(rkcg->rkcg_rk, CGRP, "CGRPOP",
4732 "Group \"%.*s\" received op %s in state %s "
4733 "(join-state %s) for %.*s [%"PRId32"]",
4734 RD_KAFKAP_STR_PR(rkcg->rkcg_group_id),
4735 rd_kafka_op2str(rko->rko_type),
4736 rd_kafka_cgrp_state_names[rkcg->rkcg_state],
4737 rd_kafka_cgrp_join_state_names[rkcg->
4738 rkcg_join_state],
4739 RD_KAFKAP_STR_PR(rktp->rktp_rkt->rkt_topic),
4740 rktp->rktp_partition);
4741 else if (!silent_op)
4742 rd_kafka_dbg(rkcg->rkcg_rk, CGRP, "CGRPOP",
4743 "Group \"%.*s\" received op %s in state %s "
4744 "(join-state %s)",
4745 RD_KAFKAP_STR_PR(rkcg->rkcg_group_id),
4746 rd_kafka_op2str(rko->rko_type),
4747 rd_kafka_cgrp_state_names[rkcg->rkcg_state],
4748 rd_kafka_cgrp_join_state_names[rkcg->
4749 rkcg_join_state]);
4750
4751 switch ((int)rko->rko_type)
4752 {
4753 case RD_KAFKA_OP_NAME:
4754 /* Return the currently assigned member id. */
4755 if (rkcg->rkcg_member_id)
4756 rko->rko_u.name.str =
4757 RD_KAFKAP_STR_DUP(rkcg->rkcg_member_id);
4758 rd_kafka_op_reply(rko, 0);
4759 rko = NULL;
4760 break;
4761
4762 case RD_KAFKA_OP_CG_METADATA:
4763 /* Return the current consumer group metadata. */
4764 rko->rko_u.cg_metadata = rkcg->rkcg_member_id
4765 ? rd_kafka_consumer_group_metadata_new_with_genid(
4766 rkcg->rkcg_rk->rk_conf.group_id_str,
4767 rkcg->rkcg_generation_id,
4768 rkcg->rkcg_member_id->str,
4769 rkcg->rkcg_rk->rk_conf.group_instance_id)
4770 : NULL;
4771 rd_kafka_op_reply(rko, RD_KAFKA_RESP_ERR_NO_ERROR);
4772 rko = NULL;
4773 break;
4774
4775 case RD_KAFKA_OP_OFFSET_FETCH:
4776 if (rkcg->rkcg_state != RD_KAFKA_CGRP_STATE_UP ||
4777 (rkcg->rkcg_flags & RD_KAFKA_CGRP_F_TERMINATE)) {
4778 rd_kafka_op_handle_OffsetFetch(
4779 rkcg->rkcg_rk, NULL,
4780 RD_KAFKA_RESP_ERR__WAIT_COORD,
4781 NULL, NULL, rko);
4782 rko = NULL; /* rko freed by handler */
4783 break;
4784 }
4785
4786 rd_kafka_OffsetFetchRequest(
4787 rkcg->rkcg_coord,
4788 rko->rko_u.offset_fetch.partitions,
4789 rko->rko_u.offset_fetch.require_stable,
4790 RD_KAFKA_REPLYQ(rkcg->rkcg_ops, 0),
4791 rd_kafka_op_handle_OffsetFetch, rko);
4792 rko = NULL; /* rko now owned by request */
4793 break;
4794
4795 case RD_KAFKA_OP_PARTITION_JOIN:
4796 rd_kafka_cgrp_partition_add(rkcg, rktp);
4797
4798 /* If terminating tell the partition to leave */
4799 if (rkcg->rkcg_flags & RD_KAFKA_CGRP_F_TERMINATE)
4800 rd_kafka_toppar_op_fetch_stop(
4801 rktp, RD_KAFKA_NO_REPLYQ);
4802 break;
4803
4804 case RD_KAFKA_OP_PARTITION_LEAVE:
4805 rd_kafka_cgrp_partition_del(rkcg, rktp);
4806 break;
4807
4808 case RD_KAFKA_OP_OFFSET_COMMIT:
4809 /* Trigger offsets commit. */
4810 rd_kafka_cgrp_offsets_commit(rkcg, rko,
4811 /* only set offsets
4812 * if no partitions were
4813 * specified. */
4814 rko->rko_u.offset_commit.
4815 partitions ?
4816 0 : 1 /* set_offsets*/,
4817 rko->rko_u.offset_commit.reason);
4818 rko = NULL; /* rko now owned by request */
4819 break;
4820
4821 case RD_KAFKA_OP_COORD_QUERY:
4822 rd_kafka_cgrp_coord_query(rkcg,
4823 rko->rko_err ?
4824 rd_kafka_err2str(rko->
4825 rko_err):
4826 "from op");
4827 break;
4828
4829 case RD_KAFKA_OP_SUBSCRIBE:
4830 rd_kafka_app_polled(rk);
4831
4832 /* New atomic subscription (may be NULL) */
4833 err = rd_kafka_cgrp_subscribe(
4834 rkcg, rko->rko_u.subscribe.topics);
4835
4836 if (!err) /* now owned by rkcg */
4837 rko->rko_u.subscribe.topics = NULL;
4838
4839 rd_kafka_op_reply(rko, err);
4840 rko = NULL;
4841 break;
4842
4843 case RD_KAFKA_OP_ASSIGN:
4844 rd_kafka_cgrp_handle_assign_op(rkcg, rko);
4845 rko = NULL;
4846 break;
4847
4848 case RD_KAFKA_OP_GET_SUBSCRIPTION:
4849 if (rkcg->rkcg_next_subscription)
4850 rko->rko_u.subscribe.topics =
4851 rd_kafka_topic_partition_list_copy(
4852 rkcg->rkcg_next_subscription);
4853 else if (rkcg->rkcg_next_unsubscribe)
4854 rko->rko_u.subscribe.topics = NULL;
4855 else if (rkcg->rkcg_subscription)
4856 rko->rko_u.subscribe.topics =
4857 rd_kafka_topic_partition_list_copy(
4858 rkcg->rkcg_subscription);
4859 rd_kafka_op_reply(rko, 0);
4860 rko = NULL;
4861 break;
4862
4863 case RD_KAFKA_OP_GET_ASSIGNMENT:
4864 /* This is the consumer assignment, not the group assignment. */
4865 rko->rko_u.assign.partitions =
4866 rd_kafka_topic_partition_list_copy(
4867 rkcg->rkcg_rk->rk_consumer.assignment.all);
4868
4869 rd_kafka_op_reply(rko, 0);
4870 rko = NULL;
4871 break;
4872
4873 case RD_KAFKA_OP_GET_REBALANCE_PROTOCOL:
4874 rko->rko_u.rebalance_protocol.str =
4875 rd_kafka_rebalance_protocol2str(
4876 rd_kafka_cgrp_rebalance_protocol(rkcg));
4877 rd_kafka_op_reply(rko, RD_KAFKA_RESP_ERR_NO_ERROR);
4878 rko = NULL;
4879 break;
4880
4881 case RD_KAFKA_OP_TERMINATE:
4882 rd_kafka_cgrp_terminate0(rkcg, rko);
4883 rko = NULL; /* terminate0() takes ownership */
4884 break;
4885
4886 default:
4887 rd_kafka_assert(rkcg->rkcg_rk, !*"unknown type");
4888 break;
4889 }
4890
4891 if (rko)
4892 rd_kafka_op_destroy(rko);
4893
4894 return RD_KAFKA_OP_RES_HANDLED;
4895 }
4896
4897
4898 /**
4899 * @returns true if the session timeout has expired (due to no successful
4900 * Heartbeats in session.timeout.ms) and triggers a rebalance.
4901 */
4902 static rd_bool_t
rd_kafka_cgrp_session_timeout_check(rd_kafka_cgrp_t * rkcg,rd_ts_t now)4903 rd_kafka_cgrp_session_timeout_check (rd_kafka_cgrp_t *rkcg, rd_ts_t now) {
4904 rd_ts_t delta;
4905 char buf[256];
4906
4907 if (unlikely(!rkcg->rkcg_ts_session_timeout))
4908 return rd_true; /* Session has expired */
4909
4910 delta = now - rkcg->rkcg_ts_session_timeout;
4911 if (likely(delta < 0))
4912 return rd_false;
4913
4914 delta += rkcg->rkcg_rk->rk_conf.group_session_timeout_ms * 1000;
4915
4916 rd_snprintf(buf, sizeof(buf),
4917 "Consumer group session timed out (in join-state %s) after "
4918 "%"PRId64" ms without a successful response from the "
4919 "group coordinator (broker %"PRId32", last error was %s)",
4920 rd_kafka_cgrp_join_state_names[rkcg->rkcg_join_state],
4921 delta/1000, rkcg->rkcg_coord_id,
4922 rd_kafka_err2str(rkcg->rkcg_last_heartbeat_err));
4923
4924 rkcg->rkcg_last_heartbeat_err = RD_KAFKA_RESP_ERR_NO_ERROR;
4925
4926 rd_kafka_log(rkcg->rkcg_rk, LOG_WARNING, "SESSTMOUT",
4927 "%s: revoking assignment and rejoining group", buf);
4928
4929 /* Prevent further rebalances */
4930 rkcg->rkcg_ts_session_timeout = 0;
4931
4932 /* Timing out invalidates the member id, reset it
4933 * now to avoid an ERR_UNKNOWN_MEMBER_ID on the next join. */
4934 rd_kafka_cgrp_set_member_id(rkcg, "");
4935
4936 /* Revoke and rebalance */
4937 rd_kafka_cgrp_revoke_all_rejoin_maybe(rkcg,
4938 rd_true/*lost*/,
4939 rd_true/*initiating*/,
4940 buf);
4941
4942 return rd_true;
4943 }
4944
4945
4946 /**
4947 * @brief Apply the next waiting subscribe/unsubscribe, if any.
4948 */
rd_kafka_cgrp_apply_next_subscribe(rd_kafka_cgrp_t * rkcg)4949 static void rd_kafka_cgrp_apply_next_subscribe (rd_kafka_cgrp_t *rkcg) {
4950 rd_assert(rkcg->rkcg_join_state == RD_KAFKA_CGRP_JOIN_STATE_INIT);
4951
4952 if (rkcg->rkcg_next_subscription) {
4953 rd_kafka_topic_partition_list_t *next_subscription =
4954 rkcg->rkcg_next_subscription;
4955 rd_kafka_dbg(rkcg->rkcg_rk, CGRP, "SUBSCRIBE",
4956 "Group \"%s\": invoking waiting postponed "
4957 "subscribe", rkcg->rkcg_group_id->str);
4958 rkcg->rkcg_next_subscription = NULL;
4959 rd_kafka_cgrp_subscribe(rkcg, next_subscription);
4960
4961 } else if (rkcg->rkcg_next_unsubscribe) {
4962 rd_kafka_dbg(rkcg->rkcg_rk, CGRP, "SUBSCRIBE",
4963 "Group \"%s\": invoking waiting postponed "
4964 "unsubscribe", rkcg->rkcg_group_id->str);
4965 rkcg->rkcg_next_unsubscribe = rd_false;
4966 rd_kafka_cgrp_unsubscribe(rkcg, rd_true/*Leave*/);
4967 }
4968 }
4969
4970 /**
4971 * Client group's join state handling
4972 */
rd_kafka_cgrp_join_state_serve(rd_kafka_cgrp_t * rkcg)4973 static void rd_kafka_cgrp_join_state_serve (rd_kafka_cgrp_t *rkcg) {
4974 rd_ts_t now = rd_clock();
4975
4976 if (unlikely(rd_kafka_fatal_error_code(rkcg->rkcg_rk)))
4977 return;
4978
4979 switch (rkcg->rkcg_join_state)
4980 {
4981 case RD_KAFKA_CGRP_JOIN_STATE_INIT:
4982 /* If there is a next subscription, apply it. */
4983 rd_kafka_cgrp_apply_next_subscribe(rkcg);
4984
4985 /* If we have a subscription start the join process. */
4986 if (!rkcg->rkcg_subscription)
4987 break;
4988
4989 if (rd_interval_immediate(&rkcg->rkcg_join_intvl,
4990 1000*1000, now) > 0)
4991 rd_kafka_cgrp_join(rkcg);
4992 break;
4993
4994 case RD_KAFKA_CGRP_JOIN_STATE_WAIT_JOIN:
4995 case RD_KAFKA_CGRP_JOIN_STATE_WAIT_METADATA:
4996 case RD_KAFKA_CGRP_JOIN_STATE_WAIT_SYNC:
4997 case RD_KAFKA_CGRP_JOIN_STATE_WAIT_UNASSIGN_TO_COMPLETE:
4998 /* FIXME: I think we might have to send heartbeats in
4999 * in WAIT_INCR_UNASSIGN, yes-no? */
5000 case RD_KAFKA_CGRP_JOIN_STATE_WAIT_INCR_UNASSIGN_TO_COMPLETE:
5001 break;
5002
5003 case RD_KAFKA_CGRP_JOIN_STATE_STEADY:
5004 case RD_KAFKA_CGRP_JOIN_STATE_WAIT_ASSIGN_CALL:
5005 case RD_KAFKA_CGRP_JOIN_STATE_WAIT_UNASSIGN_CALL:
5006 if (rkcg->rkcg_flags & RD_KAFKA_CGRP_F_SUBSCRIPTION &&
5007 rd_interval(&rkcg->rkcg_heartbeat_intvl,
5008 rkcg->rkcg_rk->rk_conf.
5009 group_heartbeat_intvl_ms * 1000, now) > 0)
5010 rd_kafka_cgrp_heartbeat(rkcg);
5011 break;
5012 }
5013
5014 }
5015 /**
5016 * Client group handling.
5017 * Called from main thread to serve the operational aspects of a cgrp.
5018 */
rd_kafka_cgrp_serve(rd_kafka_cgrp_t * rkcg)5019 void rd_kafka_cgrp_serve (rd_kafka_cgrp_t *rkcg) {
5020 rd_kafka_broker_t *rkb = rkcg->rkcg_coord;
5021 int rkb_state = RD_KAFKA_BROKER_STATE_INIT;
5022 rd_ts_t now;
5023
5024 if (rkb) {
5025 rd_kafka_broker_lock(rkb);
5026 rkb_state = rkb->rkb_state;
5027 rd_kafka_broker_unlock(rkb);
5028
5029 /* Go back to querying state if we lost the current coordinator
5030 * connection. */
5031 if (rkb_state < RD_KAFKA_BROKER_STATE_UP &&
5032 rkcg->rkcg_state == RD_KAFKA_CGRP_STATE_UP)
5033 rd_kafka_cgrp_set_state(rkcg,
5034 RD_KAFKA_CGRP_STATE_QUERY_COORD);
5035 }
5036
5037 now = rd_clock();
5038
5039 /* Check for cgrp termination */
5040 if (unlikely(rd_kafka_cgrp_try_terminate(rkcg))) {
5041 rd_kafka_cgrp_terminated(rkcg);
5042 return; /* cgrp terminated */
5043 }
5044
5045 /* Bail out if we're terminating. */
5046 if (unlikely(rd_kafka_terminating(rkcg->rkcg_rk)))
5047 return;
5048
5049 /* Check session timeout regardless of current coordinator
5050 * connection state (rkcg_state) */
5051 if (rkcg->rkcg_join_state == RD_KAFKA_CGRP_JOIN_STATE_STEADY)
5052 rd_kafka_cgrp_session_timeout_check(rkcg, now);
5053
5054 retry:
5055 switch (rkcg->rkcg_state)
5056 {
5057 case RD_KAFKA_CGRP_STATE_TERM:
5058 break;
5059
5060 case RD_KAFKA_CGRP_STATE_INIT:
5061 rd_kafka_cgrp_set_state(rkcg, RD_KAFKA_CGRP_STATE_QUERY_COORD);
5062 /* FALLTHRU */
5063
5064 case RD_KAFKA_CGRP_STATE_QUERY_COORD:
5065 /* Query for coordinator. */
5066 if (rd_interval_immediate(&rkcg->rkcg_coord_query_intvl,
5067 500*1000, now) > 0)
5068 rd_kafka_cgrp_coord_query(rkcg,
5069 "intervaled in "
5070 "state query-coord");
5071 break;
5072
5073 case RD_KAFKA_CGRP_STATE_WAIT_COORD:
5074 /* Waiting for FindCoordinator response */
5075 break;
5076
5077 case RD_KAFKA_CGRP_STATE_WAIT_BROKER:
5078 /* See if the group should be reassigned to another broker. */
5079 if (rd_kafka_cgrp_coord_update(rkcg, rkcg->rkcg_coord_id))
5080 goto retry; /* Coordinator changed, retry state-machine
5081 * to speed up next transition. */
5082
5083 /* Coordinator query */
5084 if (rd_interval(&rkcg->rkcg_coord_query_intvl,
5085 1000*1000, now) > 0)
5086 rd_kafka_cgrp_coord_query(rkcg,
5087 "intervaled in "
5088 "state wait-broker");
5089 break;
5090
5091 case RD_KAFKA_CGRP_STATE_WAIT_BROKER_TRANSPORT:
5092 /* Waiting for broker transport to come up.
5093 * Also make sure broker supports groups. */
5094 if (rkb_state < RD_KAFKA_BROKER_STATE_UP || !rkb ||
5095 !rd_kafka_broker_supports(
5096 rkb, RD_KAFKA_FEATURE_BROKER_GROUP_COORD)) {
5097 /* Coordinator query */
5098 if (rd_interval(&rkcg->rkcg_coord_query_intvl,
5099 1000*1000, now) > 0)
5100 rd_kafka_cgrp_coord_query(
5101 rkcg,
5102 "intervaled in state "
5103 "wait-broker-transport");
5104
5105 } else {
5106 rd_kafka_cgrp_set_state(rkcg, RD_KAFKA_CGRP_STATE_UP);
5107
5108 /* Serve join state to trigger (re)join */
5109 rd_kafka_cgrp_join_state_serve(rkcg);
5110
5111 /* Serve any pending partitions in the assignment */
5112 rd_kafka_assignment_serve(rkcg->rkcg_rk);
5113 }
5114 break;
5115
5116 case RD_KAFKA_CGRP_STATE_UP:
5117 /* Move any ops awaiting the coordinator to the ops queue
5118 * for reprocessing. */
5119 rd_kafka_q_concat(rkcg->rkcg_ops, rkcg->rkcg_wait_coord_q);
5120
5121 /* Relaxed coordinator queries. */
5122 if (rd_interval(&rkcg->rkcg_coord_query_intvl,
5123 rkcg->rkcg_rk->rk_conf.
5124 coord_query_intvl_ms * 1000, now) > 0)
5125 rd_kafka_cgrp_coord_query(rkcg,
5126 "intervaled in state up");
5127
5128 rd_kafka_cgrp_join_state_serve(rkcg);
5129 break;
5130
5131 }
5132
5133 if (unlikely(rkcg->rkcg_state != RD_KAFKA_CGRP_STATE_UP &&
5134 rd_interval(&rkcg->rkcg_timeout_scan_intvl,
5135 1000*1000, now) > 0))
5136 rd_kafka_cgrp_timeout_scan(rkcg, now);
5137 }
5138
5139
5140
5141
5142
5143 /**
5144 * Send an op to a cgrp.
5145 *
5146 * Locality: any thread
5147 */
rd_kafka_cgrp_op(rd_kafka_cgrp_t * rkcg,rd_kafka_toppar_t * rktp,rd_kafka_replyq_t replyq,rd_kafka_op_type_t type,rd_kafka_resp_err_t err)5148 void rd_kafka_cgrp_op (rd_kafka_cgrp_t *rkcg, rd_kafka_toppar_t *rktp,
5149 rd_kafka_replyq_t replyq, rd_kafka_op_type_t type,
5150 rd_kafka_resp_err_t err) {
5151 rd_kafka_op_t *rko;
5152
5153 rko = rd_kafka_op_new(type);
5154 rko->rko_err = err;
5155 rko->rko_replyq = replyq;
5156
5157 if (rktp)
5158 rko->rko_rktp = rd_kafka_toppar_keep(rktp);
5159
5160 rd_kafka_q_enq(rkcg->rkcg_ops, rko);
5161 }
5162
5163
5164
5165
5166
5167
5168
rd_kafka_cgrp_set_member_id(rd_kafka_cgrp_t * rkcg,const char * member_id)5169 void rd_kafka_cgrp_set_member_id (rd_kafka_cgrp_t *rkcg, const char *member_id){
5170 if (rkcg->rkcg_member_id && member_id &&
5171 !rd_kafkap_str_cmp_str(rkcg->rkcg_member_id, member_id))
5172 return; /* No change */
5173
5174 rd_kafka_dbg(rkcg->rkcg_rk, CGRP, "MEMBERID",
5175 "Group \"%.*s\": updating member id \"%s\" -> \"%s\"",
5176 RD_KAFKAP_STR_PR(rkcg->rkcg_group_id),
5177 rkcg->rkcg_member_id ?
5178 rkcg->rkcg_member_id->str : "(not-set)",
5179 member_id ? member_id : "(not-set)");
5180
5181 if (rkcg->rkcg_member_id) {
5182 rd_kafkap_str_destroy(rkcg->rkcg_member_id);
5183 rkcg->rkcg_member_id = NULL;
5184 }
5185
5186 if (member_id)
5187 rkcg->rkcg_member_id = rd_kafkap_str_new(member_id, -1);
5188 }
5189
5190
5191 /**
5192 * @brief Determine owned partitions that no longer exist (partitions in
5193 * deleted or re-created topics).
5194 */
5195 static rd_kafka_topic_partition_list_t *
rd_kafka_cgrp_owned_but_not_exist_partitions(rd_kafka_cgrp_t * rkcg)5196 rd_kafka_cgrp_owned_but_not_exist_partitions (rd_kafka_cgrp_t *rkcg) {
5197 rd_kafka_topic_partition_list_t *result = NULL;
5198 const rd_kafka_topic_partition_t *curr;
5199
5200 if (!rkcg->rkcg_group_assignment)
5201 return NULL;
5202
5203 RD_KAFKA_TPLIST_FOREACH(curr, rkcg->rkcg_group_assignment) {
5204 if (rd_list_find(rkcg->rkcg_subscribed_topics,
5205 curr->topic, rd_kafka_topic_info_topic_cmp))
5206 continue;
5207
5208 if (!result)
5209 result = rd_kafka_topic_partition_list_new(
5210 rkcg->rkcg_group_assignment->cnt);
5211
5212 rd_kafka_topic_partition_list_add0(__FUNCTION__,__LINE__,
5213 result,
5214 curr->topic,
5215 curr->partition,
5216 curr->_private);
5217 }
5218
5219 return result;
5220 }
5221
5222
5223 /**
5224 * @brief Check if the latest metadata affects the current subscription:
5225 * - matched topic added
5226 * - matched topic removed
5227 * - matched topic's partition count change
5228 *
5229 * @locks none
5230 * @locality rdkafka main thread
5231 */
rd_kafka_cgrp_metadata_update_check(rd_kafka_cgrp_t * rkcg,rd_bool_t do_join)5232 void rd_kafka_cgrp_metadata_update_check (rd_kafka_cgrp_t *rkcg,
5233 rd_bool_t do_join) {
5234 rd_list_t *tinfos;
5235 rd_kafka_topic_partition_list_t *errored;
5236 rd_bool_t changed;
5237
5238 rd_kafka_assert(NULL, thrd_is_current(rkcg->rkcg_rk->rk_thread));
5239
5240 if (!rkcg->rkcg_subscription || rkcg->rkcg_subscription->cnt == 0)
5241 return;
5242
5243 /*
5244 * Unmatched topics will be added to the errored list.
5245 */
5246 errored = rd_kafka_topic_partition_list_new(0);
5247
5248 /*
5249 * Create a list of the topics in metadata that matches our subscription
5250 */
5251 tinfos = rd_list_new(rkcg->rkcg_subscription->cnt,
5252 (void *)rd_kafka_topic_info_destroy);
5253
5254 if (rkcg->rkcg_flags & RD_KAFKA_CGRP_F_WILDCARD_SUBSCRIPTION)
5255 rd_kafka_metadata_topic_match(rkcg->rkcg_rk,
5256 tinfos, rkcg->rkcg_subscription,
5257 errored);
5258 else
5259 rd_kafka_metadata_topic_filter(rkcg->rkcg_rk,
5260 tinfos,
5261 rkcg->rkcg_subscription,
5262 errored);
5263
5264
5265 /*
5266 * Propagate consumer errors for any non-existent or errored topics.
5267 * The function takes ownership of errored.
5268 */
5269 rd_kafka_propagate_consumer_topic_errors(
5270 rkcg, errored, "Subscribed topic not available");
5271
5272 /*
5273 * Update effective list of topics (takes ownership of \c tinfos)
5274 */
5275 changed = rd_kafka_cgrp_update_subscribed_topics(rkcg, tinfos);
5276
5277 if (!do_join ||
5278 (!changed &&
5279 /* If we get the same effective list of topics as last time around,
5280 * but the join is waiting for this metadata query to complete,
5281 * then we should not return here but follow through with the
5282 * (re)join below. */
5283 rkcg->rkcg_join_state != RD_KAFKA_CGRP_JOIN_STATE_WAIT_METADATA))
5284 return;
5285
5286 /* List of subscribed topics changed, trigger rejoin. */
5287 rd_kafka_dbg(rkcg->rkcg_rk,
5288 CGRP|RD_KAFKA_DBG_METADATA|RD_KAFKA_DBG_CONSUMER,
5289 "REJOIN",
5290 "Group \"%.*s\": "
5291 "subscription updated from metadata change: "
5292 "rejoining group",
5293 RD_KAFKAP_STR_PR(rkcg->rkcg_group_id));
5294
5295 if (rd_kafka_cgrp_rebalance_protocol(rkcg) ==
5296 RD_KAFKA_REBALANCE_PROTOCOL_COOPERATIVE) {
5297
5298 /* Partitions from deleted topics */
5299 rd_kafka_topic_partition_list_t *owned_but_not_exist =
5300 rd_kafka_cgrp_owned_but_not_exist_partitions(
5301 rkcg);
5302
5303 if (owned_but_not_exist) {
5304 rd_kafka_cgrp_assignment_set_lost(
5305 rkcg,
5306 "%d subscribed topic(s) no longer exist",
5307 owned_but_not_exist->cnt);
5308
5309 rd_kafka_rebalance_op_incr(
5310 rkcg,
5311 RD_KAFKA_RESP_ERR__REVOKE_PARTITIONS,
5312 owned_but_not_exist,
5313 rkcg->rkcg_group_leader.members != NULL
5314 /* Rejoin group following revoke's
5315 * unassign if we are leader */,
5316 "topics not available");
5317 rd_kafka_topic_partition_list_destroy(
5318 owned_but_not_exist);
5319
5320 } else {
5321 /* Nothing to revoke, rejoin group if we are the
5322 * leader.
5323 * The KIP says to rejoin the group on metadata
5324 * change only if we're the leader. But what if a
5325 * non-leader is subscribed to a regex that the others
5326 * aren't?
5327 * Going against the KIP and rejoining here. */
5328 rd_kafka_cgrp_rejoin(
5329 rkcg,
5330 "Metadata for subscribed topic(s) has "
5331 "changed");
5332
5333 }
5334
5335 } else {
5336 /* EAGER */
5337 rd_kafka_cgrp_revoke_rejoin(rkcg,
5338 "Metadata for subscribed topic(s) "
5339 "has changed");
5340 }
5341
5342 /* We shouldn't get stuck in this state. */
5343 rd_dassert(rkcg->rkcg_join_state !=
5344 RD_KAFKA_CGRP_JOIN_STATE_WAIT_METADATA);
5345 }
5346
5347
rd_kafka_cgrp_handle_SyncGroup(rd_kafka_cgrp_t * rkcg,rd_kafka_broker_t * rkb,rd_kafka_resp_err_t err,const rd_kafkap_bytes_t * member_state)5348 void rd_kafka_cgrp_handle_SyncGroup (rd_kafka_cgrp_t *rkcg,
5349 rd_kafka_broker_t *rkb,
5350 rd_kafka_resp_err_t err,
5351 const rd_kafkap_bytes_t *member_state) {
5352 rd_kafka_buf_t *rkbuf = NULL;
5353 rd_kafka_topic_partition_list_t *assignment = NULL;
5354 const int log_decode_errors = LOG_ERR;
5355 int16_t Version;
5356 rd_kafkap_bytes_t UserData;
5357
5358 /* Dont handle new assignments when terminating */
5359 if (!err && rkcg->rkcg_flags & RD_KAFKA_CGRP_F_TERMINATE)
5360 err = RD_KAFKA_RESP_ERR__DESTROY;
5361
5362 if (err)
5363 goto err;
5364
5365 if (RD_KAFKAP_BYTES_LEN(member_state) == 0) {
5366 /* Empty assignment. */
5367 assignment = rd_kafka_topic_partition_list_new(0);
5368 memset(&UserData, 0, sizeof(UserData));
5369 goto done;
5370 }
5371
5372 /* Parse assignment from MemberState */
5373 rkbuf = rd_kafka_buf_new_shadow(member_state->data,
5374 RD_KAFKAP_BYTES_LEN(member_state),
5375 NULL);
5376 /* Protocol parser needs a broker handle to log errors on. */
5377 if (rkb) {
5378 rkbuf->rkbuf_rkb = rkb;
5379 rd_kafka_broker_keep(rkb);
5380 } else
5381 rkbuf->rkbuf_rkb = rd_kafka_broker_internal(rkcg->rkcg_rk);
5382
5383 rd_kafka_buf_read_i16(rkbuf, &Version);
5384 if (!(assignment = rd_kafka_buf_read_topic_partitions(rkbuf, 0,
5385 rd_false,
5386 rd_false)))
5387 goto err_parse;
5388 rd_kafka_buf_read_bytes(rkbuf, &UserData);
5389
5390 done:
5391 rd_kafka_cgrp_update_session_timeout(rkcg, rd_true/*reset timeout*/);
5392
5393 rd_assert(rkcg->rkcg_assignor);
5394 if (rkcg->rkcg_assignor->rkas_on_assignment_cb) {
5395 char *member_id;
5396 RD_KAFKAP_STR_DUPA(&member_id, rkcg->rkcg_member_id);
5397 rd_kafka_consumer_group_metadata_t *cgmd =
5398 rd_kafka_consumer_group_metadata_new_with_genid(
5399 rkcg->rkcg_rk->rk_conf.group_id_str,
5400 rkcg->rkcg_generation_id, member_id,
5401 rkcg->rkcg_rk->rk_conf.group_instance_id);
5402 rkcg->rkcg_assignor->rkas_on_assignment_cb(
5403 rkcg->rkcg_assignor,
5404 &(rkcg->rkcg_assignor_state),
5405 assignment, &UserData, cgmd);
5406 rd_kafka_consumer_group_metadata_destroy(cgmd);
5407 }
5408
5409 // FIXME: Remove when we're done debugging.
5410 rd_kafka_topic_partition_list_log(rkcg->rkcg_rk, "ASSIGNMENT",
5411 RD_KAFKA_DBG_CGRP,
5412 assignment);
5413
5414 /* Set the new assignment */
5415 rd_kafka_cgrp_handle_assignment(rkcg, assignment);
5416
5417 rd_kafka_topic_partition_list_destroy(assignment);
5418
5419 if (rkbuf)
5420 rd_kafka_buf_destroy(rkbuf);
5421
5422 return;
5423
5424 err_parse:
5425 err = rkbuf->rkbuf_err;
5426
5427 err:
5428 if (rkbuf)
5429 rd_kafka_buf_destroy(rkbuf);
5430
5431 if (assignment)
5432 rd_kafka_topic_partition_list_destroy(assignment);
5433
5434 rd_kafka_dbg(rkcg->rkcg_rk, CGRP, "GRPSYNC",
5435 "Group \"%s\": synchronization failed: %s: rejoining",
5436 rkcg->rkcg_group_id->str, rd_kafka_err2str(err));
5437
5438 if (err == RD_KAFKA_RESP_ERR_FENCED_INSTANCE_ID)
5439 rd_kafka_set_fatal_error(rkcg->rkcg_rk, err,
5440 "Fatal consumer error: %s",
5441 rd_kafka_err2str(err));
5442 else if (err == RD_KAFKA_RESP_ERR_ILLEGAL_GENERATION)
5443 rkcg->rkcg_generation_id = -1;
5444 else if (err == RD_KAFKA_RESP_ERR_UNKNOWN_MEMBER_ID)
5445 rd_kafka_cgrp_set_member_id(rkcg, "");
5446
5447 if (rd_kafka_cgrp_rebalance_protocol(rkcg) ==
5448 RD_KAFKA_REBALANCE_PROTOCOL_COOPERATIVE &&
5449 (err == RD_KAFKA_RESP_ERR_ILLEGAL_GENERATION ||
5450 err == RD_KAFKA_RESP_ERR_UNKNOWN_MEMBER_ID))
5451 rd_kafka_cgrp_revoke_all_rejoin(
5452 rkcg,
5453 rd_true/*assignment is lost*/,
5454 rd_true/*this consumer is initiating*/,
5455 "SyncGroup error");
5456 else
5457 rd_kafka_cgrp_rejoin(rkcg, "SyncGroup error: %s",
5458 rd_kafka_err2str(err));
5459 }
5460
5461
5462
5463 rd_kafka_consumer_group_metadata_t *
rd_kafka_consumer_group_metadata_new(const char * group_id)5464 rd_kafka_consumer_group_metadata_new (const char *group_id) {
5465 rd_kafka_consumer_group_metadata_t *cgmetadata;
5466
5467 cgmetadata = rd_kafka_consumer_group_metadata_new_with_genid(group_id,
5468 -1, "",
5469 NULL);
5470
5471 return cgmetadata;
5472 }
5473
5474 rd_kafka_consumer_group_metadata_t *
rd_kafka_consumer_group_metadata_new_with_genid(const char * group_id,int32_t generation_id,const char * member_id,const char * group_instance_id)5475 rd_kafka_consumer_group_metadata_new_with_genid (const char *group_id,
5476 int32_t generation_id,
5477 const char *member_id,
5478 const char
5479 *group_instance_id) {
5480 rd_kafka_consumer_group_metadata_t *cgmetadata;
5481
5482 cgmetadata = rd_calloc(1, sizeof(*cgmetadata));
5483 cgmetadata->group_id = rd_strdup(group_id);
5484 cgmetadata->generation_id = generation_id;
5485 cgmetadata->member_id = rd_strdup(member_id);
5486 if (group_instance_id)
5487 cgmetadata->group_instance_id = rd_strdup(group_instance_id);
5488
5489 return cgmetadata;
5490 }
5491
5492 rd_kafka_consumer_group_metadata_t *
rd_kafka_consumer_group_metadata(rd_kafka_t * rk)5493 rd_kafka_consumer_group_metadata (rd_kafka_t *rk) {
5494 rd_kafka_consumer_group_metadata_t *cgmetadata;
5495 rd_kafka_op_t *rko;
5496 rd_kafka_cgrp_t *rkcg;
5497
5498 if (!(rkcg = rd_kafka_cgrp_get(rk)))
5499 return NULL;
5500
5501 rko = rd_kafka_op_req2(rkcg->rkcg_ops, RD_KAFKA_OP_CG_METADATA);
5502 if (!rko)
5503 return NULL;
5504
5505 cgmetadata = rko->rko_u.cg_metadata;
5506 rko->rko_u.cg_metadata = NULL;
5507 rd_kafka_op_destroy(rko);
5508
5509 return cgmetadata;
5510 }
5511
5512 void
rd_kafka_consumer_group_metadata_destroy(rd_kafka_consumer_group_metadata_t * cgmetadata)5513 rd_kafka_consumer_group_metadata_destroy (
5514 rd_kafka_consumer_group_metadata_t *cgmetadata) {
5515 rd_free(cgmetadata->group_id);
5516 rd_free(cgmetadata->member_id);
5517 if (cgmetadata->group_instance_id)
5518 rd_free(cgmetadata->group_instance_id);
5519 rd_free(cgmetadata);
5520 }
5521
5522 rd_kafka_consumer_group_metadata_t *
rd_kafka_consumer_group_metadata_dup(const rd_kafka_consumer_group_metadata_t * cgmetadata)5523 rd_kafka_consumer_group_metadata_dup (
5524 const rd_kafka_consumer_group_metadata_t *cgmetadata) {
5525 rd_kafka_consumer_group_metadata_t *ret;
5526
5527 ret = rd_calloc(1, sizeof(*cgmetadata));
5528 ret->group_id = rd_strdup(cgmetadata->group_id);
5529 ret->generation_id = cgmetadata->generation_id;
5530 ret->member_id = rd_strdup(cgmetadata->member_id);
5531 if (cgmetadata->group_instance_id)
5532 ret->group_instance_id = rd_strdup(
5533 cgmetadata->group_instance_id);
5534
5535 return ret;
5536 }
5537
5538 /*
5539 * Consumer group metadata serialization format v2:
5540 * "CGMDv2:"<generation_id><group_id>"\0"<member_id>"\0" \
5541 * <group_instance_id_is_null>[<group_instance_id>"\0"]
5542 * Where <group_id> is the group_id string.
5543 */
5544 static const char rd_kafka_consumer_group_metadata_magic[7] = "CGMDv2:";
5545
rd_kafka_consumer_group_metadata_write(const rd_kafka_consumer_group_metadata_t * cgmd,void ** bufferp,size_t * sizep)5546 rd_kafka_error_t *rd_kafka_consumer_group_metadata_write (
5547 const rd_kafka_consumer_group_metadata_t *cgmd,
5548 void **bufferp, size_t *sizep) {
5549 char *buf;
5550 size_t size;
5551 size_t of = 0;
5552 size_t magic_len = sizeof(rd_kafka_consumer_group_metadata_magic);
5553 size_t groupid_len = strlen(cgmd->group_id) + 1;
5554 size_t generationid_len = sizeof(cgmd->generation_id);
5555 size_t member_id_len = strlen(cgmd->member_id) + 1;
5556 int8_t group_instance_id_is_null = cgmd->group_instance_id ? 0 : 1;
5557 size_t group_instance_id_is_null_len = sizeof(group_instance_id_is_null);
5558 size_t group_instance_id_len = cgmd->group_instance_id
5559 ? strlen(cgmd->group_instance_id) + 1 : 0;
5560
5561 size = magic_len + groupid_len + generationid_len + member_id_len +
5562 group_instance_id_is_null_len + group_instance_id_len;
5563
5564 buf = rd_malloc(size);
5565
5566 memcpy(buf, rd_kafka_consumer_group_metadata_magic, magic_len);
5567 of += magic_len;
5568
5569 memcpy(buf+of, &cgmd->generation_id, generationid_len);
5570 of += generationid_len;
5571
5572 memcpy(buf+of, cgmd->group_id, groupid_len);
5573 of += groupid_len;
5574
5575 memcpy(buf+of, cgmd->member_id, member_id_len);
5576 of += member_id_len;
5577
5578 memcpy(buf+of, &group_instance_id_is_null, group_instance_id_is_null_len);
5579 of += group_instance_id_is_null_len;
5580
5581 if (!group_instance_id_is_null)
5582 memcpy(buf+of, cgmd->group_instance_id, group_instance_id_len);
5583 of += group_instance_id_len;
5584
5585 rd_assert(of == size);
5586
5587 *bufferp = buf;
5588 *sizep = size;
5589
5590 return NULL;
5591 }
5592
5593
5594 /*
5595 * Check that a string is printable, returning NULL if not or
5596 * a pointer immediately after the end of the string NUL
5597 * terminator if so.
5598 **/
str_is_printable(const char * s,const char * end)5599 static const char *str_is_printable(const char *s, const char *end) {
5600 const char *c;
5601 for (c = s ; *c && c != end ; c++)
5602 if (!isprint((int)*c))
5603 return NULL;
5604 return c + 1;
5605 }
5606
5607
rd_kafka_consumer_group_metadata_read(rd_kafka_consumer_group_metadata_t ** cgmdp,const void * buffer,size_t size)5608 rd_kafka_error_t *rd_kafka_consumer_group_metadata_read (
5609 rd_kafka_consumer_group_metadata_t **cgmdp,
5610 const void *buffer, size_t size) {
5611 const char *buf = (const char *)buffer;
5612 const char *end = buf + size;
5613 const char *next;
5614 size_t magic_len = sizeof(rd_kafka_consumer_group_metadata_magic);
5615 int32_t generation_id;
5616 size_t generationid_len = sizeof(generation_id);
5617 const char *group_id;
5618 const char *member_id;
5619 int8_t group_instance_id_is_null;
5620 const char *group_instance_id = NULL;
5621
5622 if (size < magic_len + generationid_len + 1 + 1 + 1)
5623 return rd_kafka_error_new(RD_KAFKA_RESP_ERR__BAD_MSG,
5624 "Input buffer is too short");
5625
5626 if (memcmp(buffer, rd_kafka_consumer_group_metadata_magic, magic_len))
5627 return rd_kafka_error_new(RD_KAFKA_RESP_ERR__BAD_MSG,
5628 "Input buffer is not a serialized "
5629 "consumer group metadata object");
5630 memcpy(&generation_id, buf+magic_len, generationid_len);
5631
5632 group_id = buf + magic_len + generationid_len;
5633 next = str_is_printable(group_id, end);
5634 if (!next)
5635 return rd_kafka_error_new(RD_KAFKA_RESP_ERR__BAD_MSG,
5636 "Input buffer group id is not safe");
5637
5638 member_id = next;
5639 next = str_is_printable(member_id, end);
5640 if (!next)
5641 return rd_kafka_error_new(RD_KAFKA_RESP_ERR__BAD_MSG,
5642 "Input buffer member id is not "
5643 "safe");
5644
5645 group_instance_id_is_null = (int8_t)*(next++);
5646 if (!group_instance_id_is_null) {
5647 group_instance_id = next;
5648 next = str_is_printable(group_instance_id, end);
5649 if (!next)
5650 return rd_kafka_error_new(RD_KAFKA_RESP_ERR__BAD_MSG,
5651 "Input buffer group "
5652 "instance id is not safe");
5653 }
5654
5655 if (next != end)
5656 return rd_kafka_error_new(RD_KAFKA_RESP_ERR__BAD_MSG,
5657 "Input buffer bad length");
5658
5659 *cgmdp = rd_kafka_consumer_group_metadata_new_with_genid(
5660 group_id,
5661 generation_id,
5662 member_id,
5663 group_instance_id);
5664
5665 return NULL;
5666 }
5667
5668
unittest_consumer_group_metadata_iteration(const char * group_id,int32_t generation_id,const char * member_id,const char * group_instance_id)5669 static int unittest_consumer_group_metadata_iteration(const char *group_id,
5670 int32_t generation_id,
5671 const char *member_id,
5672 const char *group_instance_id) {
5673 rd_kafka_consumer_group_metadata_t *cgmd;
5674 void *buffer, *buffer2;
5675 size_t size, size2;
5676 rd_kafka_error_t *error;
5677
5678 cgmd = rd_kafka_consumer_group_metadata_new_with_genid(
5679 group_id,
5680 generation_id,
5681 member_id,
5682 group_instance_id);
5683 RD_UT_ASSERT(cgmd != NULL, "failed to create metadata");
5684
5685 error = rd_kafka_consumer_group_metadata_write(cgmd, &buffer,
5686 &size);
5687 RD_UT_ASSERT(!error, "metadata_write failed: %s",
5688 rd_kafka_error_string(error));
5689
5690 rd_kafka_consumer_group_metadata_destroy(cgmd);
5691
5692 cgmd = NULL;
5693 error = rd_kafka_consumer_group_metadata_read(&cgmd, buffer,
5694 size);
5695 RD_UT_ASSERT(!error, "metadata_read failed: %s",
5696 rd_kafka_error_string(error));
5697
5698 /* Serialize again and compare buffers */
5699 error = rd_kafka_consumer_group_metadata_write(cgmd, &buffer2,
5700 &size2);
5701 RD_UT_ASSERT(!error, "metadata_write failed: %s",
5702 rd_kafka_error_string(error));
5703
5704 RD_UT_ASSERT(size == size2 && !memcmp(buffer, buffer2, size),
5705 "metadata_read/write size or content mismatch: "
5706 "size %"PRIusz", size2 %"PRIusz,
5707 size, size2);
5708
5709 rd_kafka_consumer_group_metadata_destroy(cgmd);
5710 rd_free(buffer);
5711 rd_free(buffer2);
5712
5713 return 0;
5714 }
5715
5716
unittest_consumer_group_metadata(void)5717 static int unittest_consumer_group_metadata (void) {
5718 const char *ids[] = {
5719 "mY. random id:.",
5720 "0",
5721 "2222222222222222222222221111111111111111111111111111112222",
5722 "",
5723 "NULL",
5724 NULL,
5725 };
5726 int i, j, k, gen_id;
5727 int ret;
5728 const char *group_id;
5729 const char *member_id;
5730 const char *group_instance_id;
5731
5732 for (i = 0 ; ids[i] ; i++) {
5733 for (j = 0; ids[j] ; j++) {
5734 for (k = 0; ids[k]; k++) {
5735 for (gen_id = -1; gen_id<1; gen_id++) {
5736 group_id = ids[i];
5737 member_id = ids[j];
5738 group_instance_id = ids[k];
5739 if (strcmp(group_instance_id,
5740 "NULL") == 0)
5741 group_instance_id = NULL;
5742 ret = unittest_consumer_group_metadata_iteration(
5743 group_id,
5744 gen_id,
5745 member_id,
5746 group_instance_id);
5747 if (ret)
5748 return ret;
5749 }
5750 }
5751 }
5752 }
5753
5754 RD_UT_PASS();
5755 }
5756
5757
unittest_set_intersect(void)5758 static int unittest_set_intersect (void) {
5759 size_t par_cnt = 10;
5760 map_toppar_member_info_t *dst;
5761 rd_kafka_topic_partition_t *toppar;
5762 PartitionMemberInfo_t *v;
5763 char *id = "id";
5764 rd_kafkap_str_t id1 = RD_KAFKAP_STR_INITIALIZER;
5765 rd_kafkap_str_t id2 = RD_KAFKAP_STR_INITIALIZER;
5766 rd_kafka_group_member_t *gm1;
5767 rd_kafka_group_member_t *gm2;
5768
5769 id1.len = 2;
5770 id1.str = id;
5771 id2.len = 2;
5772 id2.str = id;
5773
5774 map_toppar_member_info_t a = RD_MAP_INITIALIZER(
5775 par_cnt,
5776 rd_kafka_topic_partition_cmp,
5777 rd_kafka_topic_partition_hash,
5778 rd_kafka_topic_partition_destroy_free,
5779 PartitionMemberInfo_free);
5780
5781 map_toppar_member_info_t b = RD_MAP_INITIALIZER(
5782 par_cnt,
5783 rd_kafka_topic_partition_cmp,
5784 rd_kafka_topic_partition_hash,
5785 rd_kafka_topic_partition_destroy_free,
5786 PartitionMemberInfo_free);
5787
5788 gm1 = rd_calloc(1, sizeof(*gm1));
5789 gm1->rkgm_member_id = &id1;
5790 gm1->rkgm_group_instance_id = &id1;
5791 gm2 = rd_calloc(1, sizeof(*gm2));
5792 gm2->rkgm_member_id = &id2;
5793 gm2->rkgm_group_instance_id = &id2;
5794
5795 RD_MAP_SET(&a,
5796 rd_kafka_topic_partition_new("t1", 4),
5797 PartitionMemberInfo_new(gm1, rd_false));
5798 RD_MAP_SET(&a,
5799 rd_kafka_topic_partition_new("t2", 4),
5800 PartitionMemberInfo_new(gm1, rd_false));
5801 RD_MAP_SET(&a,
5802 rd_kafka_topic_partition_new("t1", 7),
5803 PartitionMemberInfo_new(gm1, rd_false));
5804
5805 RD_MAP_SET(&b,
5806 rd_kafka_topic_partition_new("t2", 7),
5807 PartitionMemberInfo_new(gm1, rd_false));
5808 RD_MAP_SET(&b,
5809 rd_kafka_topic_partition_new("t1", 4),
5810 PartitionMemberInfo_new(gm2, rd_false));
5811
5812 dst = rd_kafka_member_partitions_intersect(&a, &b);
5813
5814 RD_UT_ASSERT(RD_MAP_CNT(&a) == 3,
5815 "expected a cnt to be 3 not %d", (int)RD_MAP_CNT(&a));
5816 RD_UT_ASSERT(RD_MAP_CNT(&b) == 2,
5817 "expected b cnt to be 2 not %d", (int)RD_MAP_CNT(&b));
5818 RD_UT_ASSERT(RD_MAP_CNT(dst) == 1,
5819 "expected dst cnt to be 1 not %d", (int)RD_MAP_CNT(dst));
5820
5821 toppar = rd_kafka_topic_partition_new("t1", 4);
5822 RD_UT_ASSERT((v = RD_MAP_GET(dst, toppar)), "unexpected element");
5823 RD_UT_ASSERT(v->members_match, "expected members to match");
5824 rd_kafka_topic_partition_destroy(toppar);
5825
5826 RD_MAP_DESTROY(&a);
5827 RD_MAP_DESTROY(&b);
5828 RD_MAP_DESTROY(dst);
5829 rd_free(dst);
5830
5831 rd_free(gm1);
5832 rd_free(gm2);
5833
5834 RD_UT_PASS();
5835 }
5836
5837
unittest_set_subtract(void)5838 static int unittest_set_subtract (void) {
5839 size_t par_cnt = 10;
5840 rd_kafka_topic_partition_t *toppar;
5841 map_toppar_member_info_t *dst;
5842
5843 map_toppar_member_info_t a = RD_MAP_INITIALIZER(
5844 par_cnt,
5845 rd_kafka_topic_partition_cmp,
5846 rd_kafka_topic_partition_hash,
5847 rd_kafka_topic_partition_destroy_free,
5848 PartitionMemberInfo_free);
5849
5850 map_toppar_member_info_t b = RD_MAP_INITIALIZER(
5851 par_cnt,
5852 rd_kafka_topic_partition_cmp,
5853 rd_kafka_topic_partition_hash,
5854 rd_kafka_topic_partition_destroy_free,
5855 PartitionMemberInfo_free);
5856
5857 RD_MAP_SET(&a,
5858 rd_kafka_topic_partition_new("t1", 4),
5859 PartitionMemberInfo_new(NULL, rd_false));
5860 RD_MAP_SET(&a,
5861 rd_kafka_topic_partition_new("t2", 7),
5862 PartitionMemberInfo_new(NULL, rd_false));
5863
5864 RD_MAP_SET(&b,
5865 rd_kafka_topic_partition_new("t2", 4),
5866 PartitionMemberInfo_new(NULL, rd_false));
5867 RD_MAP_SET(&b,
5868 rd_kafka_topic_partition_new("t1", 4),
5869 PartitionMemberInfo_new(NULL, rd_false));
5870 RD_MAP_SET(&b,
5871 rd_kafka_topic_partition_new("t1", 7),
5872 PartitionMemberInfo_new(NULL, rd_false));
5873
5874 dst = rd_kafka_member_partitions_subtract(&a, &b);
5875
5876 RD_UT_ASSERT(RD_MAP_CNT(&a) == 2,
5877 "expected a cnt to be 2 not %d", (int)RD_MAP_CNT(&a));
5878 RD_UT_ASSERT(RD_MAP_CNT(&b) == 3,
5879 "expected b cnt to be 3 not %d", (int)RD_MAP_CNT(&b));
5880 RD_UT_ASSERT(RD_MAP_CNT(dst) == 1,
5881 "expected dst cnt to be 1 not %d", (int)RD_MAP_CNT(dst));
5882
5883 toppar = rd_kafka_topic_partition_new("t2", 7);
5884 RD_UT_ASSERT(RD_MAP_GET(dst, toppar), "unexpected element");
5885 rd_kafka_topic_partition_destroy(toppar);
5886
5887 RD_MAP_DESTROY(&a);
5888 RD_MAP_DESTROY(&b);
5889 RD_MAP_DESTROY(dst);
5890 rd_free(dst);
5891
5892 RD_UT_PASS();
5893 }
5894
5895
unittest_map_to_list(void)5896 static int unittest_map_to_list (void) {
5897 rd_kafka_topic_partition_list_t *list;
5898
5899 map_toppar_member_info_t map = RD_MAP_INITIALIZER(
5900 10,
5901 rd_kafka_topic_partition_cmp,
5902 rd_kafka_topic_partition_hash,
5903 rd_kafka_topic_partition_destroy_free,
5904 PartitionMemberInfo_free);
5905
5906 RD_MAP_SET(&map,
5907 rd_kafka_topic_partition_new("t1", 101),
5908 PartitionMemberInfo_new(NULL, rd_false));
5909
5910 list = rd_kafka_toppar_member_info_map_to_list(&map);
5911
5912 RD_UT_ASSERT(list->cnt == 1,
5913 "expecting list size of 1 not %d.", list->cnt);
5914 RD_UT_ASSERT(list->elems[0].partition == 101,
5915 "expecting partition 101 not %d",
5916 list->elems[0].partition);
5917 RD_UT_ASSERT(!strcmp(list->elems[0].topic, "t1"),
5918 "expecting topic 't1', not %s", list->elems[0].topic);
5919
5920 rd_kafka_topic_partition_list_destroy(list);
5921 RD_MAP_DESTROY(&map);
5922
5923 RD_UT_PASS();
5924 }
5925
5926
unittest_list_to_map(void)5927 static int unittest_list_to_map (void) {
5928 rd_kafka_topic_partition_t *toppar;
5929 map_toppar_member_info_t *map;
5930 rd_kafka_topic_partition_list_t *list =
5931 rd_kafka_topic_partition_list_new(1);
5932
5933 rd_kafka_topic_partition_list_add(list, "topic1", 201);
5934 rd_kafka_topic_partition_list_add(list, "topic2", 202);
5935
5936 map = rd_kafka_toppar_list_to_toppar_member_info_map(list);
5937
5938 RD_UT_ASSERT(RD_MAP_CNT(map) == 2,
5939 "expected map cnt to be 2 not %d", (int)RD_MAP_CNT(map));
5940 toppar = rd_kafka_topic_partition_new("topic1", 201);
5941 RD_UT_ASSERT(RD_MAP_GET(map, toppar),
5942 "expected topic1 [201] to exist in map");
5943 rd_kafka_topic_partition_destroy(toppar);
5944 toppar = rd_kafka_topic_partition_new("topic2", 202);
5945 RD_UT_ASSERT(RD_MAP_GET(map, toppar),
5946 "expected topic2 [202] to exist in map");
5947 rd_kafka_topic_partition_destroy(toppar);
5948
5949 RD_MAP_DESTROY(map);
5950 rd_free(map);
5951 rd_kafka_topic_partition_list_destroy(list);
5952
5953 RD_UT_PASS();
5954 }
5955
5956
5957 /**
5958 * @brief Consumer group unit tests
5959 */
unittest_cgrp(void)5960 int unittest_cgrp (void) {
5961 int fails = 0;
5962
5963 fails += unittest_consumer_group_metadata();
5964 fails += unittest_set_intersect();
5965 fails += unittest_set_subtract();
5966 fails += unittest_map_to_list();
5967 fails += unittest_list_to_map();
5968
5969 return fails;
5970 }
5971