1 /* 2 * librdkafka - Apache Kafka C library 3 * 4 * Copyright (c) 2012-2015, Magnus Edenhill 5 * All rights reserved. 6 * 7 * Redistribution and use in source and binary forms, with or without 8 * modification, are permitted provided that the following conditions are met: 9 * 10 * 1. Redistributions of source code must retain the above copyright notice, 11 * this list of conditions and the following disclaimer. 12 * 2. Redistributions in binary form must reproduce the above copyright notice, 13 * this list of conditions and the following disclaimer in the documentation 14 * and/or other materials provided with the distribution. 15 * 16 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" 17 * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE 18 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE 19 * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE 20 * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR 21 * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF 22 * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS 23 * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN 24 * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) 25 * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE 26 * POSSIBILITY OF SUCH DAMAGE. 27 */ 28 #ifndef _RDKAFKA_CGRP_H_ 29 #define _RDKAFKA_CGRP_H_ 30 31 #include "rdinterval.h" 32 33 #include "rdkafka_assignor.h" 34 35 /** 36 * Client groups implementation 37 * 38 * Client groups handling for a single cgrp is assigned to a single 39 * rd_kafka_broker_t object at any given time. 40 * The main thread will call cgrp_serve() to serve its cgrps. 41 * 42 * This means that the cgrp itself does not need to be locked since it 43 * is only ever used from the main thread. 44 * 45 */ 46 47 48 extern const char *rd_kafka_cgrp_join_state_names[]; 49 50 /** 51 * Client group 52 */ 53 typedef struct rd_kafka_cgrp_s { 54 const rd_kafkap_str_t *rkcg_group_id; 55 rd_kafkap_str_t *rkcg_member_id; /* Last assigned MemberId */ 56 rd_kafkap_str_t *rkcg_group_instance_id; 57 const rd_kafkap_str_t *rkcg_client_id; 58 59 enum { 60 /* Init state */ 61 RD_KAFKA_CGRP_STATE_INIT, 62 63 /* Cgrp has been stopped. This is a final state */ 64 RD_KAFKA_CGRP_STATE_TERM, 65 66 /* Query for group coordinator */ 67 RD_KAFKA_CGRP_STATE_QUERY_COORD, 68 69 /* Outstanding query, awaiting response */ 70 RD_KAFKA_CGRP_STATE_WAIT_COORD, 71 72 /* Wait ack from assigned cgrp manager broker thread */ 73 RD_KAFKA_CGRP_STATE_WAIT_BROKER, 74 75 /* Wait for manager broker thread to connect to broker */ 76 RD_KAFKA_CGRP_STATE_WAIT_BROKER_TRANSPORT, 77 78 /* Coordinator is up and manager is assigned. */ 79 RD_KAFKA_CGRP_STATE_UP, 80 } rkcg_state; 81 rd_ts_t rkcg_ts_statechange; /* Timestamp of last 82 * state change. */ 83 84 85 enum { 86 RD_KAFKA_CGRP_JOIN_STATE_INIT, 87 88 /* all: JoinGroupRequest sent, awaiting response. */ 89 RD_KAFKA_CGRP_JOIN_STATE_WAIT_JOIN, 90 91 /* Leader: MetadataRequest sent, awaiting response. */ 92 RD_KAFKA_CGRP_JOIN_STATE_WAIT_METADATA, 93 94 /* Follower: SyncGroupRequest sent, awaiting response. */ 95 RD_KAFKA_CGRP_JOIN_STATE_WAIT_SYNC, 96 97 /* all: waiting for previous assignment to decommission */ 98 RD_KAFKA_CGRP_JOIN_STATE_WAIT_UNASSIGN, 99 100 /* all: waiting for application's rebalance_cb to assign() */ 101 RD_KAFKA_CGRP_JOIN_STATE_WAIT_ASSIGN_REBALANCE_CB, 102 103 /* all: waiting for application's rebalance_cb to revoke */ 104 RD_KAFKA_CGRP_JOIN_STATE_WAIT_REVOKE_REBALANCE_CB, 105 106 /* all: synchronized and assigned 107 * may be an empty assignment. */ 108 RD_KAFKA_CGRP_JOIN_STATE_ASSIGNED, 109 110 /* all: fetchers are started and operational */ 111 RD_KAFKA_CGRP_JOIN_STATE_STARTED 112 } rkcg_join_state; 113 114 /* State when group leader */ 115 struct { 116 char *protocol; 117 rd_kafka_group_member_t *members; 118 int member_cnt; 119 } rkcg_group_leader; 120 121 rd_kafka_q_t *rkcg_q; /* Application poll queue */ 122 rd_kafka_q_t *rkcg_ops; /* Manager ops queue */ 123 rd_kafka_q_t *rkcg_wait_coord_q; /* Ops awaiting coord */ 124 int32_t rkcg_version; /* Ops queue version barrier 125 * Increased by: 126 * Rebalance delegation 127 * Assign/Unassign 128 */ 129 mtx_t rkcg_lock; 130 131 int rkcg_flags; 132 #define RD_KAFKA_CGRP_F_TERMINATE 0x1 /* Terminate cgrp (async) */ 133 #define RD_KAFKA_CGRP_F_WAIT_UNASSIGN 0x4 /* Waiting for unassign 134 * to complete */ 135 #define RD_KAFKA_CGRP_F_LEAVE_ON_UNASSIGN 0x8 /* Send LeaveGroup when 136 * unassign is done */ 137 #define RD_KAFKA_CGRP_F_SUBSCRIPTION 0x10 /* If set: 138 * subscription 139 * else: 140 * static assignment */ 141 #define RD_KAFKA_CGRP_F_HEARTBEAT_IN_TRANSIT 0x20 /* A Heartbeat request 142 * is in transit, dont 143 * send a new one. */ 144 #define RD_KAFKA_CGRP_F_WILDCARD_SUBSCRIPTION 0x40 /* Subscription contains 145 * wildcards. */ 146 #define RD_KAFKA_CGRP_F_WAIT_LEAVE 0x80 /* Wait for LeaveGroup 147 * to be sent. 148 * This is used to stall 149 * termination until 150 * the LeaveGroupRequest 151 * is responded to, 152 * otherwise it risks 153 * being dropped in the 154 * output queue when 155 * the broker is destroyed. 156 */ 157 #define RD_KAFKA_CGRP_F_MAX_POLL_EXCEEDED 0x100 /**< max.poll.interval.ms 158 * was exceeded and we 159 * left the group. 160 * Do not rejoin until 161 * the application has 162 * polled again. */ 163 164 rd_interval_t rkcg_coord_query_intvl; /* Coordinator query intvl*/ 165 rd_interval_t rkcg_heartbeat_intvl; /* Heartbeat intvl */ 166 rd_interval_t rkcg_join_intvl; /* JoinGroup interval */ 167 rd_interval_t rkcg_timeout_scan_intvl; /* Timeout scanner */ 168 169 rd_ts_t rkcg_ts_session_timeout; /**< Absolute session 170 * timeout enforced by 171 * the consumer, this 172 * value is updated on 173 * Heartbeat success, 174 * etc. */ 175 rd_kafka_resp_err_t rkcg_last_heartbeat_err; /**< Last Heartbeat error, 176 * used for logging. */ 177 178 TAILQ_HEAD(, rd_kafka_topic_s) rkcg_topics;/* Topics subscribed to */ 179 180 rd_list_t rkcg_toppars; /* Toppars subscribed to*/ 181 182 int rkcg_assigned_cnt; /* Assigned partitions */ 183 184 int32_t rkcg_generation_id; /* Current generation id */ 185 186 rd_kafka_assignor_t *rkcg_assignor; /* Selected partition 187 * assignor strategy. */ 188 189 int32_t rkcg_coord_id; /**< Current coordinator id, 190 * or -1 if not known. */ 191 192 rd_kafka_broker_t *rkcg_curr_coord; /**< Current coordinator 193 * broker handle, or NULL. 194 * rkcg_coord's nodename is 195 * updated to this broker's 196 * nodename when there is a 197 * coordinator change. */ 198 rd_kafka_broker_t *rkcg_coord; /**< The dedicated coordinator 199 * broker handle. 200 * Will be updated when the 201 * coordinator changes. */ 202 203 /* Current subscription */ 204 rd_kafka_topic_partition_list_t *rkcg_subscription; 205 /* The actual topics subscribed (after metadata+wildcard matching) */ 206 rd_list_t *rkcg_subscribed_topics; /**< (rd_kafka_topic_info_t *) */ 207 208 /* Current assignment */ 209 rd_kafka_topic_partition_list_t *rkcg_assignment; 210 211 int rkcg_wait_unassign_cnt; /* Waiting for this number 212 * of partitions to be 213 * unassigned and 214 * decommissioned before 215 * transitioning to the 216 * next state. */ 217 218 int rkcg_wait_commit_cnt; /* Waiting for this number 219 * of commits to finish. */ 220 221 rd_kafka_resp_err_t rkcg_last_err; /* Last error propagated to 222 * application. 223 * This is for silencing 224 * same errors. */ 225 226 rd_kafka_timer_t rkcg_offset_commit_tmr; /* Offset commit timer */ 227 rd_kafka_timer_t rkcg_max_poll_interval_tmr; /**< Enforce the max 228 * poll interval. */ 229 230 rd_kafka_t *rkcg_rk; 231 232 rd_kafka_op_t *rkcg_reply_rko; /* Send reply for op 233 * (OP_TERMINATE) 234 * to this rko's queue. */ 235 236 rd_ts_t rkcg_ts_terminate; /* Timestamp of when 237 * cgrp termination was 238 * initiated. */ 239 240 /* Protected by rd_kafka_*lock() */ 241 struct { 242 rd_ts_t ts_rebalance; /* Timestamp of 243 * last rebalance */ 244 int rebalance_cnt; /* Number of 245 rebalances */ 246 char rebalance_reason[256]; /**< Last rebalance 247 * reason */ 248 int assignment_size; /* Partition count 249 * of last rebalance 250 * assignment */ 251 } rkcg_c; 252 253 } rd_kafka_cgrp_t; 254 255 256 257 258 #define rd_kafka_cgrp_lock(rkcg) mtx_lock(&(rkcg)->rkcg_lock) 259 #define rd_kafka_cgrp_unlock(rkcg) mtx_unlock(&(rkcg)->rkcg_lock) 260 261 /* Check if broker is the coordinator */ 262 #define RD_KAFKA_CGRP_BROKER_IS_COORD(rkcg,rkb) \ 263 ((rkcg)->rkcg_coord_id != -1 && \ 264 (rkcg)->rkcg_coord_id == (rkb)->rkb_nodeid) 265 266 /** 267 * @returns true if cgrp is using static group membership 268 */ 269 #define RD_KAFKA_CGRP_IS_STATIC_MEMBER(rkcg) \ 270 !RD_KAFKAP_STR_IS_NULL((rkcg)->rkcg_group_instance_id) 271 272 extern const char *rd_kafka_cgrp_state_names[]; 273 extern const char *rd_kafka_cgrp_join_state_names[]; 274 275 void rd_kafka_cgrp_destroy_final (rd_kafka_cgrp_t *rkcg); 276 rd_kafka_cgrp_t *rd_kafka_cgrp_new (rd_kafka_t *rk, 277 const rd_kafkap_str_t *group_id, 278 const rd_kafkap_str_t *client_id); 279 void rd_kafka_cgrp_serve (rd_kafka_cgrp_t *rkcg); 280 281 void rd_kafka_cgrp_op (rd_kafka_cgrp_t *rkcg, rd_kafka_toppar_t *rktp, 282 rd_kafka_replyq_t replyq, rd_kafka_op_type_t type, 283 rd_kafka_resp_err_t err); 284 void rd_kafka_cgrp_terminate0 (rd_kafka_cgrp_t *rkcg, rd_kafka_op_t *rko); 285 void rd_kafka_cgrp_terminate (rd_kafka_cgrp_t *rkcg, rd_kafka_replyq_t replyq); 286 287 288 rd_kafka_resp_err_t rd_kafka_cgrp_topic_pattern_del (rd_kafka_cgrp_t *rkcg, 289 const char *pattern); 290 rd_kafka_resp_err_t rd_kafka_cgrp_topic_pattern_add (rd_kafka_cgrp_t *rkcg, 291 const char *pattern); 292 293 int rd_kafka_cgrp_topic_check (rd_kafka_cgrp_t *rkcg, const char *topic); 294 295 void rd_kafka_cgrp_set_member_id (rd_kafka_cgrp_t *rkcg, const char *member_id); 296 297 void rd_kafka_cgrp_handle_SyncGroup (rd_kafka_cgrp_t *rkcg, 298 rd_kafka_broker_t *rkb, 299 rd_kafka_resp_err_t err, 300 const rd_kafkap_bytes_t *member_state); 301 void rd_kafka_cgrp_set_join_state (rd_kafka_cgrp_t *rkcg, int join_state); 302 303 void rd_kafka_cgrp_coord_query (rd_kafka_cgrp_t *rkcg, 304 const char *reason); 305 void rd_kafka_cgrp_coord_dead (rd_kafka_cgrp_t *rkcg, rd_kafka_resp_err_t err, 306 const char *reason); 307 void rd_kafka_cgrp_metadata_update_check (rd_kafka_cgrp_t *rkcg, int do_join); 308 #define rd_kafka_cgrp_get(rk) ((rk)->rk_cgrp) 309 310 311 struct rd_kafka_consumer_group_metadata_s { 312 char *group_id; 313 }; 314 315 rd_kafka_consumer_group_metadata_t * 316 rd_kafka_consumer_group_metadata_dup ( 317 const rd_kafka_consumer_group_metadata_t *cgmetadata); 318 319 #endif /* _RDKAFKA_CGRP_H_ */ 320