1 /*
2 * librdkafka - Apache Kafka C library
3 *
4 * Copyright (c) 2019 Magnus Edenhill
5 * All rights reserved.
6 *
7 * Redistribution and use in source and binary forms, with or without
8 * modification, are permitted provided that the following conditions are met:
9 *
10 * 1. Redistributions of source code must retain the above copyright notice,
11 * this list of conditions and the following disclaimer.
12 * 2. Redistributions in binary form must reproduce the above copyright notice,
13 * this list of conditions and the following disclaimer in the documentation
14 * and/or other materials provided with the distribution.
15 *
16 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
17 * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
18 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
19 * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
20 * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
21 * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
22 * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
23 * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
24 * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
25 * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
26 * POSSIBILITY OF SUCH DAMAGE.
27 */
28
29
30 #include "rdkafka_int.h"
31 #include "rdkafka_request.h"
32 #include "rdkafka_coord.h"
33
34
35 /**
36 * @name Coordinator cache
37 * @{
38 *
39 */
rd_kafka_coord_cache_entry_destroy(rd_kafka_coord_cache_t * cc,rd_kafka_coord_cache_entry_t * cce)40 void rd_kafka_coord_cache_entry_destroy (rd_kafka_coord_cache_t *cc,
41 rd_kafka_coord_cache_entry_t *cce) {
42 rd_assert(cc->cc_cnt > 0);
43 rd_free(cce->cce_coordkey);
44 rd_kafka_broker_destroy(cce->cce_rkb);
45 TAILQ_REMOVE(&cc->cc_entries, cce, cce_link);
46 cc->cc_cnt--;
47 rd_free(cce);
48 }
49
50
51 /**
52 * @brief Delete any expired cache entries
53 *
54 * @locality rdkafka main thread
55 */
rd_kafka_coord_cache_expire(rd_kafka_coord_cache_t * cc)56 void rd_kafka_coord_cache_expire (rd_kafka_coord_cache_t *cc) {
57 rd_kafka_coord_cache_entry_t *cce, *next;
58 rd_ts_t expire = rd_clock() - cc->cc_expire_thres;
59
60 next = TAILQ_LAST(&cc->cc_entries, rd_kafka_coord_cache_head_s);
61 while (next) {
62 cce = next;
63
64 if (cce->cce_ts_used > expire)
65 break;
66
67 next = TAILQ_PREV(cce, rd_kafka_coord_cache_head_s, cce_link);
68 rd_kafka_coord_cache_entry_destroy(cc, cce);
69 }
70 }
71
72
73 static rd_kafka_coord_cache_entry_t *
rd_kafka_coord_cache_find(rd_kafka_coord_cache_t * cc,rd_kafka_coordtype_t coordtype,const char * coordkey)74 rd_kafka_coord_cache_find (rd_kafka_coord_cache_t *cc,
75 rd_kafka_coordtype_t coordtype,
76 const char *coordkey) {
77 rd_kafka_coord_cache_entry_t *cce;
78
79 TAILQ_FOREACH(cce, &cc->cc_entries, cce_link) {
80 if (cce->cce_coordtype == coordtype &&
81 !strcmp(cce->cce_coordkey, coordkey)) {
82 /* Match */
83 cce->cce_ts_used = rd_clock();
84 if (TAILQ_FIRST(&cc->cc_entries) != cce) {
85 /* Move to head of list */
86 TAILQ_REMOVE(&cc->cc_entries,
87 cce, cce_link);
88 TAILQ_INSERT_HEAD(&cc->cc_entries,
89 cce, cce_link);
90 }
91 return cce;
92 }
93 }
94
95 return NULL;
96 }
97
98
rd_kafka_coord_cache_get(rd_kafka_coord_cache_t * cc,rd_kafka_coordtype_t coordtype,const char * coordkey)99 rd_kafka_broker_t *rd_kafka_coord_cache_get (rd_kafka_coord_cache_t *cc,
100 rd_kafka_coordtype_t coordtype,
101 const char *coordkey) {
102 rd_kafka_coord_cache_entry_t *cce;
103
104 cce = rd_kafka_coord_cache_find(cc, coordtype, coordkey);
105 if (!cce)
106 return NULL;
107
108 rd_kafka_broker_keep(cce->cce_rkb);
109 return cce->cce_rkb;
110 }
111
112
113
rd_kafka_coord_cache_add(rd_kafka_coord_cache_t * cc,rd_kafka_coordtype_t coordtype,const char * coordkey,rd_kafka_broker_t * rkb)114 static void rd_kafka_coord_cache_add (rd_kafka_coord_cache_t *cc,
115 rd_kafka_coordtype_t coordtype,
116 const char *coordkey,
117 rd_kafka_broker_t *rkb) {
118 rd_kafka_coord_cache_entry_t *cce;
119
120 if (!(cce = rd_kafka_coord_cache_find(cc, coordtype, coordkey))) {
121 if (cc->cc_cnt > 10) {
122 /* Not enough room in cache, remove least used entry */
123 rd_kafka_coord_cache_entry_t *rem =
124 TAILQ_LAST(&cc->cc_entries,
125 rd_kafka_coord_cache_head_s);
126 rd_kafka_coord_cache_entry_destroy(cc, rem);
127 }
128
129 cce = rd_calloc(1, sizeof(*cce));
130 cce->cce_coordtype = coordtype;
131 cce->cce_coordkey = rd_strdup(coordkey);
132 cce->cce_ts_used = rd_clock();
133
134 TAILQ_INSERT_HEAD(&cc->cc_entries, cce, cce_link);
135 cc->cc_cnt++;
136 }
137
138 if (cce->cce_rkb != rkb) {
139 if (cce->cce_rkb)
140 rd_kafka_broker_destroy(cce->cce_rkb);
141 cce->cce_rkb = rkb;
142 rd_kafka_broker_keep(rkb);
143 }
144 }
145
146
147 /**
148 * @brief Evict any cache entries for broker \p rkb.
149 *
150 * Use this when a request returns ERR_NOT_COORDINATOR_FOR...
151 *
152 * @locality rdkafka main thread
153 * @locks none
154 */
rd_kafka_coord_cache_evict(rd_kafka_coord_cache_t * cc,rd_kafka_broker_t * rkb)155 void rd_kafka_coord_cache_evict (rd_kafka_coord_cache_t *cc,
156 rd_kafka_broker_t *rkb) {
157 rd_kafka_coord_cache_entry_t *cce, *tmp;
158
159 TAILQ_FOREACH_SAFE(cce, &cc->cc_entries, cce_link, tmp) {
160 if (cce->cce_rkb == rkb)
161 rd_kafka_coord_cache_entry_destroy(cc, cce);
162 }
163 }
164
165 /**
166 * @brief Destroy all coord cache entries.
167 */
rd_kafka_coord_cache_destroy(rd_kafka_coord_cache_t * cc)168 void rd_kafka_coord_cache_destroy (rd_kafka_coord_cache_t *cc) {
169 rd_kafka_coord_cache_entry_t *cce;
170
171 while ((cce = TAILQ_FIRST(&cc->cc_entries)))
172 rd_kafka_coord_cache_entry_destroy(cc, cce);
173 }
174
175
176 /**
177 * @brief Initialize the coord cache.
178 *
179 * Locking of the coord-cache is up to the owner.
180 */
rd_kafka_coord_cache_init(rd_kafka_coord_cache_t * cc,int expire_thres_ms)181 void rd_kafka_coord_cache_init (rd_kafka_coord_cache_t *cc,
182 int expire_thres_ms) {
183 TAILQ_INIT(&cc->cc_entries);
184 cc->cc_cnt = 0;
185 cc->cc_expire_thres = expire_thres_ms * 1000;
186 }
187
188 /**@}*/
189
190
191 /**
192 * @name Asynchronous coordinator requests
193 * @{
194 *
195 */
196
197
198
199 static void rd_kafka_coord_req_fsm (rd_kafka_t *rk, rd_kafka_coord_req_t *creq);
200
201
202
203
204 /**
205 * @brief Look up coordinator for \p coordtype and \p coordkey
206 * (either from cache or by FindCoordinator), make sure there is
207 * a connection to the coordinator, and then call \p send_req_cb,
208 * passing the coordinator broker instance and \p rko
209 * to send the request.
210 * These steps may be performed by this function, or asynchronously
211 * at a later time.
212 *
213 * Response, or error, is sent on \p replyq with callback \p rkbuf_cb.
214 *
215 * @locality rdkafka main thread
216 * @locks none
217 */
rd_kafka_coord_req(rd_kafka_t * rk,rd_kafka_coordtype_t coordtype,const char * coordkey,rd_kafka_send_req_cb_t * send_req_cb,rd_kafka_op_t * rko,int timeout_ms,rd_kafka_replyq_t replyq,rd_kafka_resp_cb_t * resp_cb,void * reply_opaque)218 void rd_kafka_coord_req (rd_kafka_t *rk,
219 rd_kafka_coordtype_t coordtype,
220 const char *coordkey,
221 rd_kafka_send_req_cb_t *send_req_cb,
222 rd_kafka_op_t *rko,
223 int timeout_ms,
224 rd_kafka_replyq_t replyq,
225 rd_kafka_resp_cb_t *resp_cb,
226 void *reply_opaque) {
227 rd_kafka_coord_req_t *creq;
228
229 creq = rd_calloc(1, sizeof(*creq));
230 creq->creq_coordtype = coordtype;
231 creq->creq_coordkey = rd_strdup(coordkey);
232 creq->creq_ts_timeout = rd_timeout_init(timeout_ms);
233 creq->creq_send_req_cb = send_req_cb;
234 creq->creq_rko = rko;
235 creq->creq_replyq = replyq;
236 creq->creq_resp_cb = resp_cb;
237 creq->creq_reply_opaque = reply_opaque;
238 creq->creq_refcnt = 1;
239 creq->creq_done = rd_false;
240
241 TAILQ_INSERT_TAIL(&rk->rk_coord_reqs, creq, creq_link);
242
243 rd_kafka_coord_req_fsm(rk, creq);
244 }
245
246
247 /**
248 * @brief Decrease refcount of creq and free it if no more references.
249 *
250 * @param done Mark creq as done, having performed its duties. There may still
251 * be lingering references.
252 *
253 * @returns true if creq was destroyed, else false.
254 */
255 static rd_bool_t
rd_kafka_coord_req_destroy(rd_kafka_t * rk,rd_kafka_coord_req_t * creq,rd_bool_t done)256 rd_kafka_coord_req_destroy (rd_kafka_t *rk, rd_kafka_coord_req_t *creq,
257 rd_bool_t done) {
258
259 rd_assert(creq->creq_refcnt > 0);
260
261 if (done) {
262 /* Request has been performed, remove from rk_coord_reqs
263 * list so creq won't be triggered again by state broadcasts,
264 * etc. */
265 rd_dassert(!creq->creq_done);
266 TAILQ_REMOVE(&rk->rk_coord_reqs, creq, creq_link);
267 creq->creq_done = rd_true;
268 }
269
270 if (--creq->creq_refcnt > 0)
271 return rd_false;
272
273 rd_dassert(creq->creq_done);
274 rd_kafka_replyq_destroy(&creq->creq_replyq);
275 rd_free(creq->creq_coordkey);
276 rd_free(creq);
277
278 return rd_true;
279 }
280
rd_kafka_coord_req_keep(rd_kafka_coord_req_t * creq)281 static void rd_kafka_coord_req_keep (rd_kafka_coord_req_t *creq) {
282 creq->creq_refcnt++;
283 }
284
rd_kafka_coord_req_fail(rd_kafka_t * rk,rd_kafka_coord_req_t * creq,rd_kafka_resp_err_t err)285 static void rd_kafka_coord_req_fail (rd_kafka_t *rk, rd_kafka_coord_req_t *creq,
286 rd_kafka_resp_err_t err) {
287 rd_kafka_op_t *reply;
288 rd_kafka_buf_t *rkbuf;
289
290 reply = rd_kafka_op_new(RD_KAFKA_OP_RECV_BUF);
291 reply->rko_rk = rk; /* Set rk since the rkbuf will not have a rkb
292 * to reach it. */
293 reply->rko_err = err;
294
295 /* Need a dummy rkbuf to pass state to the buf resp_cb */
296 rkbuf = rd_kafka_buf_new(0, 0);
297 rkbuf->rkbuf_cb = creq->creq_resp_cb;
298 rkbuf->rkbuf_opaque = creq->creq_reply_opaque;
299 reply->rko_u.xbuf.rkbuf = rkbuf;
300
301 rd_kafka_replyq_enq(&creq->creq_replyq, reply, 0);
302
303 rd_kafka_coord_req_destroy(rk, creq, rd_true/*done*/);
304 }
305
306
307 static void
rd_kafka_coord_req_handle_FindCoordinator(rd_kafka_t * rk,rd_kafka_broker_t * rkb,rd_kafka_resp_err_t err,rd_kafka_buf_t * rkbuf,rd_kafka_buf_t * request,void * opaque)308 rd_kafka_coord_req_handle_FindCoordinator (rd_kafka_t *rk,
309 rd_kafka_broker_t *rkb,
310 rd_kafka_resp_err_t err,
311 rd_kafka_buf_t *rkbuf,
312 rd_kafka_buf_t *request,
313 void *opaque) {
314 const int log_decode_errors = LOG_ERR;
315 rd_kafka_coord_req_t *creq = opaque;
316 int16_t ErrorCode;
317 rd_kafkap_str_t Host;
318 int32_t NodeId, Port;
319 char errstr[256] = "";
320 int actions;
321 rd_kafka_broker_t *coord;
322 rd_kafka_metadata_broker_t mdb = RD_ZERO_INIT;
323
324 /* If creq has finished (possibly because of an earlier FindCoordinator
325 * response or a broker state broadcast we simply ignore the
326 * response. */
327 if (creq->creq_done)
328 err = RD_KAFKA_RESP_ERR__DESTROY;
329
330 if (err)
331 goto err;
332
333 if (request->rkbuf_reqhdr.ApiVersion >= 1)
334 rd_kafka_buf_read_throttle_time(rkbuf);
335
336 rd_kafka_buf_read_i16(rkbuf, &ErrorCode);
337
338 if (request->rkbuf_reqhdr.ApiVersion >= 1) {
339 rd_kafkap_str_t ErrorMsg;
340 rd_kafka_buf_read_str(rkbuf, &ErrorMsg);
341 if (ErrorCode)
342 rd_snprintf(errstr, sizeof(errstr),
343 "%.*s", RD_KAFKAP_STR_PR(&ErrorMsg));
344 }
345
346 if ((err = ErrorCode))
347 goto err;
348
349 rd_kafka_buf_read_i32(rkbuf, &NodeId);
350 rd_kafka_buf_read_str(rkbuf, &Host);
351 rd_kafka_buf_read_i32(rkbuf, &Port);
352
353 mdb.id = NodeId;
354 RD_KAFKAP_STR_DUPA(&mdb.host, &Host);
355 mdb.port = Port;
356
357 /* Find, update or add broker */
358 rd_kafka_broker_update(rk, rkb->rkb_proto, &mdb, &coord);
359
360 if (!coord) {
361 err = RD_KAFKA_RESP_ERR__FAIL;
362 rd_snprintf(errstr, sizeof(errstr),
363 "Failed to add broker: "
364 "instance is probably terminating");
365 goto err;
366 }
367
368
369 rd_kafka_coord_cache_add(&rk->rk_coord_cache,
370 creq->creq_coordtype,
371 creq->creq_coordkey,
372 coord);
373 rd_kafka_broker_destroy(coord); /* refcnt from broker_update() */
374
375 rd_kafka_coord_req_fsm(rk, creq);
376
377 /* Drop refcount from req_fsm() */
378 rd_kafka_coord_req_destroy(rk, creq, rd_false/*!done*/);
379
380 return;
381
382 err_parse:
383 err = rkbuf->rkbuf_err;
384 err:
385 actions = rd_kafka_err_action(
386 rkb, err, request,
387
388 RD_KAFKA_ERR_ACTION_SPECIAL,
389 RD_KAFKA_RESP_ERR__DESTROY,
390
391 RD_KAFKA_ERR_ACTION_PERMANENT,
392 RD_KAFKA_RESP_ERR_TRANSACTIONAL_ID_AUTHORIZATION_FAILED,
393
394 RD_KAFKA_ERR_ACTION_PERMANENT,
395 RD_KAFKA_RESP_ERR_CLUSTER_AUTHORIZATION_FAILED,
396
397 RD_KAFKA_ERR_ACTION_REFRESH,
398 RD_KAFKA_RESP_ERR__TRANSPORT,
399
400 RD_KAFKA_ERR_ACTION_RETRY,
401 RD_KAFKA_RESP_ERR_COORDINATOR_NOT_AVAILABLE,
402
403 RD_KAFKA_ERR_ACTION_RETRY,
404 RD_KAFKA_RESP_ERR_COORDINATOR_LOAD_IN_PROGRESS,
405
406 RD_KAFKA_ERR_ACTION_END);
407
408 if (actions & RD_KAFKA_ERR_ACTION_PERMANENT) {
409 rd_kafka_coord_req_fail(rk, creq, err);
410 return;
411
412 } else if (actions & RD_KAFKA_ERR_ACTION_RETRY) {
413 rd_kafka_buf_retry(rkb, request);
414 return; /* Keep refcnt from req_fsm() and retry */
415 }
416
417 /* Rely on state broadcast to trigger retry */
418
419 /* Drop refcount from req_fsm() */
420 rd_kafka_coord_req_destroy(rk, creq, rd_false/*!done*/);
421 }
422
423
424
425
426
427
428 /**
429 * @brief State machine for async coordinator requests.
430 *
431 * @remark May destroy the \p creq.
432 *
433 * @locality any
434 * @locks none
435 */
436 static void
rd_kafka_coord_req_fsm(rd_kafka_t * rk,rd_kafka_coord_req_t * creq)437 rd_kafka_coord_req_fsm (rd_kafka_t *rk, rd_kafka_coord_req_t *creq) {
438 rd_kafka_broker_t *rkb;
439 rd_kafka_resp_err_t err;
440
441 if (creq->creq_done)
442 /* crqeq has already performed its actions, this is a
443 * lingering reference, e.g., a late FindCoordinator response.
444 * Just ignore. */
445 return;
446
447 if (unlikely(rd_kafka_terminating(rk))) {
448 rd_kafka_coord_req_fail(rk, creq, RD_KAFKA_RESP_ERR__DESTROY);
449 return;
450 }
451
452 /* Check cache first */
453 rkb = rd_kafka_coord_cache_get(&rk->rk_coord_cache,
454 creq->creq_coordtype,
455 creq->creq_coordkey);
456
457 if (rkb) {
458 if (rd_kafka_broker_is_up(rkb)) {
459 /* Cached coordinator is up, send request */
460 rd_kafka_replyq_t replyq;
461
462 rd_kafka_replyq_copy(&replyq, &creq->creq_replyq);
463 err = creq->creq_send_req_cb(rkb, creq->creq_rko,
464 replyq, creq->creq_resp_cb,
465 creq->creq_reply_opaque);
466
467 if (err) {
468 /* Permanent error, e.g., request not
469 * supported by broker. */
470 rd_kafka_replyq_destroy(&replyq);
471 rd_kafka_coord_req_fail(rk, creq, err);
472 } else {
473 rd_kafka_coord_req_destroy(rk, creq,
474 rd_true/*done*/);
475 }
476
477 } else {
478 /* No connection yet. We'll be re-triggered on
479 * broker state broadcast. */
480 rd_kafka_broker_schedule_connection(rkb);
481 }
482
483 rd_kafka_broker_destroy(rkb);
484 return;
485 }
486
487 /* Get any usable broker to look up the coordinator */
488 rkb = rd_kafka_broker_any_usable(rk, RD_POLL_NOWAIT, RD_DO_LOCK,
489 RD_KAFKA_FEATURE_BROKER_GROUP_COORD,
490 "broker to look up coordinator");
491
492 if (!rkb) {
493 /* No available brokers yet, we'll be re-triggered on
494 * broker state broadcast. */
495 return;
496 }
497
498
499 /* Send FindCoordinator request, the handler will continue
500 * the state machine. */
501 rd_kafka_coord_req_keep(creq);
502 err = rd_kafka_FindCoordinatorRequest(
503 rkb, creq->creq_coordtype, creq->creq_coordkey,
504 RD_KAFKA_REPLYQ(rk->rk_ops, 0),
505 rd_kafka_coord_req_handle_FindCoordinator,
506 creq);
507
508 rd_kafka_broker_destroy(rkb);
509
510 if (err) {
511 rd_kafka_coord_req_fail(rk, creq, err);
512 /* from keep() above */
513 rd_kafka_coord_req_destroy(rk, creq, rd_false/*!done*/);
514 }
515 }
516
517
518
519 /**
520 * @brief Callback called from rdkafka main thread on each
521 * broker state change from or to UP.
522 *
523 * @locality rdkafka main thread
524 * @locks none
525 */
rd_kafka_coord_rkb_monitor_cb(rd_kafka_broker_t * rkb)526 void rd_kafka_coord_rkb_monitor_cb (rd_kafka_broker_t *rkb) {
527 rd_kafka_t *rk = rkb->rkb_rk;
528 rd_kafka_coord_req_t *creq, *tmp;
529
530 /* Run through all coord_req fsms */
531
532 TAILQ_FOREACH_SAFE(creq, &rk->rk_coord_reqs, creq_link, tmp)
533 rd_kafka_coord_req_fsm(rk, creq);
534 }
535
536
537
538 /**
539 * @brief Instance is terminating: destroy all coord reqs
540 */
rd_kafka_coord_reqs_term(rd_kafka_t * rk)541 void rd_kafka_coord_reqs_term (rd_kafka_t *rk) {
542 rd_kafka_coord_req_t *creq;
543
544 while ((creq = TAILQ_FIRST(&rk->rk_coord_reqs)))
545 rd_kafka_coord_req_fail(rk, creq, RD_KAFKA_RESP_ERR__DESTROY);
546 }
547
548
549 /**
550 * @brief Initialize coord reqs list.
551 */
rd_kafka_coord_reqs_init(rd_kafka_t * rk)552 void rd_kafka_coord_reqs_init (rd_kafka_t *rk) {
553 TAILQ_INIT(&rk->rk_coord_reqs);
554 }
555
556 /**@}*/
557