1 /*
2  * librdkafka - Apache Kafka C library
3  *
4  * Copyright (c) 2019 Magnus Edenhill
5  * All rights reserved.
6  *
7  * Redistribution and use in source and binary forms, with or without
8  * modification, are permitted provided that the following conditions are met:
9  *
10  * 1. Redistributions of source code must retain the above copyright notice,
11  *    this list of conditions and the following disclaimer.
12  * 2. Redistributions in binary form must reproduce the above copyright notice,
13  *    this list of conditions and the following disclaimer in the documentation
14  *    and/or other materials provided with the distribution.
15  *
16  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
17  * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
18  * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
19  * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
20  * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
21  * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
22  * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
23  * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
24  * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
25  * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
26  * POSSIBILITY OF SUCH DAMAGE.
27  */
28 
29 
30 #include "rdkafka_int.h"
31 #include "rdkafka_request.h"
32 #include "rdkafka_coord.h"
33 
34 
35 /**
36  * @name Coordinator cache
37  * @{
38  *
39  */
rd_kafka_coord_cache_entry_destroy(rd_kafka_coord_cache_t * cc,rd_kafka_coord_cache_entry_t * cce)40 void rd_kafka_coord_cache_entry_destroy (rd_kafka_coord_cache_t *cc,
41                                          rd_kafka_coord_cache_entry_t *cce) {
42         rd_assert(cc->cc_cnt > 0);
43         rd_free(cce->cce_coordkey);
44         rd_kafka_broker_destroy(cce->cce_rkb);
45         TAILQ_REMOVE(&cc->cc_entries, cce, cce_link);
46         cc->cc_cnt--;
47         rd_free(cce);
48 }
49 
50 
51 /**
52  * @brief Delete any expired cache entries
53  *
54  * @locality rdkafka main thread
55  */
rd_kafka_coord_cache_expire(rd_kafka_coord_cache_t * cc)56 void rd_kafka_coord_cache_expire (rd_kafka_coord_cache_t *cc) {
57         rd_kafka_coord_cache_entry_t *cce, *next;
58         rd_ts_t expire = rd_clock() - cc->cc_expire_thres;
59 
60         next = TAILQ_LAST(&cc->cc_entries, rd_kafka_coord_cache_head_s);
61         while (next) {
62                 cce = next;
63 
64                 if (cce->cce_ts_used > expire)
65                         break;
66 
67                 next = TAILQ_PREV(cce, rd_kafka_coord_cache_head_s, cce_link);
68                 rd_kafka_coord_cache_entry_destroy(cc, cce);
69         }
70 }
71 
72 
73 static rd_kafka_coord_cache_entry_t *
rd_kafka_coord_cache_find(rd_kafka_coord_cache_t * cc,rd_kafka_coordtype_t coordtype,const char * coordkey)74 rd_kafka_coord_cache_find (rd_kafka_coord_cache_t *cc,
75                            rd_kafka_coordtype_t coordtype,
76                            const char *coordkey) {
77         rd_kafka_coord_cache_entry_t *cce;
78 
79         TAILQ_FOREACH(cce, &cc->cc_entries, cce_link) {
80                 if (cce->cce_coordtype == coordtype &&
81                     !strcmp(cce->cce_coordkey, coordkey)) {
82                         /* Match */
83                         cce->cce_ts_used = rd_clock();
84                         if (TAILQ_FIRST(&cc->cc_entries) != cce) {
85                                 /* Move to head of list */
86                                 TAILQ_REMOVE(&cc->cc_entries,
87                                              cce, cce_link);
88                                 TAILQ_INSERT_HEAD(&cc->cc_entries,
89                                                   cce, cce_link);
90                         }
91                         return cce;
92                 }
93         }
94 
95         return NULL;
96 }
97 
98 
rd_kafka_coord_cache_get(rd_kafka_coord_cache_t * cc,rd_kafka_coordtype_t coordtype,const char * coordkey)99 rd_kafka_broker_t *rd_kafka_coord_cache_get (rd_kafka_coord_cache_t *cc,
100                                              rd_kafka_coordtype_t coordtype,
101                                              const char *coordkey) {
102         rd_kafka_coord_cache_entry_t *cce;
103 
104         cce = rd_kafka_coord_cache_find(cc, coordtype, coordkey);
105         if (!cce)
106                 return NULL;
107 
108         rd_kafka_broker_keep(cce->cce_rkb);
109         return cce->cce_rkb;
110 }
111 
112 
113 
rd_kafka_coord_cache_add(rd_kafka_coord_cache_t * cc,rd_kafka_coordtype_t coordtype,const char * coordkey,rd_kafka_broker_t * rkb)114 static void rd_kafka_coord_cache_add (rd_kafka_coord_cache_t *cc,
115                                       rd_kafka_coordtype_t coordtype,
116                                       const char *coordkey,
117                                       rd_kafka_broker_t *rkb) {
118         rd_kafka_coord_cache_entry_t *cce;
119 
120         if (!(cce = rd_kafka_coord_cache_find(cc, coordtype, coordkey))) {
121                 if (cc->cc_cnt > 10) {
122                         /* Not enough room in cache, remove least used entry */
123                         rd_kafka_coord_cache_entry_t *rem =
124                                 TAILQ_LAST(&cc->cc_entries,
125                                            rd_kafka_coord_cache_head_s);
126                         rd_kafka_coord_cache_entry_destroy(cc, rem);
127                 }
128 
129                 cce = rd_calloc(1, sizeof(*cce));
130                 cce->cce_coordtype = coordtype;
131                 cce->cce_coordkey = rd_strdup(coordkey);
132                 cce->cce_ts_used = rd_clock();
133 
134                 TAILQ_INSERT_HEAD(&cc->cc_entries, cce, cce_link);
135                 cc->cc_cnt++;
136         }
137 
138         if (cce->cce_rkb != rkb) {
139                 if (cce->cce_rkb)
140                         rd_kafka_broker_destroy(cce->cce_rkb);
141                 cce->cce_rkb = rkb;
142                 rd_kafka_broker_keep(rkb);
143         }
144 }
145 
146 
147 /**
148  * @brief Evict any cache entries for broker \p rkb.
149  *
150  * Use this when a request returns ERR_NOT_COORDINATOR_FOR...
151  *
152  * @locality rdkafka main thread
153  * @locks none
154  */
rd_kafka_coord_cache_evict(rd_kafka_coord_cache_t * cc,rd_kafka_broker_t * rkb)155 void rd_kafka_coord_cache_evict (rd_kafka_coord_cache_t *cc,
156                                  rd_kafka_broker_t *rkb) {
157         rd_kafka_coord_cache_entry_t *cce, *tmp;
158 
159         TAILQ_FOREACH_SAFE(cce, &cc->cc_entries, cce_link, tmp) {
160                 if (cce->cce_rkb == rkb)
161                         rd_kafka_coord_cache_entry_destroy(cc, cce);
162         }
163 }
164 
165 /**
166  * @brief Destroy all coord cache entries.
167  */
rd_kafka_coord_cache_destroy(rd_kafka_coord_cache_t * cc)168 void rd_kafka_coord_cache_destroy (rd_kafka_coord_cache_t *cc) {
169         rd_kafka_coord_cache_entry_t *cce;
170 
171         while ((cce = TAILQ_FIRST(&cc->cc_entries)))
172                 rd_kafka_coord_cache_entry_destroy(cc, cce);
173 }
174 
175 
176 /**
177  * @brief Initialize the coord cache.
178  *
179  * Locking of the coord-cache is up to the owner.
180  */
rd_kafka_coord_cache_init(rd_kafka_coord_cache_t * cc,int expire_thres_ms)181 void rd_kafka_coord_cache_init (rd_kafka_coord_cache_t *cc,
182                                 int expire_thres_ms) {
183         TAILQ_INIT(&cc->cc_entries);
184         cc->cc_cnt = 0;
185         cc->cc_expire_thres = expire_thres_ms * 1000;
186 }
187 
188 /**@}*/
189 
190 
191 /**
192  * @name Asynchronous coordinator requests
193  * @{
194  *
195  */
196 
197 
198 
199 static void rd_kafka_coord_req_fsm (rd_kafka_t *rk, rd_kafka_coord_req_t *creq);
200 
201 
202 
203 
204 /**
205  * @brief Look up coordinator for \p coordtype and \p coordkey
206  *        (either from cache or by FindCoordinator), make sure there is
207  *        a connection to the coordinator, and then call \p send_req_cb,
208  *        passing the coordinator broker instance and \p rko
209  *        to send the request.
210  *        These steps may be performed by this function, or asynchronously
211  *        at a later time.
212  *
213  * Response, or error, is sent on \p replyq with callback \p rkbuf_cb.
214  *
215  * @locality rdkafka main thread
216  * @locks none
217  */
rd_kafka_coord_req(rd_kafka_t * rk,rd_kafka_coordtype_t coordtype,const char * coordkey,rd_kafka_send_req_cb_t * send_req_cb,rd_kafka_op_t * rko,int timeout_ms,rd_kafka_replyq_t replyq,rd_kafka_resp_cb_t * resp_cb,void * reply_opaque)218 void rd_kafka_coord_req (rd_kafka_t *rk,
219                          rd_kafka_coordtype_t coordtype,
220                          const char *coordkey,
221                          rd_kafka_send_req_cb_t *send_req_cb,
222                          rd_kafka_op_t *rko,
223                          int timeout_ms,
224                          rd_kafka_replyq_t replyq,
225                          rd_kafka_resp_cb_t *resp_cb,
226                          void *reply_opaque) {
227         rd_kafka_coord_req_t *creq;
228 
229         creq = rd_calloc(1, sizeof(*creq));
230         creq->creq_coordtype = coordtype;
231         creq->creq_coordkey = rd_strdup(coordkey);
232         creq->creq_ts_timeout = rd_timeout_init(timeout_ms);
233         creq->creq_send_req_cb = send_req_cb;
234         creq->creq_rko = rko;
235         creq->creq_replyq = replyq;
236         creq->creq_resp_cb = resp_cb;
237         creq->creq_reply_opaque = reply_opaque;
238         creq->creq_refcnt = 1;
239         creq->creq_done = rd_false;
240 
241         TAILQ_INSERT_TAIL(&rk->rk_coord_reqs, creq, creq_link);
242 
243         rd_kafka_coord_req_fsm(rk, creq);
244 }
245 
246 
247 /**
248  * @brief Decrease refcount of creq and free it if no more references.
249  *
250  * @param done Mark creq as done, having performed its duties. There may still
251  *             be lingering references.
252  *
253  * @returns true if creq was destroyed, else false.
254  */
255 static rd_bool_t
rd_kafka_coord_req_destroy(rd_kafka_t * rk,rd_kafka_coord_req_t * creq,rd_bool_t done)256 rd_kafka_coord_req_destroy (rd_kafka_t *rk, rd_kafka_coord_req_t *creq,
257                             rd_bool_t done) {
258 
259         rd_assert(creq->creq_refcnt > 0);
260 
261         if (done) {
262                 /* Request has been performed, remove from rk_coord_reqs
263                  * list so creq won't be triggered again by state broadcasts,
264                  * etc. */
265                 rd_dassert(!creq->creq_done);
266                 TAILQ_REMOVE(&rk->rk_coord_reqs, creq, creq_link);
267                 creq->creq_done = rd_true;
268         }
269 
270         if (--creq->creq_refcnt > 0)
271                 return rd_false;
272 
273         rd_dassert(creq->creq_done);
274         rd_kafka_replyq_destroy(&creq->creq_replyq);
275         rd_free(creq->creq_coordkey);
276         rd_free(creq);
277 
278         return rd_true;
279 }
280 
rd_kafka_coord_req_keep(rd_kafka_coord_req_t * creq)281 static void rd_kafka_coord_req_keep (rd_kafka_coord_req_t *creq) {
282         creq->creq_refcnt++;
283 }
284 
rd_kafka_coord_req_fail(rd_kafka_t * rk,rd_kafka_coord_req_t * creq,rd_kafka_resp_err_t err)285 static void rd_kafka_coord_req_fail (rd_kafka_t *rk, rd_kafka_coord_req_t *creq,
286                                      rd_kafka_resp_err_t err) {
287         rd_kafka_op_t *reply;
288         rd_kafka_buf_t *rkbuf;
289 
290         reply = rd_kafka_op_new(RD_KAFKA_OP_RECV_BUF);
291         reply->rko_rk = rk;  /* Set rk since the rkbuf will not have a rkb
292                               * to reach it. */
293         reply->rko_err = err;
294 
295         /* Need a dummy rkbuf to pass state to the buf resp_cb */
296         rkbuf = rd_kafka_buf_new(0, 0);
297         rkbuf->rkbuf_cb = creq->creq_resp_cb;
298         rkbuf->rkbuf_opaque = creq->creq_reply_opaque;
299         reply->rko_u.xbuf.rkbuf = rkbuf;
300 
301         rd_kafka_replyq_enq(&creq->creq_replyq, reply, 0);
302 
303         rd_kafka_coord_req_destroy(rk, creq, rd_true/*done*/);
304 }
305 
306 
307 static void
rd_kafka_coord_req_handle_FindCoordinator(rd_kafka_t * rk,rd_kafka_broker_t * rkb,rd_kafka_resp_err_t err,rd_kafka_buf_t * rkbuf,rd_kafka_buf_t * request,void * opaque)308 rd_kafka_coord_req_handle_FindCoordinator (rd_kafka_t *rk,
309                                            rd_kafka_broker_t *rkb,
310                                            rd_kafka_resp_err_t err,
311                                            rd_kafka_buf_t *rkbuf,
312                                            rd_kafka_buf_t *request,
313                                            void *opaque) {
314         const int log_decode_errors = LOG_ERR;
315         rd_kafka_coord_req_t *creq = opaque;
316         int16_t ErrorCode;
317         rd_kafkap_str_t Host;
318         int32_t NodeId, Port;
319         char errstr[256] = "";
320         int actions;
321         rd_kafka_broker_t *coord;
322         rd_kafka_metadata_broker_t mdb = RD_ZERO_INIT;
323 
324         /* If creq has finished (possibly because of an earlier FindCoordinator
325          * response or a broker state broadcast we simply ignore the
326          * response. */
327         if (creq->creq_done)
328                 err = RD_KAFKA_RESP_ERR__DESTROY;
329 
330         if (err)
331                 goto err;
332 
333         if (request->rkbuf_reqhdr.ApiVersion >= 1)
334                 rd_kafka_buf_read_throttle_time(rkbuf);
335 
336         rd_kafka_buf_read_i16(rkbuf, &ErrorCode);
337 
338         if (request->rkbuf_reqhdr.ApiVersion >= 1) {
339                 rd_kafkap_str_t ErrorMsg;
340                 rd_kafka_buf_read_str(rkbuf, &ErrorMsg);
341                 if (ErrorCode)
342                         rd_snprintf(errstr, sizeof(errstr),
343                                     "%.*s", RD_KAFKAP_STR_PR(&ErrorMsg));
344         }
345 
346         if ((err = ErrorCode))
347                 goto err;
348 
349         rd_kafka_buf_read_i32(rkbuf, &NodeId);
350         rd_kafka_buf_read_str(rkbuf, &Host);
351         rd_kafka_buf_read_i32(rkbuf, &Port);
352 
353         mdb.id = NodeId;
354         RD_KAFKAP_STR_DUPA(&mdb.host, &Host);
355         mdb.port = Port;
356 
357         /* Find, update or add broker */
358         rd_kafka_broker_update(rk, rkb->rkb_proto, &mdb, &coord);
359 
360         if (!coord) {
361                 err = RD_KAFKA_RESP_ERR__FAIL;
362                 rd_snprintf(errstr, sizeof(errstr),
363                             "Failed to add broker: "
364                             "instance is probably terminating");
365                 goto err;
366         }
367 
368 
369         rd_kafka_coord_cache_add(&rk->rk_coord_cache,
370                                  creq->creq_coordtype,
371                                  creq->creq_coordkey,
372                                  coord);
373         rd_kafka_broker_destroy(coord); /* refcnt from broker_update() */
374 
375         rd_kafka_coord_req_fsm(rk, creq);
376 
377         /* Drop refcount from req_fsm() */
378         rd_kafka_coord_req_destroy(rk, creq, rd_false/*!done*/);
379 
380         return;
381 
382  err_parse:
383         err = rkbuf->rkbuf_err;
384  err:
385         actions = rd_kafka_err_action(
386                 rkb, err, request,
387 
388                 RD_KAFKA_ERR_ACTION_SPECIAL,
389                 RD_KAFKA_RESP_ERR__DESTROY,
390 
391                 RD_KAFKA_ERR_ACTION_PERMANENT,
392                 RD_KAFKA_RESP_ERR_TRANSACTIONAL_ID_AUTHORIZATION_FAILED,
393 
394                 RD_KAFKA_ERR_ACTION_PERMANENT,
395                 RD_KAFKA_RESP_ERR_CLUSTER_AUTHORIZATION_FAILED,
396 
397                 RD_KAFKA_ERR_ACTION_REFRESH,
398                 RD_KAFKA_RESP_ERR__TRANSPORT,
399 
400                 RD_KAFKA_ERR_ACTION_RETRY,
401                 RD_KAFKA_RESP_ERR_COORDINATOR_NOT_AVAILABLE,
402 
403                 RD_KAFKA_ERR_ACTION_RETRY,
404                 RD_KAFKA_RESP_ERR_COORDINATOR_LOAD_IN_PROGRESS,
405 
406                 RD_KAFKA_ERR_ACTION_END);
407 
408         if (actions & RD_KAFKA_ERR_ACTION_PERMANENT) {
409                 rd_kafka_coord_req_fail(rk, creq, err);
410                 return;
411 
412         } else if (actions & RD_KAFKA_ERR_ACTION_RETRY) {
413                 rd_kafka_buf_retry(rkb, request);
414                 return; /* Keep refcnt from req_fsm() and retry */
415         }
416 
417         /* Rely on state broadcast to trigger retry */
418 
419         /* Drop refcount from req_fsm() */
420         rd_kafka_coord_req_destroy(rk, creq, rd_false/*!done*/);
421 }
422 
423 
424 
425 
426 
427 
428 /**
429  * @brief State machine for async coordinator requests.
430  *
431  * @remark May destroy the \p creq.
432  *
433  * @locality any
434  * @locks none
435  */
436 static void
rd_kafka_coord_req_fsm(rd_kafka_t * rk,rd_kafka_coord_req_t * creq)437 rd_kafka_coord_req_fsm (rd_kafka_t *rk, rd_kafka_coord_req_t *creq) {
438         rd_kafka_broker_t *rkb;
439         rd_kafka_resp_err_t err;
440 
441         if (creq->creq_done)
442                 /* crqeq has already performed its actions, this is a
443                  * lingering reference, e.g., a late FindCoordinator response.
444                  * Just ignore. */
445                 return;
446 
447         if (unlikely(rd_kafka_terminating(rk))) {
448                 rd_kafka_coord_req_fail(rk, creq, RD_KAFKA_RESP_ERR__DESTROY);
449                 return;
450         }
451 
452         /* Check cache first */
453         rkb = rd_kafka_coord_cache_get(&rk->rk_coord_cache,
454                                        creq->creq_coordtype,
455                                        creq->creq_coordkey);
456 
457         if (rkb) {
458                 if (rd_kafka_broker_is_up(rkb)) {
459                         /* Cached coordinator is up, send request */
460                         rd_kafka_replyq_t replyq;
461 
462                         rd_kafka_replyq_copy(&replyq, &creq->creq_replyq);
463                         err = creq->creq_send_req_cb(rkb, creq->creq_rko,
464                                                      replyq, creq->creq_resp_cb,
465                                                      creq->creq_reply_opaque);
466 
467                         if (err) {
468                                 /* Permanent error, e.g., request not
469                                  *  supported by broker. */
470                                 rd_kafka_replyq_destroy(&replyq);
471                                 rd_kafka_coord_req_fail(rk, creq, err);
472                         } else {
473                                 rd_kafka_coord_req_destroy(rk, creq,
474                                                            rd_true/*done*/);
475                         }
476 
477                 } else {
478                         /* No connection yet. We'll be re-triggered on
479                          * broker state broadcast. */
480                         rd_kafka_broker_schedule_connection(rkb);
481                 }
482 
483                 rd_kafka_broker_destroy(rkb);
484                 return;
485         }
486 
487         /* Get any usable broker to look up the coordinator */
488         rkb = rd_kafka_broker_any_usable(rk, RD_POLL_NOWAIT, RD_DO_LOCK,
489                                          RD_KAFKA_FEATURE_BROKER_GROUP_COORD,
490                                          "broker to look up coordinator");
491 
492         if (!rkb) {
493                 /* No available brokers yet, we'll be re-triggered on
494                  * broker state broadcast. */
495                 return;
496         }
497 
498 
499         /* Send FindCoordinator request, the handler will continue
500          * the state machine. */
501         rd_kafka_coord_req_keep(creq);
502         err = rd_kafka_FindCoordinatorRequest(
503                 rkb, creq->creq_coordtype, creq->creq_coordkey,
504                 RD_KAFKA_REPLYQ(rk->rk_ops, 0),
505                 rd_kafka_coord_req_handle_FindCoordinator,
506                 creq);
507 
508         rd_kafka_broker_destroy(rkb);
509 
510         if (err) {
511                 rd_kafka_coord_req_fail(rk, creq, err);
512                 /* from keep() above */
513                 rd_kafka_coord_req_destroy(rk, creq, rd_false/*!done*/);
514         }
515 }
516 
517 
518 
519 /**
520  * @brief Callback called from rdkafka main thread on each
521  *        broker state change from or to UP.
522  *
523  * @locality rdkafka main thread
524  * @locks none
525  */
rd_kafka_coord_rkb_monitor_cb(rd_kafka_broker_t * rkb)526 void rd_kafka_coord_rkb_monitor_cb (rd_kafka_broker_t *rkb) {
527         rd_kafka_t *rk = rkb->rkb_rk;
528         rd_kafka_coord_req_t *creq, *tmp;
529 
530         /* Run through all coord_req fsms */
531 
532         TAILQ_FOREACH_SAFE(creq, &rk->rk_coord_reqs, creq_link, tmp)
533                 rd_kafka_coord_req_fsm(rk, creq);
534 }
535 
536 
537 
538 /**
539  * @brief Instance is terminating: destroy all coord reqs
540  */
rd_kafka_coord_reqs_term(rd_kafka_t * rk)541 void rd_kafka_coord_reqs_term (rd_kafka_t *rk) {
542         rd_kafka_coord_req_t *creq;
543 
544         while ((creq = TAILQ_FIRST(&rk->rk_coord_reqs)))
545                 rd_kafka_coord_req_fail(rk, creq, RD_KAFKA_RESP_ERR__DESTROY);
546 }
547 
548 
549 /**
550  * @brief Initialize coord reqs list.
551  */
rd_kafka_coord_reqs_init(rd_kafka_t * rk)552 void rd_kafka_coord_reqs_init (rd_kafka_t *rk) {
553         TAILQ_INIT(&rk->rk_coord_reqs);
554 }
555 
556 /**@}*/
557