1 /*
2  * librdkafka - Apache Kafka C library
3  *
4  * Copyright (c) 2012-2015, Magnus Edenhill
5  * All rights reserved.
6  *
7  * Redistribution and use in source and binary forms, with or without
8  * modification, are permitted provided that the following conditions are met:
9  *
10  * 1. Redistributions of source code must retain the above copyright notice,
11  *    this list of conditions and the following disclaimer.
12  * 2. Redistributions in binary form must reproduce the above copyright notice,
13  *    this list of conditions and the following disclaimer in the documentation
14  *    and/or other materials provided with the distribution.
15  *
16  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
17  * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
18  * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
19  * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
20  * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
21  * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
22  * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
23  * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
24  * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
25  * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
26  * POSSIBILITY OF SUCH DAMAGE.
27  */
28 
29 #include "rdkafka_int.h"
30 #include "rdkafka_broker.h"
31 #include "rdkafka_request.h"
32 #include "rdkafka_topic.h"
33 #include "rdkafka_partition.h"
34 #include "rdkafka_assignor.h"
35 #include "rdkafka_offset.h"
36 #include "rdkafka_metadata.h"
37 #include "rdkafka_cgrp.h"
38 #include "rdkafka_interceptor.h"
39 #include "rdmap.h"
40 
41 #include "rdunittest.h"
42 
43 #include <ctype.h>
44 #include <stdarg.h>
45 
46 static void rd_kafka_cgrp_offset_commit_tmr_cb (rd_kafka_timers_t *rkts,
47                                                 void *arg);
48 static rd_kafka_error_t *
49 rd_kafka_cgrp_assign (rd_kafka_cgrp_t *rkcg,
50                       rd_kafka_topic_partition_list_t *assignment);
51 static rd_kafka_error_t *rd_kafka_cgrp_unassign (rd_kafka_cgrp_t *rkcg);
52 static rd_kafka_error_t *
53 rd_kafka_cgrp_incremental_assign (rd_kafka_cgrp_t *rkcg,
54                                   rd_kafka_topic_partition_list_t
55                                   *partitions);
56 static rd_kafka_error_t *
57 rd_kafka_cgrp_incremental_unassign (rd_kafka_cgrp_t *rkcg,
58                                     rd_kafka_topic_partition_list_t
59                                     *partitions);
60 
61 static rd_kafka_op_res_t
62 rd_kafka_cgrp_op_serve (rd_kafka_t *rk, rd_kafka_q_t *rkq,
63                         rd_kafka_op_t *rko, rd_kafka_q_cb_type_t cb_type,
64                         void *opaque);
65 
66 static void rd_kafka_cgrp_group_leader_reset (rd_kafka_cgrp_t *rkcg,
67                                               const char *reason);
68 
69 static RD_INLINE int rd_kafka_cgrp_try_terminate (rd_kafka_cgrp_t *rkcg);
70 
71 static void rd_kafka_cgrp_revoke_all_rejoin (rd_kafka_cgrp_t *rkcg,
72                                              rd_bool_t assignment_lost,
73                                              rd_bool_t initiating,
74                                              const char *reason);
75 static void rd_kafka_cgrp_revoke_all_rejoin_maybe (rd_kafka_cgrp_t *rkcg,
76                                                    rd_bool_t
77                                                    assignment_lost,
78                                                    rd_bool_t initiating,
79                                                    const char *reason);
80 
81 static void rd_kafka_cgrp_group_is_rebalancing (rd_kafka_cgrp_t *rkcg);
82 
83 static void
84 rd_kafka_cgrp_max_poll_interval_check_tmr_cb (rd_kafka_timers_t *rkts,
85                                               void *arg);
86 static rd_kafka_resp_err_t
87 rd_kafka_cgrp_subscribe (rd_kafka_cgrp_t *rkcg,
88                          rd_kafka_topic_partition_list_t *rktparlist);
89 
90 static void rd_kafka_cgrp_group_assignment_set (
91         rd_kafka_cgrp_t *rkcg,
92         const rd_kafka_topic_partition_list_t *partitions);
93 static void rd_kafka_cgrp_group_assignment_modify (
94         rd_kafka_cgrp_t *rkcg,
95         rd_bool_t add,
96         const rd_kafka_topic_partition_list_t *partitions);
97 
98 
99 /**
100  * @returns true if the current assignment is lost.
101  */
rd_kafka_cgrp_assignment_is_lost(rd_kafka_cgrp_t * rkcg)102 rd_bool_t rd_kafka_cgrp_assignment_is_lost (rd_kafka_cgrp_t *rkcg) {
103         return rd_atomic32_get(&rkcg->rkcg_assignment_lost) != 0;
104 }
105 
106 
107 /**
108  * @brief Call when the current assignment has been lost, with a
109  *        human-readable reason.
110  */
111 static void rd_kafka_cgrp_assignment_set_lost (rd_kafka_cgrp_t *rkcg,
112                                                char *fmt, ...)
113         RD_FORMAT(printf, 2, 3);
rd_kafka_cgrp_assignment_set_lost(rd_kafka_cgrp_t * rkcg,char * fmt,...)114 static void rd_kafka_cgrp_assignment_set_lost (rd_kafka_cgrp_t *rkcg,
115                                                char *fmt, ...) {
116         va_list ap;
117         char reason[256];
118 
119         if (!rkcg->rkcg_group_assignment)
120                 return;
121 
122         va_start(ap, fmt);
123         rd_vsnprintf(reason, sizeof(reason), fmt, ap);
124         va_end(ap);
125 
126         rd_kafka_dbg(rkcg->rkcg_rk, CONSUMER|RD_KAFKA_DBG_CGRP, "LOST",
127                      "Group \"%s\": "
128                      "current assignment of %d partition(s) lost: %s",
129                      rkcg->rkcg_group_id->str,
130                      rkcg->rkcg_group_assignment->cnt,
131                      reason);
132 
133         rd_atomic32_set(&rkcg->rkcg_assignment_lost, rd_true);
134 }
135 
136 
137 /**
138  * @brief Call when the current assignment is no longer considered lost, with a
139  *        human-readable reason.
140  */
rd_kafka_cgrp_assignment_clear_lost(rd_kafka_cgrp_t * rkcg,char * fmt,...)141 static void rd_kafka_cgrp_assignment_clear_lost (rd_kafka_cgrp_t *rkcg,
142                                                  char *fmt, ...) {
143         va_list ap;
144         char reason[256];
145 
146         if (!rd_atomic32_get(&rkcg->rkcg_assignment_lost))
147                 return;
148 
149         va_start(ap, fmt);
150         rd_vsnprintf(reason, sizeof(reason), fmt, ap);
151         va_end(ap);
152 
153         rd_kafka_dbg(rkcg->rkcg_rk, CONSUMER|RD_KAFKA_DBG_CGRP, "LOST",
154                      "Group \"%s\": "
155                      "current assignment no longer considered lost: %s",
156                      rkcg->rkcg_group_id->str, reason);
157 
158         rd_atomic32_set(&rkcg->rkcg_assignment_lost, rd_false);
159 }
160 
161 
162 
163 /**
164  * @struct Auxillary glue type used for COOPERATIVE rebalance set operations.
165  */
166 typedef struct PartitionMemberInfo_s {
167         const rd_kafka_group_member_t *member;
168         rd_bool_t members_match;
169 } PartitionMemberInfo_t;
170 
PartitionMemberInfo_new(const rd_kafka_group_member_t * member,rd_bool_t members_match)171 static PartitionMemberInfo_t *PartitionMemberInfo_new (
172                 const rd_kafka_group_member_t *member,
173                 rd_bool_t members_match) {
174         PartitionMemberInfo_t *pmi;
175 
176         pmi = rd_calloc(1, sizeof(*pmi));
177         pmi->member = member;
178         pmi->members_match = members_match;
179 
180         return pmi;
181 }
182 
PartitionMemberInfo_free(void * p)183 static void PartitionMemberInfo_free (void *p) {
184         PartitionMemberInfo_t *pmi = p;
185         rd_free(pmi);
186 }
187 
188 typedef RD_MAP_TYPE(const rd_kafka_topic_partition_t *,
189                     PartitionMemberInfo_t *) map_toppar_member_info_t;
190 
191 
192 /**
193  * @returns true if consumer has joined the group and thus requires a leave.
194  */
195 #define RD_KAFKA_CGRP_HAS_JOINED(rkcg)                          \
196         (rkcg->rkcg_member_id != NULL &&                        \
197          RD_KAFKAP_STR_LEN((rkcg)->rkcg_member_id) > 0)
198 
199 
200 /**
201  * @returns true if cgrp is waiting for a rebalance_cb to be handled by
202  *          the application.
203  */
204 #define RD_KAFKA_CGRP_WAIT_ASSIGN_CALL(rkcg)                    \
205         ((rkcg)->rkcg_join_state ==                             \
206          RD_KAFKA_CGRP_JOIN_STATE_WAIT_ASSIGN_CALL ||           \
207          (rkcg)->rkcg_join_state ==                             \
208          RD_KAFKA_CGRP_JOIN_STATE_WAIT_UNASSIGN_CALL)
209 
210 /**
211  * @returns true if a rebalance is in progress.
212  *
213  * 1. In WAIT_JOIN or WAIT_METADATA state with a member-id set,
214  *    this happens on rejoin.
215  * 2. In WAIT_SYNC waiting for the group to rebalance on the broker.
216  * 3. in *_WAIT_UNASSIGN_TO_COMPLETE waiting for unassigned partitions to
217  *    stop fetching, et.al.
218  * 4. In _WAIT_*ASSIGN_CALL waiting for the application to handle the
219  *    assignment changes in its rebalance callback and then call *assign().
220  * 5. An incremental rebalancing is in progress.
221  * 6. A rebalance-induced rejoin is in progress.
222  */
223 #define RD_KAFKA_CGRP_REBALANCING(rkcg)                                 \
224         ((RD_KAFKA_CGRP_HAS_JOINED(rkcg) &&                             \
225           ((rkcg)->rkcg_join_state ==                                   \
226            RD_KAFKA_CGRP_JOIN_STATE_WAIT_JOIN ||                        \
227            (rkcg)->rkcg_join_state ==                                   \
228            RD_KAFKA_CGRP_JOIN_STATE_WAIT_METADATA)) ||                  \
229          (rkcg)->rkcg_join_state == RD_KAFKA_CGRP_JOIN_STATE_WAIT_SYNC || \
230          (rkcg)->rkcg_join_state ==                                     \
231          RD_KAFKA_CGRP_JOIN_STATE_WAIT_UNASSIGN_TO_COMPLETE ||          \
232          (rkcg)->rkcg_join_state ==                                     \
233          RD_KAFKA_CGRP_JOIN_STATE_WAIT_INCR_UNASSIGN_TO_COMPLETE ||     \
234          (rkcg)->rkcg_join_state ==                                     \
235          RD_KAFKA_CGRP_JOIN_STATE_WAIT_ASSIGN_CALL ||                   \
236          (rkcg)->rkcg_join_state ==                                     \
237          RD_KAFKA_CGRP_JOIN_STATE_WAIT_UNASSIGN_CALL ||                 \
238          (rkcg)->rkcg_rebalance_incr_assignment != NULL ||              \
239          (rkcg)->rkcg_rebalance_rejoin)
240 
241 
242 
243 const char *rd_kafka_cgrp_state_names[] = {
244         "init",
245         "term",
246         "query-coord",
247         "wait-coord",
248         "wait-broker",
249         "wait-broker-transport",
250         "up"
251 };
252 
253 const char *rd_kafka_cgrp_join_state_names[] = {
254         "init",
255         "wait-join",
256         "wait-metadata",
257         "wait-sync",
258         "wait-assign-call",
259         "wait-unassign-call",
260         "wait-unassign-to-complete",
261         "wait-incr-unassign-to-complete",
262         "steady",
263 };
264 
265 
266 /**
267  * @brief Change the cgrp state.
268  *
269  * @returns 1 if the state was changed, else 0.
270  */
rd_kafka_cgrp_set_state(rd_kafka_cgrp_t * rkcg,int state)271 static int rd_kafka_cgrp_set_state (rd_kafka_cgrp_t *rkcg, int state) {
272         if ((int)rkcg->rkcg_state == state)
273                 return 0;
274 
275         rd_kafka_dbg(rkcg->rkcg_rk, CGRP, "CGRPSTATE",
276                      "Group \"%.*s\" changed state %s -> %s "
277                      "(join-state %s)",
278                      RD_KAFKAP_STR_PR(rkcg->rkcg_group_id),
279                      rd_kafka_cgrp_state_names[rkcg->rkcg_state],
280                      rd_kafka_cgrp_state_names[state],
281                      rd_kafka_cgrp_join_state_names[rkcg->rkcg_join_state]);
282         rkcg->rkcg_state = state;
283         rkcg->rkcg_ts_statechange = rd_clock();
284 
285 	rd_kafka_brokers_broadcast_state_change(rkcg->rkcg_rk);
286 
287         return 1;
288 }
289 
290 
rd_kafka_cgrp_set_join_state(rd_kafka_cgrp_t * rkcg,int join_state)291 void rd_kafka_cgrp_set_join_state (rd_kafka_cgrp_t *rkcg, int join_state) {
292         if ((int)rkcg->rkcg_join_state == join_state)
293                 return;
294 
295         rd_kafka_dbg(rkcg->rkcg_rk, CGRP, "CGRPJOINSTATE",
296                      "Group \"%.*s\" changed join state %s -> %s "
297                      "(state %s)",
298                      RD_KAFKAP_STR_PR(rkcg->rkcg_group_id),
299                      rd_kafka_cgrp_join_state_names[rkcg->rkcg_join_state],
300                      rd_kafka_cgrp_join_state_names[join_state],
301                      rd_kafka_cgrp_state_names[rkcg->rkcg_state]);
302         rkcg->rkcg_join_state = join_state;
303 }
304 
305 
rd_kafka_cgrp_destroy_final(rd_kafka_cgrp_t * rkcg)306 void rd_kafka_cgrp_destroy_final (rd_kafka_cgrp_t *rkcg) {
307         rd_kafka_assert(rkcg->rkcg_rk, !rkcg->rkcg_subscription);
308         rd_kafka_assert(rkcg->rkcg_rk, !rkcg->rkcg_group_leader.members);
309         rd_kafka_cgrp_set_member_id(rkcg, NULL);
310         if (rkcg->rkcg_group_instance_id)
311                  rd_kafkap_str_destroy(rkcg->rkcg_group_instance_id);
312 
313         rd_kafka_q_destroy_owner(rkcg->rkcg_q);
314         rd_kafka_q_destroy_owner(rkcg->rkcg_ops);
315 	rd_kafka_q_destroy_owner(rkcg->rkcg_wait_coord_q);
316         rd_kafka_assert(rkcg->rkcg_rk, TAILQ_EMPTY(&rkcg->rkcg_topics));
317         rd_kafka_assert(rkcg->rkcg_rk, rd_list_empty(&rkcg->rkcg_toppars));
318         rd_list_destroy(&rkcg->rkcg_toppars);
319         rd_list_destroy(rkcg->rkcg_subscribed_topics);
320         rd_kafka_topic_partition_list_destroy(rkcg->rkcg_errored_topics);
321         if (rkcg->rkcg_assignor && rkcg->rkcg_assignor->rkas_destroy_state_cb)
322                 rkcg->rkcg_assignor->rkas_destroy_state_cb(
323                         rkcg->rkcg_assignor_state);
324         rd_free(rkcg);
325 }
326 
327 
328 
329 /**
330  * @brief Update the absolute session timeout following a successfull
331  *        response from the coordinator.
332  *        This timeout is used to enforce the session timeout in the
333  *        consumer itself.
334  *
335  * @param reset if true the timeout is updated even if the session has expired.
336  */
337 static RD_INLINE void
rd_kafka_cgrp_update_session_timeout(rd_kafka_cgrp_t * rkcg,rd_bool_t reset)338 rd_kafka_cgrp_update_session_timeout (rd_kafka_cgrp_t *rkcg, rd_bool_t reset) {
339         if (reset || rkcg->rkcg_ts_session_timeout != 0)
340                 rkcg->rkcg_ts_session_timeout = rd_clock() +
341                         (rkcg->rkcg_rk->rk_conf.group_session_timeout_ms*1000);
342 }
343 
344 
345 
rd_kafka_cgrp_new(rd_kafka_t * rk,const rd_kafkap_str_t * group_id,const rd_kafkap_str_t * client_id)346 rd_kafka_cgrp_t *rd_kafka_cgrp_new (rd_kafka_t *rk,
347                                     const rd_kafkap_str_t *group_id,
348                                     const rd_kafkap_str_t *client_id) {
349         rd_kafka_cgrp_t *rkcg;
350 
351         rkcg = rd_calloc(1, sizeof(*rkcg));
352 
353         rkcg->rkcg_rk = rk;
354         rkcg->rkcg_group_id = group_id;
355         rkcg->rkcg_client_id = client_id;
356         rkcg->rkcg_coord_id = -1;
357         rkcg->rkcg_generation_id = -1;
358 
359         rkcg->rkcg_ops = rd_kafka_q_new(rk);
360         rkcg->rkcg_ops->rkq_serve = rd_kafka_cgrp_op_serve;
361         rkcg->rkcg_ops->rkq_opaque = rkcg;
362         rkcg->rkcg_wait_coord_q = rd_kafka_q_new(rk);
363         rkcg->rkcg_wait_coord_q->rkq_serve = rkcg->rkcg_ops->rkq_serve;
364         rkcg->rkcg_wait_coord_q->rkq_opaque = rkcg->rkcg_ops->rkq_opaque;
365         rkcg->rkcg_q = rd_kafka_q_new(rk);
366         rkcg->rkcg_group_instance_id =
367                 rd_kafkap_str_new(rk->rk_conf.group_instance_id, -1);
368 
369         TAILQ_INIT(&rkcg->rkcg_topics);
370         rd_list_init(&rkcg->rkcg_toppars, 32, NULL);
371         rd_kafka_cgrp_set_member_id(rkcg, "");
372         rkcg->rkcg_subscribed_topics =
373                 rd_list_new(0, (void *)rd_kafka_topic_info_destroy);
374         rd_interval_init(&rkcg->rkcg_coord_query_intvl);
375         rd_interval_init(&rkcg->rkcg_heartbeat_intvl);
376         rd_interval_init(&rkcg->rkcg_join_intvl);
377         rd_interval_init(&rkcg->rkcg_timeout_scan_intvl);
378         rd_atomic32_init(&rkcg->rkcg_assignment_lost, rd_false);
379 
380         rkcg->rkcg_errored_topics = rd_kafka_topic_partition_list_new(0);
381 
382         /* Create a logical group coordinator broker to provide
383          * a dedicated connection for group coordination.
384          * This is needed since JoinGroup may block for up to
385          * max.poll.interval.ms, effectively blocking and timing out
386          * any other protocol requests (such as Metadata).
387          * The address for this broker will be updated when
388          * the group coordinator is assigned. */
389         rkcg->rkcg_coord = rd_kafka_broker_add_logical(rk, "GroupCoordinator");
390 
391         if (rk->rk_conf.enable_auto_commit &&
392             rk->rk_conf.auto_commit_interval_ms > 0)
393                 rd_kafka_timer_start(&rk->rk_timers,
394                                      &rkcg->rkcg_offset_commit_tmr,
395                                      rk->rk_conf.
396 				     auto_commit_interval_ms * 1000ll,
397                                      rd_kafka_cgrp_offset_commit_tmr_cb,
398                                      rkcg);
399 
400         return rkcg;
401 }
402 
403 
404 /**
405  * @brief Set the group coordinator broker.
406  */
rd_kafka_cgrp_coord_set_broker(rd_kafka_cgrp_t * rkcg,rd_kafka_broker_t * rkb)407 static void rd_kafka_cgrp_coord_set_broker (rd_kafka_cgrp_t *rkcg,
408                                             rd_kafka_broker_t *rkb) {
409 
410         rd_assert(rkcg->rkcg_curr_coord == NULL);
411 
412         rd_assert(RD_KAFKA_CGRP_BROKER_IS_COORD(rkcg, rkb));
413 
414         rkcg->rkcg_curr_coord = rkb;
415         rd_kafka_broker_keep(rkb);
416 
417         rd_kafka_dbg(rkcg->rkcg_rk, CGRP, "COORDSET",
418                      "Group \"%.*s\" coordinator set to broker %s",
419                      RD_KAFKAP_STR_PR(rkcg->rkcg_group_id),
420                      rd_kafka_broker_name(rkb));
421 
422         /* Reset query interval to trigger an immediate
423          * coord query if required */
424         if (!rd_interval_disabled(&rkcg->rkcg_coord_query_intvl))
425                 rd_interval_reset(&rkcg->rkcg_coord_query_intvl);
426 
427         rd_kafka_cgrp_set_state(rkcg,
428                                 RD_KAFKA_CGRP_STATE_WAIT_BROKER_TRANSPORT);
429 
430         rd_kafka_broker_persistent_connection_add(
431                 rkcg->rkcg_coord, &rkcg->rkcg_coord->rkb_persistconn.coord);
432 
433         /* Set the logical coordinator's nodename to the
434          * proper broker's nodename, this will trigger a (re)connect
435          * to the new address. */
436         rd_kafka_broker_set_nodename(rkcg->rkcg_coord, rkb);
437 }
438 
439 
440 /**
441  * @brief Reset/clear the group coordinator broker.
442  */
rd_kafka_cgrp_coord_clear_broker(rd_kafka_cgrp_t * rkcg)443 static void rd_kafka_cgrp_coord_clear_broker (rd_kafka_cgrp_t *rkcg) {
444         rd_kafka_broker_t *rkb = rkcg->rkcg_curr_coord;
445 
446         rd_assert(rkcg->rkcg_curr_coord);
447         rd_kafka_dbg(rkcg->rkcg_rk, CGRP, "COORDCLEAR",
448                      "Group \"%.*s\" broker %s is no longer coordinator",
449                      RD_KAFKAP_STR_PR(rkcg->rkcg_group_id),
450                      rd_kafka_broker_name(rkb));
451 
452         rd_assert(rkcg->rkcg_coord);
453 
454         rd_kafka_broker_persistent_connection_del(
455                 rkcg->rkcg_coord,
456                 &rkcg->rkcg_coord->rkb_persistconn.coord);
457 
458         /* Clear the ephemeral broker's nodename.
459          * This will also trigger a disconnect. */
460         rd_kafka_broker_set_nodename(rkcg->rkcg_coord, NULL);
461 
462         rkcg->rkcg_curr_coord = NULL;
463         rd_kafka_broker_destroy(rkb); /* from set_coord_broker() */
464 }
465 
466 
467 /**
468  * @brief Update/set the group coordinator.
469  *
470  * Will do nothing if there's been no change.
471  *
472  * @returns 1 if the coordinator, or state, was updated, else 0.
473  */
rd_kafka_cgrp_coord_update(rd_kafka_cgrp_t * rkcg,int32_t coord_id)474 static int rd_kafka_cgrp_coord_update (rd_kafka_cgrp_t *rkcg,
475                                        int32_t coord_id) {
476 
477         /* Don't do anything while terminating */
478         if (rkcg->rkcg_state == RD_KAFKA_CGRP_STATE_TERM)
479                 return 0;
480 
481         /* Check if coordinator changed */
482         if (rkcg->rkcg_coord_id != coord_id) {
483                 rd_kafka_dbg(rkcg->rkcg_rk, CGRP, "CGRPCOORD",
484                              "Group \"%.*s\" changing coordinator %"PRId32
485                              " -> %"PRId32,
486                              RD_KAFKAP_STR_PR(rkcg->rkcg_group_id),
487                              rkcg->rkcg_coord_id, coord_id);
488 
489                 /* Update coord id */
490                 rkcg->rkcg_coord_id = coord_id;
491 
492                 /* Clear previous broker handle, if any */
493                 if (rkcg->rkcg_curr_coord)
494                         rd_kafka_cgrp_coord_clear_broker(rkcg);
495         }
496 
497 
498         if (rkcg->rkcg_curr_coord) {
499                 /* There is already a known coordinator and a
500                  * corresponding broker handle. */
501                 if (rkcg->rkcg_state != RD_KAFKA_CGRP_STATE_UP)
502                         return rd_kafka_cgrp_set_state(
503                                 rkcg,
504                                 RD_KAFKA_CGRP_STATE_WAIT_BROKER_TRANSPORT);
505 
506         } else if (rkcg->rkcg_coord_id != -1) {
507                 rd_kafka_broker_t *rkb;
508 
509                 /* Try to find the coordinator broker handle */
510                 rd_kafka_rdlock(rkcg->rkcg_rk);
511                 rkb = rd_kafka_broker_find_by_nodeid(rkcg->rkcg_rk, coord_id);
512                 rd_kafka_rdunlock(rkcg->rkcg_rk);
513 
514                 /* It is possible, due to stale metadata, that the
515                  * coordinator id points to a broker we still don't know
516                  * about. In this case the client will continue
517                  * querying metadata and querying for the coordinator
518                  * until a match is found. */
519 
520                 if (rkb) {
521                         /* Coordinator is known and broker handle exists */
522                         rd_kafka_cgrp_coord_set_broker(rkcg, rkb);
523                         rd_kafka_broker_destroy(rkb); /*from find_by_nodeid()*/
524 
525                         return 1;
526                 } else {
527                         /* Coordinator is known but no corresponding
528                          * broker handle. */
529                         return rd_kafka_cgrp_set_state(
530                                 rkcg, RD_KAFKA_CGRP_STATE_WAIT_BROKER);
531 
532                 }
533 
534         } else {
535                 /* Coordinator still not known, re-query */
536                 if (rkcg->rkcg_state >= RD_KAFKA_CGRP_STATE_WAIT_COORD)
537                         return rd_kafka_cgrp_set_state(
538                                 rkcg, RD_KAFKA_CGRP_STATE_QUERY_COORD);
539         }
540 
541         return 0; /* no change */
542 }
543 
544 
545 
546 
547 /**
548  * Handle FindCoordinator response
549  */
rd_kafka_cgrp_handle_FindCoordinator(rd_kafka_t * rk,rd_kafka_broker_t * rkb,rd_kafka_resp_err_t err,rd_kafka_buf_t * rkbuf,rd_kafka_buf_t * request,void * opaque)550 static void rd_kafka_cgrp_handle_FindCoordinator (rd_kafka_t *rk,
551                                                   rd_kafka_broker_t *rkb,
552                                                   rd_kafka_resp_err_t err,
553                                                   rd_kafka_buf_t *rkbuf,
554                                                   rd_kafka_buf_t *request,
555                                                   void *opaque) {
556         const int log_decode_errors = LOG_ERR;
557         int16_t ErrorCode = 0;
558         int32_t CoordId;
559         rd_kafkap_str_t CoordHost = RD_ZERO_INIT;
560         int32_t CoordPort;
561         rd_kafka_cgrp_t *rkcg = opaque;
562         struct rd_kafka_metadata_broker mdb = RD_ZERO_INIT;
563         char *errstr = NULL;
564         int actions;
565 
566         if (likely(!(ErrorCode = err))) {
567                 if (rkbuf->rkbuf_reqhdr.ApiVersion >= 1)
568                         rd_kafka_buf_read_throttle_time(rkbuf);
569 
570                 rd_kafka_buf_read_i16(rkbuf, &ErrorCode);
571 
572                 if (rkbuf->rkbuf_reqhdr.ApiVersion >= 1) {
573                         rd_kafkap_str_t ErrorMsg;
574 
575                         rd_kafka_buf_read_str(rkbuf, &ErrorMsg);
576 
577                         if (!RD_KAFKAP_STR_IS_NULL(&ErrorMsg))
578                                 RD_KAFKAP_STR_DUPA(&errstr, &ErrorMsg);
579                 }
580 
581                 rd_kafka_buf_read_i32(rkbuf, &CoordId);
582                 rd_kafka_buf_read_str(rkbuf, &CoordHost);
583                 rd_kafka_buf_read_i32(rkbuf, &CoordPort);
584         }
585 
586         if (ErrorCode)
587                 goto err;
588 
589 
590         mdb.id = CoordId;
591 	RD_KAFKAP_STR_DUPA(&mdb.host, &CoordHost);
592 	mdb.port = CoordPort;
593 
594         rd_rkb_dbg(rkb, CGRP, "CGRPCOORD",
595                    "Group \"%.*s\" coordinator is %s:%i id %"PRId32,
596                    RD_KAFKAP_STR_PR(rkcg->rkcg_group_id),
597                    mdb.host, mdb.port, mdb.id);
598         rd_kafka_broker_update(rkb->rkb_rk, rkb->rkb_proto, &mdb, NULL);
599 
600         rd_kafka_cgrp_coord_update(rkcg, CoordId);
601         rd_kafka_cgrp_serve(rkcg); /* Serve updated state, if possible */
602         return;
603 
604 err_parse: /* Parse error */
605         ErrorCode = rkbuf->rkbuf_err;
606         /* FALLTHRU */
607 
608 err:
609         if (!errstr)
610                 errstr = (char *)rd_kafka_err2str(ErrorCode);
611 
612         rd_rkb_dbg(rkb, CGRP, "CGRPCOORD",
613                    "Group \"%.*s\" FindCoordinator response error: %s: %s",
614                    RD_KAFKAP_STR_PR(rkcg->rkcg_group_id),
615                    rd_kafka_err2name(ErrorCode), errstr);
616 
617         if (ErrorCode == RD_KAFKA_RESP_ERR__DESTROY)
618                 return;
619 
620         actions = rd_kafka_err_action(
621                 rkb, ErrorCode, request,
622 
623                 RD_KAFKA_ERR_ACTION_RETRY|RD_KAFKA_ERR_ACTION_REFRESH,
624                 RD_KAFKA_RESP_ERR_GROUP_COORDINATOR_NOT_AVAILABLE,
625 
626                 RD_KAFKA_ERR_ACTION_RETRY,
627                 RD_KAFKA_RESP_ERR__TRANSPORT,
628 
629                 RD_KAFKA_ERR_ACTION_RETRY,
630                 RD_KAFKA_RESP_ERR__TIMED_OUT,
631 
632                 RD_KAFKA_ERR_ACTION_RETRY,
633                 RD_KAFKA_RESP_ERR__TIMED_OUT_QUEUE,
634 
635                 RD_KAFKA_ERR_ACTION_END);
636 
637 
638 
639         if (actions & RD_KAFKA_ERR_ACTION_REFRESH) {
640                 rd_kafka_cgrp_coord_update(rkcg, -1);
641         } else {
642                 if (!(actions & RD_KAFKA_ERR_ACTION_RETRY) &&
643                     rkcg->rkcg_last_err != ErrorCode) {
644                         /* Propagate non-retriable errors to the application */
645                         rd_kafka_consumer_err(
646                                 rkcg->rkcg_q, rd_kafka_broker_id(rkb),
647                                 ErrorCode, 0, NULL, NULL,
648                                 RD_KAFKA_OFFSET_INVALID,
649                                 "FindCoordinator response error: %s", errstr);
650 
651                         /* Suppress repeated errors */
652                         rkcg->rkcg_last_err = ErrorCode;
653                 }
654 
655                 /* Retries are performed by the timer-intervalled
656                  * coord queries, continue querying */
657                 rd_kafka_cgrp_set_state(
658                         rkcg, RD_KAFKA_CGRP_STATE_QUERY_COORD);
659         }
660 
661         rd_kafka_cgrp_serve(rkcg); /* Serve updated state, if possible */
662 }
663 
664 
665 /**
666  * Query for coordinator.
667  * Ask any broker in state UP
668  *
669  * Locality: main thread
670  */
rd_kafka_cgrp_coord_query(rd_kafka_cgrp_t * rkcg,const char * reason)671 void rd_kafka_cgrp_coord_query (rd_kafka_cgrp_t *rkcg,
672 				const char *reason) {
673 	rd_kafka_broker_t *rkb;
674         rd_kafka_resp_err_t err;
675 
676         rkb = rd_kafka_broker_any_usable(rkcg->rkcg_rk,
677                                          RD_POLL_NOWAIT,
678                                          RD_DO_LOCK,
679                                          RD_KAFKA_FEATURE_BROKER_GROUP_COORD,
680                                          "coordinator query");
681 
682 	if (!rkb) {
683 		/* Reset the interval because there were no brokers. When a
684 		 * broker becomes available, we want to query it immediately. */
685 		rd_interval_reset(&rkcg->rkcg_coord_query_intvl);
686 		rd_kafka_dbg(rkcg->rkcg_rk, CGRP, "CGRPQUERY",
687 			     "Group \"%.*s\": "
688 			     "no broker available for coordinator query: %s",
689 			     RD_KAFKAP_STR_PR(rkcg->rkcg_group_id), reason);
690 		return;
691 	}
692 
693         rd_rkb_dbg(rkb, CGRP, "CGRPQUERY",
694                    "Group \"%.*s\": querying for coordinator: %s",
695                    RD_KAFKAP_STR_PR(rkcg->rkcg_group_id), reason);
696 
697         err = rd_kafka_FindCoordinatorRequest(
698                 rkb, RD_KAFKA_COORD_GROUP, rkcg->rkcg_group_id->str,
699                 RD_KAFKA_REPLYQ(rkcg->rkcg_ops, 0),
700                 rd_kafka_cgrp_handle_FindCoordinator, rkcg);
701 
702         if (err) {
703                 rd_rkb_dbg(rkb, CGRP, "CGRPQUERY",
704                            "Group \"%.*s\": "
705                            "unable to send coordinator query: %s",
706                            RD_KAFKAP_STR_PR(rkcg->rkcg_group_id),
707                            rd_kafka_err2str(err));
708                 rd_kafka_broker_destroy(rkb);
709                 return;
710         }
711 
712         if (rkcg->rkcg_state == RD_KAFKA_CGRP_STATE_QUERY_COORD)
713                 rd_kafka_cgrp_set_state(rkcg, RD_KAFKA_CGRP_STATE_WAIT_COORD);
714 
715 	rd_kafka_broker_destroy(rkb);
716 
717         /* Back off the next intervalled query since we just sent one. */
718         rd_interval_reset_to_now(&rkcg->rkcg_coord_query_intvl, 0);
719 }
720 
721 /**
722  * @brief Mark the current coordinator as dead.
723  *
724  * @locality main thread
725  */
rd_kafka_cgrp_coord_dead(rd_kafka_cgrp_t * rkcg,rd_kafka_resp_err_t err,const char * reason)726 void rd_kafka_cgrp_coord_dead (rd_kafka_cgrp_t *rkcg, rd_kafka_resp_err_t err,
727                                const char *reason) {
728         rd_kafka_dbg(rkcg->rkcg_rk, CGRP, "COORD",
729                      "Group \"%.*s\": "
730                      "marking the coordinator (%"PRId32") dead: %s: %s",
731                      RD_KAFKAP_STR_PR(rkcg->rkcg_group_id),
732                      rkcg->rkcg_coord_id, rd_kafka_err2str(err), reason);
733 
734 	rd_kafka_cgrp_coord_update(rkcg, -1);
735 
736 	/* Re-query for coordinator */
737 	rd_kafka_cgrp_set_state(rkcg, RD_KAFKA_CGRP_STATE_QUERY_COORD);
738 	rd_kafka_cgrp_coord_query(rkcg, reason);
739 }
740 
741 
742 /**
743  * @returns a new reference to the current coordinator, if available, else NULL.
744  *
745  * @locality rdkafka main thread
746  * @locks_required none
747  * @locks_acquired none
748  */
rd_kafka_cgrp_get_coord(rd_kafka_cgrp_t * rkcg)749 rd_kafka_broker_t *rd_kafka_cgrp_get_coord (rd_kafka_cgrp_t *rkcg) {
750         if (rkcg->rkcg_state != RD_KAFKA_CGRP_STATE_UP || !rkcg->rkcg_coord)
751                 return NULL;
752 
753         rd_kafka_broker_keep(rkcg->rkcg_coord);
754 
755         return rkcg->rkcg_coord;
756 }
757 
758 
759 /**
760  * @brief cgrp handling of LeaveGroup responses
761  * @param opaque must be the cgrp handle.
762  * @locality rdkafka main thread (unless err==ERR__DESTROY)
763  */
rd_kafka_cgrp_handle_LeaveGroup(rd_kafka_t * rk,rd_kafka_broker_t * rkb,rd_kafka_resp_err_t err,rd_kafka_buf_t * rkbuf,rd_kafka_buf_t * request,void * opaque)764 static void rd_kafka_cgrp_handle_LeaveGroup (rd_kafka_t *rk,
765                                              rd_kafka_broker_t *rkb,
766                                              rd_kafka_resp_err_t err,
767                                              rd_kafka_buf_t *rkbuf,
768                                              rd_kafka_buf_t *request,
769                                              void *opaque) {
770         rd_kafka_cgrp_t *rkcg = opaque;
771         const int log_decode_errors = LOG_ERR;
772         int16_t ErrorCode = 0;
773 
774         if (err) {
775                 ErrorCode = err;
776                 goto err;
777         }
778 
779         if (request->rkbuf_reqhdr.ApiVersion >= 1)
780                 rd_kafka_buf_read_throttle_time(rkbuf);
781 
782         rd_kafka_buf_read_i16(rkbuf, &ErrorCode);
783 
784 err:
785         if (ErrorCode)
786                 rd_kafka_dbg(rkb->rkb_rk, CGRP, "LEAVEGROUP",
787                              "LeaveGroup response error in state %s: %s",
788                              rd_kafka_cgrp_state_names[rkcg->rkcg_state],
789                              rd_kafka_err2str(ErrorCode));
790         else
791                 rd_kafka_dbg(rkb->rkb_rk, CGRP, "LEAVEGROUP",
792                              "LeaveGroup response received in state %s",
793                              rd_kafka_cgrp_state_names[rkcg->rkcg_state]);
794 
795         if (ErrorCode != RD_KAFKA_RESP_ERR__DESTROY) {
796                 rd_assert(thrd_is_current(rk->rk_thread));
797                 rkcg->rkcg_flags &= ~RD_KAFKA_CGRP_F_WAIT_LEAVE;
798                 rd_kafka_cgrp_try_terminate(rkcg);
799         }
800 
801 
802 
803         return;
804 
805  err_parse:
806         ErrorCode = rkbuf->rkbuf_err;
807         goto err;
808 }
809 
810 
rd_kafka_cgrp_leave(rd_kafka_cgrp_t * rkcg)811 static void rd_kafka_cgrp_leave (rd_kafka_cgrp_t *rkcg) {
812         char *member_id;
813 
814         RD_KAFKAP_STR_DUPA(&member_id, rkcg->rkcg_member_id);
815 
816         /* Leaving the group invalidates the member id, reset it
817          * now to avoid an ERR_UNKNOWN_MEMBER_ID on the next join. */
818         rd_kafka_cgrp_set_member_id(rkcg, "");
819 
820         if (rkcg->rkcg_flags & RD_KAFKA_CGRP_F_WAIT_LEAVE) {
821                 rd_kafka_dbg(rkcg->rkcg_rk, CGRP, "LEAVE",
822                              "Group \"%.*s\": leave (in state %s): "
823                              "LeaveGroupRequest already in-transit",
824                              RD_KAFKAP_STR_PR(rkcg->rkcg_group_id),
825                              rd_kafka_cgrp_state_names[rkcg->rkcg_state]);
826                 return;
827         }
828 
829         rd_kafka_dbg(rkcg->rkcg_rk, CGRP, "LEAVE",
830                      "Group \"%.*s\": leave (in state %s)",
831                      RD_KAFKAP_STR_PR(rkcg->rkcg_group_id),
832                      rd_kafka_cgrp_state_names[rkcg->rkcg_state]);
833 
834         rkcg->rkcg_flags |= RD_KAFKA_CGRP_F_WAIT_LEAVE;
835 
836         if (rkcg->rkcg_state == RD_KAFKA_CGRP_STATE_UP) {
837                 rd_rkb_dbg(rkcg->rkcg_curr_coord, CONSUMER, "LEAVE",
838                            "Leaving group");
839                 rd_kafka_LeaveGroupRequest(rkcg->rkcg_coord,
840                                            rkcg->rkcg_group_id->str,
841                                            member_id,
842                                            RD_KAFKA_REPLYQ(rkcg->rkcg_ops, 0),
843                                            rd_kafka_cgrp_handle_LeaveGroup,
844                                            rkcg);
845         } else
846                 rd_kafka_cgrp_handle_LeaveGroup(rkcg->rkcg_rk,
847                                                 rkcg->rkcg_coord,
848                                                 RD_KAFKA_RESP_ERR__WAIT_COORD,
849                                                 NULL, NULL, rkcg);
850 }
851 
852 
853 /**
854  * @brief Leave group, if desired.
855  *
856  * @returns true if a LeaveGroup was issued, else false.
857  */
rd_kafka_cgrp_leave_maybe(rd_kafka_cgrp_t * rkcg)858 static rd_bool_t rd_kafka_cgrp_leave_maybe (rd_kafka_cgrp_t *rkcg) {
859 
860         /* We were not instructed to leave in the first place. */
861         if (!(rkcg->rkcg_flags & RD_KAFKA_CGRP_F_LEAVE_ON_UNASSIGN_DONE))
862                 return rd_false;
863 
864         rkcg->rkcg_flags &= ~RD_KAFKA_CGRP_F_LEAVE_ON_UNASSIGN_DONE;
865 
866         /* Don't send Leave when termating with NO_CONSUMER_CLOSE flag */
867         if (rd_kafka_destroy_flags_no_consumer_close(rkcg->rkcg_rk))
868                 return rd_false;
869 
870         /* KIP-345: Static group members must not send a LeaveGroupRequest
871          * on termination. */
872         if (RD_KAFKA_CGRP_IS_STATIC_MEMBER(rkcg) &&
873             rkcg->rkcg_flags & RD_KAFKA_CGRP_F_TERMINATE)
874                 return rd_false;
875 
876         rd_kafka_cgrp_leave(rkcg);
877 
878         return rd_true;
879 }
880 
881 
882 /**
883  * @brief Enqueues a rebalance op, delegating responsibility of calling
884  *        incremental_assign / incremental_unassign to the application.
885  *        If there is no rebalance handler configured, or the action
886  *        should not be delegated to the application for some other
887  *        reason, incremental_assign / incremental_unassign will be called
888  *        automatically, immediately.
889  *
890  * @param rejoin whether or not to rejoin the group following completion
891  *        of the incremental assign / unassign.
892  *
893  * @remarks does not take ownership of \p partitions.
894  */
895 void
rd_kafka_rebalance_op_incr(rd_kafka_cgrp_t * rkcg,rd_kafka_resp_err_t err,rd_kafka_topic_partition_list_t * partitions,rd_bool_t rejoin,const char * reason)896 rd_kafka_rebalance_op_incr (rd_kafka_cgrp_t *rkcg,
897                             rd_kafka_resp_err_t err,
898                             rd_kafka_topic_partition_list_t *partitions,
899                             rd_bool_t rejoin,
900                             const char *reason) {
901         rd_kafka_error_t *error;
902 
903         /* Flag to rejoin after completion of the incr_assign or incr_unassign,
904            if required. */
905         rkcg->rkcg_rebalance_rejoin = rejoin;
906 
907         rd_kafka_wrlock(rkcg->rkcg_rk);
908         rkcg->rkcg_c.ts_rebalance = rd_clock();
909         rkcg->rkcg_c.rebalance_cnt++;
910         rd_kafka_wrunlock(rkcg->rkcg_rk);
911 
912         if (rd_kafka_destroy_flags_no_consumer_close(rkcg->rkcg_rk) ||
913             rd_kafka_fatal_error_code(rkcg->rkcg_rk)) {
914                 /* Total unconditional unassign in these cases */
915                 rd_kafka_cgrp_unassign(rkcg);
916 
917                 /* Now serve the assignment to make updates */
918                 rd_kafka_assignment_serve(rkcg->rkcg_rk);
919                 goto done;
920         }
921 
922         rd_kafka_cgrp_set_join_state(
923                 rkcg,
924                 err == RD_KAFKA_RESP_ERR__ASSIGN_PARTITIONS ?
925                 RD_KAFKA_CGRP_JOIN_STATE_WAIT_ASSIGN_CALL :
926                 RD_KAFKA_CGRP_JOIN_STATE_WAIT_UNASSIGN_CALL);
927 
928         /* Schedule application rebalance callback/event if enabled */
929         if (rkcg->rkcg_rk->rk_conf.enabled_events & RD_KAFKA_EVENT_REBALANCE) {
930                 rd_kafka_op_t *rko;
931 
932                 rd_kafka_dbg(rkcg->rkcg_rk, CGRP, "ASSIGN",
933                              "Group \"%s\": delegating incremental %s of %d "
934                              "partition(s) to application on queue %s: %s",
935                              rkcg->rkcg_group_id->str,
936                              err == RD_KAFKA_RESP_ERR__REVOKE_PARTITIONS ?
937                              "revoke" : "assign", partitions->cnt,
938                              rd_kafka_q_dest_name(rkcg->rkcg_q), reason);
939 
940                 /* Pause currently assigned partitions while waiting for
941                  * rebalance callback to get called to make sure the
942                  * application will not receive any more messages that
943                  * might block it from serving the rebalance callback
944                  * and to not process messages for partitions it
945                  * might have lost in the rebalance. */
946                 rd_kafka_assignment_pause(rkcg->rkcg_rk,
947                                           "incremental rebalance");
948 
949                 rko = rd_kafka_op_new(RD_KAFKA_OP_REBALANCE);
950                 rko->rko_err = err;
951                 rko->rko_u.rebalance.partitions =
952                         rd_kafka_topic_partition_list_copy(partitions);
953 
954                 if (rd_kafka_q_enq(rkcg->rkcg_q, rko))
955                         goto done; /* Rebalance op successfully enqueued */
956 
957                 rd_kafka_dbg(rkcg->rkcg_rk, CGRP, "CGRP",
958                              "Group \"%s\": ops queue is disabled, not "
959                              "delegating partition %s to application",
960                              rkcg->rkcg_group_id->str,
961                              err == RD_KAFKA_RESP_ERR__REVOKE_PARTITIONS ?
962                              "unassign" : "assign");
963                 /* FALLTHRU */
964         }
965 
966         /* No application rebalance callback/event handler, or it is not
967          * available, do the assign/unassign ourselves.
968          * We need to be careful here not to trigger assignment_serve()
969          * since it may call into the cgrp code again, in which case we
970          * can't really track what the outcome state will be. */
971 
972         if (err == RD_KAFKA_RESP_ERR__ASSIGN_PARTITIONS)
973                 error = rd_kafka_cgrp_incremental_assign(rkcg, partitions);
974         else
975                 error = rd_kafka_cgrp_incremental_unassign(rkcg, partitions);
976 
977         if (error) {
978                 rd_kafka_log(rkcg->rkcg_rk, LOG_ERR, "REBALANCE",
979                              "Group \"%s\": internal incremental %s "
980                              "of %d partition(s) failed: %s: "
981                              "unassigning all partitions and rejoining",
982                              rkcg->rkcg_group_id->str,
983                              err == RD_KAFKA_RESP_ERR__REVOKE_PARTITIONS ?
984                              "unassign" : "assign",
985                              partitions->cnt,
986                              rd_kafka_error_string(error));
987                 rd_kafka_error_destroy(error);
988 
989                 rd_kafka_cgrp_set_join_state(rkcg,
990                                              /* This is a clean state for
991                                               * assignment_done() to rejoin
992                                               * from. */
993                                              RD_KAFKA_CGRP_JOIN_STATE_STEADY);
994                 rd_kafka_assignment_clear(rkcg->rkcg_rk);
995         }
996 
997         /* Now serve the assignment to make updates */
998         rd_kafka_assignment_serve(rkcg->rkcg_rk);
999 
1000  done:
1001         /* Update the current group assignment based on the
1002          * added/removed partitions. */
1003         rd_kafka_cgrp_group_assignment_modify(
1004                 rkcg,
1005                 err == RD_KAFKA_RESP_ERR__ASSIGN_PARTITIONS,
1006                 partitions);
1007 }
1008 
1009 
1010 /**
1011  * @brief Enqueues a rebalance op, delegating responsibility of calling
1012  *        assign / unassign to the application. If there is no rebalance
1013  *        handler configured, or the action should not be delegated to the
1014  *        application for some other reason, assign / unassign will be
1015  *        called automatically.
1016  *
1017  * @remarks \p partitions is copied.
1018  */
1019 static void
rd_kafka_rebalance_op(rd_kafka_cgrp_t * rkcg,rd_kafka_resp_err_t err,rd_kafka_topic_partition_list_t * assignment,const char * reason)1020 rd_kafka_rebalance_op (rd_kafka_cgrp_t *rkcg,
1021                        rd_kafka_resp_err_t err,
1022                        rd_kafka_topic_partition_list_t *assignment,
1023                        const char *reason) {
1024         rd_kafka_error_t *error;
1025 
1026         rd_kafka_wrlock(rkcg->rkcg_rk);
1027         rkcg->rkcg_c.ts_rebalance = rd_clock();
1028         rkcg->rkcg_c.rebalance_cnt++;
1029         rd_kafka_wrunlock(rkcg->rkcg_rk);
1030 
1031         if (rd_kafka_destroy_flags_no_consumer_close(rkcg->rkcg_rk) ||
1032             rd_kafka_fatal_error_code(rkcg->rkcg_rk)) {
1033                 /* Unassign */
1034                 rd_kafka_cgrp_unassign(rkcg);
1035 
1036                 /* Now serve the assignment to make updates */
1037                 rd_kafka_assignment_serve(rkcg->rkcg_rk);
1038                 goto done;
1039         }
1040 
1041         rd_assert(assignment != NULL);
1042 
1043         rd_kafka_cgrp_set_join_state(
1044                 rkcg,
1045                 err == RD_KAFKA_RESP_ERR__ASSIGN_PARTITIONS ?
1046                 RD_KAFKA_CGRP_JOIN_STATE_WAIT_ASSIGN_CALL :
1047                 RD_KAFKA_CGRP_JOIN_STATE_WAIT_UNASSIGN_CALL);
1048 
1049         /* Schedule application rebalance callback/event if enabled */
1050         if (rkcg->rkcg_rk->rk_conf.enabled_events & RD_KAFKA_EVENT_REBALANCE) {
1051                 rd_kafka_op_t *rko;
1052 
1053                 rd_kafka_dbg(rkcg->rkcg_rk, CGRP, "ASSIGN",
1054                              "Group \"%s\": delegating %s of %d partition(s) "
1055                              "to application on queue %s: %s",
1056                              rkcg->rkcg_group_id->str,
1057                              err == RD_KAFKA_RESP_ERR__REVOKE_PARTITIONS ?
1058                              "revoke":"assign", assignment->cnt,
1059                              rd_kafka_q_dest_name(rkcg->rkcg_q), reason);
1060 
1061                 /* Pause currently assigned partitions while waiting for
1062                  * rebalance callback to get called to make sure the
1063                  * application will not receive any more messages that
1064                  * might block it from serving the rebalance callback
1065                  * and to not process messages for partitions it
1066                  * might have lost in the rebalance. */
1067                 rd_kafka_assignment_pause(rkcg->rkcg_rk, "rebalance");
1068 
1069                 rko = rd_kafka_op_new(RD_KAFKA_OP_REBALANCE);
1070                 rko->rko_err = err;
1071                 rko->rko_u.rebalance.partitions =
1072                         rd_kafka_topic_partition_list_copy(assignment);
1073 
1074                 if (rd_kafka_q_enq(rkcg->rkcg_q, rko))
1075                         goto done; /* Rebalance op successfully enqueued */
1076 
1077                 rd_kafka_dbg(rkcg->rkcg_rk, CGRP, "CGRP",
1078                              "Group \"%s\": ops queue is disabled, not "
1079                              "delegating partition %s to application",
1080                              rkcg->rkcg_group_id->str,
1081                              err == RD_KAFKA_RESP_ERR__REVOKE_PARTITIONS ?
1082                              "unassign" : "assign");
1083 
1084                 /* FALLTHRU */
1085         }
1086 
1087         /* No application rebalance callback/event handler, or it is not
1088          * available, do the assign/unassign ourselves.
1089          * We need to be careful here not to trigger assignment_serve()
1090          * since it may call into the cgrp code again, in which case we
1091          * can't really track what the outcome state will be. */
1092 
1093         if (err == RD_KAFKA_RESP_ERR__ASSIGN_PARTITIONS)
1094                 error = rd_kafka_cgrp_assign(rkcg, assignment);
1095         else
1096                 error = rd_kafka_cgrp_unassign(rkcg);
1097 
1098         if (error) {
1099                 rd_kafka_log(rkcg->rkcg_rk, LOG_ERR, "REBALANCE",
1100                              "Group \"%s\": internal %s "
1101                              "of %d partition(s) failed: %s: "
1102                              "unassigning all partitions and rejoining",
1103                              rkcg->rkcg_group_id->str,
1104                              err == RD_KAFKA_RESP_ERR__REVOKE_PARTITIONS ?
1105                              "unassign" : "assign",
1106                              rkcg->rkcg_group_assignment->cnt,
1107                              rd_kafka_error_string(error));
1108                 rd_kafka_error_destroy(error);
1109 
1110                 rd_kafka_cgrp_set_join_state(rkcg,
1111                                              /* This is a clean state for
1112                                               * assignment_done() to rejoin
1113                                               * from. */
1114                                              RD_KAFKA_CGRP_JOIN_STATE_STEADY);
1115                 rd_kafka_assignment_clear(rkcg->rkcg_rk);
1116         }
1117 
1118         /* Now serve the assignment to make updates */
1119         rd_kafka_assignment_serve(rkcg->rkcg_rk);
1120 
1121  done:
1122         if (err == RD_KAFKA_RESP_ERR__ASSIGN_PARTITIONS)
1123                 rd_kafka_cgrp_group_assignment_set(rkcg, assignment);
1124         else
1125                 rd_kafka_cgrp_group_assignment_set(rkcg, NULL);
1126 }
1127 
1128 
1129 /**
1130  * @brief Rejoin the group.
1131  *
1132  * @remark This function must not have any side-effects but setting the
1133  *         join state.
1134  */
1135 static void rd_kafka_cgrp_rejoin (rd_kafka_cgrp_t *rkcg, const char *fmt, ...)
1136         RD_FORMAT(printf, 2, 3);
1137 
rd_kafka_cgrp_rejoin(rd_kafka_cgrp_t * rkcg,const char * fmt,...)1138 static void rd_kafka_cgrp_rejoin (rd_kafka_cgrp_t *rkcg, const char *fmt, ...) {
1139         char reason[512];
1140         va_list ap;
1141         char astr[128];
1142 
1143         va_start(ap, fmt);
1144         rd_vsnprintf(reason, sizeof(reason), fmt, ap);
1145         va_end(ap);
1146 
1147         if (rkcg->rkcg_group_assignment)
1148                 rd_snprintf(astr, sizeof(astr), " with %d owned partition(s)",
1149                             rkcg->rkcg_group_assignment->cnt);
1150         else
1151                 rd_snprintf(astr, sizeof(astr), " without an assignment");
1152 
1153         if (rkcg->rkcg_subscription || rkcg->rkcg_next_subscription) {
1154                 rd_kafka_dbg(rkcg->rkcg_rk, CONSUMER|RD_KAFKA_DBG_CGRP,
1155                              "REJOIN",
1156                              "Group \"%s\": %s group%s: %s",
1157                              rkcg->rkcg_group_id->str,
1158                              rkcg->rkcg_join_state ==
1159                              RD_KAFKA_CGRP_JOIN_STATE_INIT ?
1160                              "Joining" : "Rejoining",
1161                              astr, reason);
1162         } else {
1163                 rd_kafka_dbg(rkcg->rkcg_rk,CONSUMER|RD_KAFKA_DBG_CGRP,
1164                              "NOREJOIN",
1165                              "Group \"%s\": Not %s group%s: %s: "
1166                              "no subscribed topics",
1167                              rkcg->rkcg_group_id->str,
1168                              rkcg->rkcg_join_state ==
1169                              RD_KAFKA_CGRP_JOIN_STATE_INIT ?
1170                              "joining" : "rejoining",
1171                              astr, reason);
1172 
1173                 rd_kafka_cgrp_leave_maybe(rkcg);
1174         }
1175 
1176         rd_kafka_cgrp_set_join_state(rkcg, RD_KAFKA_CGRP_JOIN_STATE_INIT);
1177 }
1178 
1179 
1180 /**
1181  * @brief Collect all assigned or owned partitions from group members.
1182  *        The member field of each result element is set to the associated
1183  *        group member. The members_match field is set to rd_false.
1184  *
1185  * @param members Array of group members.
1186  * @param member_cnt Number of elements in members.
1187  * @param par_cnt The total number of partitions expected to be collected.
1188  * @param collect_owned If rd_true, rkgm_owned partitions will be collected,
1189  *        else rkgm_assignment partitions will be collected.
1190  */
1191 static map_toppar_member_info_t *
rd_kafka_collect_partitions(const rd_kafka_group_member_t * members,size_t member_cnt,size_t par_cnt,rd_bool_t collect_owned)1192 rd_kafka_collect_partitions (const rd_kafka_group_member_t *members,
1193                              size_t member_cnt,
1194                              size_t par_cnt,
1195                              rd_bool_t collect_owned) {
1196         size_t i;
1197         map_toppar_member_info_t *collected = rd_calloc(1, sizeof(*collected));
1198 
1199         RD_MAP_INIT(
1200                 collected,
1201                 par_cnt,
1202                 rd_kafka_topic_partition_cmp,
1203                 rd_kafka_topic_partition_hash,
1204                 rd_kafka_topic_partition_destroy_free,
1205                 PartitionMemberInfo_free);
1206 
1207         for (i = 0 ; i<member_cnt ; i++) {
1208                 size_t j;
1209                 const rd_kafka_group_member_t *rkgm = &members[i];
1210                 const rd_kafka_topic_partition_list_t *toppars = collect_owned
1211                         ? rkgm->rkgm_owned
1212                         : rkgm->rkgm_assignment;
1213 
1214                 for (j = 0; j<(size_t)toppars->cnt; j++) {
1215                         rd_kafka_topic_partition_t *rktpar =
1216                                 rd_kafka_topic_partition_copy(
1217                                         &toppars->elems[j]);
1218                         PartitionMemberInfo_t *pmi =
1219                                 PartitionMemberInfo_new(rkgm, rd_false);
1220                         RD_MAP_SET(collected, rktpar, pmi);
1221                 }
1222         }
1223 
1224         return collected;
1225 }
1226 
1227 
1228 /**
1229  * @brief Set intersection. Returns a set of all elements of \p a that
1230  *        are also elements of \p b. Additionally, compares the members
1231  *        field of matching elements from \p a and \p b and if not NULL
1232  *        and equal, sets the members_match field in the result element
1233  *        to rd_true and the member field to equal that of the elements,
1234  *        else sets the members_match field to rd_false and member field
1235  *        to NULL.
1236  */
1237 static map_toppar_member_info_t *
rd_kafka_member_partitions_intersect(map_toppar_member_info_t * a,map_toppar_member_info_t * b)1238 rd_kafka_member_partitions_intersect (
1239                 map_toppar_member_info_t *a,
1240                 map_toppar_member_info_t *b) {
1241         const rd_kafka_topic_partition_t *key;
1242         const PartitionMemberInfo_t *a_v;
1243         map_toppar_member_info_t *intersection =
1244                 rd_calloc(1, sizeof(*intersection));
1245 
1246         RD_MAP_INIT(
1247                 intersection,
1248                 RD_MIN(a ? RD_MAP_CNT(a) : 1, b ? RD_MAP_CNT(b) : 1),
1249                 rd_kafka_topic_partition_cmp,
1250                 rd_kafka_topic_partition_hash,
1251                 rd_kafka_topic_partition_destroy_free,
1252                 PartitionMemberInfo_free);
1253 
1254         if (!a || !b)
1255                 return intersection;
1256 
1257         RD_MAP_FOREACH(key, a_v, a) {
1258                 rd_bool_t members_match;
1259                 const PartitionMemberInfo_t *b_v = RD_MAP_GET(b, key);
1260 
1261                 if (b_v == NULL)
1262                         continue;
1263 
1264                 members_match =
1265                         a_v->member &&
1266                         b_v->member &&
1267                         rd_kafka_group_member_cmp(a_v->member,
1268                                                   b_v->member) == 0;
1269 
1270                 RD_MAP_SET(intersection,
1271                            rd_kafka_topic_partition_copy(key),
1272                            PartitionMemberInfo_new(
1273                                 b_v->member,
1274                                 members_match));
1275         }
1276 
1277         return intersection;
1278 }
1279 
1280 
1281 /**
1282  * @brief Set subtraction. Returns a set of all elements of \p a
1283  *        that are not elements of \p b. Sets the member field in
1284  *        elements in the returned set to equal that of the
1285  *        corresponding element in \p a
1286  */
1287 static map_toppar_member_info_t *
rd_kafka_member_partitions_subtract(map_toppar_member_info_t * a,map_toppar_member_info_t * b)1288 rd_kafka_member_partitions_subtract (
1289                 map_toppar_member_info_t *a,
1290                 map_toppar_member_info_t *b) {
1291         const rd_kafka_topic_partition_t *key;
1292         const PartitionMemberInfo_t *a_v;
1293         map_toppar_member_info_t *difference =
1294                 rd_calloc(1, sizeof(*difference));
1295 
1296         RD_MAP_INIT(
1297                 difference,
1298                 a ? RD_MAP_CNT(a) : 1,
1299                 rd_kafka_topic_partition_cmp,
1300                 rd_kafka_topic_partition_hash,
1301                 rd_kafka_topic_partition_destroy_free,
1302                 PartitionMemberInfo_free);
1303 
1304         if (!a)
1305                 return difference;
1306 
1307         RD_MAP_FOREACH(key, a_v, a) {
1308                 const PartitionMemberInfo_t *b_v = b ? RD_MAP_GET(b, key)
1309                                                      : NULL;
1310 
1311                 if (!b_v)
1312                         RD_MAP_SET(difference,
1313                                    rd_kafka_topic_partition_copy(key),
1314                                    PartitionMemberInfo_new(a_v->member,
1315                                                            rd_false));
1316         }
1317 
1318         return difference;
1319 }
1320 
1321 
1322 /**
1323  * @brief Adjust the partition assignment as provided by the assignor
1324  *        according to the COOPERATIVE protocol.
1325  */
rd_kafka_cooperative_protocol_adjust_assignment(rd_kafka_cgrp_t * rkcg,rd_kafka_group_member_t * members,int member_cnt)1326 static void rd_kafka_cooperative_protocol_adjust_assignment (
1327                                         rd_kafka_cgrp_t *rkcg,
1328                                         rd_kafka_group_member_t *members,
1329                                         int member_cnt) {
1330 
1331         /* https://cwiki.apache.org/confluence/display/KAFKA/KIP-429%3A+Kafk\
1332            a+Consumer+Incremental+Rebalance+Protocol */
1333 
1334         int i;
1335         int expected_max_assignment_size;
1336         int total_assigned = 0;
1337         int not_revoking = 0;
1338         size_t par_cnt = 0;
1339         const rd_kafka_topic_partition_t *toppar;
1340         const PartitionMemberInfo_t *pmi;
1341         map_toppar_member_info_t *assigned;
1342         map_toppar_member_info_t *owned;
1343         map_toppar_member_info_t *maybe_revoking;
1344         map_toppar_member_info_t *ready_to_migrate;
1345         map_toppar_member_info_t *unknown_but_owned;
1346 
1347         for (i = 0 ; i<member_cnt ; i++)
1348                 par_cnt += members[i].rkgm_owned->cnt;
1349 
1350         assigned = rd_kafka_collect_partitions(members,
1351                                                member_cnt,
1352                                                par_cnt,
1353                                                rd_false/*assigned*/);
1354 
1355         owned = rd_kafka_collect_partitions(members,
1356                                             member_cnt,
1357                                             par_cnt,
1358                                             rd_true/*owned*/);
1359 
1360         rd_kafka_dbg(rkcg->rkcg_rk, CGRP, "CGRP",
1361                      "Group \"%s\": Partitions owned by members: %d, "
1362                      "partitions assigned by assignor: %d",
1363                      rkcg->rkcg_group_id->str,
1364                      (int)RD_MAP_CNT(owned), (int)RD_MAP_CNT(assigned));
1365 
1366         /* Still owned by some members */
1367         maybe_revoking =
1368                 rd_kafka_member_partitions_intersect(assigned,
1369                                                      owned);
1370 
1371         /* Not previously owned by anyone */
1372         ready_to_migrate =
1373                 rd_kafka_member_partitions_subtract(assigned,
1374                                                     owned);
1375 
1376         /* Don't exist in assigned partitions */
1377         unknown_but_owned =
1378                 rd_kafka_member_partitions_subtract(owned,
1379                                                     assigned);
1380 
1381         /* Rough guess at a size that is a bit higher than
1382          * the maximum number of partitions likely to be
1383          * assigned to any partition. */
1384         expected_max_assignment_size =
1385                 (int)(RD_MAP_CNT(assigned) / member_cnt) + 4;
1386 
1387         for (i = 0 ; i < member_cnt ; i++) {
1388                 rd_kafka_group_member_t *rkgm = &members[i];
1389                 rd_kafka_topic_partition_list_destroy(
1390                         rkgm->rkgm_assignment);
1391 
1392                 rkgm->rkgm_assignment =
1393                         rd_kafka_topic_partition_list_new(
1394                                 expected_max_assignment_size);
1395         }
1396 
1397         /* For maybe-revoking-partitions, check if the owner has
1398          * changed. If yes, exclude them from the assigned-partitions
1399          * list to the new owner. The old owner will realize it does
1400          * not own it any more, revoke it and then trigger another
1401          * rebalance for these partitions to finally be reassigned.
1402          */
1403         RD_MAP_FOREACH(toppar, pmi, maybe_revoking) {
1404                 if (!pmi->members_match)
1405                         /* Owner has changed. */
1406                         continue;
1407 
1408                 /* Owner hasn't changed. */
1409                 rd_kafka_topic_partition_list_add(
1410                         pmi->member->rkgm_assignment,
1411                         toppar->topic,
1412                         toppar->partition);
1413 
1414                 total_assigned++;
1415                 not_revoking++;
1416         }
1417 
1418         /* For ready-to-migrate-partitions, it is safe to move them
1419          * to the new member immediately since we know no one owns
1420          * it before, and hence we can encode the owner from the
1421          * newly-assigned-partitions directly.
1422          */
1423         RD_MAP_FOREACH(toppar, pmi, ready_to_migrate) {
1424                 rd_kafka_topic_partition_list_add(
1425                         pmi->member->rkgm_assignment,
1426                         toppar->topic,
1427                         toppar->partition);
1428                 total_assigned++;
1429         }
1430 
1431         /* For unknown-but-owned-partitions, it is also safe to just
1432          * give them back to whoever claimed to be their owners by
1433          * encoding them directly as well. If this is due to a topic
1434          * metadata update, then a later rebalance will be triggered
1435          * anyway.
1436          */
1437         RD_MAP_FOREACH(toppar, pmi, unknown_but_owned) {
1438                 rd_kafka_topic_partition_list_add(
1439                         pmi->member->rkgm_assignment,
1440                         toppar->topic,
1441                         toppar->partition);
1442                 total_assigned++;
1443         }
1444 
1445         rd_kafka_dbg(rkcg->rkcg_rk, CGRP, "CGRP",
1446                 "Group \"%s\": COOPERATIVE protocol collection sizes: "
1447                 "maybe revoking: %d, ready to migrate: %d, unknown but "
1448                 "owned: %d", rkcg->rkcg_group_id->str,
1449                 (int)RD_MAP_CNT(maybe_revoking),
1450                 (int)RD_MAP_CNT(ready_to_migrate),
1451                 (int)RD_MAP_CNT(unknown_but_owned));
1452 
1453         rd_kafka_dbg(rkcg->rkcg_rk, CGRP, "CGRP",
1454                 "Group \"%s\": %d partitions assigned to consumers",
1455                 rkcg->rkcg_group_id->str, total_assigned);
1456 
1457         RD_MAP_DESTROY_AND_FREE(maybe_revoking);
1458         RD_MAP_DESTROY_AND_FREE(ready_to_migrate);
1459         RD_MAP_DESTROY_AND_FREE(unknown_but_owned);
1460         RD_MAP_DESTROY_AND_FREE(assigned);
1461         RD_MAP_DESTROY_AND_FREE(owned);
1462 }
1463 
1464 
1465 
1466 /**
1467  * @brief Run group assignment.
1468  */
1469 static void
rd_kafka_cgrp_assignor_run(rd_kafka_cgrp_t * rkcg,rd_kafka_assignor_t * rkas,rd_kafka_resp_err_t err,rd_kafka_metadata_t * metadata,rd_kafka_group_member_t * members,int member_cnt)1470 rd_kafka_cgrp_assignor_run (rd_kafka_cgrp_t *rkcg,
1471                             rd_kafka_assignor_t *rkas,
1472                             rd_kafka_resp_err_t err,
1473                             rd_kafka_metadata_t *metadata,
1474                             rd_kafka_group_member_t *members,
1475                             int member_cnt) {
1476         char errstr[512];
1477 
1478         if (err) {
1479                 rd_snprintf(errstr, sizeof(errstr),
1480                             "Failed to get cluster metadata: %s",
1481                             rd_kafka_err2str(err));
1482                 goto err;
1483         }
1484 
1485         *errstr = '\0';
1486 
1487         /* Run assignor */
1488         err = rd_kafka_assignor_run(rkcg, rkas, metadata,
1489                                     members, member_cnt,
1490                                     errstr, sizeof(errstr));
1491 
1492         if (err) {
1493                 if (!*errstr)
1494                         rd_snprintf(errstr, sizeof(errstr), "%s",
1495                                     rd_kafka_err2str(err));
1496                 goto err;
1497         }
1498 
1499         rd_kafka_dbg(rkcg->rkcg_rk, CGRP|RD_KAFKA_DBG_CONSUMER, "ASSIGNOR",
1500                      "Group \"%s\": \"%s\" assignor run for %d member(s)",
1501                      rkcg->rkcg_group_id->str,
1502                      rkas->rkas_protocol_name->str,
1503                      member_cnt);
1504 
1505         if (rkas->rkas_protocol == RD_KAFKA_REBALANCE_PROTOCOL_COOPERATIVE)
1506                 rd_kafka_cooperative_protocol_adjust_assignment(rkcg,
1507                                                                 members,
1508                                                                 member_cnt);
1509 
1510         rd_kafka_cgrp_set_join_state(rkcg, RD_KAFKA_CGRP_JOIN_STATE_WAIT_SYNC);
1511 
1512         /* Respond to broker with assignment set or error */
1513         rd_kafka_SyncGroupRequest(rkcg->rkcg_coord,
1514                                   rkcg->rkcg_group_id,
1515                                   rkcg->rkcg_generation_id,
1516                                   rkcg->rkcg_member_id,
1517                                   rkcg->rkcg_group_instance_id,
1518                                   members, err ? 0 : member_cnt,
1519                                   RD_KAFKA_REPLYQ(rkcg->rkcg_ops, 0),
1520                                   rd_kafka_handle_SyncGroup, rkcg);
1521         return;
1522 
1523 err:
1524         rd_kafka_log(rkcg->rkcg_rk, LOG_ERR, "ASSIGNOR",
1525                      "Group \"%s\": failed to run assignor \"%s\" for "
1526                      "%d member(s): %s",
1527                      rkcg->rkcg_group_id->str,
1528                      rkas->rkas_protocol_name->str,
1529                      member_cnt, errstr);
1530 
1531         rd_kafka_cgrp_rejoin(rkcg, "%s assignor failed: %s",
1532                              rkas->rkas_protocol_name->str, errstr);
1533 }
1534 
1535 
1536 
1537 /**
1538  * @brief Op callback from handle_JoinGroup
1539  */
1540 static rd_kafka_op_res_t
rd_kafka_cgrp_assignor_handle_Metadata_op(rd_kafka_t * rk,rd_kafka_q_t * rkq,rd_kafka_op_t * rko)1541 rd_kafka_cgrp_assignor_handle_Metadata_op (rd_kafka_t *rk,
1542                                            rd_kafka_q_t *rkq,
1543                                            rd_kafka_op_t *rko) {
1544         rd_kafka_cgrp_t *rkcg = rk->rk_cgrp;
1545 
1546         if (rko->rko_err == RD_KAFKA_RESP_ERR__DESTROY)
1547                 return RD_KAFKA_OP_RES_HANDLED; /* Terminating */
1548 
1549         if (rkcg->rkcg_join_state != RD_KAFKA_CGRP_JOIN_STATE_WAIT_METADATA)
1550                 return RD_KAFKA_OP_RES_HANDLED; /* From outdated state */
1551 
1552         if (!rkcg->rkcg_group_leader.members) {
1553                 rd_kafka_dbg(rk, CGRP, "GRPLEADER",
1554                              "Group \"%.*s\": no longer leader: "
1555                              "not running assignor",
1556                              RD_KAFKAP_STR_PR(rkcg->rkcg_group_id));
1557                 return RD_KAFKA_OP_RES_HANDLED;
1558         }
1559 
1560         rd_kafka_cgrp_assignor_run(rkcg,
1561                                    rkcg->rkcg_assignor,
1562                                    rko->rko_err, rko->rko_u.metadata.md,
1563                                    rkcg->rkcg_group_leader.members,
1564                                    rkcg->rkcg_group_leader.member_cnt);
1565 
1566         return RD_KAFKA_OP_RES_HANDLED;
1567 }
1568 
1569 
1570 /**
1571  * Parse single JoinGroup.Members.MemberMetadata for "consumer" ProtocolType
1572  *
1573  * Protocol definition:
1574  * https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Client-side+Assignment+Proposal
1575  *
1576  * Returns 0 on success or -1 on error.
1577  */
1578 static int
rd_kafka_group_MemberMetadata_consumer_read(rd_kafka_broker_t * rkb,rd_kafka_group_member_t * rkgm,const rd_kafkap_bytes_t * MemberMetadata)1579 rd_kafka_group_MemberMetadata_consumer_read (
1580         rd_kafka_broker_t *rkb, rd_kafka_group_member_t *rkgm,
1581         const rd_kafkap_bytes_t *MemberMetadata) {
1582 
1583         rd_kafka_buf_t *rkbuf;
1584         int16_t Version;
1585         int32_t subscription_cnt;
1586         rd_kafkap_bytes_t UserData;
1587         const int log_decode_errors = LOG_ERR;
1588         rd_kafka_resp_err_t err = RD_KAFKA_RESP_ERR__BAD_MSG;
1589 
1590         /* Create a shadow-buffer pointing to the metadata to ease parsing. */
1591         rkbuf = rd_kafka_buf_new_shadow(MemberMetadata->data,
1592                                         RD_KAFKAP_BYTES_LEN(MemberMetadata),
1593                                         NULL);
1594 
1595         rd_kafka_buf_read_i16(rkbuf, &Version);
1596         rd_kafka_buf_read_i32(rkbuf, &subscription_cnt);
1597 
1598         if (subscription_cnt > 10000 || subscription_cnt <= 0)
1599                 goto err;
1600 
1601         rkgm->rkgm_subscription =
1602                 rd_kafka_topic_partition_list_new(subscription_cnt);
1603 
1604         while (subscription_cnt-- > 0) {
1605                 rd_kafkap_str_t Topic;
1606                 char *topic_name;
1607                 rd_kafka_buf_read_str(rkbuf, &Topic);
1608                 RD_KAFKAP_STR_DUPA(&topic_name, &Topic);
1609                 rd_kafka_topic_partition_list_add(rkgm->rkgm_subscription,
1610                                                   topic_name,
1611                                                   RD_KAFKA_PARTITION_UA);
1612         }
1613 
1614         rd_kafka_buf_read_bytes(rkbuf, &UserData);
1615         rkgm->rkgm_userdata = rd_kafkap_bytes_copy(&UserData);
1616 
1617         if (Version >= 1 &&
1618             !(rkgm->rkgm_owned = rd_kafka_buf_read_topic_partitions(
1619                         rkbuf, 0, rd_false, rd_false)))
1620                 goto err;
1621 
1622         rd_kafka_buf_destroy(rkbuf);
1623 
1624         return 0;
1625 
1626  err_parse:
1627         err = rkbuf->rkbuf_err;
1628 
1629  err:
1630         rd_rkb_dbg(rkb, CGRP, "MEMBERMETA",
1631                    "Failed to parse MemberMetadata for \"%.*s\": %s",
1632                    RD_KAFKAP_STR_PR(rkgm->rkgm_member_id),
1633                    rd_kafka_err2str(err));
1634         if (rkgm->rkgm_subscription) {
1635                 rd_kafka_topic_partition_list_destroy(rkgm->
1636                                                       rkgm_subscription);
1637                 rkgm->rkgm_subscription = NULL;
1638         }
1639 
1640         rd_kafka_buf_destroy(rkbuf);
1641         return -1;
1642 }
1643 
1644 
1645 /**
1646  * @brief The rebalance protocol currently in use. This will be
1647  *        RD_KAFKA_REBALANCE_PROTOCOL_NONE if the consumer has not
1648  *        (yet) joined a group, else it will match the rebalance
1649  *        protocol of the configured assignor(s).
1650  *
1651  * @locality main thread
1652  */
1653 rd_kafka_rebalance_protocol_t
rd_kafka_cgrp_rebalance_protocol(rd_kafka_cgrp_t * rkcg)1654 rd_kafka_cgrp_rebalance_protocol (rd_kafka_cgrp_t *rkcg) {
1655         if (!rkcg->rkcg_assignor)
1656                 return RD_KAFKA_REBALANCE_PROTOCOL_NONE;
1657         return rkcg->rkcg_assignor->rkas_protocol;
1658 }
1659 
1660 
1661 /**
1662  * @brief cgrp handler for JoinGroup responses
1663  * opaque must be the cgrp handle.
1664  *
1665  * @locality rdkafka main thread (unless ERR__DESTROY: arbitrary thread)
1666  */
rd_kafka_cgrp_handle_JoinGroup(rd_kafka_t * rk,rd_kafka_broker_t * rkb,rd_kafka_resp_err_t err,rd_kafka_buf_t * rkbuf,rd_kafka_buf_t * request,void * opaque)1667 static void rd_kafka_cgrp_handle_JoinGroup (rd_kafka_t *rk,
1668                                             rd_kafka_broker_t *rkb,
1669                                             rd_kafka_resp_err_t err,
1670                                             rd_kafka_buf_t *rkbuf,
1671                                             rd_kafka_buf_t *request,
1672                                             void *opaque) {
1673         rd_kafka_cgrp_t *rkcg = opaque;
1674         const int log_decode_errors = LOG_ERR;
1675         int16_t ErrorCode = 0;
1676         int32_t GenerationId;
1677         rd_kafkap_str_t Protocol, LeaderId;
1678         rd_kafkap_str_t MyMemberId = RD_KAFKAP_STR_INITIALIZER;
1679         int32_t member_cnt;
1680         int actions;
1681         int i_am_leader = 0;
1682         rd_kafka_assignor_t *rkas = NULL;
1683 
1684         if (err == RD_KAFKA_RESP_ERR__DESTROY ||
1685             rkcg->rkcg_flags & RD_KAFKA_CGRP_F_TERMINATE)
1686                 return; /* Terminating */
1687 
1688         if (rkcg->rkcg_join_state != RD_KAFKA_CGRP_JOIN_STATE_WAIT_JOIN) {
1689                 rd_kafka_dbg(rkb->rkb_rk, CGRP, "JOINGROUP",
1690                              "JoinGroup response: discarding outdated request "
1691                              "(now in join-state %s)",
1692                              rd_kafka_cgrp_join_state_names[rkcg->
1693                                                             rkcg_join_state]);
1694                 return;
1695         }
1696 
1697         if (err) {
1698                 ErrorCode = err;
1699                 goto err;
1700         }
1701 
1702         if (request->rkbuf_reqhdr.ApiVersion >= 2)
1703                 rd_kafka_buf_read_throttle_time(rkbuf);
1704 
1705         rd_kafka_buf_read_i16(rkbuf, &ErrorCode);
1706         rd_kafka_buf_read_i32(rkbuf, &GenerationId);
1707         rd_kafka_buf_read_str(rkbuf, &Protocol);
1708         rd_kafka_buf_read_str(rkbuf, &LeaderId);
1709         rd_kafka_buf_read_str(rkbuf, &MyMemberId);
1710         rd_kafka_buf_read_i32(rkbuf, &member_cnt);
1711 
1712         if (!ErrorCode && RD_KAFKAP_STR_IS_NULL(&Protocol)) {
1713                 /* Protocol not set, we will not be able to find
1714                  * a matching assignor so error out early. */
1715                 ErrorCode = RD_KAFKA_RESP_ERR__BAD_MSG;
1716         } else if (!ErrorCode) {
1717                 char *protocol_name;
1718                 RD_KAFKAP_STR_DUPA(&protocol_name, &Protocol);
1719                 if (!(rkas = rd_kafka_assignor_find(rkcg->rkcg_rk,
1720                                                     protocol_name)) ||
1721                     !rkas->rkas_enabled) {
1722                         rd_kafka_dbg(rkb->rkb_rk, CGRP, "JOINGROUP",
1723                                 "Unsupported assignment strategy \"%s\"",
1724                                 protocol_name);
1725                         if (rkcg->rkcg_assignor) {
1726                                 if (rkcg->rkcg_assignor->rkas_destroy_state_cb)
1727                                         rkcg->rkcg_assignor->rkas_destroy_state_cb(
1728                                                 rkcg->rkcg_assignor_state);
1729                                 rkcg->rkcg_assignor_state = NULL;
1730                                 rkcg->rkcg_assignor = NULL;
1731                         }
1732                         ErrorCode = RD_KAFKA_RESP_ERR__UNKNOWN_PROTOCOL;
1733                 }
1734 	}
1735 
1736         rd_kafka_dbg(rkb->rkb_rk, CGRP, "JOINGROUP",
1737                      "JoinGroup response: GenerationId %"PRId32", "
1738                      "Protocol %.*s, LeaderId %.*s%s, my MemberId %.*s, "
1739                      "member metadata count ""%"PRId32": %s",
1740                      GenerationId,
1741                      RD_KAFKAP_STR_PR(&Protocol),
1742                      RD_KAFKAP_STR_PR(&LeaderId),
1743                      RD_KAFKAP_STR_LEN(&MyMemberId) &&
1744                      !rd_kafkap_str_cmp(&LeaderId, &MyMemberId) ? " (me)" : "",
1745                      RD_KAFKAP_STR_PR(&MyMemberId),
1746                      member_cnt,
1747                      ErrorCode ? rd_kafka_err2str(ErrorCode) : "(no error)");
1748 
1749         if (!ErrorCode) {
1750                 char *my_member_id;
1751                 RD_KAFKAP_STR_DUPA(&my_member_id, &MyMemberId);
1752                 rd_kafka_cgrp_set_member_id(rkcg, my_member_id);
1753                 rkcg->rkcg_generation_id = GenerationId;
1754                 i_am_leader = !rd_kafkap_str_cmp(&LeaderId, &MyMemberId);
1755         } else {
1756                 rd_interval_backoff(&rkcg->rkcg_join_intvl, 1000*1000);
1757                 goto err;
1758         }
1759 
1760         if (rkcg->rkcg_assignor && rkcg->rkcg_assignor != rkas) {
1761                 if (rkcg->rkcg_assignor->rkas_destroy_state_cb)
1762                         rkcg->rkcg_assignor->rkas_destroy_state_cb(
1763                                 rkcg->rkcg_assignor_state);
1764                 rkcg->rkcg_assignor_state = NULL;
1765         }
1766         rkcg->rkcg_assignor = rkas;
1767 
1768         if (i_am_leader) {
1769                 rd_kafka_group_member_t *members;
1770                 int i;
1771                 int sub_cnt = 0;
1772                 rd_list_t topics;
1773                 rd_kafka_op_t *rko;
1774                 rd_kafka_dbg(rkb->rkb_rk, CGRP, "JOINGROUP",
1775                              "I am elected leader for group \"%s\" "
1776                              "with %"PRId32" member(s)",
1777                              rkcg->rkcg_group_id->str, member_cnt);
1778 
1779                 if (member_cnt > 100000) {
1780                         err = RD_KAFKA_RESP_ERR__BAD_MSG;
1781                         goto err;
1782                 }
1783 
1784                 rd_list_init(&topics, member_cnt, rd_free);
1785 
1786                 members = rd_calloc(member_cnt, sizeof(*members));
1787 
1788                 for (i = 0 ; i < member_cnt ; i++) {
1789                         rd_kafkap_str_t MemberId;
1790                         rd_kafkap_bytes_t MemberMetadata;
1791                         rd_kafka_group_member_t *rkgm;
1792                         rd_kafkap_str_t GroupInstanceId = RD_KAFKAP_STR_INITIALIZER;
1793 
1794                         rd_kafka_buf_read_str(rkbuf, &MemberId);
1795                         if (request->rkbuf_reqhdr.ApiVersion >= 5)
1796                                 rd_kafka_buf_read_str(rkbuf, &GroupInstanceId);
1797                         rd_kafka_buf_read_bytes(rkbuf, &MemberMetadata);
1798 
1799                         rkgm = &members[sub_cnt];
1800                         rkgm->rkgm_member_id = rd_kafkap_str_copy(&MemberId);
1801                         rkgm->rkgm_group_instance_id =
1802                                 rd_kafkap_str_copy(&GroupInstanceId);
1803                         rd_list_init(&rkgm->rkgm_eligible, 0, NULL);
1804                         rkgm->rkgm_generation = -1;
1805 
1806                         if (rd_kafka_group_MemberMetadata_consumer_read(
1807                                     rkb, rkgm, &MemberMetadata)) {
1808                                 /* Failed to parse this member's metadata,
1809                                  * ignore it. */
1810                         } else {
1811                                 sub_cnt++;
1812                                 rkgm->rkgm_assignment =
1813                                         rd_kafka_topic_partition_list_new(
1814                                                 rkgm->rkgm_subscription->cnt);
1815                                 rd_kafka_topic_partition_list_get_topic_names(
1816                                         rkgm->rkgm_subscription, &topics,
1817                                         0/*dont include regex*/);
1818                         }
1819 
1820                 }
1821 
1822                 /* FIXME: What to do if parsing failed for some/all members?
1823                  *        It is a sign of incompatibility. */
1824 
1825 
1826                 rd_kafka_cgrp_group_leader_reset(rkcg,
1827                                                  "JoinGroup response clean-up");
1828 
1829                 rd_kafka_assert(NULL, rkcg->rkcg_group_leader.members == NULL);
1830                 rkcg->rkcg_group_leader.members    = members;
1831                 rkcg->rkcg_group_leader.member_cnt = sub_cnt;
1832 
1833                 rd_kafka_cgrp_set_join_state(
1834                         rkcg, RD_KAFKA_CGRP_JOIN_STATE_WAIT_METADATA);
1835 
1836                 /* The assignor will need metadata so fetch it asynchronously
1837                  * and run the assignor when we get a reply.
1838                  * Create a callback op that the generic metadata code
1839                  * will trigger when metadata has been parsed. */
1840                 rko = rd_kafka_op_new_cb(
1841                         rkcg->rkcg_rk, RD_KAFKA_OP_METADATA,
1842                         rd_kafka_cgrp_assignor_handle_Metadata_op);
1843                 rd_kafka_op_set_replyq(rko, rkcg->rkcg_ops, NULL);
1844 
1845                 rd_kafka_MetadataRequest(
1846                         rkb, &topics,
1847                         "partition assignor",
1848                         rd_false/*!allow_auto_create*/,
1849                         /* cgrp_update=false:
1850                          * Since the subscription list may not be identical
1851                          * across all members of the group and thus the
1852                          * Metadata response may not be identical to this
1853                          * consumer's subscription list, we want to
1854                          * avoid triggering a rejoin or error propagation
1855                          * on receiving the response since some topics
1856                          * may be missing. */
1857                         rd_false,
1858                         rko);
1859                 rd_list_destroy(&topics);
1860 
1861         } else {
1862                 rd_kafka_cgrp_set_join_state(
1863                         rkcg, RD_KAFKA_CGRP_JOIN_STATE_WAIT_SYNC);
1864 
1865                 rd_kafka_SyncGroupRequest(rkb, rkcg->rkcg_group_id,
1866                                           rkcg->rkcg_generation_id,
1867                                           rkcg->rkcg_member_id,
1868                                           rkcg->rkcg_group_instance_id,
1869                                           NULL, 0,
1870                                           RD_KAFKA_REPLYQ(rkcg->rkcg_ops, 0),
1871                                           rd_kafka_handle_SyncGroup, rkcg);
1872 
1873         }
1874 
1875 err:
1876         actions = rd_kafka_err_action(rkb, ErrorCode, request,
1877                                       RD_KAFKA_ERR_ACTION_IGNORE,
1878                                       RD_KAFKA_RESP_ERR_UNKNOWN_MEMBER_ID,
1879 
1880                                       RD_KAFKA_ERR_ACTION_IGNORE,
1881                                       RD_KAFKA_RESP_ERR_MEMBER_ID_REQUIRED,
1882 
1883                                       RD_KAFKA_ERR_ACTION_IGNORE,
1884                                       RD_KAFKA_RESP_ERR_ILLEGAL_GENERATION,
1885 
1886                                       RD_KAFKA_ERR_ACTION_PERMANENT,
1887                                       RD_KAFKA_RESP_ERR_FENCED_INSTANCE_ID,
1888 
1889                                       RD_KAFKA_ERR_ACTION_END);
1890 
1891         if (actions & RD_KAFKA_ERR_ACTION_REFRESH) {
1892                 /* Re-query for coordinator */
1893                 rd_kafka_cgrp_op(rkcg, NULL, RD_KAFKA_NO_REPLYQ,
1894                                  RD_KAFKA_OP_COORD_QUERY, ErrorCode);
1895         }
1896 
1897         /* No need for retries here since the join is intervalled,
1898          * see rkcg_join_intvl */
1899 
1900         if (ErrorCode) {
1901                 if (ErrorCode == RD_KAFKA_RESP_ERR__DESTROY)
1902                         return; /* Termination */
1903 
1904                 if (ErrorCode == RD_KAFKA_RESP_ERR_FENCED_INSTANCE_ID) {
1905                         rd_kafka_set_fatal_error(rkcg->rkcg_rk, ErrorCode,
1906                                                  "Fatal consumer error: %s",
1907                                                  rd_kafka_err2str(ErrorCode));
1908                         ErrorCode = RD_KAFKA_RESP_ERR__FATAL;
1909 
1910                 } else if (actions & RD_KAFKA_ERR_ACTION_PERMANENT)
1911                         rd_kafka_consumer_err(rkcg->rkcg_q,
1912                                               rd_kafka_broker_id(rkb),
1913                                               ErrorCode, 0, NULL, NULL,
1914                                               RD_KAFKA_OFFSET_INVALID,
1915                                               "JoinGroup failed: %s",
1916                                               rd_kafka_err2str(ErrorCode));
1917 
1918                 if (ErrorCode == RD_KAFKA_RESP_ERR_UNKNOWN_MEMBER_ID)
1919                         rd_kafka_cgrp_set_member_id(rkcg, "");
1920                 else if (ErrorCode == RD_KAFKA_RESP_ERR_ILLEGAL_GENERATION)
1921                         rkcg->rkcg_generation_id = -1;
1922                 else if (ErrorCode == RD_KAFKA_RESP_ERR_MEMBER_ID_REQUIRED) {
1923                         /* KIP-394 requires member.id on initial join
1924                          * group request */
1925                         char *my_member_id;
1926                         RD_KAFKAP_STR_DUPA(&my_member_id, &MyMemberId);
1927                         rd_kafka_cgrp_set_member_id(rkcg, my_member_id);
1928                         /* Skip the join backoff */
1929                         rd_interval_reset(&rkcg->rkcg_join_intvl);
1930                 }
1931 
1932                 if (rd_kafka_cgrp_rebalance_protocol(rkcg) ==
1933                     RD_KAFKA_REBALANCE_PROTOCOL_COOPERATIVE &&
1934                     (ErrorCode == RD_KAFKA_RESP_ERR_ILLEGAL_GENERATION ||
1935                      ErrorCode == RD_KAFKA_RESP_ERR_MEMBER_ID_REQUIRED))
1936                         rd_kafka_cgrp_revoke_all_rejoin(
1937                                 rkcg,
1938                                 rd_true/*assignment is lost*/,
1939                                 rd_true/*this consumer is initiating*/,
1940                                 "JoinGroup error");
1941                 else
1942                         rd_kafka_cgrp_rejoin(rkcg,
1943                                              "JoinGroup error: %s",
1944                                              rd_kafka_err2str(ErrorCode));
1945 
1946         }
1947 
1948         return;
1949 
1950  err_parse:
1951         ErrorCode = rkbuf->rkbuf_err;
1952         goto err;
1953 }
1954 
1955 
1956 /**
1957  * @brief Check subscription against requested Metadata.
1958  */
1959 static rd_kafka_op_res_t
rd_kafka_cgrp_handle_Metadata_op(rd_kafka_t * rk,rd_kafka_q_t * rkq,rd_kafka_op_t * rko)1960 rd_kafka_cgrp_handle_Metadata_op (rd_kafka_t *rk, rd_kafka_q_t *rkq,
1961                                   rd_kafka_op_t *rko) {
1962         rd_kafka_cgrp_t *rkcg = rk->rk_cgrp;
1963 
1964         if (rko->rko_err == RD_KAFKA_RESP_ERR__DESTROY)
1965                 return RD_KAFKA_OP_RES_HANDLED; /* Terminating */
1966 
1967         rd_kafka_cgrp_metadata_update_check(rkcg, rd_false/*dont rejoin*/);
1968 
1969         return RD_KAFKA_OP_RES_HANDLED;
1970 }
1971 
1972 
1973 /**
1974  * @brief (Async) Refresh metadata (for cgrp's needs)
1975  *
1976  * @returns 1 if metadata refresh was requested, or 0 if metadata is
1977  *          up to date, or -1 if no broker is available for metadata requests.
1978  *
1979  * @locks none
1980  * @locality rdkafka main thread
1981  */
rd_kafka_cgrp_metadata_refresh(rd_kafka_cgrp_t * rkcg,int * metadata_agep,const char * reason)1982 static int rd_kafka_cgrp_metadata_refresh (rd_kafka_cgrp_t *rkcg,
1983                                             int *metadata_agep,
1984                                             const char *reason) {
1985         rd_kafka_t *rk = rkcg->rkcg_rk;
1986         rd_kafka_op_t *rko;
1987         rd_list_t topics;
1988         rd_kafka_resp_err_t err;
1989 
1990         rd_list_init(&topics, 8, rd_free);
1991 
1992         /* Insert all non-wildcard topics in cache. */
1993         rd_kafka_metadata_cache_hint_rktparlist(rkcg->rkcg_rk,
1994                                                 rkcg->rkcg_subscription,
1995                                                 NULL, 0/*dont replace*/);
1996 
1997         if (rkcg->rkcg_flags & RD_KAFKA_CGRP_F_WILDCARD_SUBSCRIPTION) {
1998                 /* For wildcard subscriptions make sure the
1999                  * cached full metadata isn't too old. */
2000                 int metadata_age = -1;
2001 
2002                 if (rk->rk_ts_full_metadata)
2003                         metadata_age = (int)(rd_clock() -
2004                                              rk->rk_ts_full_metadata)/1000;
2005 
2006                 *metadata_agep = metadata_age;
2007 
2008                 if (metadata_age != -1 &&
2009                     metadata_age <= rk->rk_conf.metadata_max_age_ms) {
2010                         rd_kafka_dbg(rk, CGRP|RD_KAFKA_DBG_METADATA,
2011                                      "CGRPMETADATA",
2012                                      "%s: metadata for wildcard subscription "
2013                                      "is up to date (%dms old)",
2014                                      reason, *metadata_agep);
2015                         rd_list_destroy(&topics);
2016                         return 0; /* Up-to-date */
2017                 }
2018 
2019         } else {
2020                 /* Check that all subscribed topics are in the cache. */
2021                 int r;
2022 
2023                 rd_kafka_topic_partition_list_get_topic_names(
2024                         rkcg->rkcg_subscription, &topics, 0/*no regexps*/);
2025 
2026                 rd_kafka_rdlock(rk);
2027                 r = rd_kafka_metadata_cache_topics_count_exists(rk, &topics,
2028                                                                 metadata_agep);
2029                 rd_kafka_rdunlock(rk);
2030 
2031                 if (r == rd_list_cnt(&topics)) {
2032                         rd_kafka_dbg(rk, CGRP|RD_KAFKA_DBG_METADATA,
2033                                      "CGRPMETADATA",
2034                                      "%s: metadata for subscription "
2035                                      "is up to date (%dms old)", reason,
2036                                      *metadata_agep);
2037                         rd_list_destroy(&topics);
2038                         return 0; /* Up-to-date and all topics exist. */
2039                 }
2040 
2041                 rd_kafka_dbg(rk, CGRP|RD_KAFKA_DBG_METADATA,
2042                              "CGRPMETADATA",
2043                              "%s: metadata for subscription "
2044                              "only available for %d/%d topics (%dms old)",
2045                              reason, r, rd_list_cnt(&topics), *metadata_agep);
2046         }
2047 
2048         /* Async request, result will be triggered from
2049          * rd_kafka_parse_metadata(). */
2050         rko = rd_kafka_op_new_cb(rkcg->rkcg_rk, RD_KAFKA_OP_METADATA,
2051                                  rd_kafka_cgrp_handle_Metadata_op);
2052         rd_kafka_op_set_replyq(rko, rkcg->rkcg_ops, 0);
2053 
2054         err = rd_kafka_metadata_request(rkcg->rkcg_rk, NULL, &topics,
2055                                         rd_false/*!allow auto create */,
2056                                         rd_true/*cgrp_update*/,
2057                                         reason, rko);
2058         if (err) {
2059                 rd_kafka_dbg(rk, CGRP|RD_KAFKA_DBG_METADATA,
2060                              "CGRPMETADATA",
2061                              "%s: need to refresh metadata (%dms old) "
2062                              "but no usable brokers available: %s",
2063                              reason, *metadata_agep, rd_kafka_err2str(err));
2064                 rd_kafka_op_destroy(rko);
2065         }
2066 
2067         rd_list_destroy(&topics);
2068 
2069         return err ? -1 : 1;
2070 }
2071 
2072 
2073 
rd_kafka_cgrp_join(rd_kafka_cgrp_t * rkcg)2074 static void rd_kafka_cgrp_join (rd_kafka_cgrp_t *rkcg) {
2075         int metadata_age;
2076 
2077         if (rkcg->rkcg_state != RD_KAFKA_CGRP_STATE_UP ||
2078             rkcg->rkcg_join_state != RD_KAFKA_CGRP_JOIN_STATE_INIT)
2079                 return;
2080 
2081         /* On max.poll.interval.ms failure, do not rejoin group until the
2082          * application has called poll. */
2083         if ((rkcg->rkcg_flags & RD_KAFKA_CGRP_F_MAX_POLL_EXCEEDED) &&
2084             rd_kafka_max_poll_exceeded(rkcg->rkcg_rk))
2085                 return;
2086 
2087         rkcg->rkcg_flags &= ~RD_KAFKA_CGRP_F_MAX_POLL_EXCEEDED;
2088 
2089         rd_kafka_dbg(rkcg->rkcg_rk, CGRP, "JOIN",
2090                      "Group \"%.*s\": join with %d subscribed topic(s)",
2091                      RD_KAFKAP_STR_PR(rkcg->rkcg_group_id),
2092                      rd_list_cnt(rkcg->rkcg_subscribed_topics));
2093 
2094 
2095         /* See if we need to query metadata to continue:
2096          * - if subscription contains wildcards:
2097          *   * query all topics in cluster
2098          *
2099          * - if subscription does not contain wildcards but
2100          *   some topics are missing from the local metadata cache:
2101          *   * query subscribed topics (all cached ones)
2102          *
2103          * - otherwise:
2104          *   * rely on topic metadata cache
2105          */
2106         /* We need up-to-date full metadata to continue,
2107          * refresh metadata if necessary. */
2108         if (rd_kafka_cgrp_metadata_refresh(rkcg, &metadata_age,
2109                                            "consumer join") == 1) {
2110                 rd_kafka_dbg(rkcg->rkcg_rk, CGRP|RD_KAFKA_DBG_CONSUMER, "JOIN",
2111                              "Group \"%.*s\": "
2112                              "postponing join until up-to-date "
2113                              "metadata is available",
2114                              RD_KAFKAP_STR_PR(rkcg->rkcg_group_id));
2115 
2116                 rd_assert(rkcg->rkcg_join_state ==
2117                           RD_KAFKA_CGRP_JOIN_STATE_INIT ||
2118                           /* Possible via rd_kafka_cgrp_modify_subscription */
2119                           rkcg->rkcg_join_state ==
2120                           RD_KAFKA_CGRP_JOIN_STATE_STEADY);
2121 
2122                 rd_kafka_cgrp_set_join_state(
2123                         rkcg, RD_KAFKA_CGRP_JOIN_STATE_WAIT_METADATA);
2124 
2125                 return; /* ^ async call */
2126         }
2127 
2128         if (rd_list_empty(rkcg->rkcg_subscribed_topics))
2129                 rd_kafka_cgrp_metadata_update_check(rkcg,
2130                                                     rd_false/*dont join*/);
2131 
2132         if (rd_list_empty(rkcg->rkcg_subscribed_topics)) {
2133                 rd_kafka_dbg(rkcg->rkcg_rk, CGRP|RD_KAFKA_DBG_CONSUMER, "JOIN",
2134                              "Group \"%.*s\": "
2135                              "no matching topics based on %dms old metadata: "
2136                              "next metadata refresh in %dms",
2137                              RD_KAFKAP_STR_PR(rkcg->rkcg_group_id),
2138                              metadata_age,
2139                              rkcg->rkcg_rk->rk_conf.
2140                              metadata_refresh_interval_ms - metadata_age);
2141                 return;
2142         }
2143 
2144         rd_rkb_dbg(rkcg->rkcg_curr_coord, CONSUMER, "JOIN",
2145                    "Joining group \"%.*s\" with %d subscribed topic(s)",
2146                    RD_KAFKAP_STR_PR(rkcg->rkcg_group_id),
2147                    rd_list_cnt(rkcg->rkcg_subscribed_topics));
2148 
2149         rd_kafka_cgrp_set_join_state(rkcg, RD_KAFKA_CGRP_JOIN_STATE_WAIT_JOIN);
2150         rd_kafka_JoinGroupRequest(rkcg->rkcg_coord, rkcg->rkcg_group_id,
2151                                   rkcg->rkcg_member_id,
2152                                   rkcg->rkcg_group_instance_id,
2153                                   rkcg->rkcg_rk->rk_conf.group_protocol_type,
2154                                   rkcg->rkcg_subscribed_topics,
2155                                   RD_KAFKA_REPLYQ(rkcg->rkcg_ops, 0),
2156                                   rd_kafka_cgrp_handle_JoinGroup, rkcg);
2157 }
2158 
2159 /**
2160  * Rejoin group on update to effective subscribed topics list
2161  */
rd_kafka_cgrp_revoke_rejoin(rd_kafka_cgrp_t * rkcg,const char * reason)2162 static void rd_kafka_cgrp_revoke_rejoin (rd_kafka_cgrp_t *rkcg,
2163                                          const char *reason) {
2164         /*
2165          * Clean-up group leader duties, if any.
2166          */
2167         rd_kafka_cgrp_group_leader_reset(rkcg, "group (re)join");
2168 
2169         rd_kafka_dbg(rkcg->rkcg_rk, CGRP, "REJOIN",
2170                      "Group \"%.*s\" (re)joining in join-state %s "
2171                      "with %d assigned partition(s): %s",
2172                      RD_KAFKAP_STR_PR(rkcg->rkcg_group_id),
2173                      rd_kafka_cgrp_join_state_names[rkcg->rkcg_join_state],
2174                      rkcg->rkcg_group_assignment ?
2175                      rkcg->rkcg_group_assignment->cnt : 0,
2176                      reason);
2177 
2178         rd_kafka_cgrp_revoke_all_rejoin(rkcg,
2179                                         rd_false/*not lost*/,
2180                                         rd_true/*initiating*/,
2181                                         reason);
2182 }
2183 
2184 /**
2185  * @brief Update the effective list of subscribed topics.
2186  *
2187  * Set \p tinfos to NULL to clear the list.
2188  *
2189  * @param tinfos rd_list_t(rd_kafka_topic_info_t *): new effective topic list
2190  *
2191  * @returns true on change, else false.
2192  *
2193  * @remark Takes ownership of \p tinfos
2194  */
2195 static rd_bool_t
rd_kafka_cgrp_update_subscribed_topics(rd_kafka_cgrp_t * rkcg,rd_list_t * tinfos)2196 rd_kafka_cgrp_update_subscribed_topics (rd_kafka_cgrp_t *rkcg,
2197                                         rd_list_t *tinfos) {
2198         rd_kafka_topic_info_t *tinfo;
2199         int i;
2200 
2201         if (!tinfos) {
2202                 if (!rd_list_empty(rkcg->rkcg_subscribed_topics))
2203                         rd_kafka_dbg(rkcg->rkcg_rk, CGRP, "SUBSCRIPTION",
2204                                      "Group \"%.*s\": "
2205                                      "clearing subscribed topics list (%d)",
2206                                      RD_KAFKAP_STR_PR(rkcg->rkcg_group_id),
2207                                      rd_list_cnt(rkcg->rkcg_subscribed_topics));
2208                 tinfos = rd_list_new(0, (void *)rd_kafka_topic_info_destroy);
2209 
2210         } else {
2211                 if (rd_list_cnt(tinfos) == 0)
2212                         rd_kafka_dbg(rkcg->rkcg_rk, CGRP, "SUBSCRIPTION",
2213                                      "Group \"%.*s\": "
2214                                      "no topics in metadata matched "
2215                                      "subscription",
2216                                      RD_KAFKAP_STR_PR(rkcg->rkcg_group_id));
2217         }
2218 
2219         /* Sort for comparison */
2220         rd_list_sort(tinfos, rd_kafka_topic_info_cmp);
2221 
2222         /* Compare to existing to see if anything changed. */
2223         if (!rd_list_cmp(rkcg->rkcg_subscribed_topics, tinfos,
2224                          rd_kafka_topic_info_cmp)) {
2225                 /* No change */
2226                 rd_list_destroy(tinfos);
2227                 return rd_false;
2228         }
2229 
2230         rd_kafka_dbg(rkcg->rkcg_rk, CGRP|RD_KAFKA_DBG_METADATA, "SUBSCRIPTION",
2231                      "Group \"%.*s\": effective subscription list changed "
2232                      "from %d to %d topic(s):",
2233                      RD_KAFKAP_STR_PR(rkcg->rkcg_group_id),
2234                      rd_list_cnt(rkcg->rkcg_subscribed_topics),
2235                      rd_list_cnt(tinfos));
2236 
2237         RD_LIST_FOREACH(tinfo, tinfos, i)
2238                 rd_kafka_dbg(rkcg->rkcg_rk, CGRP|RD_KAFKA_DBG_METADATA,
2239                              "SUBSCRIPTION",
2240                              " Topic %s with %d partition(s)",
2241                              tinfo->topic, tinfo->partition_cnt);
2242 
2243         rd_list_destroy(rkcg->rkcg_subscribed_topics);
2244 
2245         rkcg->rkcg_subscribed_topics = tinfos;
2246 
2247         return rd_true;
2248 }
2249 
2250 
2251 /**
2252  * @brief Handle Heartbeat response.
2253  */
rd_kafka_cgrp_handle_Heartbeat(rd_kafka_t * rk,rd_kafka_broker_t * rkb,rd_kafka_resp_err_t err,rd_kafka_buf_t * rkbuf,rd_kafka_buf_t * request,void * opaque)2254 void rd_kafka_cgrp_handle_Heartbeat (rd_kafka_t *rk,
2255                                      rd_kafka_broker_t *rkb,
2256                                      rd_kafka_resp_err_t err,
2257                                      rd_kafka_buf_t *rkbuf,
2258                                      rd_kafka_buf_t *request,
2259                                      void *opaque) {
2260         rd_kafka_cgrp_t *rkcg = rk->rk_cgrp;
2261         const int log_decode_errors = LOG_ERR;
2262         int16_t ErrorCode = 0;
2263         int actions = 0;
2264 
2265         if (err == RD_KAFKA_RESP_ERR__DESTROY)
2266                 return;
2267 
2268         rd_dassert(rkcg->rkcg_flags & RD_KAFKA_CGRP_F_HEARTBEAT_IN_TRANSIT);
2269         rkcg->rkcg_flags &= ~RD_KAFKA_CGRP_F_HEARTBEAT_IN_TRANSIT;
2270 
2271         rkcg->rkcg_last_heartbeat_err = RD_KAFKA_RESP_ERR_NO_ERROR;
2272 
2273         if (err)
2274                 goto err;
2275 
2276         if (request->rkbuf_reqhdr.ApiVersion >= 1)
2277                 rd_kafka_buf_read_throttle_time(rkbuf);
2278 
2279         rd_kafka_buf_read_i16(rkbuf, &ErrorCode);
2280         if (ErrorCode) {
2281                 err = ErrorCode;
2282                 goto err;
2283         }
2284 
2285         rd_kafka_cgrp_update_session_timeout(
2286                 rkcg, rd_false/*don't update if session has expired*/);
2287 
2288         return;
2289 
2290  err_parse:
2291         err = rkbuf->rkbuf_err;
2292  err:
2293         rkcg->rkcg_last_heartbeat_err = err;
2294 
2295 	rd_kafka_dbg(rkcg->rkcg_rk, CGRP, "HEARTBEAT",
2296 		     "Group \"%s\" heartbeat error response in "
2297 		     "state %s (join-state %s, %d partition(s) assigned): %s",
2298 		     rkcg->rkcg_group_id->str,
2299 		     rd_kafka_cgrp_state_names[rkcg->rkcg_state],
2300 		     rd_kafka_cgrp_join_state_names[rkcg->rkcg_join_state],
2301                      rkcg->rkcg_group_assignment ?
2302                      rkcg->rkcg_group_assignment->cnt : 0,
2303 		     rd_kafka_err2str(err));
2304 
2305 	if (rkcg->rkcg_join_state <= RD_KAFKA_CGRP_JOIN_STATE_WAIT_SYNC) {
2306 		rd_kafka_dbg(rkcg->rkcg_rk, CGRP, "HEARTBEAT",
2307 			     "Heartbeat response: discarding outdated "
2308 			     "request (now in join-state %s)",
2309 			     rd_kafka_cgrp_join_state_names[rkcg->
2310                                                             rkcg_join_state]);
2311 		return;
2312 	}
2313 
2314 	switch (err)
2315 	{
2316 	case RD_KAFKA_RESP_ERR__DESTROY:
2317 		/* quick cleanup */
2318                 return;
2319 
2320 	case RD_KAFKA_RESP_ERR_NOT_COORDINATOR_FOR_GROUP:
2321 	case RD_KAFKA_RESP_ERR_GROUP_COORDINATOR_NOT_AVAILABLE:
2322 	case RD_KAFKA_RESP_ERR__TRANSPORT:
2323                 rd_kafka_dbg(rkcg->rkcg_rk, CONSUMER, "HEARTBEAT",
2324                              "Heartbeat failed due to coordinator (%s) "
2325                              "no longer available: %s: "
2326                              "re-querying for coordinator",
2327                              rkcg->rkcg_curr_coord ?
2328                              rd_kafka_broker_name(rkcg->rkcg_curr_coord) :
2329                              "none",
2330                              rd_kafka_err2str(err));
2331 		/* Remain in joined state and keep querying for coordinator */
2332                 actions = RD_KAFKA_ERR_ACTION_REFRESH;
2333                 break;
2334 
2335         case RD_KAFKA_RESP_ERR_REBALANCE_IN_PROGRESS:
2336                 /* No further action if already rebalancing */
2337                 if (RD_KAFKA_CGRP_WAIT_ASSIGN_CALL(rkcg))
2338                         return;
2339                 rd_kafka_cgrp_group_is_rebalancing(rkcg);
2340                 return;
2341 
2342         case RD_KAFKA_RESP_ERR_UNKNOWN_MEMBER_ID:
2343                 rd_kafka_cgrp_set_member_id(rkcg, "");
2344                 rd_kafka_cgrp_revoke_all_rejoin_maybe(rkcg,
2345                                                       rd_true/*lost*/,
2346                                                       rd_true/*initiating*/,
2347                                                       "resetting member-id");
2348                 return;
2349 
2350         case RD_KAFKA_RESP_ERR_ILLEGAL_GENERATION:
2351                 rkcg->rkcg_generation_id = -1;
2352                 rd_kafka_cgrp_revoke_all_rejoin_maybe(rkcg,
2353                                                       rd_true/*lost*/,
2354                                                       rd_true/*initiating*/,
2355                                                       "illegal generation");
2356                 return;
2357 
2358         case RD_KAFKA_RESP_ERR_FENCED_INSTANCE_ID:
2359                 rd_kafka_set_fatal_error(rkcg->rkcg_rk, err,
2360                                          "Fatal consumer error: %s",
2361                                          rd_kafka_err2str(err));
2362                 rd_kafka_cgrp_revoke_all_rejoin_maybe(rkcg,
2363                                                       rd_true,/*assignment lost*/
2364                                                       rd_true,/*initiating*/
2365                                                       "consumer fenced by "
2366                                                       "newer instance");
2367                 return;
2368 
2369         default:
2370                 actions = rd_kafka_err_action(rkb, err, request,
2371                                               RD_KAFKA_ERR_ACTION_END);
2372                 break;
2373         }
2374 
2375 
2376         if (actions & RD_KAFKA_ERR_ACTION_REFRESH) {
2377                 /* Re-query for coordinator */
2378                 rd_kafka_cgrp_coord_query(rkcg, rd_kafka_err2str(err));
2379         }
2380 
2381         if (actions & RD_KAFKA_ERR_ACTION_RETRY &&
2382             rd_kafka_buf_retry(rkb, request)) {
2383                 /* Retry */
2384                 rkcg->rkcg_flags |= RD_KAFKA_CGRP_F_HEARTBEAT_IN_TRANSIT;
2385                 return;
2386         }
2387 }
2388 
2389 
2390 
2391 /**
2392  * @brief Send Heartbeat
2393  */
rd_kafka_cgrp_heartbeat(rd_kafka_cgrp_t * rkcg)2394 static void rd_kafka_cgrp_heartbeat (rd_kafka_cgrp_t *rkcg) {
2395         /* Don't send heartbeats if max.poll.interval.ms was exceeded */
2396         if (rkcg->rkcg_flags & RD_KAFKA_CGRP_F_MAX_POLL_EXCEEDED)
2397                 return;
2398 
2399         /* Skip heartbeat if we have one in transit */
2400         if (rkcg->rkcg_flags & RD_KAFKA_CGRP_F_HEARTBEAT_IN_TRANSIT)
2401                 return;
2402 
2403         rkcg->rkcg_flags |= RD_KAFKA_CGRP_F_HEARTBEAT_IN_TRANSIT;
2404         rd_kafka_HeartbeatRequest(rkcg->rkcg_coord, rkcg->rkcg_group_id,
2405                                   rkcg->rkcg_generation_id,
2406                                   rkcg->rkcg_member_id,
2407                                   rkcg->rkcg_group_instance_id,
2408                                   RD_KAFKA_REPLYQ(rkcg->rkcg_ops, 0),
2409                                   rd_kafka_cgrp_handle_Heartbeat, NULL);
2410 }
2411 
2412 /**
2413  * Cgrp is now terminated: decommission it and signal back to application.
2414  */
rd_kafka_cgrp_terminated(rd_kafka_cgrp_t * rkcg)2415 static void rd_kafka_cgrp_terminated (rd_kafka_cgrp_t *rkcg) {
2416         if (rkcg->rkcg_flags & RD_KAFKA_CGRP_F_TERMINATED)
2417                 return; /* terminated() may be called multiple times,
2418                          * make sure to only terminate once. */
2419 
2420         rd_kafka_cgrp_group_assignment_set(rkcg, NULL);
2421 
2422         rd_kafka_assert(NULL, !rd_kafka_assignment_in_progress(rkcg->rkcg_rk));
2423         rd_kafka_assert(NULL, !rkcg->rkcg_group_assignment);
2424         rd_kafka_assert(NULL, rkcg->rkcg_rk->rk_consumer.wait_commit_cnt == 0);
2425         rd_kafka_assert(NULL, rkcg->rkcg_state == RD_KAFKA_CGRP_STATE_TERM);
2426 
2427         rd_kafka_timer_stop(&rkcg->rkcg_rk->rk_timers,
2428                             &rkcg->rkcg_offset_commit_tmr, 1/*lock*/);
2429 
2430 	rd_kafka_q_purge(rkcg->rkcg_wait_coord_q);
2431 
2432 	/* Disable and empty ops queue since there will be no
2433 	 * (broker) thread serving it anymore after the unassign_broker
2434 	 * below.
2435 	 * This prevents hang on destroy where responses are enqueued on rkcg_ops
2436 	 * without anything serving the queue. */
2437 	rd_kafka_q_disable(rkcg->rkcg_ops);
2438 	rd_kafka_q_purge(rkcg->rkcg_ops);
2439 
2440 	if (rkcg->rkcg_curr_coord)
2441 		rd_kafka_cgrp_coord_clear_broker(rkcg);
2442 
2443         if (rkcg->rkcg_coord) {
2444                 rd_kafka_broker_destroy(rkcg->rkcg_coord);
2445                 rkcg->rkcg_coord = NULL;
2446         }
2447 
2448         if (rkcg->rkcg_reply_rko) {
2449                 /* Signal back to application. */
2450                 rd_kafka_replyq_enq(&rkcg->rkcg_reply_rko->rko_replyq,
2451 				    rkcg->rkcg_reply_rko, 0);
2452                 rkcg->rkcg_reply_rko = NULL;
2453         }
2454 
2455         rkcg->rkcg_flags |= RD_KAFKA_CGRP_F_TERMINATED;
2456 }
2457 
2458 
2459 /**
2460  * If a cgrp is terminating and all outstanding ops are now finished
2461  * then progress to final termination and return 1.
2462  * Else returns 0.
2463  */
rd_kafka_cgrp_try_terminate(rd_kafka_cgrp_t * rkcg)2464 static RD_INLINE int rd_kafka_cgrp_try_terminate (rd_kafka_cgrp_t *rkcg) {
2465 
2466         if (rkcg->rkcg_state == RD_KAFKA_CGRP_STATE_TERM)
2467                 return 1;
2468 
2469 	if (likely(!(rkcg->rkcg_flags & RD_KAFKA_CGRP_F_TERMINATE)))
2470 		return 0;
2471 
2472 	/* Check if wait-coord queue has timed out. */
2473 	if (rd_kafka_q_len(rkcg->rkcg_wait_coord_q) > 0 &&
2474 	    rkcg->rkcg_ts_terminate +
2475 	    (rkcg->rkcg_rk->rk_conf.group_session_timeout_ms * 1000) <
2476 	    rd_clock()) {
2477 		rd_kafka_dbg(rkcg->rkcg_rk, CGRP, "CGRPTERM",
2478 			     "Group \"%s\": timing out %d op(s) in "
2479 			     "wait-for-coordinator queue",
2480 			     rkcg->rkcg_group_id->str,
2481 			     rd_kafka_q_len(rkcg->rkcg_wait_coord_q));
2482 		rd_kafka_q_disable(rkcg->rkcg_wait_coord_q);
2483 		if (rd_kafka_q_concat(rkcg->rkcg_ops,
2484 				      rkcg->rkcg_wait_coord_q) == -1) {
2485 			/* ops queue shut down, purge coord queue */
2486 			rd_kafka_q_purge(rkcg->rkcg_wait_coord_q);
2487 		}
2488 	}
2489 
2490         if (!RD_KAFKA_CGRP_WAIT_ASSIGN_CALL(rkcg) &&
2491             rd_list_empty(&rkcg->rkcg_toppars) &&
2492             !rd_kafka_assignment_in_progress(rkcg->rkcg_rk) &&
2493             rkcg->rkcg_rk->rk_consumer.wait_commit_cnt == 0 &&
2494             !(rkcg->rkcg_flags & RD_KAFKA_CGRP_F_WAIT_LEAVE)) {
2495                 /* Since we might be deep down in a 'rko' handler
2496                  * called from cgrp_op_serve() we cant call terminated()
2497                  * directly since it will decommission the rkcg_ops queue
2498                  * that might be locked by intermediate functions.
2499                  * Instead set the TERM state and let the cgrp terminate
2500                  * at its own discretion. */
2501                 rd_kafka_cgrp_set_state(rkcg, RD_KAFKA_CGRP_STATE_TERM);
2502 
2503                 return 1;
2504         } else {
2505                 rd_kafka_dbg(rkcg->rkcg_rk, CGRP, "CGRPTERM",
2506                              "Group \"%s\": "
2507                              "waiting for %s%d toppar(s), "
2508                              "%s"
2509                              "%d commit(s)%s%s%s (state %s, join-state %s) "
2510                              "before terminating",
2511                              rkcg->rkcg_group_id->str,
2512                              RD_KAFKA_CGRP_WAIT_ASSIGN_CALL(rkcg) ?
2513                              "assign call, ": "",
2514                              rd_list_cnt(&rkcg->rkcg_toppars),
2515                              rd_kafka_assignment_in_progress(rkcg->rkcg_rk) ?
2516                              "assignment in progress, " : "",
2517                              rkcg->rkcg_rk->rk_consumer.wait_commit_cnt,
2518                              (rkcg->rkcg_flags & RD_KAFKA_CGRP_F_WAIT_LEAVE)?
2519                              ", wait-leave," : "",
2520                              rkcg->rkcg_rebalance_rejoin ?
2521                              ", rebalance_rejoin,": "",
2522                              (rkcg->rkcg_rebalance_incr_assignment != NULL)?
2523                              ", rebalance_incr_assignment,": "",
2524                              rd_kafka_cgrp_state_names[rkcg->rkcg_state],
2525                              rd_kafka_cgrp_join_state_names[
2526                                      rkcg->rkcg_join_state]);
2527                 return 0;
2528         }
2529 }
2530 
2531 
2532 /**
2533  * @brief Add partition to this cgrp management
2534  *
2535  * @locks none
2536  */
rd_kafka_cgrp_partition_add(rd_kafka_cgrp_t * rkcg,rd_kafka_toppar_t * rktp)2537 static void rd_kafka_cgrp_partition_add (rd_kafka_cgrp_t *rkcg,
2538                                          rd_kafka_toppar_t *rktp) {
2539         rd_kafka_dbg(rkcg->rkcg_rk, CGRP,"PARTADD",
2540                      "Group \"%s\": add %s [%"PRId32"]",
2541                      rkcg->rkcg_group_id->str,
2542                      rktp->rktp_rkt->rkt_topic->str,
2543                      rktp->rktp_partition);
2544 
2545         rd_kafka_toppar_lock(rktp);
2546         rd_assert(!(rktp->rktp_flags & RD_KAFKA_TOPPAR_F_ON_CGRP));
2547         rktp->rktp_flags |= RD_KAFKA_TOPPAR_F_ON_CGRP;
2548         rd_kafka_toppar_unlock(rktp);
2549 
2550         rd_kafka_toppar_keep(rktp);
2551         rd_list_add(&rkcg->rkcg_toppars, rktp);
2552 }
2553 
2554 /**
2555  * @brief Remove partition from this cgrp management
2556  *
2557  * @locks none
2558  */
rd_kafka_cgrp_partition_del(rd_kafka_cgrp_t * rkcg,rd_kafka_toppar_t * rktp)2559 static void rd_kafka_cgrp_partition_del (rd_kafka_cgrp_t *rkcg,
2560                                          rd_kafka_toppar_t *rktp) {
2561         rd_kafka_dbg(rkcg->rkcg_rk, CGRP, "PARTDEL",
2562                      "Group \"%s\": delete %s [%"PRId32"]",
2563                      rkcg->rkcg_group_id->str,
2564                      rktp->rktp_rkt->rkt_topic->str,
2565                      rktp->rktp_partition);
2566 
2567         rd_kafka_toppar_lock(rktp);
2568         rd_assert(rktp->rktp_flags & RD_KAFKA_TOPPAR_F_ON_CGRP);
2569         rktp->rktp_flags &= ~RD_KAFKA_TOPPAR_F_ON_CGRP;
2570         rd_kafka_toppar_unlock(rktp);
2571 
2572         rd_list_remove(&rkcg->rkcg_toppars, rktp);
2573 
2574         rd_kafka_toppar_destroy(rktp); /* refcnt from _add above */
2575 
2576         rd_kafka_cgrp_try_terminate(rkcg);
2577 }
2578 
2579 
2580 
2581 
2582 /**
2583  * @brief Defer offset commit (rko) until coordinator is available.
2584  *
2585  * @returns 1 if the rko was deferred or 0 if the defer queue is disabled
2586  *          or rko already deferred.
2587  */
rd_kafka_cgrp_defer_offset_commit(rd_kafka_cgrp_t * rkcg,rd_kafka_op_t * rko,const char * reason)2588 static int rd_kafka_cgrp_defer_offset_commit (rd_kafka_cgrp_t *rkcg,
2589                                               rd_kafka_op_t *rko,
2590                                               const char *reason) {
2591 
2592         /* wait_coord_q is disabled session.timeout.ms after
2593          * group close() has been initated. */
2594         if (rko->rko_u.offset_commit.ts_timeout != 0 ||
2595             !rd_kafka_q_ready(rkcg->rkcg_wait_coord_q))
2596                 return 0;
2597 
2598         rd_kafka_dbg(rkcg->rkcg_rk, CGRP, "COMMIT",
2599                      "Group \"%s\": "
2600                      "unable to OffsetCommit in state %s: %s: "
2601                      "coordinator (%s) is unavailable: "
2602                      "retrying later",
2603                      rkcg->rkcg_group_id->str,
2604                      rd_kafka_cgrp_state_names[rkcg->rkcg_state],
2605                      reason,
2606                      rkcg->rkcg_curr_coord ?
2607                      rd_kafka_broker_name(rkcg->rkcg_curr_coord) :
2608                      "none");
2609 
2610         rko->rko_flags |= RD_KAFKA_OP_F_REPROCESS;
2611         rko->rko_u.offset_commit.ts_timeout = rd_clock() +
2612                 (rkcg->rkcg_rk->rk_conf.group_session_timeout_ms
2613                  * 1000);
2614         rd_kafka_q_enq(rkcg->rkcg_wait_coord_q, rko);
2615 
2616         return 1;
2617 }
2618 
2619 
2620 /**
2621  * @brief Update the committed offsets for the partitions in \p offsets,
2622  *
2623  * @remark \p offsets may be NULL if \p err is set
2624  * @returns the number of partitions with errors encountered
2625  */
2626 static int
rd_kafka_cgrp_update_committed_offsets(rd_kafka_cgrp_t * rkcg,rd_kafka_resp_err_t err,rd_kafka_topic_partition_list_t * offsets)2627 rd_kafka_cgrp_update_committed_offsets (rd_kafka_cgrp_t *rkcg,
2628                                         rd_kafka_resp_err_t err,
2629                                         rd_kafka_topic_partition_list_t
2630                                         *offsets) {
2631         int i;
2632         int errcnt = 0;
2633 
2634         /* Update toppars' committed offset or global error */
2635         for (i = 0 ; offsets && i < offsets->cnt ; i++) {
2636                 rd_kafka_topic_partition_t *rktpar = &offsets->elems[i];
2637                 rd_kafka_toppar_t *rktp;
2638 
2639                 /* Ignore logical offsets since they were never
2640                  * sent to the broker. */
2641                 if (RD_KAFKA_OFFSET_IS_LOGICAL(rktpar->offset))
2642                         continue;
2643 
2644                 /* Propagate global error to all partitions that don't have
2645                  * explicit error set. */
2646                 if (err && !rktpar->err)
2647                         rktpar->err = err;
2648 
2649                 if (rktpar->err) {
2650                         rd_kafka_dbg(rkcg->rkcg_rk, TOPIC,
2651                                      "OFFSET",
2652                                      "OffsetCommit failed for "
2653                                      "%s [%"PRId32"] at offset "
2654                                      "%"PRId64" in join-state %s: %s",
2655                                      rktpar->topic, rktpar->partition,
2656                                      rktpar->offset,
2657                                      rd_kafka_cgrp_join_state_names[
2658                                              rkcg->rkcg_join_state],
2659                                      rd_kafka_err2str(rktpar->err));
2660 
2661                         errcnt++;
2662                         continue;
2663                 }
2664 
2665                 rktp = rd_kafka_topic_partition_get_toppar(rkcg->rkcg_rk,
2666                                                            rktpar, rd_false);
2667                 if (!rktp)
2668                         continue;
2669 
2670                 rd_kafka_toppar_lock(rktp);
2671                 rktp->rktp_committed_offset = rktpar->offset;
2672                 rd_kafka_toppar_unlock(rktp);
2673 
2674                 rd_kafka_toppar_destroy(rktp); /* from get_toppar() */
2675         }
2676 
2677         return errcnt;
2678 }
2679 
2680 
2681 /**
2682  * @brief Propagate OffsetCommit results.
2683  *
2684  * @param rko_orig The original rko that triggered the commit, this is used
2685  *                 to propagate the result.
2686  * @param err Is the aggregated request-level error, or ERR_NO_ERROR.
2687  * @param errcnt Are the number of partitions in \p offsets that failed
2688  *               offset commit.
2689  */
2690 static void
rd_kafka_cgrp_propagate_commit_result(rd_kafka_cgrp_t * rkcg,rd_kafka_op_t * rko_orig,rd_kafka_resp_err_t err,int errcnt,rd_kafka_topic_partition_list_t * offsets)2691 rd_kafka_cgrp_propagate_commit_result (
2692         rd_kafka_cgrp_t *rkcg,
2693         rd_kafka_op_t *rko_orig,
2694         rd_kafka_resp_err_t err,
2695         int errcnt,
2696         rd_kafka_topic_partition_list_t *offsets) {
2697 
2698         const rd_kafka_t *rk = rkcg->rkcg_rk;
2699         int offset_commit_cb_served = 0;
2700 
2701         /* If no special callback is set but a offset_commit_cb has
2702          * been set in conf then post an event for the latter. */
2703         if (!rko_orig->rko_u.offset_commit.cb && rk->rk_conf.offset_commit_cb) {
2704                 rd_kafka_op_t *rko_reply = rd_kafka_op_new_reply(rko_orig, err);
2705 
2706                 rd_kafka_op_set_prio(rko_reply, RD_KAFKA_PRIO_HIGH);
2707 
2708                 if (offsets)
2709                         rko_reply->rko_u.offset_commit.partitions =
2710                                 rd_kafka_topic_partition_list_copy(offsets);
2711 
2712                 rko_reply->rko_u.offset_commit.cb =
2713                         rk->rk_conf.offset_commit_cb;
2714                 rko_reply->rko_u.offset_commit.opaque = rk->rk_conf.opaque;
2715 
2716                 rd_kafka_q_enq(rk->rk_rep, rko_reply);
2717                 offset_commit_cb_served++;
2718         }
2719 
2720 
2721         /* Enqueue reply to requester's queue, if any. */
2722         if (rko_orig->rko_replyq.q) {
2723                 rd_kafka_op_t *rko_reply = rd_kafka_op_new_reply(rko_orig, err);
2724 
2725                 rd_kafka_op_set_prio(rko_reply, RD_KAFKA_PRIO_HIGH);
2726 
2727                 /* Copy offset & partitions & callbacks to reply op */
2728                 rko_reply->rko_u.offset_commit = rko_orig->rko_u.offset_commit;
2729                 if (offsets)
2730                         rko_reply->rko_u.offset_commit.partitions =
2731                                 rd_kafka_topic_partition_list_copy(offsets);
2732                 if (rko_reply->rko_u.offset_commit.reason)
2733                         rko_reply->rko_u.offset_commit.reason =
2734                                 rd_strdup(rko_reply->rko_u.
2735                                           offset_commit.reason);
2736 
2737                 rd_kafka_replyq_enq(&rko_orig->rko_replyq, rko_reply, 0);
2738                 offset_commit_cb_served++;
2739         }
2740 
2741         if (!offset_commit_cb_served &&
2742             offsets &&
2743             (errcnt > 0 ||
2744              (err != RD_KAFKA_RESP_ERR_NO_ERROR &&
2745               err != RD_KAFKA_RESP_ERR__NO_OFFSET))) {
2746                 /* If there is no callback or handler for this (auto)
2747                  * commit then log an error (#1043) */
2748                 char tmp[512];
2749 
2750                 rd_kafka_topic_partition_list_str(
2751                         offsets, tmp, sizeof(tmp),
2752                         /* Print per-partition errors unless there was a
2753                          * request-level error. */
2754                         RD_KAFKA_FMT_F_OFFSET |
2755                         (errcnt ? RD_KAFKA_FMT_F_ONLY_ERR : 0));
2756 
2757                 rd_kafka_log(rkcg->rkcg_rk, LOG_WARNING, "COMMITFAIL",
2758                              "Offset commit (%s) failed "
2759                              "for %d/%d partition(s) in join-state %s: "
2760                              "%s%s%s",
2761                              rko_orig->rko_u.offset_commit.reason,
2762                              errcnt ? errcnt : offsets->cnt, offsets->cnt,
2763                              rd_kafka_cgrp_join_state_names[rkcg->
2764                                                             rkcg_join_state],
2765                              errcnt ? rd_kafka_err2str(err) : "",
2766                              errcnt ? ": " : "",
2767                              tmp);
2768         }
2769 }
2770 
2771 
2772 
2773 /**
2774  * @brief Handle OffsetCommitResponse
2775  * Takes the original 'rko' as opaque argument.
2776  * @remark \p rkb, rkbuf, and request may be NULL in a number of
2777  *         error cases (e.g., _NO_OFFSET, _WAIT_COORD)
2778  */
rd_kafka_cgrp_op_handle_OffsetCommit(rd_kafka_t * rk,rd_kafka_broker_t * rkb,rd_kafka_resp_err_t err,rd_kafka_buf_t * rkbuf,rd_kafka_buf_t * request,void * opaque)2779 static void rd_kafka_cgrp_op_handle_OffsetCommit (rd_kafka_t *rk,
2780                                                   rd_kafka_broker_t *rkb,
2781                                                   rd_kafka_resp_err_t err,
2782                                                   rd_kafka_buf_t *rkbuf,
2783                                                   rd_kafka_buf_t *request,
2784                                                   void *opaque) {
2785         rd_kafka_cgrp_t *rkcg = rk->rk_cgrp;
2786         rd_kafka_op_t *rko_orig = opaque;
2787         rd_kafka_topic_partition_list_t *offsets =
2788                 rko_orig->rko_u.offset_commit.partitions; /* maybe NULL */
2789         int errcnt;
2790 
2791         RD_KAFKA_OP_TYPE_ASSERT(rko_orig, RD_KAFKA_OP_OFFSET_COMMIT);
2792 
2793         err = rd_kafka_handle_OffsetCommit(rk, rkb, err, rkbuf,
2794                                            request, offsets);
2795 
2796         /* Suppress empty commit debug logs if allowed */
2797         if (err != RD_KAFKA_RESP_ERR__NO_OFFSET ||
2798             !rko_orig->rko_u.offset_commit.silent_empty) {
2799                 if (rkb)
2800                         rd_rkb_dbg(rkb, CGRP, "COMMIT",
2801                                    "OffsetCommit for %d partition(s) in "
2802                                    "join-state %s: "
2803                                    "%s: returned: %s",
2804                                    offsets ? offsets->cnt : -1,
2805                                    rd_kafka_cgrp_join_state_names[
2806                                            rkcg->rkcg_join_state],
2807                                    rko_orig->rko_u.offset_commit.reason,
2808                                    rd_kafka_err2str(err));
2809                 else
2810                         rd_kafka_dbg(rk, CGRP, "COMMIT",
2811                                      "OffsetCommit for %d partition(s) in "
2812                                      "join-state "
2813                                      "%s: %s: "
2814                                      "returned: %s",
2815                                      offsets ? offsets->cnt : -1,
2816                                      rd_kafka_cgrp_join_state_names[
2817                                              rkcg->rkcg_join_state],
2818                                      rko_orig->rko_u.offset_commit.reason,
2819                                      rd_kafka_err2str(err));
2820         }
2821 
2822 
2823         /*
2824          * Error handling
2825          */
2826         switch (err)
2827         {
2828         case RD_KAFKA_RESP_ERR_UNKNOWN_MEMBER_ID:
2829                 /* Revoke assignment and rebalance on unknown member */
2830                 rd_kafka_cgrp_set_member_id(rk->rk_cgrp, "");
2831                 rd_kafka_cgrp_revoke_all_rejoin_maybe(
2832                         rkcg,
2833                         rd_true/*assignment is lost*/,
2834                         rd_true/*this consumer is initiating*/,
2835                         "OffsetCommit error: Unknown member");
2836                 break;
2837 
2838         case RD_KAFKA_RESP_ERR_ILLEGAL_GENERATION:
2839                 /* Revoke assignment and rebalance on illegal generation */
2840                 rk->rk_cgrp->rkcg_generation_id = -1;
2841                 rd_kafka_cgrp_revoke_all_rejoin_maybe(
2842                         rkcg,
2843                         rd_true/*assignment is lost*/,
2844                         rd_true/*this consumer is initiating*/,
2845                         "OffsetCommit error: Illegal generation");
2846                 break;
2847 
2848         case RD_KAFKA_RESP_ERR__IN_PROGRESS:
2849                 return; /* Retrying */
2850 
2851         case RD_KAFKA_RESP_ERR_NOT_COORDINATOR:
2852         case RD_KAFKA_RESP_ERR_COORDINATOR_NOT_AVAILABLE:
2853         case RD_KAFKA_RESP_ERR__TRANSPORT:
2854                 /* The coordinator is not available, defer the offset commit
2855                  * to when the coordinator is back up again. */
2856 
2857                 /* Future-proofing, see timeout_scan(). */
2858                 rd_kafka_assert(NULL, err != RD_KAFKA_RESP_ERR__WAIT_COORD);
2859 
2860                 if (rd_kafka_cgrp_defer_offset_commit(rkcg, rko_orig,
2861                                                       rd_kafka_err2str(err)))
2862                         return;
2863                 break;
2864 
2865         default:
2866                 break;
2867         }
2868 
2869         /* Call on_commit interceptors */
2870         if (err != RD_KAFKA_RESP_ERR__NO_OFFSET &&
2871             err != RD_KAFKA_RESP_ERR__DESTROY &&
2872             offsets && offsets->cnt > 0)
2873                 rd_kafka_interceptors_on_commit(rk, offsets, err);
2874 
2875         /* Keep track of outstanding commits */
2876         rd_kafka_assert(NULL, rk->rk_consumer.wait_commit_cnt > 0);
2877         rk->rk_consumer.wait_commit_cnt--;
2878 
2879         if (err == RD_KAFKA_RESP_ERR__DESTROY) {
2880                 rd_kafka_op_destroy(rko_orig);
2881                 return; /* Handle is terminating, this op may be handled
2882                          * by the op enq()ing thread rather than the
2883                          * rdkafka main thread, it is not safe to
2884                          * continue here. */
2885         }
2886 
2887         /* Update the committed offsets for each partition's rktp. */
2888         errcnt = rd_kafka_cgrp_update_committed_offsets(rkcg, err, offsets);
2889 
2890         if (err != RD_KAFKA_RESP_ERR__DESTROY &&
2891             !(err == RD_KAFKA_RESP_ERR__NO_OFFSET &&
2892               rko_orig->rko_u.offset_commit.silent_empty)) {
2893                 /* Propagate commit results (success or permanent error)
2894                  * unless we're shutting down or commit was empty. */
2895                 rd_kafka_cgrp_propagate_commit_result(rkcg, rko_orig,
2896                                                       err, errcnt, offsets);
2897         }
2898 
2899         rd_kafka_op_destroy(rko_orig);
2900 
2901         /* If the current state was waiting for commits to finish we'll try to
2902          * transition to the next state. */
2903         if (rk->rk_consumer.wait_commit_cnt == 0)
2904                 rd_kafka_assignment_serve(rk);
2905 
2906 
2907 }
2908 
2909 
rd_kafka_topic_partition_has_absolute_offset(const rd_kafka_topic_partition_t * rktpar,void * opaque)2910 static size_t rd_kafka_topic_partition_has_absolute_offset (
2911         const rd_kafka_topic_partition_t *rktpar, void *opaque) {
2912         return rktpar->offset >= 0 ? 1 : 0;
2913 }
2914 
2915 
2916 /**
2917  * Commit a list of offsets.
2918  * Reuse the orignating 'rko' for the async reply.
2919  * 'rko->rko_payload' should either by NULL (to commit current assignment) or
2920  * a proper topic_partition_list_t with offsets to commit.
2921  * The offset list will be altered.
2922  *
2923  * \p rko...silent_empty: if there are no offsets to commit bail out
2924  *                        silently without posting an op on the reply queue.
2925  * \p set_offsets: set offsets in rko->rko_u.offset_commit.partitions from
2926  *                 the rktp's stored offset.
2927  *
2928  * Locality: cgrp thread
2929  */
rd_kafka_cgrp_offsets_commit(rd_kafka_cgrp_t * rkcg,rd_kafka_op_t * rko,rd_bool_t set_offsets,const char * reason)2930 static void rd_kafka_cgrp_offsets_commit (rd_kafka_cgrp_t *rkcg,
2931                                           rd_kafka_op_t *rko,
2932                                           rd_bool_t set_offsets,
2933                                           const char *reason) {
2934 	rd_kafka_topic_partition_list_t *offsets;
2935 	rd_kafka_resp_err_t err;
2936         int valid_offsets = 0;
2937         int r;
2938         rd_kafka_buf_t *rkbuf;
2939         rd_kafka_op_t *reply;
2940 
2941         if (!(rko->rko_flags & RD_KAFKA_OP_F_REPROCESS)) {
2942                 /* wait_commit_cnt has already been increased for
2943                  * reprocessed ops. */
2944                 rkcg->rkcg_rk->rk_consumer.wait_commit_cnt++;
2945         }
2946 
2947         /* If offsets is NULL we shall use the current assignment
2948          * (not the group assignment). */
2949         if (!rko->rko_u.offset_commit.partitions &&
2950             rkcg->rkcg_rk->rk_consumer.assignment.all->cnt > 0) {
2951                 if (rd_kafka_cgrp_assignment_is_lost(rkcg)) {
2952                         /* Not committing assigned offsets: assignment lost */
2953                         err = RD_KAFKA_RESP_ERR__ASSIGNMENT_LOST;
2954                         goto err;
2955                 }
2956 
2957                 rko->rko_u.offset_commit.partitions =
2958                         rd_kafka_topic_partition_list_copy(
2959                                 rkcg->rkcg_rk->rk_consumer.assignment.all);
2960         }
2961 
2962 	offsets = rko->rko_u.offset_commit.partitions;
2963 
2964         if (offsets) {
2965                 /* Set offsets to commits */
2966                 if (set_offsets)
2967                         rd_kafka_topic_partition_list_set_offsets(
2968 			rkcg->rkcg_rk, rko->rko_u.offset_commit.partitions, 1,
2969 			RD_KAFKA_OFFSET_INVALID/* def */,
2970 			1 /* is commit */);
2971 
2972                 /*  Check the number of valid offsets to commit. */
2973                 valid_offsets = (int)rd_kafka_topic_partition_list_sum(
2974                         offsets,
2975                         rd_kafka_topic_partition_has_absolute_offset, NULL);
2976         }
2977 
2978         if (rd_kafka_fatal_error_code(rkcg->rkcg_rk)) {
2979                 /* Commits are not allowed when a fatal error has been raised */
2980                 err = RD_KAFKA_RESP_ERR__FATAL;
2981                 goto err;
2982         }
2983 
2984 	if (!valid_offsets) {
2985                 /* No valid offsets */
2986                 err = RD_KAFKA_RESP_ERR__NO_OFFSET;
2987                 goto err;
2988 	}
2989 
2990         if (rkcg->rkcg_state != RD_KAFKA_CGRP_STATE_UP) {
2991                 rd_kafka_dbg(rkcg->rkcg_rk, CONSUMER|RD_KAFKA_DBG_CGRP,
2992                              "COMMIT",
2993                              "Deferring \"%s\" offset commit "
2994                              "for %d partition(s) in state %s: "
2995                              "no coordinator available",
2996                              reason, valid_offsets,
2997                              rd_kafka_cgrp_state_names[rkcg->rkcg_state]);
2998 
2999 		if (rd_kafka_cgrp_defer_offset_commit(rkcg, rko, reason))
3000 			return;
3001 
3002 		err = RD_KAFKA_RESP_ERR__WAIT_COORD;
3003                 goto err;
3004         }
3005 
3006 
3007         rd_rkb_dbg(rkcg->rkcg_coord, CONSUMER|RD_KAFKA_DBG_CGRP, "COMMIT",
3008                    "Committing offsets for %d partition(s) with "
3009                    "generation-id %" PRId32 " in join-state %s: %s",
3010                    valid_offsets, rkcg->rkcg_generation_id,
3011                    rd_kafka_cgrp_join_state_names[rkcg->rkcg_join_state],
3012                    reason);
3013 
3014         /* Send OffsetCommit */
3015         r = rd_kafka_OffsetCommitRequest(
3016                 rkcg->rkcg_coord, rkcg, offsets,
3017                 RD_KAFKA_REPLYQ(rkcg->rkcg_ops, 0),
3018                 rd_kafka_cgrp_op_handle_OffsetCommit, rko,
3019                 reason);
3020 
3021         /* Must have valid offsets to commit if we get here */
3022         rd_kafka_assert(NULL, r != 0);
3023 
3024         return;
3025 
3026  err:
3027         if (err != RD_KAFKA_RESP_ERR__NO_OFFSET)
3028                 rd_kafka_dbg(rkcg->rkcg_rk, CONSUMER|RD_KAFKA_DBG_CGRP,
3029                              "COMMIT",
3030                              "OffsetCommit internal error: %s",
3031                              rd_kafka_err2str(err));
3032 
3033         /* Propagate error through dummy buffer object that will
3034          * call the response handler from the main loop, avoiding
3035          * any recursive calls from op_handle_OffsetCommit ->
3036          * assignment_serve() and then back to cgrp_assigned_offsets_commit() */
3037 
3038         reply = rd_kafka_op_new(RD_KAFKA_OP_RECV_BUF);
3039         reply->rko_rk = rkcg->rkcg_rk; /* Set rk since the rkbuf will not
3040                                         * have a rkb to reach it. */
3041         reply->rko_err = err;
3042 
3043         rkbuf = rd_kafka_buf_new(0, 0);
3044         rkbuf->rkbuf_cb = rd_kafka_cgrp_op_handle_OffsetCommit;
3045         rkbuf->rkbuf_opaque = rko;
3046         reply->rko_u.xbuf.rkbuf = rkbuf;
3047 
3048         rd_kafka_q_enq(rkcg->rkcg_ops, reply);
3049 
3050 }
3051 
3052 
3053 /**
3054  * @brief Commit offsets assigned partitions.
3055  *
3056  * If \p offsets is NULL all partitions in the current assignment will be used.
3057  * If \p set_offsets is true the offsets to commit will be read from the
3058  * rktp's stored offset rather than the .offset fields in \p offsets.
3059  *
3060  * rkcg_wait_commit_cnt will be increased accordingly.
3061  */
3062 void
rd_kafka_cgrp_assigned_offsets_commit(rd_kafka_cgrp_t * rkcg,const rd_kafka_topic_partition_list_t * offsets,rd_bool_t set_offsets,const char * reason)3063 rd_kafka_cgrp_assigned_offsets_commit (
3064         rd_kafka_cgrp_t *rkcg,
3065         const rd_kafka_topic_partition_list_t *offsets,
3066         rd_bool_t set_offsets,
3067         const char *reason) {
3068         rd_kafka_op_t *rko;
3069 
3070         if (rd_kafka_cgrp_assignment_is_lost(rkcg)) {
3071                 rd_kafka_dbg(rkcg->rkcg_rk, CGRP, "AUTOCOMMIT",
3072                              "Group \"%s\": not committing assigned offsets: "
3073                              "assignment lost",
3074                              rkcg->rkcg_group_id->str);
3075                 return;
3076         }
3077 
3078 	rko = rd_kafka_op_new(RD_KAFKA_OP_OFFSET_COMMIT);
3079         rko->rko_u.offset_commit.reason = rd_strdup(reason);
3080         if (rkcg->rkcg_rk->rk_conf.enabled_events &
3081             RD_KAFKA_EVENT_OFFSET_COMMIT) {
3082                 /* Send results to application */
3083 		rd_kafka_op_set_replyq(rko, rkcg->rkcg_rk->rk_rep, 0);
3084 		rko->rko_u.offset_commit.cb =
3085 			rkcg->rkcg_rk->rk_conf.offset_commit_cb; /*maybe NULL*/
3086 		rko->rko_u.offset_commit.opaque = rkcg->rkcg_rk->rk_conf.opaque;
3087 	}
3088         /* NULL partitions means current assignment */
3089         if (offsets)
3090                 rko->rko_u.offset_commit.partitions =
3091                         rd_kafka_topic_partition_list_copy(offsets);
3092 	rko->rko_u.offset_commit.silent_empty = 1;
3093         rd_kafka_cgrp_offsets_commit(rkcg, rko, set_offsets, reason);
3094 }
3095 
3096 
3097 /**
3098  * auto.commit.interval.ms commit timer callback.
3099  *
3100  * Trigger a group offset commit.
3101  *
3102  * Locality: rdkafka main thread
3103  */
rd_kafka_cgrp_offset_commit_tmr_cb(rd_kafka_timers_t * rkts,void * arg)3104 static void rd_kafka_cgrp_offset_commit_tmr_cb (rd_kafka_timers_t *rkts,
3105                                                 void *arg) {
3106         rd_kafka_cgrp_t *rkcg = arg;
3107 
3108         /* Don't attempt auto commit when rebalancing or initializing since
3109          * the rkcg_generation_id is most likely in flux. */
3110         if (rkcg->rkcg_join_state != RD_KAFKA_CGRP_JOIN_STATE_STEADY)
3111                 return;
3112 
3113         rd_kafka_cgrp_assigned_offsets_commit(rkcg, NULL,
3114                                               rd_true/*set offsets*/,
3115                                               "cgrp auto commit timer");
3116 }
3117 
3118 
3119 /**
3120  * @brief If rkcg_next_subscription or rkcg_next_unsubscribe are
3121  *        set, trigger a state change so that they are applied from the
3122  *        main dispatcher.
3123  *
3124  * @returns rd_true if a subscribe was scheduled, else false.
3125  */
3126 static rd_bool_t
rd_kafka_trigger_waiting_subscribe_maybe(rd_kafka_cgrp_t * rkcg)3127 rd_kafka_trigger_waiting_subscribe_maybe (rd_kafka_cgrp_t *rkcg) {
3128 
3129         if (rkcg->rkcg_next_subscription || rkcg->rkcg_next_unsubscribe) {
3130                 /* Skip the join backoff */
3131                 rd_interval_reset(&rkcg->rkcg_join_intvl);
3132                 rd_kafka_cgrp_rejoin(rkcg, "Applying next subscription");
3133                 return rd_true;
3134         }
3135 
3136         return rd_false;
3137 }
3138 
3139 
3140 /**
3141  * @brief Incrementally add to an existing partition assignment
3142  *        May update \p partitions but will not hold on to it.
3143  *
3144  * @returns an error object or NULL on success.
3145  */
3146 static rd_kafka_error_t *
rd_kafka_cgrp_incremental_assign(rd_kafka_cgrp_t * rkcg,rd_kafka_topic_partition_list_t * partitions)3147 rd_kafka_cgrp_incremental_assign (rd_kafka_cgrp_t *rkcg,
3148                                   rd_kafka_topic_partition_list_t
3149                                   *partitions) {
3150         rd_kafka_error_t *error;
3151 
3152         error = rd_kafka_assignment_add(rkcg->rkcg_rk, partitions);
3153         if (error)
3154                 return error;
3155 
3156         if (rkcg->rkcg_join_state ==
3157             RD_KAFKA_CGRP_JOIN_STATE_WAIT_ASSIGN_CALL) {
3158                 rd_kafka_assignment_resume(rkcg->rkcg_rk,
3159                                            "incremental assign called");
3160                 rd_kafka_cgrp_set_join_state(
3161                         rkcg,
3162                         RD_KAFKA_CGRP_JOIN_STATE_STEADY);
3163 
3164                 if (rkcg->rkcg_subscription) {
3165                         /* If using subscribe(), start a timer to enforce
3166                          * `max.poll.interval.ms`.
3167                          * Instead of restarting the timer on each ...poll()
3168                          * call, which would be costly (once per message),
3169                          * set up an intervalled timer that checks a timestamp
3170                          * (that is updated on ..poll()).
3171                          * The timer interval is 2 hz. */
3172                         rd_kafka_timer_start(
3173                                 &rkcg->rkcg_rk->rk_timers,
3174                                 &rkcg->rkcg_max_poll_interval_tmr,
3175                                 500 * 1000ll /* 500ms */,
3176                                 rd_kafka_cgrp_max_poll_interval_check_tmr_cb,
3177                                 rkcg);
3178                 }
3179         }
3180 
3181         rd_kafka_cgrp_assignment_clear_lost(rkcg,
3182                                             "incremental_assign() called");
3183 
3184         return NULL;
3185 }
3186 
3187 
3188 /**
3189  * @brief Incrementally remove partitions from an existing partition
3190  *        assignment. May update \p partitions but will not hold on
3191  *        to it.
3192  *
3193  * @remark This method does not unmark the current assignment as lost
3194  *         (if lost). That happens following _incr_unassign_done and
3195  *         a group-rejoin initiated.
3196  *
3197  * @returns An error object or NULL on success.
3198  */
3199 static rd_kafka_error_t *
rd_kafka_cgrp_incremental_unassign(rd_kafka_cgrp_t * rkcg,rd_kafka_topic_partition_list_t * partitions)3200 rd_kafka_cgrp_incremental_unassign (rd_kafka_cgrp_t *rkcg,
3201                                     rd_kafka_topic_partition_list_t
3202                                     *partitions) {
3203         rd_kafka_error_t *error;
3204 
3205         error = rd_kafka_assignment_subtract(rkcg->rkcg_rk, partitions);
3206         if (error)
3207                 return error;
3208 
3209         if (rkcg->rkcg_join_state ==
3210             RD_KAFKA_CGRP_JOIN_STATE_WAIT_UNASSIGN_CALL) {
3211                 rd_kafka_assignment_resume(rkcg->rkcg_rk,
3212                                            "incremental unassign called");
3213                 rd_kafka_cgrp_set_join_state(
3214                         rkcg,
3215                         RD_KAFKA_CGRP_JOIN_STATE_WAIT_INCR_UNASSIGN_TO_COMPLETE);
3216         }
3217 
3218         rd_kafka_cgrp_assignment_clear_lost(rkcg,
3219                                             "incremental_unassign() called");
3220 
3221         return NULL;
3222 }
3223 
3224 
3225 /**
3226  * @brief Call when all incremental unassign operations are done to transition
3227  *        to the next state.
3228  */
rd_kafka_cgrp_incr_unassign_done(rd_kafka_cgrp_t * rkcg)3229 static void rd_kafka_cgrp_incr_unassign_done (rd_kafka_cgrp_t *rkcg) {
3230 
3231         /* If this action was underway when a terminate was initiated, it will
3232          * be left to complete. Now that's done, unassign all partitions */
3233         if (rkcg->rkcg_flags & RD_KAFKA_CGRP_F_TERMINATE) {
3234                 rd_kafka_dbg(rkcg->rkcg_rk, CGRP, "UNASSIGN",
3235                              "Group \"%s\" is terminating, initiating full "
3236                              "unassign", rkcg->rkcg_group_id->str);
3237                 rd_kafka_cgrp_unassign(rkcg);
3238                 return;
3239         }
3240 
3241         if (rkcg->rkcg_rebalance_incr_assignment) {
3242 
3243                 /* This incremental unassign was part of a normal rebalance
3244                  * (in which the revoke set was not empty). Immediately
3245                  * trigger the assign that follows this revoke. The protocol
3246                  * dictates this should occur even if the new assignment
3247                  * set is empty.
3248                  *
3249                  * Also, since this rebalance had some revoked partitions,
3250                  * a re-join should occur following the assign.
3251                  */
3252 
3253                 rd_kafka_rebalance_op_incr(
3254                         rkcg,
3255                         RD_KAFKA_RESP_ERR__ASSIGN_PARTITIONS,
3256                         rkcg->rkcg_rebalance_incr_assignment,
3257                         rd_true/*rejoin following assign*/,
3258                         "cooperative assign after revoke");
3259 
3260                 rd_kafka_topic_partition_list_destroy(
3261                         rkcg->rkcg_rebalance_incr_assignment);
3262                 rkcg->rkcg_rebalance_incr_assignment = NULL;
3263 
3264                 /* Note: rkcg_rebalance_rejoin is actioned / reset in
3265                  * rd_kafka_cgrp_incremental_assign call */
3266 
3267         } else if (rkcg->rkcg_rebalance_rejoin) {
3268                 rkcg->rkcg_rebalance_rejoin = rd_false;
3269 
3270                 /* There are some cases (lost partitions), where a rejoin
3271                  * should occur immediately following the unassign (this
3272                  * is not the case under normal conditions), in which case
3273                  * the rejoin flag will be set. */
3274 
3275                 /* Skip the join backoff */
3276                 rd_interval_reset(&rkcg->rkcg_join_intvl);
3277 
3278                 rd_kafka_cgrp_rejoin(rkcg, "Incremental unassignment done");
3279 
3280         } else if (!rd_kafka_trigger_waiting_subscribe_maybe(rkcg)) {
3281                 /* After this incremental unassignment we're now back in
3282                  * a steady state. */
3283                 rd_kafka_cgrp_set_join_state(rkcg,
3284                                              RD_KAFKA_CGRP_JOIN_STATE_STEADY);
3285 
3286         }
3287 }
3288 
3289 
3290 /**
3291  * @brief Call when all absolute (non-incremental) unassign operations are done
3292  *        to transition to the next state.
3293  */
rd_kafka_cgrp_unassign_done(rd_kafka_cgrp_t * rkcg)3294 static void rd_kafka_cgrp_unassign_done (rd_kafka_cgrp_t *rkcg) {
3295         rd_kafka_dbg(rkcg->rkcg_rk, CGRP, "UNASSIGN",
3296                      "Group \"%s\": unassign done in state %s "
3297                      "(join-state %s)",
3298                      rkcg->rkcg_group_id->str,
3299                      rd_kafka_cgrp_state_names[rkcg->rkcg_state],
3300                      rd_kafka_cgrp_join_state_names[rkcg->rkcg_join_state]);
3301 
3302         /* Leave group, if desired. */
3303         rd_kafka_cgrp_leave_maybe(rkcg);
3304 
3305         if (rkcg->rkcg_join_state !=
3306             RD_KAFKA_CGRP_JOIN_STATE_WAIT_UNASSIGN_TO_COMPLETE)
3307                 return;
3308 
3309         /* All partitions are unassigned. Rejoin the group. */
3310 
3311         /* Skip the join backoff */
3312         rd_interval_reset(&rkcg->rkcg_join_intvl);
3313 
3314         rd_kafka_cgrp_rejoin(rkcg, "Unassignment done");
3315 }
3316 
3317 
3318 
3319 /**
3320  * @brief Called from assignment code when all in progress
3321  *        assignment/unassignment operations are done, allowing the cgrp to
3322  *        transition to other states if needed.
3323  *
3324  * @remark This may be called spontaneously without any need for a state
3325  *         change in the rkcg.
3326  */
rd_kafka_cgrp_assignment_done(rd_kafka_cgrp_t * rkcg)3327 void rd_kafka_cgrp_assignment_done (rd_kafka_cgrp_t *rkcg) {
3328         rd_kafka_dbg(rkcg->rkcg_rk, CGRP, "ASSIGNDONE",
3329                      "Group \"%s\": "
3330                      "assignment operations done in join-state %s "
3331                      "(rebalance rejoin=%s)",
3332                      rkcg->rkcg_group_id->str,
3333                      rd_kafka_cgrp_join_state_names[rkcg->rkcg_join_state],
3334                      RD_STR_ToF(rkcg->rkcg_rebalance_rejoin));
3335 
3336         switch (rkcg->rkcg_join_state)
3337         {
3338         case RD_KAFKA_CGRP_JOIN_STATE_WAIT_UNASSIGN_TO_COMPLETE:
3339                 rd_kafka_cgrp_unassign_done(rkcg);
3340                 break;
3341 
3342         case RD_KAFKA_CGRP_JOIN_STATE_WAIT_INCR_UNASSIGN_TO_COMPLETE:
3343                 rd_kafka_cgrp_incr_unassign_done(rkcg);
3344                 break;
3345 
3346         case RD_KAFKA_CGRP_JOIN_STATE_STEADY:
3347                 /* If an updated/next subscription is available, schedule it. */
3348                 if (rd_kafka_trigger_waiting_subscribe_maybe(rkcg))
3349                         break;
3350 
3351                 if (rkcg->rkcg_rebalance_rejoin) {
3352                         rkcg->rkcg_rebalance_rejoin = rd_false;
3353 
3354                         /* Skip the join backoff */
3355                         rd_interval_reset(&rkcg->rkcg_join_intvl);
3356 
3357                         rd_kafka_cgrp_rejoin(
3358                                 rkcg,
3359                                 "rejoining group to redistribute "
3360                                 "previously owned partitions to other "
3361                                 "group members");
3362                         break;
3363                 }
3364 
3365                 /* FALLTHRU */
3366 
3367         case RD_KAFKA_CGRP_JOIN_STATE_INIT:
3368                 /* Check if cgrp is trying to terminate, which is safe to do
3369                  * in these two states. Otherwise we'll need to wait for
3370                  * the current state to decommission. */
3371                 rd_kafka_cgrp_try_terminate(rkcg);
3372                 break;
3373 
3374         default:
3375                 break;
3376         }
3377 }
3378 
3379 
3380 
3381 /**
3382  * @brief Remove existing assignment.
3383  */
3384 static rd_kafka_error_t *
rd_kafka_cgrp_unassign(rd_kafka_cgrp_t * rkcg)3385 rd_kafka_cgrp_unassign (rd_kafka_cgrp_t *rkcg) {
3386 
3387         rd_kafka_assignment_clear(rkcg->rkcg_rk);
3388 
3389         if (rkcg->rkcg_join_state ==
3390             RD_KAFKA_CGRP_JOIN_STATE_WAIT_UNASSIGN_CALL) {
3391                 rd_kafka_assignment_resume(rkcg->rkcg_rk, "unassign called");
3392                 rd_kafka_cgrp_set_join_state(
3393                         rkcg,
3394                         RD_KAFKA_CGRP_JOIN_STATE_WAIT_UNASSIGN_TO_COMPLETE);
3395         }
3396 
3397         rd_kafka_cgrp_assignment_clear_lost(rkcg, "unassign() called");
3398 
3399         return NULL;
3400 }
3401 
3402 
3403 /**
3404  * @brief Set new atomic partition assignment
3405  *        May update \p assignment but will not hold on to it.
3406  *
3407  * @returns NULL on success or an error if a fatal error has been raised.
3408  */
3409 static rd_kafka_error_t *
rd_kafka_cgrp_assign(rd_kafka_cgrp_t * rkcg,rd_kafka_topic_partition_list_t * assignment)3410 rd_kafka_cgrp_assign (rd_kafka_cgrp_t *rkcg,
3411                       rd_kafka_topic_partition_list_t *assignment) {
3412         rd_kafka_error_t *error;
3413 
3414         rd_kafka_dbg(rkcg->rkcg_rk, CGRP|RD_KAFKA_DBG_CONSUMER, "ASSIGN",
3415                      "Group \"%s\": new assignment of %d partition(s) "
3416                      "in join-state %s",
3417                      rkcg->rkcg_group_id->str,
3418                      assignment ? assignment->cnt : 0,
3419                      rd_kafka_cgrp_join_state_names[rkcg->rkcg_join_state]);
3420 
3421         /* Clear existing assignment, if any, and serve its removals. */
3422         if (rd_kafka_assignment_clear(rkcg->rkcg_rk))
3423                 rd_kafka_assignment_serve(rkcg->rkcg_rk);
3424 
3425         error = rd_kafka_assignment_add(rkcg->rkcg_rk, assignment);
3426         if (error)
3427                 return error;
3428 
3429         rd_kafka_cgrp_assignment_clear_lost(rkcg, "assign() called");
3430 
3431         if (rkcg->rkcg_join_state ==
3432             RD_KAFKA_CGRP_JOIN_STATE_WAIT_ASSIGN_CALL) {
3433                 rd_kafka_assignment_resume(rkcg->rkcg_rk, "assign called");
3434                 rd_kafka_cgrp_set_join_state(
3435                         rkcg,
3436                         RD_KAFKA_CGRP_JOIN_STATE_STEADY);
3437 
3438                 if (rkcg->rkcg_subscription) {
3439                         /* If using subscribe(), start a timer to enforce
3440                          * `max.poll.interval.ms`.
3441                          * Instead of restarting the timer on each ...poll()
3442                          * call, which would be costly (once per message),
3443                          * set up an intervalled timer that checks a timestamp
3444                          * (that is updated on ..poll()).
3445                          * The timer interval is 2 hz. */
3446                         rd_kafka_timer_start(
3447                                 &rkcg->rkcg_rk->rk_timers,
3448                                 &rkcg->rkcg_max_poll_interval_tmr,
3449                                 500 * 1000ll /* 500ms */,
3450                                 rd_kafka_cgrp_max_poll_interval_check_tmr_cb,
3451                                 rkcg);
3452                 }
3453         }
3454 
3455         return NULL;
3456 }
3457 
3458 
3459 
3460 /**
3461  * @brief Construct a typed map from list \p rktparlist with key corresponding
3462  *        to each element in the list and value NULL.
3463  *
3464  * @remark \p rktparlist may be NULL.
3465  */
3466 static map_toppar_member_info_t *
rd_kafka_toppar_list_to_toppar_member_info_map(rd_kafka_topic_partition_list_t * rktparlist)3467 rd_kafka_toppar_list_to_toppar_member_info_map (rd_kafka_topic_partition_list_t
3468                                                 *rktparlist) {
3469         map_toppar_member_info_t *map = rd_calloc(1, sizeof(*map));
3470         const rd_kafka_topic_partition_t *rktpar;
3471 
3472         RD_MAP_INIT(
3473                 map,
3474                 rktparlist ? rktparlist->cnt : 0,
3475                 rd_kafka_topic_partition_cmp,
3476                 rd_kafka_topic_partition_hash,
3477                 rd_kafka_topic_partition_destroy_free,
3478                 PartitionMemberInfo_free);
3479 
3480         if (!rktparlist)
3481                 return map;
3482 
3483         RD_KAFKA_TPLIST_FOREACH(rktpar, rktparlist)
3484                 RD_MAP_SET(map,
3485                            rd_kafka_topic_partition_copy(rktpar),
3486                            PartitionMemberInfo_new(NULL, rd_false));
3487 
3488         return map;
3489 }
3490 
3491 
3492 /**
3493  * @brief Construct a toppar list from map \p map with elements corresponding
3494  *        to the keys of \p map.
3495  */
3496 static rd_kafka_topic_partition_list_t *
rd_kafka_toppar_member_info_map_to_list(map_toppar_member_info_t * map)3497 rd_kafka_toppar_member_info_map_to_list (map_toppar_member_info_t *map) {
3498         const rd_kafka_topic_partition_t *k;
3499         rd_kafka_topic_partition_list_t *list =
3500                 rd_kafka_topic_partition_list_new((int)RD_MAP_CNT(map));
3501 
3502         RD_MAP_FOREACH_KEY(k, map) {
3503                 rd_kafka_topic_partition_list_add(list,
3504                                                   k->topic,
3505                                                   k->partition);
3506         }
3507 
3508         return list;
3509 }
3510 
3511 
3512 /**
3513  * @brief Handle a rebalance-triggered partition assignment
3514  *        (COOPERATIVE case).
3515  */
3516 static void
rd_kafka_cgrp_handle_assignment_cooperative(rd_kafka_cgrp_t * rkcg,rd_kafka_topic_partition_list_t * assignment)3517 rd_kafka_cgrp_handle_assignment_cooperative (rd_kafka_cgrp_t *rkcg,
3518                                              rd_kafka_topic_partition_list_t
3519                                              *assignment) {
3520         map_toppar_member_info_t *new_assignment_set;
3521         map_toppar_member_info_t *old_assignment_set;
3522         map_toppar_member_info_t *newly_added_set;
3523         map_toppar_member_info_t *revoked_set;
3524         rd_kafka_topic_partition_list_t *newly_added;
3525         rd_kafka_topic_partition_list_t *revoked;
3526 
3527         new_assignment_set =
3528                 rd_kafka_toppar_list_to_toppar_member_info_map(assignment);
3529 
3530         old_assignment_set =
3531                 rd_kafka_toppar_list_to_toppar_member_info_map(
3532                         rkcg->rkcg_group_assignment);
3533 
3534         newly_added_set =
3535                 rd_kafka_member_partitions_subtract(
3536                         new_assignment_set, old_assignment_set);
3537         revoked_set =
3538                 rd_kafka_member_partitions_subtract(
3539                         old_assignment_set, new_assignment_set);
3540 
3541         newly_added = rd_kafka_toppar_member_info_map_to_list(newly_added_set);
3542         revoked = rd_kafka_toppar_member_info_map_to_list(revoked_set);
3543 
3544         rd_kafka_dbg(rkcg->rkcg_rk, CGRP, "COOPASSIGN",
3545                      "Group \"%s\": incremental assignment: %d newly added, "
3546                      "%d revoked partitions based on assignment of %d "
3547                      "partitions",
3548                      rkcg->rkcg_group_id->str,
3549                      newly_added->cnt,
3550                      revoked->cnt,
3551                      assignment->cnt);
3552 
3553         if (revoked->cnt > 0) {
3554                 /* Setting rkcg_incr_assignment causes a follow on incremental
3555                  * assign rebalance op after completion of this incremental
3556                  * unassign op. */
3557 
3558                 rkcg->rkcg_rebalance_incr_assignment = newly_added;
3559                 newly_added = NULL;
3560 
3561                 rd_kafka_rebalance_op_incr(
3562                         rkcg,
3563                         RD_KAFKA_RESP_ERR__REVOKE_PARTITIONS,
3564                         revoked, rd_false/*no rejoin following
3565                         unassign*/,  "sync group revoke");
3566 
3567         } else {
3568                 /* There are no revoked partitions - trigger the assign
3569                  * rebalance op, and flag that the group does not need
3570                  * to be re-joined */
3571 
3572                 rd_kafka_rebalance_op_incr(rkcg,
3573                         RD_KAFKA_RESP_ERR__ASSIGN_PARTITIONS,
3574                         newly_added,
3575                         rd_false/*no rejoin following assign*/,
3576                         "sync group assign");
3577         }
3578 
3579         if (newly_added)
3580                 rd_kafka_topic_partition_list_destroy(newly_added);
3581         rd_kafka_topic_partition_list_destroy(revoked);
3582         RD_MAP_DESTROY_AND_FREE(revoked_set);
3583         RD_MAP_DESTROY_AND_FREE(newly_added_set);
3584         RD_MAP_DESTROY_AND_FREE(old_assignment_set);
3585         RD_MAP_DESTROY_AND_FREE(new_assignment_set);
3586 }
3587 
3588 
3589 /**
3590  * @brief Sets or clears the group's partition assignment for our consumer.
3591  *
3592  * Will replace the current group assignment, if any.
3593  */
rd_kafka_cgrp_group_assignment_set(rd_kafka_cgrp_t * rkcg,const rd_kafka_topic_partition_list_t * partitions)3594 static void rd_kafka_cgrp_group_assignment_set (
3595         rd_kafka_cgrp_t *rkcg,
3596         const rd_kafka_topic_partition_list_t *partitions) {
3597 
3598         if (rkcg->rkcg_group_assignment)
3599                 rd_kafka_topic_partition_list_destroy(
3600                         rkcg->rkcg_group_assignment);
3601 
3602         if (partitions) {
3603                 rkcg->rkcg_group_assignment =
3604                         rd_kafka_topic_partition_list_copy(partitions);
3605                 rd_kafka_topic_partition_list_sort_by_topic(
3606                         rkcg->rkcg_group_assignment);
3607                 rd_kafka_dbg(rkcg->rkcg_rk, CGRP, "ASSIGNMENT",
3608                              "Group \"%s\": setting group assignment to %d "
3609                              "partition(s)",
3610                              rkcg->rkcg_group_id->str,
3611                              rkcg->rkcg_group_assignment->cnt);
3612 
3613         } else {
3614                 rd_kafka_dbg(rkcg->rkcg_rk, CGRP, "ASSIGNMENT",
3615                              "Group \"%s\": clearing group assignment",
3616                              rkcg->rkcg_group_id->str);
3617                 rkcg->rkcg_group_assignment = NULL;
3618         }
3619 
3620         rd_kafka_wrlock(rkcg->rkcg_rk);
3621         rkcg->rkcg_c.assignment_size = rkcg->rkcg_group_assignment ?
3622                 rkcg->rkcg_group_assignment->cnt : 0;
3623         rd_kafka_wrunlock(rkcg->rkcg_rk);
3624 
3625         if (rkcg->rkcg_group_assignment)
3626                 rd_kafka_topic_partition_list_log(
3627                         rkcg->rkcg_rk, "GRPASSIGNMENT", RD_KAFKA_DBG_CGRP,
3628                         rkcg->rkcg_group_assignment);
3629 }
3630 
3631 
3632 /**
3633  * @brief Adds or removes \p partitions from the current group assignment.
3634  *
3635  * @param add Whether to add or remove the partitions.
3636  *
3637  * @remark The added partitions must not already be on the group assignment,
3638  *         and the removed partitions must be on the group assignment.
3639  *
3640  * To be used with incremental rebalancing.
3641  *
3642  */
rd_kafka_cgrp_group_assignment_modify(rd_kafka_cgrp_t * rkcg,rd_bool_t add,const rd_kafka_topic_partition_list_t * partitions)3643 static void rd_kafka_cgrp_group_assignment_modify (
3644         rd_kafka_cgrp_t *rkcg,
3645         rd_bool_t add,
3646         const rd_kafka_topic_partition_list_t *partitions) {
3647         const rd_kafka_topic_partition_t *rktpar;
3648         int precnt;
3649         rd_kafka_dbg(rkcg->rkcg_rk, CGRP, "ASSIGNMENT",
3650                      "Group \"%s\": %d partition(s) being %s group assignment "
3651                      "of %d partition(s)",
3652                      rkcg->rkcg_group_id->str,
3653                      partitions->cnt,
3654                      add ? "added to" : "removed from",
3655                      rkcg->rkcg_group_assignment ?
3656                      rkcg->rkcg_group_assignment->cnt : 0);
3657 
3658         if (partitions == rkcg->rkcg_group_assignment) {
3659                 /* \p partitions is the actual assignment, which
3660                  * must mean it is all to be removed.
3661                  * Short-cut directly to set(NULL). */
3662                 rd_assert(!add);
3663                 rd_kafka_cgrp_group_assignment_set(rkcg, NULL);
3664                 return;
3665         }
3666 
3667         if (add &&
3668             (!rkcg->rkcg_group_assignment ||
3669              rkcg->rkcg_group_assignment->cnt == 0)) {
3670                 /* Adding to an empty assignment is a set operation. */
3671                 rd_kafka_cgrp_group_assignment_set(rkcg, partitions);
3672                 return;
3673         }
3674 
3675         if (!add) {
3676                 /* Removing from an empty assignment is illegal. */
3677                 rd_assert(rkcg->rkcg_group_assignment != NULL &&
3678                           rkcg->rkcg_group_assignment->cnt > 0);
3679         }
3680 
3681 
3682         precnt = rkcg->rkcg_group_assignment->cnt;
3683         RD_KAFKA_TPLIST_FOREACH(rktpar, partitions) {
3684                 int idx;
3685 
3686                 idx = rd_kafka_topic_partition_list_find_idx(
3687                         rkcg->rkcg_group_assignment,
3688                         rktpar->topic,
3689                         rktpar->partition);
3690 
3691                 if (add) {
3692                         rd_assert(idx == -1);
3693 
3694                         rd_kafka_topic_partition_list_add_copy(
3695                                 rkcg->rkcg_group_assignment, rktpar);
3696 
3697                 } else {
3698                         rd_assert(idx != -1);
3699 
3700                         rd_kafka_topic_partition_list_del_by_idx(
3701                                 rkcg->rkcg_group_assignment, idx);
3702 
3703                 }
3704         }
3705 
3706         if (add)
3707                 rd_assert(precnt + partitions->cnt ==
3708                           rkcg->rkcg_group_assignment->cnt);
3709         else
3710                 rd_assert(precnt - partitions->cnt ==
3711                           rkcg->rkcg_group_assignment->cnt);
3712 
3713         if (rkcg->rkcg_group_assignment->cnt == 0) {
3714                 rd_kafka_topic_partition_list_destroy(
3715                         rkcg->rkcg_group_assignment);
3716                 rkcg->rkcg_group_assignment = NULL;
3717 
3718         } else if (add)
3719                 rd_kafka_topic_partition_list_sort_by_topic(
3720                         rkcg->rkcg_group_assignment);
3721 
3722         rd_kafka_wrlock(rkcg->rkcg_rk);
3723         rkcg->rkcg_c.assignment_size = rkcg->rkcg_group_assignment ?
3724                 rkcg->rkcg_group_assignment->cnt : 0;
3725         rd_kafka_wrunlock(rkcg->rkcg_rk);
3726 
3727         if (rkcg->rkcg_group_assignment)
3728                 rd_kafka_topic_partition_list_log(
3729                         rkcg->rkcg_rk, "GRPASSIGNMENT", RD_KAFKA_DBG_CGRP,
3730                         rkcg->rkcg_group_assignment);
3731 }
3732 
3733 
3734 /**
3735  * @brief Handle a rebalance-triggered partition assignment.
3736  *
3737  *        If a rebalance_cb has been registered we enqueue an op for the app
3738  *        and let the app perform the actual assign() call. Otherwise we
3739  *        assign() directly from here.
3740  *
3741  *        This provides the most flexibility, allowing the app to perform any
3742  *        operation it seem fit (e.g., offset writes or reads) before actually
3743  *        updating the assign():ment.
3744  */
3745 static void
rd_kafka_cgrp_handle_assignment(rd_kafka_cgrp_t * rkcg,rd_kafka_topic_partition_list_t * assignment)3746 rd_kafka_cgrp_handle_assignment (rd_kafka_cgrp_t *rkcg,
3747                                  rd_kafka_topic_partition_list_t *assignment) {
3748 
3749         if (rd_kafka_cgrp_rebalance_protocol(rkcg) ==
3750             RD_KAFKA_REBALANCE_PROTOCOL_COOPERATIVE) {
3751                 rd_kafka_cgrp_handle_assignment_cooperative(rkcg,
3752                                                             assignment);
3753         } else {
3754 
3755                 rd_kafka_rebalance_op(rkcg,
3756                                       RD_KAFKA_RESP_ERR__ASSIGN_PARTITIONS,
3757                                       assignment, "new assignment");
3758         }
3759 }
3760 
3761 
3762 /**
3763  * Clean up any group-leader related resources.
3764  *
3765  * Locality: cgrp thread
3766  */
rd_kafka_cgrp_group_leader_reset(rd_kafka_cgrp_t * rkcg,const char * reason)3767 static void rd_kafka_cgrp_group_leader_reset (rd_kafka_cgrp_t *rkcg,
3768                                               const char *reason) {
3769         rd_kafka_dbg(rkcg->rkcg_rk, CGRP, "GRPLEADER",
3770                      "Group \"%.*s\": resetting group leader info: %s",
3771                      RD_KAFKAP_STR_PR(rkcg->rkcg_group_id), reason);
3772 
3773         if (rkcg->rkcg_group_leader.members) {
3774                 int i;
3775 
3776                 for (i = 0 ; i < rkcg->rkcg_group_leader.member_cnt ; i++)
3777                         rd_kafka_group_member_clear(&rkcg->rkcg_group_leader.
3778                                                     members[i]);
3779                 rkcg->rkcg_group_leader.member_cnt = 0;
3780                 rd_free(rkcg->rkcg_group_leader.members);
3781                 rkcg->rkcg_group_leader.members = NULL;
3782         }
3783 }
3784 
3785 
3786 /**
3787  * @brief React to a RD_KAFKA_RESP_ERR_REBALANCE_IN_PROGRESS broker response.
3788  */
rd_kafka_cgrp_group_is_rebalancing(rd_kafka_cgrp_t * rkcg)3789 static void rd_kafka_cgrp_group_is_rebalancing (rd_kafka_cgrp_t *rkcg) {
3790 
3791         if (rd_kafka_cgrp_rebalance_protocol(rkcg) ==
3792             RD_KAFKA_REBALANCE_PROTOCOL_EAGER) {
3793                 rd_kafka_cgrp_revoke_all_rejoin_maybe(rkcg,
3794                                                       rd_false/*lost*/,
3795                                                       rd_false/*initiating*/,
3796                                                       "rebalance in progress");
3797                 return;
3798         }
3799 
3800 
3801         /* In the COOPERATIVE case, simply rejoin the group
3802          * - partitions are unassigned on SyncGroup response,
3803          * not prior to JoinGroup as with the EAGER case. */
3804 
3805         if (RD_KAFKA_CGRP_REBALANCING(rkcg)) {
3806                 rd_kafka_dbg(rkcg->rkcg_rk, CONSUMER|RD_KAFKA_DBG_CGRP,
3807                              "REBALANCE", "Group \"%.*s\": skipping "
3808                              "COOPERATIVE rebalance in state %s "
3809                              "(join-state %s)%s%s%s",
3810                              RD_KAFKAP_STR_PR(rkcg->rkcg_group_id),
3811                              rd_kafka_cgrp_state_names[rkcg->rkcg_state],
3812                              rd_kafka_cgrp_join_state_names[
3813                              rkcg->rkcg_join_state],
3814                              RD_KAFKA_CGRP_WAIT_ASSIGN_CALL(rkcg)
3815                              ? " (awaiting assign call)" : "",
3816                              (rkcg->rkcg_rebalance_incr_assignment != NULL)
3817                              ? " (incremental assignment pending)": "",
3818                              rkcg->rkcg_rebalance_rejoin
3819                              ? " (rebalance rejoin)": "");
3820                 return;
3821         }
3822 
3823         rd_kafka_cgrp_rejoin(rkcg, "Group is rebalancing");
3824 }
3825 
3826 
3827 
3828 /**
3829  * @brief Triggers the application rebalance callback if required to
3830  *        revoke partitions, and transition to INIT state for (eventual)
3831  *        rejoin. Does nothing if a rebalance workflow is already in
3832  *        progress
3833  */
rd_kafka_cgrp_revoke_all_rejoin_maybe(rd_kafka_cgrp_t * rkcg,rd_bool_t assignment_lost,rd_bool_t initiating,const char * reason)3834 static void rd_kafka_cgrp_revoke_all_rejoin_maybe (rd_kafka_cgrp_t *rkcg,
3835                                                    rd_bool_t assignment_lost,
3836                                                    rd_bool_t initiating,
3837                                                    const char *reason) {
3838         if (RD_KAFKA_CGRP_REBALANCING(rkcg)) {
3839                 rd_kafka_dbg(
3840                         rkcg->rkcg_rk, CONSUMER|RD_KAFKA_DBG_CGRP,
3841                         "REBALANCE", "Group \"%.*s\": rebalance (%s) "
3842                         "already in progress, skipping in state %s "
3843                         "(join-state %s) with %d assigned partition(s)%s%s%s: "
3844                         "%s",
3845                         RD_KAFKAP_STR_PR(rkcg->rkcg_group_id),
3846                         rd_kafka_rebalance_protocol2str(
3847                                 rd_kafka_cgrp_rebalance_protocol(rkcg)),
3848                         rd_kafka_cgrp_state_names[rkcg->rkcg_state],
3849                         rd_kafka_cgrp_join_state_names[rkcg->rkcg_join_state],
3850                         rkcg->rkcg_group_assignment ?
3851                         rkcg->rkcg_group_assignment->cnt : 0,
3852                         assignment_lost ? " (lost)" : "",
3853                         rkcg->rkcg_rebalance_incr_assignment ?
3854                         ", incremental assignment in progress" : "",
3855                         rkcg->rkcg_rebalance_rejoin ?
3856                         ", rejoin on rebalance" : "",
3857                         reason);
3858                 return;
3859         }
3860 
3861         rd_kafka_cgrp_revoke_all_rejoin(rkcg, assignment_lost,
3862                                         initiating, reason);
3863 }
3864 
3865 
3866 /**
3867  * @brief Triggers the application rebalance callback if required to
3868  *        revoke partitions, and transition to INIT state for (eventual)
3869  *        rejoin.
3870  */
rd_kafka_cgrp_revoke_all_rejoin(rd_kafka_cgrp_t * rkcg,rd_bool_t assignment_lost,rd_bool_t initiating,const char * reason)3871 static void rd_kafka_cgrp_revoke_all_rejoin (rd_kafka_cgrp_t *rkcg,
3872                                              rd_bool_t assignment_lost,
3873                                              rd_bool_t initiating,
3874                                              const char *reason) {
3875 
3876         rd_kafka_rebalance_protocol_t protocol =
3877                 rd_kafka_cgrp_rebalance_protocol(rkcg);
3878 
3879         rd_bool_t terminating =
3880                 unlikely(rkcg->rkcg_flags & RD_KAFKA_CGRP_F_TERMINATE);
3881 
3882 
3883         rd_kafka_dbg(rkcg->rkcg_rk, CONSUMER|RD_KAFKA_DBG_CGRP, "REBALANCE",
3884                      "Group \"%.*s\" %s (%s) in state %s (join-state %s) "
3885                      "with %d assigned partition(s)%s: %s",
3886                      RD_KAFKAP_STR_PR(rkcg->rkcg_group_id),
3887                      initiating ? "initiating rebalance" : "is rebalancing",
3888                      rd_kafka_rebalance_protocol2str(protocol),
3889                      rd_kafka_cgrp_state_names[rkcg->rkcg_state],
3890                      rd_kafka_cgrp_join_state_names[rkcg->rkcg_join_state],
3891                      rkcg->rkcg_group_assignment ?
3892                      rkcg->rkcg_group_assignment->cnt : 0,
3893                      assignment_lost ? " (lost)" : "",
3894                      reason);
3895 
3896         rd_snprintf(rkcg->rkcg_c.rebalance_reason,
3897                     sizeof(rkcg->rkcg_c.rebalance_reason), "%s", reason);
3898 
3899 
3900         if (protocol == RD_KAFKA_REBALANCE_PROTOCOL_EAGER ||
3901             protocol == RD_KAFKA_REBALANCE_PROTOCOL_NONE) {
3902                 /* EAGER case (or initial subscribe) - revoke partitions which
3903                  * will be followed by rejoin, if required. */
3904 
3905                 if (assignment_lost)
3906                         rd_kafka_cgrp_assignment_set_lost(
3907                                 rkcg, "%s: revoking assignment and rejoining",
3908                                 reason);
3909 
3910                 /* Schedule application rebalance op if there is an existing
3911                  * assignment (albeit perhaps empty) and there is no
3912                  * outstanding rebalance op in progress. */
3913                 if (rkcg->rkcg_group_assignment &&
3914                     !RD_KAFKA_CGRP_WAIT_ASSIGN_CALL(rkcg)) {
3915                         rd_kafka_rebalance_op(
3916                                 rkcg,
3917                                 RD_KAFKA_RESP_ERR__REVOKE_PARTITIONS,
3918                                 rkcg->rkcg_group_assignment, reason);
3919                 } else {
3920                         /* Skip the join backoff */
3921                         rd_interval_reset(&rkcg->rkcg_join_intvl);
3922 
3923                         rd_kafka_cgrp_rejoin(rkcg, "%s", reason);
3924                 }
3925 
3926                 return;
3927         }
3928 
3929 
3930         /* COOPERATIVE case. */
3931 
3932         /* All partitions should never be revoked unless terminating, leaving
3933          * the group, or on assignment lost. Another scenario represents a
3934          * logic error. Fail fast in this case. */
3935         if (!(terminating ||
3936               assignment_lost ||
3937               (rkcg->rkcg_flags & RD_KAFKA_CGRP_F_LEAVE_ON_UNASSIGN_DONE))) {
3938                 rd_kafka_log(rkcg->rkcg_rk, LOG_ERR, "REBALANCE",
3939                              "Group \"%s\": unexpected instruction to revoke "
3940                              "current assignment and rebalance "
3941                              "(terminating=%d, assignment_lost=%d, "
3942                              "LEAVE_ON_UNASSIGN_DONE=%d)",
3943                              rkcg->rkcg_group_id->str,
3944                              terminating, assignment_lost,
3945                              (rkcg->rkcg_flags &
3946                               RD_KAFKA_CGRP_F_LEAVE_ON_UNASSIGN_DONE));
3947                 rd_dassert(!*"BUG: unexpected instruction to revoke "
3948                            "current assignment and rebalance");
3949         }
3950 
3951         if (rkcg->rkcg_group_assignment &&
3952             rkcg->rkcg_group_assignment->cnt > 0) {
3953                 if (assignment_lost)
3954                         rd_kafka_cgrp_assignment_set_lost(
3955                                 rkcg,
3956                                 "%s: revoking incremental assignment "
3957                                 "and rejoining", reason);
3958 
3959                 rd_kafka_dbg(rkcg->rkcg_rk,
3960                         CONSUMER|RD_KAFKA_DBG_CGRP,
3961                         "REBALANCE", "Group \"%.*s\": revoking "
3962                         "all %d partition(s)%s%s",
3963                         RD_KAFKAP_STR_PR(rkcg->rkcg_group_id),
3964                              rkcg->rkcg_group_assignment->cnt,
3965                         terminating ? " (terminating)" : "",
3966                         assignment_lost ? " (assignment lost)" : "");
3967 
3968                 rd_kafka_rebalance_op_incr(
3969                         rkcg, RD_KAFKA_RESP_ERR__REVOKE_PARTITIONS,
3970                         rkcg->rkcg_group_assignment,
3971                         terminating ? rd_false : rd_true /*rejoin*/,
3972                         reason);
3973 
3974                 return;
3975         }
3976 
3977         if (terminating) {
3978                 /* If terminating, then don't rejoin group. */
3979                 rd_kafka_dbg(rkcg->rkcg_rk,
3980                         CONSUMER|RD_KAFKA_DBG_CGRP,
3981                         "REBALANCE", "Group \"%.*s\": consumer is "
3982                         "terminating, skipping rejoin",
3983                         RD_KAFKAP_STR_PR(rkcg->rkcg_group_id));
3984                 return;
3985         }
3986 
3987         rd_kafka_cgrp_rejoin(rkcg, "Current assignment is empty");
3988 }
3989 
3990 
3991 /**
3992  * @brief `max.poll.interval.ms` enforcement check timer.
3993  *
3994  * @locality rdkafka main thread
3995  * @locks none
3996  */
3997 static void
rd_kafka_cgrp_max_poll_interval_check_tmr_cb(rd_kafka_timers_t * rkts,void * arg)3998 rd_kafka_cgrp_max_poll_interval_check_tmr_cb (rd_kafka_timers_t *rkts,
3999                                               void *arg) {
4000         rd_kafka_cgrp_t *rkcg = arg;
4001         rd_kafka_t *rk = rkcg->rkcg_rk;
4002         int exceeded;
4003 
4004         exceeded = rd_kafka_max_poll_exceeded(rk);
4005 
4006         if (likely(!exceeded))
4007                 return;
4008 
4009         rd_kafka_log(rk, LOG_WARNING, "MAXPOLL",
4010                      "Application maximum poll interval (%dms) "
4011                      "exceeded by %dms "
4012                      "(adjust max.poll.interval.ms for "
4013                      "long-running message processing): "
4014                      "leaving group",
4015                      rk->rk_conf.max_poll_interval_ms, exceeded);
4016 
4017         rd_kafka_consumer_err(rkcg->rkcg_q, RD_KAFKA_NODEID_UA,
4018                               RD_KAFKA_RESP_ERR__MAX_POLL_EXCEEDED,
4019                               0, NULL, NULL, RD_KAFKA_OFFSET_INVALID,
4020                               "Application maximum poll interval (%dms) "
4021                               "exceeded by %dms",
4022                               rk->rk_conf.max_poll_interval_ms, exceeded);
4023 
4024         rkcg->rkcg_flags |= RD_KAFKA_CGRP_F_MAX_POLL_EXCEEDED;
4025 
4026         rd_kafka_timer_stop(rkts, &rkcg->rkcg_max_poll_interval_tmr,
4027                             1/*lock*/);
4028 
4029         /* Leave the group before calling rebalance since the standard leave
4030          * will be triggered first after the rebalance callback has been served.
4031          * But since the application is blocked still doing processing
4032          * that leave will be further delayed.
4033          *
4034          * KIP-345: static group members should continue to respect
4035          * `max.poll.interval.ms` but should not send a LeaveGroupRequest.
4036          */
4037         if (!RD_KAFKA_CGRP_IS_STATIC_MEMBER(rkcg))
4038                 rd_kafka_cgrp_leave(rkcg);
4039 
4040         /* Timing out or leaving the group invalidates the member id, reset it
4041          * now to avoid an ERR_UNKNOWN_MEMBER_ID on the next join. */
4042         rd_kafka_cgrp_set_member_id(rkcg, "");
4043 
4044         /* Trigger rebalance */
4045         rd_kafka_cgrp_revoke_all_rejoin_maybe(rkcg,
4046                                               rd_true/*lost*/,
4047                                               rd_true/*initiating*/,
4048                                               "max.poll.interval.ms exceeded");
4049 }
4050 
4051 
4052 /**
4053  * @brief Generate consumer errors for each topic in the list.
4054  *
4055  * Also replaces the list of last reported topic errors so that repeated
4056  * errors are silenced.
4057  *
4058  * @param errored Errored topics.
4059  * @param error_prefix Error message prefix.
4060  *
4061  * @remark Assumes ownership of \p errored.
4062  */
4063 static void
rd_kafka_propagate_consumer_topic_errors(rd_kafka_cgrp_t * rkcg,rd_kafka_topic_partition_list_t * errored,const char * error_prefix)4064 rd_kafka_propagate_consumer_topic_errors (
4065         rd_kafka_cgrp_t *rkcg, rd_kafka_topic_partition_list_t *errored,
4066         const char *error_prefix) {
4067         int i;
4068 
4069         for (i = 0 ; i < errored->cnt ; i++) {
4070                 rd_kafka_topic_partition_t *topic = &errored->elems[i];
4071                 rd_kafka_topic_partition_t *prev;
4072 
4073                 rd_assert(topic->err);
4074 
4075                 /* Normalize error codes, unknown topic may be
4076                  * reported by the broker, or the lack of a topic in
4077                  * metadata response is figured out by the client.
4078                  * Make sure the application only sees one error code
4079                  * for both these cases. */
4080                 if (topic->err == RD_KAFKA_RESP_ERR__UNKNOWN_TOPIC)
4081                         topic->err = RD_KAFKA_RESP_ERR_UNKNOWN_TOPIC_OR_PART;
4082 
4083                 /* Check if this topic errored previously */
4084                 prev = rd_kafka_topic_partition_list_find(
4085                         rkcg->rkcg_errored_topics, topic->topic,
4086                         RD_KAFKA_PARTITION_UA);
4087 
4088                 if (prev && prev->err == topic->err)
4089                         continue; /* This topic already reported same error */
4090 
4091                 rd_kafka_dbg(rkcg->rkcg_rk, CONSUMER|RD_KAFKA_DBG_TOPIC,
4092                              "TOPICERR",
4093                              "%s: %s: %s",
4094                              error_prefix, topic->topic,
4095                              rd_kafka_err2str(topic->err));
4096 
4097                 /* Send consumer error to application */
4098                 rd_kafka_consumer_err(rkcg->rkcg_q, RD_KAFKA_NODEID_UA,
4099                                       topic->err, 0,
4100                                       topic->topic, NULL,
4101                                       RD_KAFKA_OFFSET_INVALID,
4102                                       "%s: %s: %s",
4103                                       error_prefix, topic->topic,
4104                                       rd_kafka_err2str(topic->err));
4105         }
4106 
4107         rd_kafka_topic_partition_list_destroy(rkcg->rkcg_errored_topics);
4108         rkcg->rkcg_errored_topics = errored;
4109 }
4110 
4111 
4112 /**
4113  * @brief Work out the topics currently subscribed to that do not
4114  *        match any pattern in \p subscription.
4115  */
4116 static rd_kafka_topic_partition_list_t *
rd_kafka_cgrp_get_unsubscribing_topics(rd_kafka_cgrp_t * rkcg,rd_kafka_topic_partition_list_t * subscription)4117 rd_kafka_cgrp_get_unsubscribing_topics (rd_kafka_cgrp_t *rkcg,
4118                                         rd_kafka_topic_partition_list_t
4119                                         *subscription) {
4120         int i;
4121         rd_kafka_topic_partition_list_t *result;
4122 
4123         result = rd_kafka_topic_partition_list_new(
4124                 rkcg->rkcg_subscribed_topics->rl_cnt);
4125 
4126         /* TODO: Something that isn't O(N*M) */
4127         for (i=0; i<rkcg->rkcg_subscribed_topics->rl_cnt; i++) {
4128                 int j;
4129                 const char *topic = ((rd_kafka_topic_info_t *)
4130                         rkcg->rkcg_subscribed_topics->rl_elems[i])->topic;
4131 
4132                 for (j=0; j<subscription->cnt; j++) {
4133                         const char *pattern = subscription->elems[j].topic;
4134                         if (rd_kafka_topic_match(rkcg->rkcg_rk,
4135                                                  pattern,
4136                                                  topic)) {
4137                                 break;
4138                         }
4139                 }
4140 
4141                 if (j == subscription->cnt)
4142                         rd_kafka_topic_partition_list_add(
4143                                 result, topic,
4144                                 RD_KAFKA_PARTITION_UA);
4145         }
4146 
4147         if (result->cnt == 0) {
4148                 rd_kafka_topic_partition_list_destroy(result);
4149                 return NULL;
4150         }
4151 
4152         return result;
4153 }
4154 
4155 
4156 /**
4157  * @brief Determine the partitions to revoke, given the topics being
4158  *        unassigned.
4159  */
4160 static rd_kafka_topic_partition_list_t *
rd_kafka_cgrp_calculate_subscribe_revoking_partitions(rd_kafka_cgrp_t * rkcg,const rd_kafka_topic_partition_list_t * unsubscribing)4161 rd_kafka_cgrp_calculate_subscribe_revoking_partitions(
4162                 rd_kafka_cgrp_t *rkcg,
4163                 const rd_kafka_topic_partition_list_t *unsubscribing) {
4164         rd_kafka_topic_partition_list_t *revoking;
4165         const rd_kafka_topic_partition_t *rktpar;
4166 
4167         if (!unsubscribing)
4168                 return NULL;
4169 
4170         if (!rkcg->rkcg_group_assignment ||
4171             rkcg->rkcg_group_assignment->cnt == 0)
4172                 return NULL;
4173 
4174         revoking = rd_kafka_topic_partition_list_new(
4175                 rkcg->rkcg_group_assignment->cnt);
4176 
4177         /* TODO: Something that isn't O(N*M). */
4178         RD_KAFKA_TPLIST_FOREACH(rktpar, unsubscribing) {
4179                 const rd_kafka_topic_partition_t *assigned;
4180 
4181                 RD_KAFKA_TPLIST_FOREACH(assigned, rkcg->rkcg_group_assignment) {
4182                         if (!strcmp(assigned->topic, rktpar->topic)) {
4183                                 rd_kafka_topic_partition_list_add(
4184                                         revoking,
4185                                         assigned->topic,
4186                                         assigned->partition);
4187                                 continue;
4188                         }
4189                 }
4190         }
4191 
4192         if (revoking->cnt == 0) {
4193                 rd_kafka_topic_partition_list_destroy(revoking);
4194                 revoking = NULL;
4195         }
4196 
4197         return revoking;
4198 }
4199 
4200 
4201 /**
4202  * @brief Handle a new subscription that is modifying an existing subscription
4203  *        in the COOPERATIVE case.
4204  *
4205  * @remark Assumes ownership of \p rktparlist.
4206  */
4207 static rd_kafka_resp_err_t
rd_kafka_cgrp_modify_subscription(rd_kafka_cgrp_t * rkcg,rd_kafka_topic_partition_list_t * rktparlist)4208 rd_kafka_cgrp_modify_subscription (rd_kafka_cgrp_t *rkcg,
4209                                    rd_kafka_topic_partition_list_t
4210                                    *rktparlist) {
4211         rd_kafka_topic_partition_list_t *unsubscribing_topics;
4212         rd_kafka_topic_partition_list_t *revoking;
4213         rd_list_t *tinfos;
4214         rd_kafka_topic_partition_list_t *errored;
4215         int metadata_age;
4216         int old_cnt = rkcg->rkcg_subscription->cnt;
4217 
4218         rkcg->rkcg_flags &= ~RD_KAFKA_CGRP_F_WILDCARD_SUBSCRIPTION;
4219 
4220         if (rd_kafka_topic_partition_list_regex_cnt(rktparlist) > 0)
4221                 rkcg->rkcg_flags |= RD_KAFKA_CGRP_F_WILDCARD_SUBSCRIPTION;
4222 
4223         /* Topics in rkcg_subscribed_topics that don't match any pattern in
4224            the new subscription. */
4225         unsubscribing_topics = rd_kafka_cgrp_get_unsubscribing_topics(
4226                                         rkcg, rktparlist);
4227 
4228         /* Currently assigned topic partitions that are no longer desired. */
4229         revoking = rd_kafka_cgrp_calculate_subscribe_revoking_partitions(
4230                                         rkcg, unsubscribing_topics);
4231 
4232         rd_kafka_topic_partition_list_destroy(rkcg->rkcg_subscription);
4233         rkcg->rkcg_subscription = rktparlist;
4234 
4235         if (rd_kafka_cgrp_metadata_refresh(rkcg, &metadata_age,
4236                                            "modify subscription") == 1) {
4237                 rd_kafka_dbg(rkcg->rkcg_rk, CGRP|RD_KAFKA_DBG_CONSUMER,
4238                              "MODSUB",
4239                              "Group \"%.*s\": postponing join until "
4240                              "up-to-date metadata is available",
4241                              RD_KAFKAP_STR_PR(rkcg->rkcg_group_id));
4242 
4243                 rd_assert(rkcg->rkcg_join_state ==
4244                           RD_KAFKA_CGRP_JOIN_STATE_INIT ||
4245                           /* Possible via rd_kafka_cgrp_modify_subscription */
4246                           rkcg->rkcg_join_state ==
4247                           RD_KAFKA_CGRP_JOIN_STATE_STEADY);
4248 
4249                 rd_kafka_cgrp_set_join_state(
4250                         rkcg, RD_KAFKA_CGRP_JOIN_STATE_WAIT_METADATA);
4251 
4252 
4253                 /* Revoke/join will occur after metadata refresh completes */
4254                 if (revoking)
4255                         rd_kafka_topic_partition_list_destroy(revoking);
4256                 if (unsubscribing_topics)
4257                         rd_kafka_topic_partition_list_destroy(
4258                                 unsubscribing_topics);
4259 
4260                 return RD_KAFKA_RESP_ERR_NO_ERROR;
4261         }
4262 
4263         rd_kafka_dbg(rkcg->rkcg_rk, CGRP|RD_KAFKA_DBG_CONSUMER, "SUBSCRIBE",
4264                      "Group \"%.*s\": modifying subscription of size %d to "
4265                      "new subscription of size %d, removing %d topic(s), "
4266                      "revoking %d partition(s) (join-state %s)",
4267                      RD_KAFKAP_STR_PR(rkcg->rkcg_group_id),
4268                      old_cnt, rkcg->rkcg_subscription->cnt,
4269                      unsubscribing_topics ?
4270                      unsubscribing_topics->cnt : 0,
4271                      revoking ? revoking->cnt : 0,
4272                      rd_kafka_cgrp_join_state_names[rkcg->rkcg_join_state]);
4273 
4274         if (unsubscribing_topics)
4275                 rd_kafka_topic_partition_list_destroy(unsubscribing_topics);
4276 
4277         /* Create a list of the topics in metadata that matches the new
4278          * subscription */
4279         tinfos = rd_list_new(rkcg->rkcg_subscription->cnt,
4280                              (void *)rd_kafka_topic_info_destroy);
4281 
4282         /* Unmatched topics will be added to the errored list. */
4283         errored = rd_kafka_topic_partition_list_new(0);
4284 
4285         if (rkcg->rkcg_flags & RD_KAFKA_CGRP_F_WILDCARD_SUBSCRIPTION)
4286                 rd_kafka_metadata_topic_match(rkcg->rkcg_rk,
4287                                               tinfos, rkcg->rkcg_subscription,
4288                                               errored);
4289         else
4290                 rd_kafka_metadata_topic_filter(rkcg->rkcg_rk,
4291                                                tinfos,
4292                                                rkcg->rkcg_subscription,
4293                                                errored);
4294 
4295         /* Propagate consumer errors for any non-existent or errored topics.
4296          * The function takes ownership of errored. */
4297         rd_kafka_propagate_consumer_topic_errors(
4298                 rkcg, errored, "Subscribed topic not available");
4299 
4300         if (rd_kafka_cgrp_update_subscribed_topics(rkcg, tinfos) &&
4301             !revoking) {
4302                 rd_kafka_cgrp_rejoin(rkcg, "Subscription modified");
4303                 return RD_KAFKA_RESP_ERR_NO_ERROR;
4304         }
4305 
4306         if (revoking) {
4307                 rd_kafka_dbg(rkcg->rkcg_rk,
4308                              CONSUMER|RD_KAFKA_DBG_CGRP,
4309                              "REBALANCE", "Group \"%.*s\" revoking "
4310                              "%d of %d partition(s)",
4311                              RD_KAFKAP_STR_PR(rkcg->rkcg_group_id),
4312                              revoking->cnt,
4313                              rkcg->rkcg_group_assignment->cnt);
4314 
4315                 rd_kafka_rebalance_op_incr(rkcg,
4316                         RD_KAFKA_RESP_ERR__REVOKE_PARTITIONS,
4317                         revoking, rd_true/*rejoin*/, "subscribe");
4318 
4319                 rd_kafka_topic_partition_list_destroy(revoking);
4320         }
4321 
4322         return RD_KAFKA_RESP_ERR_NO_ERROR;
4323 }
4324 
4325 
4326 /**
4327  * Remove existing topic subscription.
4328  */
4329 static rd_kafka_resp_err_t
rd_kafka_cgrp_unsubscribe(rd_kafka_cgrp_t * rkcg,rd_bool_t leave_group)4330 rd_kafka_cgrp_unsubscribe (rd_kafka_cgrp_t *rkcg, rd_bool_t leave_group) {
4331 
4332         rd_kafka_dbg(rkcg->rkcg_rk, CGRP, "UNSUBSCRIBE",
4333                      "Group \"%.*s\": unsubscribe from current %ssubscription "
4334                      "of size %d (leave group=%s, has joined=%s, %s, "
4335                      "join-state %s)",
4336                      RD_KAFKAP_STR_PR(rkcg->rkcg_group_id),
4337                      rkcg->rkcg_subscription ? "" : "unset ",
4338                      rkcg->rkcg_subscription ? rkcg->rkcg_subscription->cnt : 0,
4339                      RD_STR_ToF(leave_group),
4340                      RD_STR_ToF(RD_KAFKA_CGRP_HAS_JOINED(rkcg)),
4341                      rkcg->rkcg_member_id ?
4342                      rkcg->rkcg_member_id->str : "n/a",
4343                      rd_kafka_cgrp_join_state_names[rkcg->rkcg_join_state]);
4344 
4345         rd_kafka_timer_stop(&rkcg->rkcg_rk->rk_timers,
4346                             &rkcg->rkcg_max_poll_interval_tmr, 1/*lock*/);
4347 
4348         if (rkcg->rkcg_subscription) {
4349                 rd_kafka_topic_partition_list_destroy(rkcg->rkcg_subscription);
4350                 rkcg->rkcg_subscription = NULL;
4351         }
4352 
4353         rd_kafka_cgrp_update_subscribed_topics(rkcg, NULL);
4354 
4355         /*
4356          * Clean-up group leader duties, if any.
4357          */
4358         rd_kafka_cgrp_group_leader_reset(rkcg, "unsubscribe");
4359 
4360         if (leave_group && RD_KAFKA_CGRP_HAS_JOINED(rkcg))
4361                 rkcg->rkcg_flags |= RD_KAFKA_CGRP_F_LEAVE_ON_UNASSIGN_DONE;
4362 
4363         /* FIXME: Why are we only revoking if !assignment_lost ? */
4364         if (!rd_kafka_cgrp_assignment_is_lost(rkcg))
4365                 rd_kafka_cgrp_revoke_all_rejoin(rkcg,
4366                                                 rd_false/*not lost*/,
4367                                                 rd_true/*initiating*/,
4368                                                 "unsubscribe");
4369 
4370         rkcg->rkcg_flags &= ~(RD_KAFKA_CGRP_F_SUBSCRIPTION |
4371                               RD_KAFKA_CGRP_F_WILDCARD_SUBSCRIPTION);
4372 
4373         return RD_KAFKA_RESP_ERR_NO_ERROR;
4374 }
4375 
4376 
4377 /**
4378  * Set new atomic topic subscription.
4379  */
4380 static rd_kafka_resp_err_t
rd_kafka_cgrp_subscribe(rd_kafka_cgrp_t * rkcg,rd_kafka_topic_partition_list_t * rktparlist)4381 rd_kafka_cgrp_subscribe (rd_kafka_cgrp_t *rkcg,
4382                          rd_kafka_topic_partition_list_t *rktparlist) {
4383 
4384         rd_kafka_dbg(rkcg->rkcg_rk, CGRP|RD_KAFKA_DBG_CONSUMER, "SUBSCRIBE",
4385                      "Group \"%.*s\": subscribe to new %ssubscription "
4386                      "of %d topics (join-state %s)",
4387                      RD_KAFKAP_STR_PR(rkcg->rkcg_group_id),
4388                      rktparlist ? "" : "unset ",
4389                      rktparlist ? rktparlist->cnt : 0,
4390                      rd_kafka_cgrp_join_state_names[rkcg->rkcg_join_state]);
4391 
4392         if (rkcg->rkcg_rk->rk_conf.enabled_assignor_cnt == 0)
4393                 return RD_KAFKA_RESP_ERR__INVALID_ARG;
4394 
4395         /* If the consumer has raised a fatal error treat all subscribes as
4396            unsubscribe */
4397         if (rd_kafka_fatal_error_code(rkcg->rkcg_rk)) {
4398                 if (rkcg->rkcg_subscription)
4399                         rd_kafka_cgrp_unsubscribe(rkcg,
4400                                                   rd_true/*leave group*/);
4401                 return RD_KAFKA_RESP_ERR__FATAL;
4402         }
4403 
4404         /* Clear any existing postponed subscribe. */
4405         if (rkcg->rkcg_next_subscription)
4406                 rd_kafka_topic_partition_list_destroy_free(
4407                         rkcg->rkcg_next_subscription);
4408         rkcg->rkcg_next_subscription = NULL;
4409         rkcg->rkcg_next_unsubscribe = rd_false;
4410 
4411         if (RD_KAFKA_CGRP_REBALANCING(rkcg)) {
4412                 rd_kafka_dbg(rkcg->rkcg_rk, CGRP|RD_KAFKA_DBG_CONSUMER,
4413                              "SUBSCRIBE", "Group \"%.*s\": postponing "
4414                              "subscribe until previous rebalance "
4415                              "completes (join-state %s)",
4416                              RD_KAFKAP_STR_PR(rkcg->rkcg_group_id),
4417                              rd_kafka_cgrp_join_state_names[
4418                                      rkcg->rkcg_join_state]);
4419 
4420                 if (!rktparlist)
4421                         rkcg->rkcg_next_unsubscribe = rd_true;
4422                 else
4423                         rkcg->rkcg_next_subscription = rktparlist;
4424 
4425                 return RD_KAFKA_RESP_ERR_NO_ERROR;
4426         }
4427 
4428         if (rd_kafka_cgrp_rebalance_protocol(rkcg) ==
4429             RD_KAFKA_REBALANCE_PROTOCOL_COOPERATIVE &&
4430             rktparlist &&
4431             rkcg->rkcg_subscription)
4432                 return rd_kafka_cgrp_modify_subscription(rkcg, rktparlist);
4433 
4434         /* Remove existing subscription first */
4435         if (rkcg->rkcg_subscription)
4436                 rd_kafka_cgrp_unsubscribe(
4437                         rkcg,
4438                         rktparlist ?
4439                         rd_false/* don't leave group if new subscription */ :
4440                         rd_true/* leave group if no new subscription */);
4441 
4442         if (!rktparlist)
4443                 return RD_KAFKA_RESP_ERR_NO_ERROR;
4444 
4445         rkcg->rkcg_flags |= RD_KAFKA_CGRP_F_SUBSCRIPTION;
4446 
4447         if (rd_kafka_topic_partition_list_regex_cnt(rktparlist) > 0)
4448                 rkcg->rkcg_flags |= RD_KAFKA_CGRP_F_WILDCARD_SUBSCRIPTION;
4449 
4450         rkcg->rkcg_subscription = rktparlist;
4451 
4452         rd_kafka_cgrp_join(rkcg);
4453 
4454         return RD_KAFKA_RESP_ERR_NO_ERROR;
4455 }
4456 
4457 
4458 
4459 
4460 
4461 
4462 /**
4463  * Same as cgrp_terminate() but called from the cgrp/main thread upon receiving
4464  * the op 'rko' from cgrp_terminate().
4465  *
4466  * NOTE: Takes ownership of 'rko'
4467  *
4468  * Locality: main thread
4469  */
4470 void
rd_kafka_cgrp_terminate0(rd_kafka_cgrp_t * rkcg,rd_kafka_op_t * rko)4471 rd_kafka_cgrp_terminate0 (rd_kafka_cgrp_t *rkcg, rd_kafka_op_t *rko) {
4472 
4473 	rd_kafka_assert(NULL, thrd_is_current(rkcg->rkcg_rk->rk_thread));
4474 
4475         rd_kafka_dbg(rkcg->rkcg_rk, CGRP, "CGRPTERM",
4476                      "Terminating group \"%.*s\" in state %s "
4477                      "with %d partition(s)",
4478                      RD_KAFKAP_STR_PR(rkcg->rkcg_group_id),
4479                      rd_kafka_cgrp_state_names[rkcg->rkcg_state],
4480                      rd_list_cnt(&rkcg->rkcg_toppars));
4481 
4482         if (unlikely(rkcg->rkcg_state == RD_KAFKA_CGRP_STATE_TERM ||
4483 		     (rkcg->rkcg_flags & RD_KAFKA_CGRP_F_TERMINATE) ||
4484 		     rkcg->rkcg_reply_rko != NULL)) {
4485                 /* Already terminating or handling a previous terminate */
4486 		if (rko) {
4487 			rd_kafka_q_t *rkq = rko->rko_replyq.q;
4488 			rko->rko_replyq.q = NULL;
4489                         rd_kafka_consumer_err(rkq, RD_KAFKA_NODEID_UA,
4490                                               RD_KAFKA_RESP_ERR__IN_PROGRESS,
4491                                               rko->rko_replyq.version,
4492                                               NULL, NULL,
4493                                               RD_KAFKA_OFFSET_INVALID,
4494                                               "Group is %s",
4495                                               rkcg->rkcg_reply_rko ?
4496                                               "terminating":"terminated");
4497 			rd_kafka_q_destroy(rkq);
4498 			rd_kafka_op_destroy(rko);
4499 		}
4500                 return;
4501         }
4502 
4503         /* Mark for stopping, the actual state transition
4504          * is performed when all toppars have left. */
4505         rkcg->rkcg_flags |= RD_KAFKA_CGRP_F_TERMINATE;
4506 	rkcg->rkcg_ts_terminate = rd_clock();
4507         rkcg->rkcg_reply_rko = rko;
4508 
4509         if (rkcg->rkcg_flags & RD_KAFKA_CGRP_F_SUBSCRIPTION)
4510                 rd_kafka_cgrp_unsubscribe(
4511                         rkcg,
4512                         /* Leave group if this is a controlled shutdown */
4513                         !rd_kafka_destroy_flags_no_consumer_close(
4514                                 rkcg->rkcg_rk));
4515 
4516         /* Reset the wait-for-LeaveGroup flag if there is an outstanding
4517          * LeaveGroupRequest being waited on (from a prior unsubscribe), but
4518          * the destroy flags have NO_CONSUMER_CLOSE set, which calls
4519          * for immediate termination. */
4520         if (rd_kafka_destroy_flags_no_consumer_close(rkcg->rkcg_rk))
4521                 rkcg->rkcg_flags &= ~RD_KAFKA_CGRP_F_WAIT_LEAVE;
4522 
4523         /* If there's an oustanding rebalance which has not yet been
4524          * served by the application it will be served from consumer_close().
4525          * If the instance is being terminated with NO_CONSUMER_CLOSE we
4526          * trigger unassign directly to avoid stalling on rebalance callback
4527          * queues that are no longer served by the application. */
4528         if (!RD_KAFKA_CGRP_WAIT_ASSIGN_CALL(rkcg) ||
4529             rd_kafka_destroy_flags_no_consumer_close(rkcg->rkcg_rk))
4530                 rd_kafka_cgrp_unassign(rkcg);
4531 
4532         /* Serve assignment so it can start to decommission */
4533         rd_kafka_assignment_serve(rkcg->rkcg_rk);
4534 
4535         /* Try to terminate right away if all preconditions are met. */
4536         rd_kafka_cgrp_try_terminate(rkcg);
4537 }
4538 
4539 
4540 /**
4541  * Terminate and decommission a cgrp asynchronously.
4542  *
4543  * Locality: any thread
4544  */
rd_kafka_cgrp_terminate(rd_kafka_cgrp_t * rkcg,rd_kafka_replyq_t replyq)4545 void rd_kafka_cgrp_terminate (rd_kafka_cgrp_t *rkcg, rd_kafka_replyq_t replyq) {
4546 	rd_kafka_assert(NULL, !thrd_is_current(rkcg->rkcg_rk->rk_thread));
4547         rd_kafka_cgrp_op(rkcg, NULL, replyq, RD_KAFKA_OP_TERMINATE, 0);
4548 }
4549 
4550 
4551 struct _op_timeout_offset_commit {
4552         rd_ts_t now;
4553         rd_kafka_t *rk;
4554         rd_list_t expired;
4555 };
4556 
4557 /**
4558  * q_filter callback for expiring OFFSET_COMMIT timeouts.
4559  */
rd_kafka_op_offset_commit_timeout_check(rd_kafka_q_t * rkq,rd_kafka_op_t * rko,void * opaque)4560 static int rd_kafka_op_offset_commit_timeout_check (rd_kafka_q_t *rkq,
4561                                                     rd_kafka_op_t *rko,
4562                                                     void *opaque) {
4563         struct _op_timeout_offset_commit *state =
4564                 (struct _op_timeout_offset_commit*)opaque;
4565 
4566         if (likely(rko->rko_type != RD_KAFKA_OP_OFFSET_COMMIT ||
4567                    rko->rko_u.offset_commit.ts_timeout == 0 ||
4568                    rko->rko_u.offset_commit.ts_timeout > state->now)) {
4569                 return 0;
4570         }
4571 
4572         rd_kafka_q_deq0(rkq, rko);
4573 
4574         /* Add to temporary list to avoid recursive
4575          * locking of rkcg_wait_coord_q. */
4576         rd_list_add(&state->expired, rko);
4577         return 1;
4578 }
4579 
4580 
4581 /**
4582  * Scan for various timeouts.
4583  */
rd_kafka_cgrp_timeout_scan(rd_kafka_cgrp_t * rkcg,rd_ts_t now)4584 static void rd_kafka_cgrp_timeout_scan (rd_kafka_cgrp_t *rkcg, rd_ts_t now) {
4585         struct _op_timeout_offset_commit ofc_state;
4586         int i, cnt = 0;
4587         rd_kafka_op_t *rko;
4588 
4589         ofc_state.now = now;
4590         ofc_state.rk = rkcg->rkcg_rk;
4591         rd_list_init(&ofc_state.expired, 0, NULL);
4592 
4593         cnt += rd_kafka_q_apply(rkcg->rkcg_wait_coord_q,
4594                                 rd_kafka_op_offset_commit_timeout_check,
4595                                 &ofc_state);
4596 
4597         RD_LIST_FOREACH(rko, &ofc_state.expired, i)
4598                 rd_kafka_cgrp_op_handle_OffsetCommit(
4599                         rkcg->rkcg_rk, NULL,
4600                         RD_KAFKA_RESP_ERR__WAIT_COORD,
4601                         NULL, NULL, rko);
4602 
4603         rd_list_destroy(&ofc_state.expired);
4604 
4605         if (cnt > 0)
4606                 rd_kafka_dbg(rkcg->rkcg_rk, CGRP, "CGRPTIMEOUT",
4607                              "Group \"%.*s\": timed out %d op(s), %d remain",
4608                              RD_KAFKAP_STR_PR(rkcg->rkcg_group_id), cnt,
4609                              rd_kafka_q_len(rkcg->rkcg_wait_coord_q));
4610 
4611 
4612 }
4613 
4614 
4615 /**
4616  * @brief Handle an assign op.
4617  * @locality rdkafka main thread
4618  * @locks none
4619  */
rd_kafka_cgrp_handle_assign_op(rd_kafka_cgrp_t * rkcg,rd_kafka_op_t * rko)4620 static void rd_kafka_cgrp_handle_assign_op (rd_kafka_cgrp_t *rkcg,
4621                                             rd_kafka_op_t *rko) {
4622         rd_kafka_error_t *error = NULL;
4623 
4624         if (rd_kafka_cgrp_rebalance_protocol(rkcg) ==
4625             RD_KAFKA_REBALANCE_PROTOCOL_COOPERATIVE &&
4626             !(rko->rko_u.assign.method == RD_KAFKA_ASSIGN_METHOD_INCR_ASSIGN ||
4627               rko->rko_u.assign.method == RD_KAFKA_ASSIGN_METHOD_INCR_UNASSIGN))
4628                 error = rd_kafka_error_new(RD_KAFKA_RESP_ERR__STATE,
4629                                            "Changes to the current assignment "
4630                                            "must be made using "
4631                                            "incremental_assign() or "
4632                                            "incremental_unassign() "
4633                                            "when rebalance protocol type is "
4634                                            "COOPERATIVE");
4635 
4636         else if (rd_kafka_cgrp_rebalance_protocol(rkcg) ==
4637                  RD_KAFKA_REBALANCE_PROTOCOL_EAGER &&
4638                  !(rko->rko_u.assign.method == RD_KAFKA_ASSIGN_METHOD_ASSIGN))
4639                 error = rd_kafka_error_new(RD_KAFKA_RESP_ERR__STATE,
4640                                            "Changes to the current assignment "
4641                                            "must be made using "
4642                                            "assign() when rebalance "
4643                                            "protocol type is EAGER");
4644 
4645         else if (rd_kafka_fatal_error_code(rkcg->rkcg_rk) ||
4646                  rkcg->rkcg_flags & RD_KAFKA_CGRP_F_TERMINATE) {
4647                 /* Treat all assignments as unassign when a fatal error is
4648                  * raised or the cgrp is terminating. */
4649 
4650                 rd_kafka_dbg(rkcg->rkcg_rk, CGRP|RD_KAFKA_DBG_CONSUMER,
4651                              "ASSIGN", "Group \"%s\": Consumer %s: "
4652                              "treating assign as unassign",
4653                              rkcg->rkcg_group_id->str,
4654                              rd_kafka_fatal_error_code(rkcg->rkcg_rk) ?
4655                              "has raised a fatal error" : "is terminating");
4656 
4657                 if (rko->rko_u.assign.partitions) {
4658                         rd_kafka_topic_partition_list_destroy(
4659                                 rko->rko_u.assign.partitions);
4660                         rko->rko_u.assign.partitions = NULL;
4661                 }
4662                 rko->rko_u.assign.method = RD_KAFKA_ASSIGN_METHOD_ASSIGN;
4663         }
4664 
4665         if (!error) {
4666                 switch (rko->rko_u.assign.method)
4667                 {
4668                 case RD_KAFKA_ASSIGN_METHOD_ASSIGN:
4669                         /* New atomic assignment (partitions != NULL),
4670                          * or unassignment (partitions == NULL) */
4671                         if (rko->rko_u.assign.partitions)
4672                                 error = rd_kafka_cgrp_assign(
4673                                         rkcg,
4674                                         rko->rko_u.assign.partitions);
4675                         else
4676                                 error = rd_kafka_cgrp_unassign(rkcg);
4677                         break;
4678                 case RD_KAFKA_ASSIGN_METHOD_INCR_ASSIGN:
4679                         error = rd_kafka_cgrp_incremental_assign(
4680                                         rkcg,
4681                                         rko->rko_u.assign.partitions);
4682                         break;
4683                 case RD_KAFKA_ASSIGN_METHOD_INCR_UNASSIGN:
4684                         error = rd_kafka_cgrp_incremental_unassign(
4685                                         rkcg,
4686                                         rko->rko_u.assign.partitions);
4687                         break;
4688                 default:
4689                         RD_NOTREACHED();
4690                         break;
4691                 }
4692 
4693                 /* If call succeeded serve the assignment */
4694                 if (!error)
4695                         rd_kafka_assignment_serve(rkcg->rkcg_rk);
4696 
4697 
4698         }
4699 
4700         if (error) {
4701                 /* Log error since caller might not check
4702                  * *assign() return value. */
4703                 rd_kafka_log(rkcg->rkcg_rk, LOG_WARNING, "ASSIGN",
4704                              "Group \"%s\": application *assign() call "
4705                              "failed: %s",
4706                              rkcg->rkcg_group_id->str,
4707                              rd_kafka_error_string(error));
4708         }
4709 
4710         rd_kafka_op_error_reply(rko, error);
4711 }
4712 
4713 
4714 /**
4715  * @brief Handle cgrp queue op.
4716  * @locality rdkafka main thread
4717  * @locks none
4718  */
4719 static rd_kafka_op_res_t
rd_kafka_cgrp_op_serve(rd_kafka_t * rk,rd_kafka_q_t * rkq,rd_kafka_op_t * rko,rd_kafka_q_cb_type_t cb_type,void * opaque)4720 rd_kafka_cgrp_op_serve (rd_kafka_t *rk, rd_kafka_q_t *rkq,
4721                         rd_kafka_op_t *rko, rd_kafka_q_cb_type_t cb_type,
4722                         void *opaque) {
4723         rd_kafka_cgrp_t *rkcg = opaque;
4724         rd_kafka_toppar_t *rktp;
4725         rd_kafka_resp_err_t err;
4726         const int silent_op = rko->rko_type == RD_KAFKA_OP_RECV_BUF;
4727 
4728         rktp = rko->rko_rktp;
4729 
4730         if (rktp && !silent_op)
4731                 rd_kafka_dbg(rkcg->rkcg_rk, CGRP, "CGRPOP",
4732                              "Group \"%.*s\" received op %s in state %s "
4733                              "(join-state %s) for %.*s [%"PRId32"]",
4734                              RD_KAFKAP_STR_PR(rkcg->rkcg_group_id),
4735                              rd_kafka_op2str(rko->rko_type),
4736                              rd_kafka_cgrp_state_names[rkcg->rkcg_state],
4737                              rd_kafka_cgrp_join_state_names[rkcg->
4738                                                             rkcg_join_state],
4739                              RD_KAFKAP_STR_PR(rktp->rktp_rkt->rkt_topic),
4740                              rktp->rktp_partition);
4741         else if (!silent_op)
4742                 rd_kafka_dbg(rkcg->rkcg_rk, CGRP, "CGRPOP",
4743                              "Group \"%.*s\" received op %s in state %s "
4744                              "(join-state %s)",
4745                              RD_KAFKAP_STR_PR(rkcg->rkcg_group_id),
4746                              rd_kafka_op2str(rko->rko_type),
4747                              rd_kafka_cgrp_state_names[rkcg->rkcg_state],
4748                              rd_kafka_cgrp_join_state_names[rkcg->
4749                                                             rkcg_join_state]);
4750 
4751         switch ((int)rko->rko_type)
4752         {
4753         case RD_KAFKA_OP_NAME:
4754                 /* Return the currently assigned member id. */
4755                 if (rkcg->rkcg_member_id)
4756                         rko->rko_u.name.str =
4757                                 RD_KAFKAP_STR_DUP(rkcg->rkcg_member_id);
4758                 rd_kafka_op_reply(rko, 0);
4759                 rko = NULL;
4760                 break;
4761 
4762         case RD_KAFKA_OP_CG_METADATA:
4763                 /* Return the current consumer group metadata. */
4764                 rko->rko_u.cg_metadata = rkcg->rkcg_member_id
4765                         ? rd_kafka_consumer_group_metadata_new_with_genid(
4766                                 rkcg->rkcg_rk->rk_conf.group_id_str,
4767                                 rkcg->rkcg_generation_id,
4768                                 rkcg->rkcg_member_id->str,
4769                                 rkcg->rkcg_rk->rk_conf.group_instance_id)
4770                         : NULL;
4771                 rd_kafka_op_reply(rko, RD_KAFKA_RESP_ERR_NO_ERROR);
4772                 rko = NULL;
4773                 break;
4774 
4775         case RD_KAFKA_OP_OFFSET_FETCH:
4776                 if (rkcg->rkcg_state != RD_KAFKA_CGRP_STATE_UP ||
4777                     (rkcg->rkcg_flags & RD_KAFKA_CGRP_F_TERMINATE)) {
4778                         rd_kafka_op_handle_OffsetFetch(
4779                                 rkcg->rkcg_rk, NULL,
4780                                 RD_KAFKA_RESP_ERR__WAIT_COORD,
4781                                 NULL, NULL, rko);
4782                         rko = NULL; /* rko freed by handler */
4783                         break;
4784                 }
4785 
4786                 rd_kafka_OffsetFetchRequest(
4787                         rkcg->rkcg_coord,
4788                         rko->rko_u.offset_fetch.partitions,
4789                         rko->rko_u.offset_fetch.require_stable,
4790                         RD_KAFKA_REPLYQ(rkcg->rkcg_ops, 0),
4791                         rd_kafka_op_handle_OffsetFetch, rko);
4792                 rko = NULL; /* rko now owned by request */
4793                 break;
4794 
4795         case RD_KAFKA_OP_PARTITION_JOIN:
4796                 rd_kafka_cgrp_partition_add(rkcg, rktp);
4797 
4798                 /* If terminating tell the partition to leave */
4799                 if (rkcg->rkcg_flags & RD_KAFKA_CGRP_F_TERMINATE)
4800                         rd_kafka_toppar_op_fetch_stop(
4801                                 rktp, RD_KAFKA_NO_REPLYQ);
4802                 break;
4803 
4804         case RD_KAFKA_OP_PARTITION_LEAVE:
4805                 rd_kafka_cgrp_partition_del(rkcg, rktp);
4806                 break;
4807 
4808         case RD_KAFKA_OP_OFFSET_COMMIT:
4809                 /* Trigger offsets commit. */
4810                 rd_kafka_cgrp_offsets_commit(rkcg, rko,
4811                                              /* only set offsets
4812                                               * if no partitions were
4813                                               * specified. */
4814                                              rko->rko_u.offset_commit.
4815                                              partitions ?
4816                                              0 : 1 /* set_offsets*/,
4817                                              rko->rko_u.offset_commit.reason);
4818                 rko = NULL; /* rko now owned by request */
4819                 break;
4820 
4821         case RD_KAFKA_OP_COORD_QUERY:
4822                 rd_kafka_cgrp_coord_query(rkcg,
4823                                           rko->rko_err ?
4824                                           rd_kafka_err2str(rko->
4825                                                            rko_err):
4826                                           "from op");
4827                 break;
4828 
4829         case RD_KAFKA_OP_SUBSCRIBE:
4830                 rd_kafka_app_polled(rk);
4831 
4832                 /* New atomic subscription (may be NULL) */
4833                 err = rd_kafka_cgrp_subscribe(
4834                         rkcg, rko->rko_u.subscribe.topics);
4835 
4836                 if (!err) /* now owned by rkcg */
4837                         rko->rko_u.subscribe.topics = NULL;
4838 
4839                 rd_kafka_op_reply(rko, err);
4840                 rko = NULL;
4841                 break;
4842 
4843         case RD_KAFKA_OP_ASSIGN:
4844                 rd_kafka_cgrp_handle_assign_op(rkcg, rko);
4845                 rko = NULL;
4846                 break;
4847 
4848         case RD_KAFKA_OP_GET_SUBSCRIPTION:
4849                 if (rkcg->rkcg_next_subscription)
4850                         rko->rko_u.subscribe.topics =
4851                                 rd_kafka_topic_partition_list_copy(
4852                                         rkcg->rkcg_next_subscription);
4853                 else if (rkcg->rkcg_next_unsubscribe)
4854                         rko->rko_u.subscribe.topics = NULL;
4855                 else if (rkcg->rkcg_subscription)
4856                         rko->rko_u.subscribe.topics =
4857                                 rd_kafka_topic_partition_list_copy(
4858                                         rkcg->rkcg_subscription);
4859                 rd_kafka_op_reply(rko, 0);
4860                 rko = NULL;
4861                 break;
4862 
4863         case RD_KAFKA_OP_GET_ASSIGNMENT:
4864                 /* This is the consumer assignment, not the group assignment. */
4865                 rko->rko_u.assign.partitions =
4866                         rd_kafka_topic_partition_list_copy(
4867                                 rkcg->rkcg_rk->rk_consumer.assignment.all);
4868 
4869                 rd_kafka_op_reply(rko, 0);
4870                 rko = NULL;
4871                 break;
4872 
4873         case RD_KAFKA_OP_GET_REBALANCE_PROTOCOL:
4874                 rko->rko_u.rebalance_protocol.str =
4875                         rd_kafka_rebalance_protocol2str(
4876                                 rd_kafka_cgrp_rebalance_protocol(rkcg));
4877                 rd_kafka_op_reply(rko, RD_KAFKA_RESP_ERR_NO_ERROR);
4878                 rko = NULL;
4879                 break;
4880 
4881         case RD_KAFKA_OP_TERMINATE:
4882                 rd_kafka_cgrp_terminate0(rkcg, rko);
4883                 rko = NULL; /* terminate0() takes ownership */
4884                 break;
4885 
4886         default:
4887                 rd_kafka_assert(rkcg->rkcg_rk, !*"unknown type");
4888                 break;
4889         }
4890 
4891         if (rko)
4892                 rd_kafka_op_destroy(rko);
4893 
4894         return RD_KAFKA_OP_RES_HANDLED;
4895 }
4896 
4897 
4898 /**
4899  * @returns true if the session timeout has expired (due to no successful
4900  *          Heartbeats in session.timeout.ms) and triggers a rebalance.
4901  */
4902 static rd_bool_t
rd_kafka_cgrp_session_timeout_check(rd_kafka_cgrp_t * rkcg,rd_ts_t now)4903 rd_kafka_cgrp_session_timeout_check (rd_kafka_cgrp_t *rkcg, rd_ts_t now) {
4904         rd_ts_t delta;
4905         char buf[256];
4906 
4907         if (unlikely(!rkcg->rkcg_ts_session_timeout))
4908                 return rd_true; /* Session has expired */
4909 
4910         delta = now - rkcg->rkcg_ts_session_timeout;
4911         if (likely(delta < 0))
4912                 return rd_false;
4913 
4914         delta += rkcg->rkcg_rk->rk_conf.group_session_timeout_ms * 1000;
4915 
4916         rd_snprintf(buf, sizeof(buf),
4917                     "Consumer group session timed out (in join-state %s) after "
4918                     "%"PRId64" ms without a successful response from the "
4919                     "group coordinator (broker %"PRId32", last error was %s)",
4920                     rd_kafka_cgrp_join_state_names[rkcg->rkcg_join_state],
4921                     delta/1000, rkcg->rkcg_coord_id,
4922                     rd_kafka_err2str(rkcg->rkcg_last_heartbeat_err));
4923 
4924         rkcg->rkcg_last_heartbeat_err = RD_KAFKA_RESP_ERR_NO_ERROR;
4925 
4926         rd_kafka_log(rkcg->rkcg_rk, LOG_WARNING, "SESSTMOUT",
4927                      "%s: revoking assignment and rejoining group", buf);
4928 
4929         /* Prevent further rebalances */
4930         rkcg->rkcg_ts_session_timeout = 0;
4931 
4932         /* Timing out invalidates the member id, reset it
4933          * now to avoid an ERR_UNKNOWN_MEMBER_ID on the next join. */
4934         rd_kafka_cgrp_set_member_id(rkcg, "");
4935 
4936         /* Revoke and rebalance */
4937         rd_kafka_cgrp_revoke_all_rejoin_maybe(rkcg,
4938                                               rd_true/*lost*/,
4939                                               rd_true/*initiating*/,
4940                                               buf);
4941 
4942         return rd_true;
4943 }
4944 
4945 
4946 /**
4947  * @brief Apply the next waiting subscribe/unsubscribe, if any.
4948  */
rd_kafka_cgrp_apply_next_subscribe(rd_kafka_cgrp_t * rkcg)4949 static void rd_kafka_cgrp_apply_next_subscribe (rd_kafka_cgrp_t *rkcg) {
4950         rd_assert(rkcg->rkcg_join_state == RD_KAFKA_CGRP_JOIN_STATE_INIT);
4951 
4952         if (rkcg->rkcg_next_subscription) {
4953                 rd_kafka_topic_partition_list_t *next_subscription =
4954                         rkcg->rkcg_next_subscription;
4955                 rd_kafka_dbg(rkcg->rkcg_rk, CGRP, "SUBSCRIBE",
4956                              "Group \"%s\": invoking waiting postponed "
4957                              "subscribe", rkcg->rkcg_group_id->str);
4958                 rkcg->rkcg_next_subscription = NULL;
4959                 rd_kafka_cgrp_subscribe(rkcg, next_subscription);
4960 
4961         } else if (rkcg->rkcg_next_unsubscribe) {
4962                 rd_kafka_dbg(rkcg->rkcg_rk, CGRP, "SUBSCRIBE",
4963                              "Group \"%s\": invoking waiting postponed "
4964                              "unsubscribe", rkcg->rkcg_group_id->str);
4965                 rkcg->rkcg_next_unsubscribe = rd_false;
4966                 rd_kafka_cgrp_unsubscribe(rkcg, rd_true/*Leave*/);
4967         }
4968 }
4969 
4970 /**
4971  * Client group's join state handling
4972  */
rd_kafka_cgrp_join_state_serve(rd_kafka_cgrp_t * rkcg)4973 static void rd_kafka_cgrp_join_state_serve (rd_kafka_cgrp_t *rkcg) {
4974         rd_ts_t now = rd_clock();
4975 
4976         if (unlikely(rd_kafka_fatal_error_code(rkcg->rkcg_rk)))
4977                 return;
4978 
4979         switch (rkcg->rkcg_join_state)
4980         {
4981         case RD_KAFKA_CGRP_JOIN_STATE_INIT:
4982                 /* If there is a next subscription, apply it.  */
4983                 rd_kafka_cgrp_apply_next_subscribe(rkcg);
4984 
4985                 /* If we have a subscription start the join process. */
4986                 if (!rkcg->rkcg_subscription)
4987                         break;
4988 
4989                 if (rd_interval_immediate(&rkcg->rkcg_join_intvl,
4990 					  1000*1000, now) > 0)
4991                         rd_kafka_cgrp_join(rkcg);
4992                 break;
4993 
4994         case RD_KAFKA_CGRP_JOIN_STATE_WAIT_JOIN:
4995         case RD_KAFKA_CGRP_JOIN_STATE_WAIT_METADATA:
4996         case RD_KAFKA_CGRP_JOIN_STATE_WAIT_SYNC:
4997         case RD_KAFKA_CGRP_JOIN_STATE_WAIT_UNASSIGN_TO_COMPLETE:
4998                 /* FIXME: I think we might have to send heartbeats in
4999                  *        in WAIT_INCR_UNASSIGN, yes-no? */
5000         case RD_KAFKA_CGRP_JOIN_STATE_WAIT_INCR_UNASSIGN_TO_COMPLETE:
5001 		break;
5002 
5003         case RD_KAFKA_CGRP_JOIN_STATE_STEADY:
5004         case RD_KAFKA_CGRP_JOIN_STATE_WAIT_ASSIGN_CALL:
5005         case RD_KAFKA_CGRP_JOIN_STATE_WAIT_UNASSIGN_CALL:
5006                 if (rkcg->rkcg_flags & RD_KAFKA_CGRP_F_SUBSCRIPTION &&
5007                     rd_interval(&rkcg->rkcg_heartbeat_intvl,
5008                                 rkcg->rkcg_rk->rk_conf.
5009                                 group_heartbeat_intvl_ms * 1000, now) > 0)
5010                         rd_kafka_cgrp_heartbeat(rkcg);
5011                 break;
5012         }
5013 
5014 }
5015 /**
5016  * Client group handling.
5017  * Called from main thread to serve the operational aspects of a cgrp.
5018  */
rd_kafka_cgrp_serve(rd_kafka_cgrp_t * rkcg)5019 void rd_kafka_cgrp_serve (rd_kafka_cgrp_t *rkcg) {
5020 	rd_kafka_broker_t *rkb = rkcg->rkcg_coord;
5021 	int rkb_state = RD_KAFKA_BROKER_STATE_INIT;
5022         rd_ts_t now;
5023 
5024 	if (rkb) {
5025 		rd_kafka_broker_lock(rkb);
5026 		rkb_state = rkb->rkb_state;
5027 		rd_kafka_broker_unlock(rkb);
5028 
5029 		/* Go back to querying state if we lost the current coordinator
5030 		 * connection. */
5031 		if (rkb_state < RD_KAFKA_BROKER_STATE_UP &&
5032 		    rkcg->rkcg_state == RD_KAFKA_CGRP_STATE_UP)
5033 			rd_kafka_cgrp_set_state(rkcg,
5034 						RD_KAFKA_CGRP_STATE_QUERY_COORD);
5035 	}
5036 
5037         now = rd_clock();
5038 
5039 	/* Check for cgrp termination */
5040 	if (unlikely(rd_kafka_cgrp_try_terminate(rkcg))) {
5041                 rd_kafka_cgrp_terminated(rkcg);
5042                 return; /* cgrp terminated */
5043         }
5044 
5045         /* Bail out if we're terminating. */
5046         if (unlikely(rd_kafka_terminating(rkcg->rkcg_rk)))
5047                 return;
5048 
5049         /* Check session timeout regardless of current coordinator
5050          * connection state (rkcg_state) */
5051         if (rkcg->rkcg_join_state == RD_KAFKA_CGRP_JOIN_STATE_STEADY)
5052                 rd_kafka_cgrp_session_timeout_check(rkcg, now);
5053 
5054  retry:
5055         switch (rkcg->rkcg_state)
5056         {
5057         case RD_KAFKA_CGRP_STATE_TERM:
5058                 break;
5059 
5060         case RD_KAFKA_CGRP_STATE_INIT:
5061                 rd_kafka_cgrp_set_state(rkcg, RD_KAFKA_CGRP_STATE_QUERY_COORD);
5062                 /* FALLTHRU */
5063 
5064         case RD_KAFKA_CGRP_STATE_QUERY_COORD:
5065                 /* Query for coordinator. */
5066                 if (rd_interval_immediate(&rkcg->rkcg_coord_query_intvl,
5067 					  500*1000, now) > 0)
5068                         rd_kafka_cgrp_coord_query(rkcg,
5069                                                   "intervaled in "
5070                                                   "state query-coord");
5071                 break;
5072 
5073         case RD_KAFKA_CGRP_STATE_WAIT_COORD:
5074                 /* Waiting for FindCoordinator response */
5075                 break;
5076 
5077         case RD_KAFKA_CGRP_STATE_WAIT_BROKER:
5078                 /* See if the group should be reassigned to another broker. */
5079                 if (rd_kafka_cgrp_coord_update(rkcg, rkcg->rkcg_coord_id))
5080                         goto retry; /* Coordinator changed, retry state-machine
5081                                      * to speed up next transition. */
5082 
5083                 /* Coordinator query */
5084                 if (rd_interval(&rkcg->rkcg_coord_query_intvl,
5085 				1000*1000, now) > 0)
5086                         rd_kafka_cgrp_coord_query(rkcg,
5087                                                   "intervaled in "
5088                                                   "state wait-broker");
5089                 break;
5090 
5091         case RD_KAFKA_CGRP_STATE_WAIT_BROKER_TRANSPORT:
5092                 /* Waiting for broker transport to come up.
5093 		 * Also make sure broker supports groups. */
5094                 if (rkb_state < RD_KAFKA_BROKER_STATE_UP || !rkb ||
5095 		    !rd_kafka_broker_supports(
5096 			    rkb, RD_KAFKA_FEATURE_BROKER_GROUP_COORD)) {
5097 			/* Coordinator query */
5098 			if (rd_interval(&rkcg->rkcg_coord_query_intvl,
5099 					1000*1000, now) > 0)
5100 				rd_kafka_cgrp_coord_query(
5101 					rkcg,
5102 					"intervaled in state "
5103 					"wait-broker-transport");
5104 
5105                 } else {
5106                         rd_kafka_cgrp_set_state(rkcg, RD_KAFKA_CGRP_STATE_UP);
5107 
5108                         /* Serve join state to trigger (re)join */
5109                         rd_kafka_cgrp_join_state_serve(rkcg);
5110 
5111                         /* Serve any pending partitions in the assignment */
5112                         rd_kafka_assignment_serve(rkcg->rkcg_rk);
5113                 }
5114                 break;
5115 
5116         case RD_KAFKA_CGRP_STATE_UP:
5117 		/* Move any ops awaiting the coordinator to the ops queue
5118 		 * for reprocessing. */
5119 		rd_kafka_q_concat(rkcg->rkcg_ops, rkcg->rkcg_wait_coord_q);
5120 
5121                 /* Relaxed coordinator queries. */
5122                 if (rd_interval(&rkcg->rkcg_coord_query_intvl,
5123                                 rkcg->rkcg_rk->rk_conf.
5124                                 coord_query_intvl_ms * 1000, now) > 0)
5125                         rd_kafka_cgrp_coord_query(rkcg,
5126                                                   "intervaled in state up");
5127 
5128                 rd_kafka_cgrp_join_state_serve(rkcg);
5129                 break;
5130 
5131         }
5132 
5133         if (unlikely(rkcg->rkcg_state != RD_KAFKA_CGRP_STATE_UP &&
5134                      rd_interval(&rkcg->rkcg_timeout_scan_intvl,
5135                                  1000*1000, now) > 0))
5136                 rd_kafka_cgrp_timeout_scan(rkcg, now);
5137 }
5138 
5139 
5140 
5141 
5142 
5143 /**
5144  * Send an op to a cgrp.
5145  *
5146  * Locality: any thread
5147  */
rd_kafka_cgrp_op(rd_kafka_cgrp_t * rkcg,rd_kafka_toppar_t * rktp,rd_kafka_replyq_t replyq,rd_kafka_op_type_t type,rd_kafka_resp_err_t err)5148 void rd_kafka_cgrp_op (rd_kafka_cgrp_t *rkcg, rd_kafka_toppar_t *rktp,
5149                        rd_kafka_replyq_t replyq, rd_kafka_op_type_t type,
5150                        rd_kafka_resp_err_t err) {
5151         rd_kafka_op_t *rko;
5152 
5153         rko = rd_kafka_op_new(type);
5154         rko->rko_err = err;
5155 	rko->rko_replyq = replyq;
5156 
5157 	if (rktp)
5158                 rko->rko_rktp = rd_kafka_toppar_keep(rktp);
5159 
5160         rd_kafka_q_enq(rkcg->rkcg_ops, rko);
5161 }
5162 
5163 
5164 
5165 
5166 
5167 
5168 
rd_kafka_cgrp_set_member_id(rd_kafka_cgrp_t * rkcg,const char * member_id)5169 void rd_kafka_cgrp_set_member_id (rd_kafka_cgrp_t *rkcg, const char *member_id){
5170         if (rkcg->rkcg_member_id && member_id &&
5171             !rd_kafkap_str_cmp_str(rkcg->rkcg_member_id, member_id))
5172                 return; /* No change */
5173 
5174         rd_kafka_dbg(rkcg->rkcg_rk, CGRP, "MEMBERID",
5175                      "Group \"%.*s\": updating member id \"%s\" -> \"%s\"",
5176                      RD_KAFKAP_STR_PR(rkcg->rkcg_group_id),
5177                      rkcg->rkcg_member_id ?
5178                      rkcg->rkcg_member_id->str : "(not-set)",
5179                      member_id ? member_id : "(not-set)");
5180 
5181         if (rkcg->rkcg_member_id) {
5182                 rd_kafkap_str_destroy(rkcg->rkcg_member_id);
5183                 rkcg->rkcg_member_id = NULL;
5184         }
5185 
5186         if (member_id)
5187                 rkcg->rkcg_member_id = rd_kafkap_str_new(member_id, -1);
5188 }
5189 
5190 
5191 /**
5192  * @brief Determine owned partitions that no longer exist (partitions in
5193  *        deleted or re-created topics).
5194  */
5195 static rd_kafka_topic_partition_list_t *
rd_kafka_cgrp_owned_but_not_exist_partitions(rd_kafka_cgrp_t * rkcg)5196 rd_kafka_cgrp_owned_but_not_exist_partitions (rd_kafka_cgrp_t *rkcg) {
5197         rd_kafka_topic_partition_list_t *result = NULL;
5198         const rd_kafka_topic_partition_t *curr;
5199 
5200         if (!rkcg->rkcg_group_assignment)
5201                 return NULL;
5202 
5203         RD_KAFKA_TPLIST_FOREACH(curr, rkcg->rkcg_group_assignment) {
5204                 if (rd_list_find(rkcg->rkcg_subscribed_topics,
5205                                  curr->topic, rd_kafka_topic_info_topic_cmp))
5206                         continue;
5207 
5208                 if (!result)
5209                         result = rd_kafka_topic_partition_list_new(
5210                                 rkcg->rkcg_group_assignment->cnt);
5211 
5212                 rd_kafka_topic_partition_list_add0(__FUNCTION__,__LINE__,
5213                                                    result,
5214                                                    curr->topic,
5215                                                    curr->partition,
5216                                                    curr->_private);
5217         }
5218 
5219         return result;
5220 }
5221 
5222 
5223 /**
5224  * @brief Check if the latest metadata affects the current subscription:
5225  * - matched topic added
5226  * - matched topic removed
5227  * - matched topic's partition count change
5228  *
5229  * @locks none
5230  * @locality rdkafka main thread
5231  */
rd_kafka_cgrp_metadata_update_check(rd_kafka_cgrp_t * rkcg,rd_bool_t do_join)5232 void rd_kafka_cgrp_metadata_update_check (rd_kafka_cgrp_t *rkcg,
5233                                           rd_bool_t do_join) {
5234         rd_list_t *tinfos;
5235         rd_kafka_topic_partition_list_t *errored;
5236         rd_bool_t changed;
5237 
5238         rd_kafka_assert(NULL, thrd_is_current(rkcg->rkcg_rk->rk_thread));
5239 
5240         if (!rkcg->rkcg_subscription || rkcg->rkcg_subscription->cnt == 0)
5241                 return;
5242 
5243         /*
5244          * Unmatched topics will be added to the errored list.
5245          */
5246         errored = rd_kafka_topic_partition_list_new(0);
5247 
5248         /*
5249          * Create a list of the topics in metadata that matches our subscription
5250          */
5251         tinfos = rd_list_new(rkcg->rkcg_subscription->cnt,
5252                              (void *)rd_kafka_topic_info_destroy);
5253 
5254         if (rkcg->rkcg_flags & RD_KAFKA_CGRP_F_WILDCARD_SUBSCRIPTION)
5255                 rd_kafka_metadata_topic_match(rkcg->rkcg_rk,
5256                                               tinfos, rkcg->rkcg_subscription,
5257                                               errored);
5258         else
5259                 rd_kafka_metadata_topic_filter(rkcg->rkcg_rk,
5260                                                tinfos,
5261                                                rkcg->rkcg_subscription,
5262                                                errored);
5263 
5264 
5265         /*
5266          * Propagate consumer errors for any non-existent or errored topics.
5267          * The function takes ownership of errored.
5268          */
5269         rd_kafka_propagate_consumer_topic_errors(
5270                 rkcg, errored, "Subscribed topic not available");
5271 
5272         /*
5273          * Update effective list of topics (takes ownership of \c tinfos)
5274          */
5275         changed = rd_kafka_cgrp_update_subscribed_topics(rkcg, tinfos);
5276 
5277         if (!do_join ||
5278             (!changed &&
5279              /* If we get the same effective list of topics as last time around,
5280               * but the join is waiting for this metadata query to complete,
5281               * then we should not return here but follow through with the
5282               * (re)join below. */
5283              rkcg->rkcg_join_state != RD_KAFKA_CGRP_JOIN_STATE_WAIT_METADATA))
5284                 return;
5285 
5286         /* List of subscribed topics changed, trigger rejoin. */
5287         rd_kafka_dbg(rkcg->rkcg_rk,
5288                      CGRP|RD_KAFKA_DBG_METADATA|RD_KAFKA_DBG_CONSUMER,
5289                      "REJOIN",
5290                      "Group \"%.*s\": "
5291                      "subscription updated from metadata change: "
5292                      "rejoining group",
5293                      RD_KAFKAP_STR_PR(rkcg->rkcg_group_id));
5294 
5295         if (rd_kafka_cgrp_rebalance_protocol(rkcg) ==
5296             RD_KAFKA_REBALANCE_PROTOCOL_COOPERATIVE) {
5297 
5298                 /* Partitions from deleted topics */
5299                 rd_kafka_topic_partition_list_t *owned_but_not_exist =
5300                         rd_kafka_cgrp_owned_but_not_exist_partitions(
5301                                 rkcg);
5302 
5303                 if (owned_but_not_exist) {
5304                         rd_kafka_cgrp_assignment_set_lost(
5305                                 rkcg,
5306                                 "%d subscribed topic(s) no longer exist",
5307                                 owned_but_not_exist->cnt);
5308 
5309                         rd_kafka_rebalance_op_incr(
5310                                 rkcg,
5311                                 RD_KAFKA_RESP_ERR__REVOKE_PARTITIONS,
5312                                 owned_but_not_exist,
5313                                 rkcg->rkcg_group_leader.members != NULL
5314                                 /* Rejoin group following revoke's
5315                                  * unassign if we are leader */,
5316                                 "topics not available");
5317                         rd_kafka_topic_partition_list_destroy(
5318                                 owned_but_not_exist);
5319 
5320                 } else {
5321                         /* Nothing to revoke, rejoin group if we are the
5322                          * leader.
5323                          * The KIP says to rejoin the group on metadata
5324                          * change only if we're the leader. But what if a
5325                          * non-leader is subscribed to a regex that the others
5326                          * aren't?
5327                          * Going against the KIP and rejoining here. */
5328                         rd_kafka_cgrp_rejoin(
5329                                 rkcg,
5330                                 "Metadata for subscribed topic(s) has "
5331                                 "changed");
5332 
5333                 }
5334 
5335         } else {
5336                 /* EAGER */
5337                 rd_kafka_cgrp_revoke_rejoin(rkcg,
5338                                             "Metadata for subscribed topic(s) "
5339                                             "has changed");
5340         }
5341 
5342         /* We shouldn't get stuck in this state. */
5343         rd_dassert(rkcg->rkcg_join_state !=
5344                    RD_KAFKA_CGRP_JOIN_STATE_WAIT_METADATA);
5345 }
5346 
5347 
rd_kafka_cgrp_handle_SyncGroup(rd_kafka_cgrp_t * rkcg,rd_kafka_broker_t * rkb,rd_kafka_resp_err_t err,const rd_kafkap_bytes_t * member_state)5348 void rd_kafka_cgrp_handle_SyncGroup (rd_kafka_cgrp_t *rkcg,
5349 				     rd_kafka_broker_t *rkb,
5350                                      rd_kafka_resp_err_t err,
5351                                      const rd_kafkap_bytes_t *member_state) {
5352         rd_kafka_buf_t *rkbuf = NULL;
5353         rd_kafka_topic_partition_list_t *assignment = NULL;
5354         const int log_decode_errors = LOG_ERR;
5355         int16_t Version;
5356         rd_kafkap_bytes_t UserData;
5357 
5358 	/* Dont handle new assignments when terminating */
5359 	if (!err && rkcg->rkcg_flags & RD_KAFKA_CGRP_F_TERMINATE)
5360 		err = RD_KAFKA_RESP_ERR__DESTROY;
5361 
5362         if (err)
5363                 goto err;
5364 
5365 	if (RD_KAFKAP_BYTES_LEN(member_state) == 0) {
5366 		/* Empty assignment. */
5367 		assignment = rd_kafka_topic_partition_list_new(0);
5368 		memset(&UserData, 0, sizeof(UserData));
5369 		goto done;
5370 	}
5371 
5372         /* Parse assignment from MemberState */
5373         rkbuf = rd_kafka_buf_new_shadow(member_state->data,
5374                                         RD_KAFKAP_BYTES_LEN(member_state),
5375                                         NULL);
5376 	/* Protocol parser needs a broker handle to log errors on. */
5377 	if (rkb) {
5378 		rkbuf->rkbuf_rkb = rkb;
5379 		rd_kafka_broker_keep(rkb);
5380 	} else
5381 		rkbuf->rkbuf_rkb = rd_kafka_broker_internal(rkcg->rkcg_rk);
5382 
5383         rd_kafka_buf_read_i16(rkbuf, &Version);
5384         if (!(assignment = rd_kafka_buf_read_topic_partitions(rkbuf, 0,
5385                                                               rd_false,
5386                                                               rd_false)))
5387                 goto err_parse;
5388         rd_kafka_buf_read_bytes(rkbuf, &UserData);
5389 
5390  done:
5391         rd_kafka_cgrp_update_session_timeout(rkcg, rd_true/*reset timeout*/);
5392 
5393         rd_assert(rkcg->rkcg_assignor);
5394         if (rkcg->rkcg_assignor->rkas_on_assignment_cb) {
5395                 char *member_id;
5396                 RD_KAFKAP_STR_DUPA(&member_id, rkcg->rkcg_member_id);
5397                 rd_kafka_consumer_group_metadata_t *cgmd =
5398                         rd_kafka_consumer_group_metadata_new_with_genid(
5399                                 rkcg->rkcg_rk->rk_conf.group_id_str,
5400                                 rkcg->rkcg_generation_id, member_id,
5401                                 rkcg->rkcg_rk->rk_conf.group_instance_id);
5402                 rkcg->rkcg_assignor->rkas_on_assignment_cb(
5403                         rkcg->rkcg_assignor,
5404                         &(rkcg->rkcg_assignor_state),
5405                         assignment, &UserData, cgmd);
5406                 rd_kafka_consumer_group_metadata_destroy(cgmd);
5407         }
5408 
5409         // FIXME: Remove when we're done debugging.
5410         rd_kafka_topic_partition_list_log(rkcg->rkcg_rk, "ASSIGNMENT",
5411                                           RD_KAFKA_DBG_CGRP,
5412                                           assignment);
5413 
5414         /* Set the new assignment */
5415         rd_kafka_cgrp_handle_assignment(rkcg, assignment);
5416 
5417         rd_kafka_topic_partition_list_destroy(assignment);
5418 
5419         if (rkbuf)
5420                 rd_kafka_buf_destroy(rkbuf);
5421 
5422         return;
5423 
5424  err_parse:
5425         err = rkbuf->rkbuf_err;
5426 
5427  err:
5428         if (rkbuf)
5429                 rd_kafka_buf_destroy(rkbuf);
5430 
5431         if (assignment)
5432                 rd_kafka_topic_partition_list_destroy(assignment);
5433 
5434         rd_kafka_dbg(rkcg->rkcg_rk, CGRP, "GRPSYNC",
5435                      "Group \"%s\": synchronization failed: %s: rejoining",
5436                      rkcg->rkcg_group_id->str, rd_kafka_err2str(err));
5437 
5438         if (err == RD_KAFKA_RESP_ERR_FENCED_INSTANCE_ID)
5439                 rd_kafka_set_fatal_error(rkcg->rkcg_rk, err,
5440                                          "Fatal consumer error: %s",
5441                                          rd_kafka_err2str(err));
5442         else if (err == RD_KAFKA_RESP_ERR_ILLEGAL_GENERATION)
5443                 rkcg->rkcg_generation_id = -1;
5444         else if (err == RD_KAFKA_RESP_ERR_UNKNOWN_MEMBER_ID)
5445                 rd_kafka_cgrp_set_member_id(rkcg, "");
5446 
5447         if (rd_kafka_cgrp_rebalance_protocol(rkcg) ==
5448             RD_KAFKA_REBALANCE_PROTOCOL_COOPERATIVE &&
5449             (err == RD_KAFKA_RESP_ERR_ILLEGAL_GENERATION ||
5450              err == RD_KAFKA_RESP_ERR_UNKNOWN_MEMBER_ID))
5451                 rd_kafka_cgrp_revoke_all_rejoin(
5452                         rkcg,
5453                         rd_true/*assignment is lost*/,
5454                         rd_true/*this consumer is initiating*/,
5455                         "SyncGroup error");
5456         else
5457                 rd_kafka_cgrp_rejoin(rkcg, "SyncGroup error: %s",
5458                                      rd_kafka_err2str(err));
5459 }
5460 
5461 
5462 
5463 rd_kafka_consumer_group_metadata_t *
rd_kafka_consumer_group_metadata_new(const char * group_id)5464 rd_kafka_consumer_group_metadata_new (const char *group_id) {
5465         rd_kafka_consumer_group_metadata_t *cgmetadata;
5466 
5467         cgmetadata = rd_kafka_consumer_group_metadata_new_with_genid(group_id,
5468                                                                      -1, "",
5469                                                                      NULL);
5470 
5471         return cgmetadata;
5472 }
5473 
5474 rd_kafka_consumer_group_metadata_t *
rd_kafka_consumer_group_metadata_new_with_genid(const char * group_id,int32_t generation_id,const char * member_id,const char * group_instance_id)5475 rd_kafka_consumer_group_metadata_new_with_genid (const char *group_id,
5476                                                  int32_t generation_id,
5477                                                  const char *member_id,
5478                                                  const char
5479                                                  *group_instance_id) {
5480         rd_kafka_consumer_group_metadata_t *cgmetadata;
5481 
5482         cgmetadata = rd_calloc(1, sizeof(*cgmetadata));
5483         cgmetadata->group_id = rd_strdup(group_id);
5484         cgmetadata->generation_id = generation_id;
5485         cgmetadata->member_id = rd_strdup(member_id);
5486         if (group_instance_id)
5487                 cgmetadata->group_instance_id = rd_strdup(group_instance_id);
5488 
5489         return cgmetadata;
5490 }
5491 
5492 rd_kafka_consumer_group_metadata_t *
rd_kafka_consumer_group_metadata(rd_kafka_t * rk)5493 rd_kafka_consumer_group_metadata (rd_kafka_t *rk) {
5494         rd_kafka_consumer_group_metadata_t *cgmetadata;
5495         rd_kafka_op_t *rko;
5496         rd_kafka_cgrp_t *rkcg;
5497 
5498         if (!(rkcg = rd_kafka_cgrp_get(rk)))
5499                 return NULL;
5500 
5501         rko = rd_kafka_op_req2(rkcg->rkcg_ops, RD_KAFKA_OP_CG_METADATA);
5502         if (!rko)
5503                 return NULL;
5504 
5505         cgmetadata = rko->rko_u.cg_metadata;
5506         rko->rko_u.cg_metadata = NULL;
5507         rd_kafka_op_destroy(rko);
5508 
5509         return cgmetadata;
5510 }
5511 
5512 void
rd_kafka_consumer_group_metadata_destroy(rd_kafka_consumer_group_metadata_t * cgmetadata)5513 rd_kafka_consumer_group_metadata_destroy (
5514         rd_kafka_consumer_group_metadata_t *cgmetadata) {
5515         rd_free(cgmetadata->group_id);
5516         rd_free(cgmetadata->member_id);
5517         if (cgmetadata->group_instance_id)
5518                 rd_free(cgmetadata->group_instance_id);
5519         rd_free(cgmetadata);
5520 }
5521 
5522 rd_kafka_consumer_group_metadata_t *
rd_kafka_consumer_group_metadata_dup(const rd_kafka_consumer_group_metadata_t * cgmetadata)5523 rd_kafka_consumer_group_metadata_dup (
5524         const rd_kafka_consumer_group_metadata_t *cgmetadata) {
5525         rd_kafka_consumer_group_metadata_t *ret;
5526 
5527         ret = rd_calloc(1, sizeof(*cgmetadata));
5528         ret->group_id = rd_strdup(cgmetadata->group_id);
5529         ret->generation_id = cgmetadata->generation_id;
5530         ret->member_id = rd_strdup(cgmetadata->member_id);
5531         if (cgmetadata->group_instance_id)
5532                 ret->group_instance_id = rd_strdup(
5533                         cgmetadata->group_instance_id);
5534 
5535         return ret;
5536 }
5537 
5538 /*
5539  * Consumer group metadata serialization format v2:
5540  *    "CGMDv2:"<generation_id><group_id>"\0"<member_id>"\0" \
5541  *    <group_instance_id_is_null>[<group_instance_id>"\0"]
5542  * Where <group_id> is the group_id string.
5543  */
5544 static const char rd_kafka_consumer_group_metadata_magic[7] = "CGMDv2:";
5545 
rd_kafka_consumer_group_metadata_write(const rd_kafka_consumer_group_metadata_t * cgmd,void ** bufferp,size_t * sizep)5546 rd_kafka_error_t *rd_kafka_consumer_group_metadata_write (
5547         const rd_kafka_consumer_group_metadata_t *cgmd,
5548         void **bufferp, size_t *sizep) {
5549         char *buf;
5550         size_t size;
5551         size_t of = 0;
5552         size_t magic_len = sizeof(rd_kafka_consumer_group_metadata_magic);
5553         size_t groupid_len = strlen(cgmd->group_id) + 1;
5554         size_t generationid_len = sizeof(cgmd->generation_id);
5555         size_t member_id_len = strlen(cgmd->member_id) + 1;
5556         int8_t group_instance_id_is_null = cgmd->group_instance_id ? 0 : 1;
5557         size_t group_instance_id_is_null_len = sizeof(group_instance_id_is_null);
5558         size_t group_instance_id_len = cgmd->group_instance_id
5559                 ? strlen(cgmd->group_instance_id) + 1 : 0;
5560 
5561         size = magic_len + groupid_len + generationid_len + member_id_len +
5562                group_instance_id_is_null_len + group_instance_id_len;
5563 
5564         buf = rd_malloc(size);
5565 
5566         memcpy(buf, rd_kafka_consumer_group_metadata_magic, magic_len);
5567         of += magic_len;
5568 
5569         memcpy(buf+of, &cgmd->generation_id, generationid_len);
5570         of += generationid_len;
5571 
5572         memcpy(buf+of, cgmd->group_id, groupid_len);
5573         of += groupid_len;
5574 
5575         memcpy(buf+of, cgmd->member_id, member_id_len);
5576         of += member_id_len;
5577 
5578         memcpy(buf+of, &group_instance_id_is_null, group_instance_id_is_null_len);
5579         of += group_instance_id_is_null_len;
5580 
5581         if (!group_instance_id_is_null)
5582                 memcpy(buf+of, cgmd->group_instance_id, group_instance_id_len);
5583         of += group_instance_id_len;
5584 
5585         rd_assert(of == size);
5586 
5587         *bufferp = buf;
5588         *sizep = size;
5589 
5590         return NULL;
5591 }
5592 
5593 
5594 /*
5595  * Check that a string is printable, returning NULL if not or
5596  * a pointer immediately after the end of the string NUL
5597  * terminator if so.
5598  **/
str_is_printable(const char * s,const char * end)5599 static const char *str_is_printable(const char *s, const char *end) {
5600         const char *c;
5601         for (c = s ; *c && c != end ; c++)
5602                 if (!isprint((int)*c))
5603                         return NULL;
5604         return c + 1;
5605 }
5606 
5607 
rd_kafka_consumer_group_metadata_read(rd_kafka_consumer_group_metadata_t ** cgmdp,const void * buffer,size_t size)5608 rd_kafka_error_t *rd_kafka_consumer_group_metadata_read (
5609         rd_kafka_consumer_group_metadata_t **cgmdp,
5610         const void *buffer, size_t size) {
5611         const char *buf = (const char *)buffer;
5612         const char *end = buf + size;
5613         const char *next;
5614         size_t magic_len = sizeof(rd_kafka_consumer_group_metadata_magic);
5615         int32_t generation_id;
5616         size_t generationid_len = sizeof(generation_id);
5617         const char *group_id;
5618         const char *member_id;
5619         int8_t group_instance_id_is_null;
5620         const char *group_instance_id = NULL;
5621 
5622         if (size < magic_len + generationid_len + 1 + 1 + 1)
5623                 return rd_kafka_error_new(RD_KAFKA_RESP_ERR__BAD_MSG,
5624                                           "Input buffer is too short");
5625 
5626         if (memcmp(buffer, rd_kafka_consumer_group_metadata_magic, magic_len))
5627                 return rd_kafka_error_new(RD_KAFKA_RESP_ERR__BAD_MSG,
5628                                           "Input buffer is not a serialized "
5629                                           "consumer group metadata object");
5630         memcpy(&generation_id, buf+magic_len, generationid_len);
5631 
5632         group_id = buf + magic_len + generationid_len;
5633         next = str_is_printable(group_id, end);
5634         if (!next)
5635                 return rd_kafka_error_new(RD_KAFKA_RESP_ERR__BAD_MSG,
5636                                           "Input buffer group id is not safe");
5637 
5638         member_id = next;
5639         next = str_is_printable(member_id, end);
5640         if (!next)
5641                 return rd_kafka_error_new(RD_KAFKA_RESP_ERR__BAD_MSG,
5642                                           "Input buffer member id is not "
5643                                           "safe");
5644 
5645         group_instance_id_is_null = (int8_t)*(next++);
5646         if (!group_instance_id_is_null) {
5647                 group_instance_id = next;
5648                 next = str_is_printable(group_instance_id, end);
5649                 if (!next)
5650                         return rd_kafka_error_new(RD_KAFKA_RESP_ERR__BAD_MSG,
5651                                                   "Input buffer group "
5652                                                   "instance id is not safe");
5653         }
5654 
5655         if (next != end)
5656                 return rd_kafka_error_new(RD_KAFKA_RESP_ERR__BAD_MSG,
5657                                           "Input buffer bad length");
5658 
5659         *cgmdp = rd_kafka_consumer_group_metadata_new_with_genid(
5660                                                         group_id,
5661                                                         generation_id,
5662                                                         member_id,
5663                                                         group_instance_id);
5664 
5665         return NULL;
5666 }
5667 
5668 
unittest_consumer_group_metadata_iteration(const char * group_id,int32_t generation_id,const char * member_id,const char * group_instance_id)5669 static int unittest_consumer_group_metadata_iteration(const char *group_id,
5670                           int32_t generation_id,
5671                           const char *member_id,
5672                           const char *group_instance_id) {
5673         rd_kafka_consumer_group_metadata_t *cgmd;
5674         void *buffer, *buffer2;
5675         size_t size, size2;
5676         rd_kafka_error_t *error;
5677 
5678         cgmd = rd_kafka_consumer_group_metadata_new_with_genid(
5679                                                         group_id,
5680                                                         generation_id,
5681                                                         member_id,
5682                                                         group_instance_id);
5683         RD_UT_ASSERT(cgmd != NULL, "failed to create metadata");
5684 
5685         error = rd_kafka_consumer_group_metadata_write(cgmd, &buffer,
5686                                                        &size);
5687         RD_UT_ASSERT(!error, "metadata_write failed: %s",
5688                         rd_kafka_error_string(error));
5689 
5690         rd_kafka_consumer_group_metadata_destroy(cgmd);
5691 
5692         cgmd = NULL;
5693         error = rd_kafka_consumer_group_metadata_read(&cgmd, buffer,
5694                                                       size);
5695         RD_UT_ASSERT(!error, "metadata_read failed: %s",
5696                         rd_kafka_error_string(error));
5697 
5698         /* Serialize again and compare buffers */
5699         error = rd_kafka_consumer_group_metadata_write(cgmd, &buffer2,
5700                                                        &size2);
5701         RD_UT_ASSERT(!error, "metadata_write failed: %s",
5702                         rd_kafka_error_string(error));
5703 
5704         RD_UT_ASSERT(size == size2 && !memcmp(buffer, buffer2, size),
5705                         "metadata_read/write size or content mismatch: "
5706                         "size %"PRIusz", size2 %"PRIusz,
5707                         size, size2);
5708 
5709         rd_kafka_consumer_group_metadata_destroy(cgmd);
5710         rd_free(buffer);
5711         rd_free(buffer2);
5712 
5713         return 0;
5714 }
5715 
5716 
unittest_consumer_group_metadata(void)5717 static int unittest_consumer_group_metadata (void) {
5718         const char *ids[] = {
5719                 "mY. random id:.",
5720                 "0",
5721                 "2222222222222222222222221111111111111111111111111111112222",
5722                 "",
5723                 "NULL",
5724                 NULL,
5725         };
5726         int i, j, k, gen_id;
5727         int ret;
5728         const char *group_id;
5729         const char *member_id;
5730         const char *group_instance_id;
5731 
5732         for (i = 0 ; ids[i] ; i++) {
5733                 for (j = 0; ids[j] ; j++) {
5734                         for (k = 0; ids[k]; k++) {
5735                                 for (gen_id = -1; gen_id<1; gen_id++) {
5736                                         group_id = ids[i];
5737                                         member_id = ids[j];
5738                                         group_instance_id = ids[k];
5739                                         if (strcmp(group_instance_id,
5740                                                    "NULL") == 0)
5741                                                 group_instance_id = NULL;
5742                                         ret = unittest_consumer_group_metadata_iteration(
5743                                                         group_id,
5744                                                         gen_id,
5745                                                         member_id,
5746                                                         group_instance_id);
5747                                         if (ret)
5748                                                 return ret;
5749                                 }
5750                         }
5751                 }
5752         }
5753 
5754         RD_UT_PASS();
5755 }
5756 
5757 
unittest_set_intersect(void)5758 static int unittest_set_intersect (void) {
5759         size_t par_cnt = 10;
5760         map_toppar_member_info_t *dst;
5761         rd_kafka_topic_partition_t *toppar;
5762         PartitionMemberInfo_t *v;
5763         char *id = "id";
5764         rd_kafkap_str_t id1 = RD_KAFKAP_STR_INITIALIZER;
5765         rd_kafkap_str_t id2 = RD_KAFKAP_STR_INITIALIZER;
5766         rd_kafka_group_member_t *gm1;
5767         rd_kafka_group_member_t *gm2;
5768 
5769         id1.len = 2;
5770         id1.str = id;
5771         id2.len = 2;
5772         id2.str = id;
5773 
5774         map_toppar_member_info_t a = RD_MAP_INITIALIZER(
5775                 par_cnt,
5776                 rd_kafka_topic_partition_cmp,
5777                 rd_kafka_topic_partition_hash,
5778                 rd_kafka_topic_partition_destroy_free,
5779                 PartitionMemberInfo_free);
5780 
5781         map_toppar_member_info_t b = RD_MAP_INITIALIZER(
5782                 par_cnt,
5783                 rd_kafka_topic_partition_cmp,
5784                 rd_kafka_topic_partition_hash,
5785                 rd_kafka_topic_partition_destroy_free,
5786                 PartitionMemberInfo_free);
5787 
5788         gm1 = rd_calloc(1, sizeof(*gm1));
5789         gm1->rkgm_member_id = &id1;
5790         gm1->rkgm_group_instance_id = &id1;
5791         gm2 = rd_calloc(1, sizeof(*gm2));
5792         gm2->rkgm_member_id = &id2;
5793         gm2->rkgm_group_instance_id = &id2;
5794 
5795         RD_MAP_SET(&a,
5796                    rd_kafka_topic_partition_new("t1", 4),
5797                    PartitionMemberInfo_new(gm1, rd_false));
5798         RD_MAP_SET(&a,
5799                    rd_kafka_topic_partition_new("t2", 4),
5800                    PartitionMemberInfo_new(gm1, rd_false));
5801         RD_MAP_SET(&a,
5802                    rd_kafka_topic_partition_new("t1", 7),
5803                    PartitionMemberInfo_new(gm1, rd_false));
5804 
5805         RD_MAP_SET(&b,
5806                    rd_kafka_topic_partition_new("t2", 7),
5807                    PartitionMemberInfo_new(gm1, rd_false));
5808         RD_MAP_SET(&b,
5809                    rd_kafka_topic_partition_new("t1", 4),
5810                    PartitionMemberInfo_new(gm2, rd_false));
5811 
5812         dst = rd_kafka_member_partitions_intersect(&a, &b);
5813 
5814         RD_UT_ASSERT(RD_MAP_CNT(&a) == 3,
5815                      "expected a cnt to be 3 not %d", (int)RD_MAP_CNT(&a));
5816         RD_UT_ASSERT(RD_MAP_CNT(&b) == 2,
5817                      "expected b cnt to be 2 not %d", (int)RD_MAP_CNT(&b));
5818         RD_UT_ASSERT(RD_MAP_CNT(dst) == 1,
5819                      "expected dst cnt to be 1 not %d", (int)RD_MAP_CNT(dst));
5820 
5821         toppar = rd_kafka_topic_partition_new("t1", 4);
5822         RD_UT_ASSERT((v = RD_MAP_GET(dst, toppar)), "unexpected element");
5823         RD_UT_ASSERT(v->members_match, "expected members to match");
5824         rd_kafka_topic_partition_destroy(toppar);
5825 
5826         RD_MAP_DESTROY(&a);
5827         RD_MAP_DESTROY(&b);
5828         RD_MAP_DESTROY(dst);
5829         rd_free(dst);
5830 
5831         rd_free(gm1);
5832         rd_free(gm2);
5833 
5834         RD_UT_PASS();
5835 }
5836 
5837 
unittest_set_subtract(void)5838 static int unittest_set_subtract (void) {
5839         size_t par_cnt = 10;
5840         rd_kafka_topic_partition_t *toppar;
5841         map_toppar_member_info_t *dst;
5842 
5843         map_toppar_member_info_t a = RD_MAP_INITIALIZER(
5844                 par_cnt,
5845                 rd_kafka_topic_partition_cmp,
5846                 rd_kafka_topic_partition_hash,
5847                 rd_kafka_topic_partition_destroy_free,
5848                 PartitionMemberInfo_free);
5849 
5850         map_toppar_member_info_t b = RD_MAP_INITIALIZER(
5851                 par_cnt,
5852                 rd_kafka_topic_partition_cmp,
5853                 rd_kafka_topic_partition_hash,
5854                 rd_kafka_topic_partition_destroy_free,
5855                 PartitionMemberInfo_free);
5856 
5857         RD_MAP_SET(&a,
5858                    rd_kafka_topic_partition_new("t1", 4),
5859                    PartitionMemberInfo_new(NULL, rd_false));
5860         RD_MAP_SET(&a,
5861                    rd_kafka_topic_partition_new("t2", 7),
5862                    PartitionMemberInfo_new(NULL, rd_false));
5863 
5864         RD_MAP_SET(&b,
5865                    rd_kafka_topic_partition_new("t2", 4),
5866                    PartitionMemberInfo_new(NULL, rd_false));
5867         RD_MAP_SET(&b,
5868                    rd_kafka_topic_partition_new("t1", 4),
5869                    PartitionMemberInfo_new(NULL, rd_false));
5870         RD_MAP_SET(&b,
5871                    rd_kafka_topic_partition_new("t1", 7),
5872                    PartitionMemberInfo_new(NULL, rd_false));
5873 
5874         dst = rd_kafka_member_partitions_subtract(&a, &b);
5875 
5876         RD_UT_ASSERT(RD_MAP_CNT(&a) == 2,
5877                      "expected a cnt to be 2 not %d", (int)RD_MAP_CNT(&a));
5878         RD_UT_ASSERT(RD_MAP_CNT(&b) == 3,
5879                      "expected b cnt to be 3 not %d", (int)RD_MAP_CNT(&b));
5880         RD_UT_ASSERT(RD_MAP_CNT(dst) == 1,
5881                      "expected dst cnt to be 1 not %d", (int)RD_MAP_CNT(dst));
5882 
5883         toppar = rd_kafka_topic_partition_new("t2", 7);
5884         RD_UT_ASSERT(RD_MAP_GET(dst, toppar), "unexpected element");
5885         rd_kafka_topic_partition_destroy(toppar);
5886 
5887         RD_MAP_DESTROY(&a);
5888         RD_MAP_DESTROY(&b);
5889         RD_MAP_DESTROY(dst);
5890         rd_free(dst);
5891 
5892         RD_UT_PASS();
5893 }
5894 
5895 
unittest_map_to_list(void)5896 static int unittest_map_to_list (void) {
5897         rd_kafka_topic_partition_list_t *list;
5898 
5899         map_toppar_member_info_t map = RD_MAP_INITIALIZER(
5900                 10,
5901                 rd_kafka_topic_partition_cmp,
5902                 rd_kafka_topic_partition_hash,
5903                 rd_kafka_topic_partition_destroy_free,
5904                 PartitionMemberInfo_free);
5905 
5906         RD_MAP_SET(&map,
5907                    rd_kafka_topic_partition_new("t1", 101),
5908                    PartitionMemberInfo_new(NULL, rd_false));
5909 
5910         list = rd_kafka_toppar_member_info_map_to_list(&map);
5911 
5912         RD_UT_ASSERT(list->cnt == 1,
5913                      "expecting list size of 1 not %d.", list->cnt);
5914         RD_UT_ASSERT(list->elems[0].partition == 101,
5915                      "expecting partition 101 not %d",
5916                      list->elems[0].partition);
5917         RD_UT_ASSERT(!strcmp(list->elems[0].topic, "t1"),
5918                      "expecting topic 't1', not %s", list->elems[0].topic);
5919 
5920         rd_kafka_topic_partition_list_destroy(list);
5921         RD_MAP_DESTROY(&map);
5922 
5923         RD_UT_PASS();
5924 }
5925 
5926 
unittest_list_to_map(void)5927 static int unittest_list_to_map (void) {
5928         rd_kafka_topic_partition_t *toppar;
5929         map_toppar_member_info_t *map;
5930         rd_kafka_topic_partition_list_t *list =
5931                 rd_kafka_topic_partition_list_new(1);
5932 
5933         rd_kafka_topic_partition_list_add(list, "topic1", 201);
5934         rd_kafka_topic_partition_list_add(list, "topic2", 202);
5935 
5936         map = rd_kafka_toppar_list_to_toppar_member_info_map(list);
5937 
5938         RD_UT_ASSERT(RD_MAP_CNT(map) == 2,
5939                     "expected map cnt to be 2 not %d", (int)RD_MAP_CNT(map));
5940         toppar = rd_kafka_topic_partition_new("topic1", 201);
5941         RD_UT_ASSERT(RD_MAP_GET(map, toppar),
5942                      "expected topic1 [201] to exist in map");
5943         rd_kafka_topic_partition_destroy(toppar);
5944         toppar = rd_kafka_topic_partition_new("topic2", 202);
5945         RD_UT_ASSERT(RD_MAP_GET(map, toppar),
5946                      "expected topic2 [202] to exist in map");
5947         rd_kafka_topic_partition_destroy(toppar);
5948 
5949         RD_MAP_DESTROY(map);
5950         rd_free(map);
5951         rd_kafka_topic_partition_list_destroy(list);
5952 
5953         RD_UT_PASS();
5954 }
5955 
5956 
5957 /**
5958  * @brief Consumer group unit tests
5959  */
unittest_cgrp(void)5960 int unittest_cgrp (void) {
5961         int fails = 0;
5962 
5963         fails += unittest_consumer_group_metadata();
5964         fails += unittest_set_intersect();
5965         fails += unittest_set_subtract();
5966         fails += unittest_map_to_list();
5967         fails += unittest_list_to_map();
5968 
5969         return fails;
5970 }
5971