1 /*
2  * librdkafka - Apache Kafka C library
3  *
4  * Copyright (c) 2012-2015, Magnus Edenhill
5  * All rights reserved.
6  *
7  * Redistribution and use in source and binary forms, with or without
8  * modification, are permitted provided that the following conditions are met:
9  *
10  * 1. Redistributions of source code must retain the above copyright notice,
11  *    this list of conditions and the following disclaimer.
12  * 2. Redistributions in binary form must reproduce the above copyright notice,
13  *    this list of conditions and the following disclaimer in the documentation
14  *    and/or other materials provided with the distribution.
15  *
16  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
17  * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
18  * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
19  * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
20  * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
21  * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
22  * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
23  * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
24  * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
25  * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
26  * POSSIBILITY OF SUCH DAMAGE.
27  */
28 #ifndef _RDKAFKA_CGRP_H_
29 #define _RDKAFKA_CGRP_H_
30 
31 #include "rdinterval.h"
32 
33 #include "rdkafka_assignor.h"
34 
35 /**
36  * Client groups implementation
37  *
38  * Client groups handling for a single cgrp is assigned to a single
39  * rd_kafka_broker_t object at any given time.
40  * The main thread will call cgrp_serve() to serve its cgrps.
41  *
42  * This means that the cgrp itself does not need to be locked since it
43  * is only ever used from the main thread.
44  *
45  */
46 
47 
48 extern const char *rd_kafka_cgrp_join_state_names[];
49 
50 /**
51  * Client group
52  */
53 typedef struct rd_kafka_cgrp_s {
54         const rd_kafkap_str_t    *rkcg_group_id;
55         rd_kafkap_str_t          *rkcg_member_id;  /* Last assigned MemberId */
56         rd_kafkap_str_t          *rkcg_group_instance_id;
57         const rd_kafkap_str_t    *rkcg_client_id;
58 
59         enum {
60                 /* Init state */
61                 RD_KAFKA_CGRP_STATE_INIT,
62 
63                 /* Cgrp has been stopped. This is a final state */
64                 RD_KAFKA_CGRP_STATE_TERM,
65 
66                 /* Query for group coordinator */
67                 RD_KAFKA_CGRP_STATE_QUERY_COORD,
68 
69                 /* Outstanding query, awaiting response */
70                 RD_KAFKA_CGRP_STATE_WAIT_COORD,
71 
72                 /* Wait ack from assigned cgrp manager broker thread */
73                 RD_KAFKA_CGRP_STATE_WAIT_BROKER,
74 
75                 /* Wait for manager broker thread to connect to broker */
76                 RD_KAFKA_CGRP_STATE_WAIT_BROKER_TRANSPORT,
77 
78                 /* Coordinator is up and manager is assigned. */
79                 RD_KAFKA_CGRP_STATE_UP,
80         } rkcg_state;
81         rd_ts_t            rkcg_ts_statechange;     /* Timestamp of last
82                                                      * state change. */
83 
84 
85         enum {
86                 RD_KAFKA_CGRP_JOIN_STATE_INIT,
87 
88                 /* all: JoinGroupRequest sent, awaiting response. */
89                 RD_KAFKA_CGRP_JOIN_STATE_WAIT_JOIN,
90 
91                 /* Leader: MetadataRequest sent, awaiting response. */
92                 RD_KAFKA_CGRP_JOIN_STATE_WAIT_METADATA,
93 
94                 /* Follower: SyncGroupRequest sent, awaiting response. */
95                 RD_KAFKA_CGRP_JOIN_STATE_WAIT_SYNC,
96 
97                 /* all: waiting for previous assignment to decommission */
98                 RD_KAFKA_CGRP_JOIN_STATE_WAIT_UNASSIGN,
99 
100                 /* all: waiting for application's rebalance_cb to assign() */
101                 RD_KAFKA_CGRP_JOIN_STATE_WAIT_ASSIGN_REBALANCE_CB,
102 
103 		/* all: waiting for application's rebalance_cb to revoke */
104                 RD_KAFKA_CGRP_JOIN_STATE_WAIT_REVOKE_REBALANCE_CB,
105 
106                 /* all: synchronized and assigned
107                  *      may be an empty assignment. */
108                 RD_KAFKA_CGRP_JOIN_STATE_ASSIGNED,
109 
110 		/* all: fetchers are started and operational */
111 		RD_KAFKA_CGRP_JOIN_STATE_STARTED
112         } rkcg_join_state;
113 
114         /* State when group leader */
115         struct {
116                 char *protocol;
117                 rd_kafka_group_member_t *members;
118                 int member_cnt;
119         } rkcg_group_leader;
120 
121         rd_kafka_q_t      *rkcg_q;                  /* Application poll queue */
122         rd_kafka_q_t      *rkcg_ops;                /* Manager ops queue */
123 	rd_kafka_q_t      *rkcg_wait_coord_q;       /* Ops awaiting coord */
124 	int32_t            rkcg_version;            /* Ops queue version barrier
125 						     * Increased by:
126 						     *  Rebalance delegation
127 						     *  Assign/Unassign
128 						     */
129         mtx_t              rkcg_lock;
130 
131         int                rkcg_flags;
132 #define RD_KAFKA_CGRP_F_TERMINATE    0x1            /* Terminate cgrp (async) */
133 #define RD_KAFKA_CGRP_F_WAIT_UNASSIGN 0x4           /* Waiting for unassign
134 						     * to complete */
135 #define RD_KAFKA_CGRP_F_LEAVE_ON_UNASSIGN 0x8       /* Send LeaveGroup when
136 						     * unassign is done */
137 #define RD_KAFKA_CGRP_F_SUBSCRIPTION 0x10           /* If set:
138                                                      *   subscription
139                                                      * else:
140                                                      *   static assignment */
141 #define RD_KAFKA_CGRP_F_HEARTBEAT_IN_TRANSIT  0x20  /* A Heartbeat request
142                                                      * is in transit, dont
143                                                      * send a new one. */
144 #define RD_KAFKA_CGRP_F_WILDCARD_SUBSCRIPTION 0x40  /* Subscription contains
145                                                      * wildcards. */
146 #define RD_KAFKA_CGRP_F_WAIT_LEAVE            0x80  /* Wait for LeaveGroup
147                                                      * to be sent.
148                                                      * This is used to stall
149                                                      * termination until
150                                                      * the LeaveGroupRequest
151                                                      * is responded to,
152                                                      * otherwise it risks
153                                                      * being dropped in the
154                                                      * output queue when
155                                                      * the broker is destroyed.
156                                                      */
157 #define RD_KAFKA_CGRP_F_MAX_POLL_EXCEEDED 0x100     /**< max.poll.interval.ms
158                                                      *   was exceeded and we
159                                                      *   left the group.
160                                                      *   Do not rejoin until
161                                                      *   the application has
162                                                      *   polled again. */
163 
164         rd_interval_t      rkcg_coord_query_intvl;  /* Coordinator query intvl*/
165         rd_interval_t      rkcg_heartbeat_intvl;    /* Heartbeat intvl */
166         rd_interval_t      rkcg_join_intvl;         /* JoinGroup interval */
167         rd_interval_t      rkcg_timeout_scan_intvl; /* Timeout scanner */
168 
169         rd_ts_t            rkcg_ts_session_timeout; /**< Absolute session
170                                                      *   timeout enforced by
171                                                      *   the consumer, this
172                                                      *   value is updated on
173                                                      *   Heartbeat success,
174                                                      *   etc. */
175         rd_kafka_resp_err_t rkcg_last_heartbeat_err; /**< Last Heartbeat error,
176                                                       *   used for logging. */
177 
178         TAILQ_HEAD(, rd_kafka_topic_s)  rkcg_topics;/* Topics subscribed to */
179 
180         rd_list_t          rkcg_toppars;            /* Toppars subscribed to*/
181 
182 	int                rkcg_assigned_cnt;       /* Assigned partitions */
183 
184         int32_t            rkcg_generation_id;      /* Current generation id */
185 
186         rd_kafka_assignor_t *rkcg_assignor;         /* Selected partition
187                                                      * assignor strategy. */
188 
189         int32_t            rkcg_coord_id;      /**< Current coordinator id,
190                                                 *   or -1 if not known. */
191 
192         rd_kafka_broker_t *rkcg_curr_coord;    /**< Current coordinator
193                                                 *   broker handle, or NULL.
194                                                 *   rkcg_coord's nodename is
195                                                 *   updated to this broker's
196                                                 *   nodename when there is a
197                                                 *   coordinator change. */
198         rd_kafka_broker_t *rkcg_coord;         /**< The dedicated coordinator
199                                                 *   broker handle.
200                                                 *   Will be updated when the
201                                                 *   coordinator changes. */
202 
203         /* Current subscription */
204         rd_kafka_topic_partition_list_t *rkcg_subscription;
205 	/* The actual topics subscribed (after metadata+wildcard matching) */
206 	rd_list_t *rkcg_subscribed_topics; /**< (rd_kafka_topic_info_t *) */
207 
208         /* Current assignment */
209         rd_kafka_topic_partition_list_t *rkcg_assignment;
210 
211         int rkcg_wait_unassign_cnt;                 /* Waiting for this number
212                                                      * of partitions to be
213                                                      * unassigned and
214                                                      * decommissioned before
215                                                      * transitioning to the
216                                                      * next state. */
217 
218 	int rkcg_wait_commit_cnt;                   /* Waiting for this number
219 						     * of commits to finish. */
220 
221         rd_kafka_resp_err_t rkcg_last_err;          /* Last error propagated to
222                                                      * application.
223                                                      * This is for silencing
224                                                      * same errors. */
225 
226         rd_kafka_timer_t   rkcg_offset_commit_tmr;  /* Offset commit timer */
227         rd_kafka_timer_t   rkcg_max_poll_interval_tmr; /**< Enforce the max
228                                                         *   poll interval. */
229 
230         rd_kafka_t        *rkcg_rk;
231 
232         rd_kafka_op_t     *rkcg_reply_rko;          /* Send reply for op
233                                                      * (OP_TERMINATE)
234                                                      * to this rko's queue. */
235 
236 	rd_ts_t            rkcg_ts_terminate;       /* Timestamp of when
237 						     * cgrp termination was
238 						     * initiated. */
239 
240         /* Protected by rd_kafka_*lock() */
241         struct {
242                 rd_ts_t            ts_rebalance;       /* Timestamp of
243                                                         * last rebalance */
244                 int                rebalance_cnt;      /* Number of
245                                                           rebalances */
246                 char               rebalance_reason[256]; /**< Last rebalance
247                                                            *   reason */
248                 int                assignment_size;    /* Partition count
249                                                         * of last rebalance
250                                                         * assignment */
251         } rkcg_c;
252 
253 } rd_kafka_cgrp_t;
254 
255 
256 
257 
258 #define rd_kafka_cgrp_lock(rkcg)    mtx_lock(&(rkcg)->rkcg_lock)
259 #define rd_kafka_cgrp_unlock(rkcg)  mtx_unlock(&(rkcg)->rkcg_lock)
260 
261 /* Check if broker is the coordinator */
262 #define RD_KAFKA_CGRP_BROKER_IS_COORD(rkcg,rkb)          \
263         ((rkcg)->rkcg_coord_id != -1 &&                  \
264          (rkcg)->rkcg_coord_id == (rkb)->rkb_nodeid)
265 
266 /**
267  * @returns true if cgrp is using static group membership
268  */
269 #define RD_KAFKA_CGRP_IS_STATIC_MEMBER(rkcg) \
270         !RD_KAFKAP_STR_IS_NULL((rkcg)->rkcg_group_instance_id)
271 
272 extern const char *rd_kafka_cgrp_state_names[];
273 extern const char *rd_kafka_cgrp_join_state_names[];
274 
275 void rd_kafka_cgrp_destroy_final (rd_kafka_cgrp_t *rkcg);
276 rd_kafka_cgrp_t *rd_kafka_cgrp_new (rd_kafka_t *rk,
277                                     const rd_kafkap_str_t *group_id,
278                                     const rd_kafkap_str_t *client_id);
279 void rd_kafka_cgrp_serve (rd_kafka_cgrp_t *rkcg);
280 
281 void rd_kafka_cgrp_op (rd_kafka_cgrp_t *rkcg, rd_kafka_toppar_t *rktp,
282                        rd_kafka_replyq_t replyq, rd_kafka_op_type_t type,
283                        rd_kafka_resp_err_t err);
284 void rd_kafka_cgrp_terminate0 (rd_kafka_cgrp_t *rkcg, rd_kafka_op_t *rko);
285 void rd_kafka_cgrp_terminate (rd_kafka_cgrp_t *rkcg, rd_kafka_replyq_t replyq);
286 
287 
288 rd_kafka_resp_err_t rd_kafka_cgrp_topic_pattern_del (rd_kafka_cgrp_t *rkcg,
289                                                      const char *pattern);
290 rd_kafka_resp_err_t rd_kafka_cgrp_topic_pattern_add (rd_kafka_cgrp_t *rkcg,
291                                                      const char *pattern);
292 
293 int rd_kafka_cgrp_topic_check (rd_kafka_cgrp_t *rkcg, const char *topic);
294 
295 void rd_kafka_cgrp_set_member_id (rd_kafka_cgrp_t *rkcg, const char *member_id);
296 
297 void rd_kafka_cgrp_handle_SyncGroup (rd_kafka_cgrp_t *rkcg,
298 				     rd_kafka_broker_t *rkb,
299                                      rd_kafka_resp_err_t err,
300                                      const rd_kafkap_bytes_t *member_state);
301 void rd_kafka_cgrp_set_join_state (rd_kafka_cgrp_t *rkcg, int join_state);
302 
303 void rd_kafka_cgrp_coord_query (rd_kafka_cgrp_t *rkcg,
304 				const char *reason);
305 void rd_kafka_cgrp_coord_dead (rd_kafka_cgrp_t *rkcg, rd_kafka_resp_err_t err,
306 			       const char *reason);
307 void rd_kafka_cgrp_metadata_update_check (rd_kafka_cgrp_t *rkcg, int do_join);
308 #define rd_kafka_cgrp_get(rk) ((rk)->rk_cgrp)
309 
310 
311 struct rd_kafka_consumer_group_metadata_s {
312         char *group_id;
313 };
314 
315 rd_kafka_consumer_group_metadata_t *
316 rd_kafka_consumer_group_metadata_dup (
317         const rd_kafka_consumer_group_metadata_t *cgmetadata);
318 
319 #endif /* _RDKAFKA_CGRP_H_ */
320