1 /*
2 * librdkafka - Apache Kafka C library
3 *
4 * Copyright (c) 2012-2015, Magnus Edenhill
5 * All rights reserved.
6 *
7 * Redistribution and use in source and binary forms, with or without
8 * modification, are permitted provided that the following conditions are met:
9 *
10 * 1. Redistributions of source code must retain the above copyright notice,
11 * this list of conditions and the following disclaimer.
12 * 2. Redistributions in binary form must reproduce the above copyright notice,
13 * this list of conditions and the following disclaimer in the documentation
14 * and/or other materials provided with the distribution.
15 *
16 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
17 * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
18 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
19 * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
20 * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
21 * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
22 * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
23 * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
24 * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
25 * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
26 * POSSIBILITY OF SUCH DAMAGE.
27 */
28
29 #if defined(__MINGW32__)
30 #include <ws2tcpip.h>
31 #endif
32
33 #ifndef _WIN32
34 #define _GNU_SOURCE
35 /*
36 * AIX defines this and the value needs to be set correctly. For Solaris,
37 * src/rd.h defines _POSIX_SOURCE to be 200809L, which corresponds to XPG7,
38 * which itself is not compatible with _XOPEN_SOURCE on that platform.
39 */
40 #if !defined(_AIX) && !defined(__sun)
41 #define _XOPEN_SOURCE
42 #endif
43 #include <signal.h>
44 #endif
45
46 #include <stdio.h>
47 #include <stdarg.h>
48 #include <string.h>
49 #include <ctype.h>
50
51 #include "rd.h"
52 #include "rdkafka_int.h"
53 #include "rdkafka_msg.h"
54 #include "rdkafka_msgset.h"
55 #include "rdkafka_topic.h"
56 #include "rdkafka_partition.h"
57 #include "rdkafka_broker.h"
58 #include "rdkafka_offset.h"
59 #include "rdkafka_transport.h"
60 #include "rdkafka_proto.h"
61 #include "rdkafka_buf.h"
62 #include "rdkafka_request.h"
63 #include "rdkafka_sasl.h"
64 #include "rdkafka_interceptor.h"
65 #include "rdkafka_idempotence.h"
66 #include "rdkafka_txnmgr.h"
67 #include "rdtime.h"
68 #include "rdcrc32.h"
69 #include "rdrand.h"
70 #include "rdkafka_lz4.h"
71 #if WITH_SSL
72 #include <openssl/err.h>
73 #endif
74 #include "rdendian.h"
75 #include "rdunittest.h"
76
77
78 static const int rd_kafka_max_block_ms = 1000;
79
80 const char *rd_kafka_broker_state_names[] = {
81 "INIT",
82 "DOWN",
83 "TRY_CONNECT",
84 "CONNECT",
85 "SSL_HANDSHAKE",
86 "AUTH_LEGACY",
87 "UP",
88 "UPDATE",
89 "APIVERSION_QUERY",
90 "AUTH_HANDSHAKE",
91 "AUTH_REQ"
92 };
93
94 const char *rd_kafka_secproto_names[] = {
95 [RD_KAFKA_PROTO_PLAINTEXT] = "plaintext",
96 [RD_KAFKA_PROTO_SSL] = "ssl",
97 [RD_KAFKA_PROTO_SASL_PLAINTEXT] = "sasl_plaintext",
98 [RD_KAFKA_PROTO_SASL_SSL] = "sasl_ssl",
99 NULL
100 };
101
102
103
104 /**
105 * @returns true if the broker needs a persistent connection
106 * @locaility broker thread
107 */
108 static RD_INLINE rd_bool_t
rd_kafka_broker_needs_persistent_connection(rd_kafka_broker_t * rkb)109 rd_kafka_broker_needs_persistent_connection (rd_kafka_broker_t *rkb) {
110 return rkb->rkb_persistconn.internal ||
111 rd_atomic32_get(&rkb->rkb_persistconn.coord);
112 }
113
114
115 /**
116 * @returns > 0 if a connection to this broker is needed, else 0.
117 * @locality broker thread
118 * @locks none
119 */
120 static RD_INLINE int
rd_kafka_broker_needs_connection(rd_kafka_broker_t * rkb)121 rd_kafka_broker_needs_connection (rd_kafka_broker_t *rkb) {
122 return rkb->rkb_state == RD_KAFKA_BROKER_STATE_INIT &&
123 !rd_kafka_terminating(rkb->rkb_rk) &&
124 !rd_kafka_fatal_error_code(rkb->rkb_rk) &&
125 (!rkb->rkb_rk->rk_conf.sparse_connections ||
126 rd_kafka_broker_needs_persistent_connection(rkb));
127 }
128
129
130 static void rd_kafka_broker_handle_purge_queues (rd_kafka_broker_t *rkb,
131 rd_kafka_op_t *rko);
132 static void rd_kafka_broker_trigger_monitors (rd_kafka_broker_t *rkb);
133
134
135 #define rd_kafka_broker_terminating(rkb) \
136 (rd_refcnt_get(&(rkb)->rkb_refcnt) <= 1)
137
138
139 /**
140 * Construct broker nodename.
141 */
rd_kafka_mk_nodename(char * dest,size_t dsize,const char * name,uint16_t port)142 static void rd_kafka_mk_nodename (char *dest, size_t dsize,
143 const char *name, uint16_t port) {
144 rd_snprintf(dest, dsize, "%s:%hu", name, port);
145 }
146
147 /**
148 * Construct descriptive broker name
149 */
rd_kafka_mk_brokername(char * dest,size_t dsize,rd_kafka_secproto_t proto,const char * nodename,int32_t nodeid,rd_kafka_confsource_t source)150 static void rd_kafka_mk_brokername (char *dest, size_t dsize,
151 rd_kafka_secproto_t proto,
152 const char *nodename, int32_t nodeid,
153 rd_kafka_confsource_t source) {
154
155 /* Prepend protocol name to brokername, unless it is a
156 * standard plaintext or logical broker in which case we
157 * omit the protocol part. */
158 if (proto != RD_KAFKA_PROTO_PLAINTEXT &&
159 source != RD_KAFKA_LOGICAL) {
160 int r = rd_snprintf(dest, dsize, "%s://",
161 rd_kafka_secproto_names[proto]);
162 if (r >= (int)dsize) /* Skip proto name if it wont fit.. */
163 r = 0;
164
165 dest += r;
166 dsize -= r;
167 }
168
169 if (nodeid == RD_KAFKA_NODEID_UA)
170 rd_snprintf(dest, dsize, "%s%s",
171 nodename,
172 source == RD_KAFKA_LOGICAL ? "" :
173 (source == RD_KAFKA_INTERNAL ?
174 "/internal" : "/bootstrap"));
175 else
176 rd_snprintf(dest, dsize, "%s/%"PRId32, nodename, nodeid);
177 }
178
179
180 /**
181 * @brief Enable protocol feature(s) for the current broker.
182 *
183 * @locks broker_lock MUST be held
184 * @locality broker thread
185 */
rd_kafka_broker_feature_enable(rd_kafka_broker_t * rkb,int features)186 static void rd_kafka_broker_feature_enable (rd_kafka_broker_t *rkb,
187 int features) {
188 if (features & rkb->rkb_features)
189 return;
190
191 rkb->rkb_features |= features;
192 rd_rkb_dbg(rkb, BROKER | RD_KAFKA_DBG_PROTOCOL | RD_KAFKA_DBG_FEATURE,
193 "FEATURE",
194 "Updated enabled protocol features +%s to %s",
195 rd_kafka_features2str(features),
196 rd_kafka_features2str(rkb->rkb_features));
197 }
198
199
200 /**
201 * @brief Disable protocol feature(s) for the current broker.
202 *
203 * @locks broker_lock MUST be held
204 * @locality broker thread
205 */
rd_kafka_broker_feature_disable(rd_kafka_broker_t * rkb,int features)206 static void rd_kafka_broker_feature_disable (rd_kafka_broker_t *rkb,
207 int features) {
208 if (!(features & rkb->rkb_features))
209 return;
210
211 rkb->rkb_features &= ~features;
212 rd_rkb_dbg(rkb, BROKER | RD_KAFKA_DBG_PROTOCOL | RD_KAFKA_DBG_FEATURE,
213 "FEATURE",
214 "Updated enabled protocol features -%s to %s",
215 rd_kafka_features2str(features),
216 rd_kafka_features2str(rkb->rkb_features));
217 }
218
219
220 /**
221 * @brief Set protocol feature(s) for the current broker.
222 *
223 * @remark This replaces the previous feature set.
224 *
225 * @locality broker thread
226 * @locks rd_kafka_broker_lock()
227 */
rd_kafka_broker_features_set(rd_kafka_broker_t * rkb,int features)228 static void rd_kafka_broker_features_set (rd_kafka_broker_t *rkb, int features) {
229 if (rkb->rkb_features == features)
230 return;
231
232 rkb->rkb_features = features;
233 rd_rkb_dbg(rkb, BROKER, "FEATURE",
234 "Updated enabled protocol features to %s",
235 rd_kafka_features2str(rkb->rkb_features));
236 }
237
238
239 /**
240 * @brief Check and return supported ApiVersion for \p ApiKey.
241 *
242 * @returns the highest supported ApiVersion in the specified range (inclusive)
243 * or -1 if the ApiKey is not supported or no matching ApiVersion.
244 * The current feature set is also returned in \p featuresp
245 * @locks none
246 * @locality any
247 */
rd_kafka_broker_ApiVersion_supported(rd_kafka_broker_t * rkb,int16_t ApiKey,int16_t minver,int16_t maxver,int * featuresp)248 int16_t rd_kafka_broker_ApiVersion_supported (rd_kafka_broker_t *rkb,
249 int16_t ApiKey,
250 int16_t minver, int16_t maxver,
251 int *featuresp) {
252 struct rd_kafka_ApiVersion skel = { .ApiKey = ApiKey };
253 struct rd_kafka_ApiVersion ret = RD_ZERO_INIT, *retp;
254
255 rd_kafka_broker_lock(rkb);
256 if (featuresp)
257 *featuresp = rkb->rkb_features;
258
259 if (rkb->rkb_features & RD_KAFKA_FEATURE_UNITTEST) {
260 /* For unit tests let the broker support everything. */
261 rd_kafka_broker_unlock(rkb);
262 return maxver;
263 }
264
265 retp = bsearch(&skel, rkb->rkb_ApiVersions, rkb->rkb_ApiVersions_cnt,
266 sizeof(*rkb->rkb_ApiVersions),
267 rd_kafka_ApiVersion_key_cmp);
268 if (retp)
269 ret = *retp;
270 rd_kafka_broker_unlock(rkb);
271
272 if (!retp)
273 return -1;
274
275 if (ret.MaxVer < maxver) {
276 if (ret.MaxVer < minver)
277 return -1;
278 else
279 return ret.MaxVer;
280 } else if (ret.MinVer > maxver)
281 return -1;
282 else
283 return maxver;
284 }
285
286
287 /**
288 * @brief Set broker state.
289 *
290 * \c rkb->rkb_state is the previous state, while
291 * \p state is the new state.
292 *
293 * @locks rd_kafka_broker_lock() MUST be held.
294 * @locality broker thread
295 */
rd_kafka_broker_set_state(rd_kafka_broker_t * rkb,int state)296 void rd_kafka_broker_set_state (rd_kafka_broker_t *rkb, int state) {
297 if ((int)rkb->rkb_state == state)
298 return;
299
300 rd_kafka_dbg(rkb->rkb_rk, BROKER, "STATE",
301 "%s: Broker changed state %s -> %s",
302 rkb->rkb_name,
303 rd_kafka_broker_state_names[rkb->rkb_state],
304 rd_kafka_broker_state_names[state]);
305
306 if (rkb->rkb_source == RD_KAFKA_INTERNAL) {
307 /* no-op */
308 } else if (state == RD_KAFKA_BROKER_STATE_DOWN &&
309 !rkb->rkb_down_reported) {
310 /* Propagate ALL_BROKERS_DOWN event if all brokers are
311 * now down, unless we're terminating. */
312 if (rd_atomic32_add(&rkb->rkb_rk->rk_broker_down_cnt, 1) ==
313 rd_atomic32_get(&rkb->rkb_rk->rk_broker_cnt) -
314 rd_atomic32_get(&rkb->rkb_rk->rk_broker_addrless_cnt) &&
315 !rd_kafka_terminating(rkb->rkb_rk))
316 rd_kafka_op_err(rkb->rkb_rk,
317 RD_KAFKA_RESP_ERR__ALL_BROKERS_DOWN,
318 "%i/%i brokers are down",
319 rd_atomic32_get(&rkb->rkb_rk->
320 rk_broker_down_cnt),
321 rd_atomic32_get(&rkb->rkb_rk->
322 rk_broker_cnt) -
323 rd_atomic32_get(&rkb->rkb_rk->
324 rk_broker_addrless_cnt));
325 rkb->rkb_down_reported = 1;
326
327 } else if (state >= RD_KAFKA_BROKER_STATE_UP &&
328 rkb->rkb_down_reported) {
329 rd_atomic32_sub(&rkb->rkb_rk->rk_broker_down_cnt, 1);
330 rkb->rkb_down_reported = 0;
331 }
332
333 if (rkb->rkb_source != RD_KAFKA_INTERNAL) {
334 if (rd_kafka_broker_state_is_up(state) &&
335 !rd_kafka_broker_state_is_up(rkb->rkb_state)) {
336 /* Up -> Down */
337 rd_atomic32_add(&rkb->rkb_rk->rk_broker_up_cnt, 1);
338
339 rd_kafka_broker_trigger_monitors(rkb);
340
341 if (RD_KAFKA_BROKER_IS_LOGICAL(rkb))
342 rd_atomic32_add(&rkb->rkb_rk->
343 rk_logical_broker_up_cnt, 1);
344
345 } else if (rd_kafka_broker_state_is_up(rkb->rkb_state) &&
346 !rd_kafka_broker_state_is_up(state)) {
347 /* ~Down(!Up) -> Up */
348 rd_atomic32_sub(&rkb->rkb_rk->rk_broker_up_cnt, 1);
349
350 rd_kafka_broker_trigger_monitors(rkb);
351
352 if (RD_KAFKA_BROKER_IS_LOGICAL(rkb))
353 rd_atomic32_sub(&rkb->rkb_rk->
354 rk_logical_broker_up_cnt, 1);
355 }
356 }
357
358 rkb->rkb_state = state;
359 rkb->rkb_ts_state = rd_clock();
360
361 rd_kafka_brokers_broadcast_state_change(rkb->rkb_rk);
362 }
363
364
365 /**
366 * @brief Set, log and propagate broker fail error.
367 *
368 * @param rkb Broker connection that failed.
369 * @param level Syslog level. LOG_DEBUG will not be logged unless debugging
370 * is enabled.
371 * @param err The type of error that occurred.
372 * @param fmt Format string.
373 * @param ap Format string arguments.
374 *
375 * @locks none
376 * @locality broker thread
377 */
rd_kafka_broker_set_error(rd_kafka_broker_t * rkb,int level,rd_kafka_resp_err_t err,const char * fmt,va_list ap)378 static void rd_kafka_broker_set_error (rd_kafka_broker_t *rkb, int level,
379 rd_kafka_resp_err_t err,
380 const char *fmt, va_list ap) {
381 char errstr[512];
382 char extra[128];
383 size_t of = 0, ofe;
384 rd_bool_t identical, suppress;
385 int state_duration_ms = (int)((rd_clock() - rkb->rkb_ts_state)/1000);
386
387
388 /* If this is a logical broker we include its current nodename/address
389 * in the log message. */
390 rd_kafka_broker_lock(rkb);
391 if (rkb->rkb_source == RD_KAFKA_LOGICAL && *rkb->rkb_nodename) {
392 of = (size_t)rd_snprintf(errstr, sizeof(errstr), "%s: ",
393 rkb->rkb_nodename);
394 if (of > sizeof(errstr))
395 of = 0; /* If nodename overflows the entire buffer we
396 * skip it completely since the error message
397 * itself is more important. */
398 }
399 rd_kafka_broker_unlock(rkb);
400
401 ofe = (size_t)rd_vsnprintf(errstr+of, sizeof(errstr)-of, fmt, ap);
402 if (ofe > sizeof(errstr)-of)
403 ofe = sizeof(errstr)-of;
404 of += ofe;
405
406 /* Provide more meaningful error messages in certain cases */
407 if (err == RD_KAFKA_RESP_ERR__TRANSPORT &&
408 !strcmp(errstr, "Disconnected")) {
409 if (rkb->rkb_state == RD_KAFKA_BROKER_STATE_APIVERSION_QUERY) {
410 /* A disconnect while requesting ApiVersion typically
411 * means we're connecting to a SSL-listener as
412 * PLAINTEXT, but may also be caused by connecting to
413 * a broker that does not support ApiVersion (<0.10). */
414
415 if (rkb->rkb_proto != RD_KAFKA_PROTO_SSL &&
416 rkb->rkb_proto != RD_KAFKA_PROTO_SASL_SSL)
417 rd_kafka_broker_set_error(
418 rkb, level, err,
419 "Disconnected while requesting "
420 "ApiVersion: "
421 "might be caused by incorrect "
422 "security.protocol configuration "
423 "(connecting to a SSL listener?) or "
424 "broker version is < 0.10 "
425 "(see api.version.request)",
426 ap/*ignored*/);
427 else
428 rd_kafka_broker_set_error(
429 rkb, level, err,
430 "Disconnected while requesting "
431 "ApiVersion: "
432 "might be caused by broker version "
433 "< 0.10 (see api.version.request)",
434 ap/*ignored*/);
435 return;
436
437 } else if (rkb->rkb_state == RD_KAFKA_BROKER_STATE_UP &&
438 state_duration_ms < 2000/*2s*/ &&
439 rkb->rkb_rk->rk_conf.security_protocol !=
440 RD_KAFKA_PROTO_SASL_SSL &&
441 rkb->rkb_rk->rk_conf.security_protocol !=
442 RD_KAFKA_PROTO_SASL_PLAINTEXT) {
443 /* If disconnected shortly after transitioning to UP
444 * state it typically means the broker listener is
445 * configured for SASL authentication but the client
446 * is not. */
447 rd_kafka_broker_set_error(
448 rkb, level, err,
449 "Disconnected: verify that security.protocol "
450 "is correctly configured, broker might "
451 "require SASL authentication",
452 ap/*ignored*/);
453 return;
454 }
455 }
456
457 /* Check if error is identical to last error (prior to appending
458 * the variable suffix "after Xms in state Y"), if so we should
459 * suppress it. */
460 identical = err == rkb->rkb_last_err.err &&
461 !strcmp(rkb->rkb_last_err.errstr, errstr);
462 suppress = identical &&
463 rd_interval(&rkb->rkb_suppress.fail_error,
464 30 * 1000 * 1000 /*30s*/, 0) <= 0;
465
466 /* Copy last error prior to adding extras */
467 rkb->rkb_last_err.err = err;
468 rd_strlcpy(rkb->rkb_last_err.errstr, errstr,
469 sizeof(rkb->rkb_last_err.errstr));
470
471 /* Time since last state change to help debug connection issues */
472 ofe = rd_snprintf(extra, sizeof(extra),
473 "after %dms in state %s",
474 state_duration_ms,
475 rd_kafka_broker_state_names[rkb->rkb_state]);
476
477 /* Number of suppressed identical logs */
478 if (identical && !suppress && rkb->rkb_last_err.cnt >= 1 &&
479 ofe + 30 < sizeof(extra)) {
480 size_t r = (size_t)rd_snprintf(
481 extra+ofe, sizeof(extra)-ofe,
482 ", %d identical error(s) suppressed",
483 rkb->rkb_last_err.cnt);
484 if (r < sizeof(extra)-ofe)
485 ofe += r;
486 else
487 ofe = sizeof(extra);
488 }
489
490 /* Append the extra info if there is enough room */
491 if (ofe > 0 && of + ofe + 4 < sizeof(errstr))
492 rd_snprintf(errstr+of, sizeof(errstr)-of,
493 " (%s)", extra);
494
495 /* Don't log interrupt-wakeups when terminating */
496 if (err == RD_KAFKA_RESP_ERR__INTR &&
497 rd_kafka_terminating(rkb->rkb_rk))
498 suppress = rd_true;
499
500 if (!suppress)
501 rkb->rkb_last_err.cnt = 1;
502 else
503 rkb->rkb_last_err.cnt++;
504
505 rd_rkb_dbg(rkb, BROKER, "FAIL", "%s (%s)%s%s",
506 errstr, rd_kafka_err2name(err),
507 identical ? ": identical to last error" : "",
508 suppress ? ": error log suppressed" : "");
509
510 if (level != LOG_DEBUG && (level <= LOG_CRIT || !suppress)) {
511 rd_kafka_log(rkb->rkb_rk, level, "FAIL",
512 "%s: %s", rkb->rkb_name, errstr);
513
514 /* Send ERR op to application for processing. */
515 rd_kafka_q_op_err(rkb->rkb_rk->rk_rep, err, "%s: %s",
516 rkb->rkb_name, errstr);
517 }
518 }
519
520
521 /**
522 * @brief Failure propagation to application.
523 *
524 * Will tear down connection to broker and trigger a reconnect.
525 *
526 * \p level is the log level, <=LOG_INFO will be logged while =LOG_DEBUG will
527 * be debug-logged.
528 *
529 * @locality broker thread
530 */
rd_kafka_broker_fail(rd_kafka_broker_t * rkb,int level,rd_kafka_resp_err_t err,const char * fmt,...)531 void rd_kafka_broker_fail (rd_kafka_broker_t *rkb,
532 int level, rd_kafka_resp_err_t err,
533 const char *fmt, ...) {
534 va_list ap;
535 rd_kafka_bufq_t tmpq_waitresp, tmpq;
536 int old_state;
537
538 rd_kafka_assert(rkb->rkb_rk, thrd_is_current(rkb->rkb_thread));
539
540 if (rkb->rkb_transport) {
541 rd_kafka_transport_close(rkb->rkb_transport);
542 rkb->rkb_transport = NULL;
543
544 if (rkb->rkb_state >= RD_KAFKA_BROKER_STATE_UP)
545 rd_atomic32_add(&rkb->rkb_c.disconnects, 1);
546 }
547
548 rkb->rkb_req_timeouts = 0;
549
550 if (rkb->rkb_recv_buf) {
551 rd_kafka_buf_destroy(rkb->rkb_recv_buf);
552 rkb->rkb_recv_buf = NULL;
553 }
554
555 va_start(ap, fmt);
556 rd_kafka_broker_set_error(rkb, level, err, fmt, ap);
557 va_end(ap);
558
559 rd_kafka_broker_lock(rkb);
560
561 /* If we're currently asking for ApiVersion and the connection
562 * went down it probably means the broker does not support that request
563 * and tore down the connection. In this case we disable that feature flag. */
564 if (rkb->rkb_state == RD_KAFKA_BROKER_STATE_APIVERSION_QUERY)
565 rd_kafka_broker_feature_disable(rkb, RD_KAFKA_FEATURE_APIVERSION);
566
567 /* Set broker state */
568 old_state = rkb->rkb_state;
569 rd_kafka_broker_set_state(rkb, RD_KAFKA_BROKER_STATE_DOWN);
570
571 /* Unlock broker since a requeue will try to lock it. */
572 rd_kafka_broker_unlock(rkb);
573
574 rd_atomic64_set(&rkb->rkb_c.ts_send, 0);
575 rd_atomic64_set(&rkb->rkb_c.ts_recv, 0);
576
577 /*
578 * Purge all buffers
579 * (put bufs on a temporary queue since bufs may be requeued,
580 * make sure outstanding requests are re-enqueued before
581 * bufs on outbufs queue.)
582 */
583 rd_kafka_bufq_init(&tmpq_waitresp);
584 rd_kafka_bufq_init(&tmpq);
585 rd_kafka_bufq_concat(&tmpq_waitresp, &rkb->rkb_waitresps);
586 rd_kafka_bufq_concat(&tmpq, &rkb->rkb_outbufs);
587 rd_atomic32_init(&rkb->rkb_blocking_request_cnt, 0);
588
589 /* Purge the in-flight buffers (might get re-enqueued in case
590 * of retries). */
591 rd_kafka_bufq_purge(rkb, &tmpq_waitresp, err);
592
593 /* Purge the waiting-in-output-queue buffers,
594 * might also get re-enqueued. */
595 rd_kafka_bufq_purge(rkb, &tmpq,
596 /* If failure was caused by a timeout,
597 * adjust the error code for in-queue requests. */
598 err == RD_KAFKA_RESP_ERR__TIMED_OUT ?
599 RD_KAFKA_RESP_ERR__TIMED_OUT_QUEUE : err);
600
601 /* Update bufq for connection reset:
602 * - Purge connection-setup requests from outbufs since they will be
603 * reissued on the next connect.
604 * - Reset any partially sent buffer's offset.
605 */
606 rd_kafka_bufq_connection_reset(rkb, &rkb->rkb_outbufs);
607
608 /* Extra debugging for tracking termination-hang issues:
609 * show what is keeping this broker from decommissioning. */
610 if (rd_kafka_terminating(rkb->rkb_rk) &&
611 !rd_kafka_broker_terminating(rkb)) {
612 rd_rkb_dbg(rkb, BROKER | RD_KAFKA_DBG_PROTOCOL, "BRKTERM",
613 "terminating: broker still has %d refcnt(s), "
614 "%"PRId32" buffer(s), %d partition(s)",
615 rd_refcnt_get(&rkb->rkb_refcnt),
616 rd_kafka_bufq_cnt(&rkb->rkb_outbufs),
617 rkb->rkb_toppar_cnt);
618 rd_kafka_bufq_dump(rkb, "BRKOUTBUFS", &rkb->rkb_outbufs);
619 }
620
621
622 /* Query for topic leaders to quickly pick up on failover. */
623 if (err != RD_KAFKA_RESP_ERR__DESTROY &&
624 old_state >= RD_KAFKA_BROKER_STATE_UP)
625 rd_kafka_metadata_refresh_known_topics(rkb->rkb_rk, NULL,
626 rd_true/*force*/,
627 "broker down");
628 }
629
630
631
632 /**
633 * @brief Handle broker connection close.
634 *
635 * @locality broker thread
636 */
rd_kafka_broker_conn_closed(rd_kafka_broker_t * rkb,rd_kafka_resp_err_t err,const char * errstr)637 void rd_kafka_broker_conn_closed (rd_kafka_broker_t *rkb,
638 rd_kafka_resp_err_t err,
639 const char *errstr) {
640 int log_level = LOG_ERR;
641
642 if (!rkb->rkb_rk->rk_conf.log_connection_close) {
643 /* Silence all connection closes */
644 log_level = LOG_DEBUG;
645
646 } else {
647 /* Silence close logs for connections that are idle,
648 * it is most likely the broker's idle connection
649 * reaper kicking in.
650 *
651 * Indications there might be an error and not an
652 * idle disconnect:
653 * - If the connection age is low a disconnect
654 * typically indicates a failure, such as protocol mismatch.
655 * - If the connection hasn't been idle long enough.
656 * - There are outstanding requests, or requests enqueued.
657 *
658 * For non-idle connections, adjust log level:
659 * - requests in-flight: LOG_WARNING
660 * - else: LOG_INFO
661 */
662 rd_ts_t now = rd_clock();
663 rd_ts_t minidle =
664 RD_MAX(60*1000/*60s*/,
665 rkb->rkb_rk->rk_conf.socket_timeout_ms) * 1000;
666 int inflight = rd_kafka_bufq_cnt(&rkb->rkb_waitresps);
667 int inqueue = rd_kafka_bufq_cnt(&rkb->rkb_outbufs);
668
669 if (rkb->rkb_ts_state + minidle < now &&
670 rd_atomic64_get(&rkb->rkb_c.ts_send) + minidle < now &&
671 inflight + inqueue == 0)
672 log_level = LOG_DEBUG;
673 else if (inflight > 1)
674 log_level = LOG_WARNING;
675 else
676 log_level = LOG_INFO;
677 }
678
679 rd_kafka_broker_fail(rkb, log_level, err, "%s", errstr);
680 }
681
682
683 /**
684 * @brief Purge requests in \p rkbq matching request \p ApiKey
685 * and partition \p rktp.
686 *
687 * @warning ApiKey must be RD_KAFKAP_Produce
688 *
689 * @returns the number of purged buffers.
690 *
691 * @locality broker thread
692 */
693 static int
rd_kafka_broker_bufq_purge_by_toppar(rd_kafka_broker_t * rkb,rd_kafka_bufq_t * rkbq,int64_t ApiKey,rd_kafka_toppar_t * rktp,rd_kafka_resp_err_t err)694 rd_kafka_broker_bufq_purge_by_toppar (rd_kafka_broker_t *rkb,
695 rd_kafka_bufq_t *rkbq,
696 int64_t ApiKey,
697 rd_kafka_toppar_t *rktp,
698 rd_kafka_resp_err_t err) {
699 rd_kafka_buf_t *rkbuf, *tmp;
700 int cnt = 0;
701
702 rd_assert(ApiKey == RD_KAFKAP_Produce);
703
704 TAILQ_FOREACH_SAFE(rkbuf, &rkbq->rkbq_bufs, rkbuf_link, tmp) {
705
706 if (rkbuf->rkbuf_reqhdr.ApiKey != ApiKey ||
707 rkbuf->rkbuf_u.Produce.batch.rktp != rktp ||
708 /* Skip partially sent buffers and let them transmit.
709 * The alternative would be to kill the connection here,
710 * which is more drastic and costly. */
711 rd_slice_offset(&rkbuf->rkbuf_reader) > 0)
712 continue;
713
714 rd_kafka_bufq_deq(rkbq, rkbuf);
715
716 rd_kafka_buf_callback(rkb->rkb_rk, rkb, err, NULL, rkbuf);
717 cnt++;
718 }
719
720 return cnt;
721 }
722
723
724 /**
725 * Scan bufq for buffer timeouts, trigger buffer callback on timeout.
726 *
727 * If \p partial_cntp is non-NULL any partially sent buffers will increase
728 * the provided counter by 1.
729 *
730 * @param ApiKey Only match requests with this ApiKey, or -1 for all.
731 * @param now If 0, all buffers will time out, else the current clock.
732 * @param description "N requests timed out <description>", e.g., "in flight".
733 * Only used if log_first_n > 0.
734 * @param log_first_n Log the first N request timeouts.
735 *
736 * @returns the number of timed out buffers.
737 *
738 * @locality broker thread
739 */
rd_kafka_broker_bufq_timeout_scan(rd_kafka_broker_t * rkb,int is_waitresp_q,rd_kafka_bufq_t * rkbq,int * partial_cntp,int16_t ApiKey,rd_kafka_resp_err_t err,rd_ts_t now,const char * description,int log_first_n)740 static int rd_kafka_broker_bufq_timeout_scan (rd_kafka_broker_t *rkb,
741 int is_waitresp_q,
742 rd_kafka_bufq_t *rkbq,
743 int *partial_cntp,
744 int16_t ApiKey,
745 rd_kafka_resp_err_t err,
746 rd_ts_t now,
747 const char *description,
748 int log_first_n) {
749 rd_kafka_buf_t *rkbuf, *tmp;
750 int cnt = 0;
751 int idx = -1;
752 const rd_kafka_buf_t *holb;
753
754 restart:
755 holb = TAILQ_FIRST(&rkbq->rkbq_bufs);
756
757 TAILQ_FOREACH_SAFE(rkbuf, &rkbq->rkbq_bufs, rkbuf_link, tmp) {
758 rd_kafka_broker_state_t pre_state, post_state;
759
760 idx++;
761
762 if (likely(now && rkbuf->rkbuf_ts_timeout > now))
763 continue;
764
765 if (ApiKey != -1 && rkbuf->rkbuf_reqhdr.ApiKey != ApiKey)
766 continue;
767
768 if (partial_cntp && rd_slice_offset(&rkbuf->rkbuf_reader) > 0)
769 (*partial_cntp)++;
770
771 /* Convert rkbuf_ts_sent to elapsed time since request */
772 if (rkbuf->rkbuf_ts_sent)
773 rkbuf->rkbuf_ts_sent = now - rkbuf->rkbuf_ts_sent;
774 else
775 rkbuf->rkbuf_ts_sent = now - rkbuf->rkbuf_ts_enq;
776
777 rd_kafka_bufq_deq(rkbq, rkbuf);
778
779 if (now && cnt < log_first_n) {
780 char holbstr[128];
781 /* Head of line blocking:
782 * If this is not the first request in queue, but the
783 * initial first request did not time out,
784 * it typically means the first request is a
785 * long-running blocking one, holding up the
786 * sub-sequent requests.
787 * In this case log what is likely holding up the
788 * requests and what caused this request to time out. */
789 if (holb && holb == TAILQ_FIRST(&rkbq->rkbq_bufs)) {
790 rd_snprintf(holbstr, sizeof(holbstr),
791 ": possibly held back by "
792 "preceeding%s %sRequest with "
793 "timeout in %dms",
794 (holb->rkbuf_flags &
795 RD_KAFKA_OP_F_BLOCKING) ?
796 " blocking" : "",
797 rd_kafka_ApiKey2str(holb->
798 rkbuf_reqhdr.
799 ApiKey),
800 (int)((holb->rkbuf_ts_timeout -
801 now) / 1000));
802 /* Only log the HOLB once */
803 holb = NULL;
804 } else {
805 *holbstr = '\0';
806 }
807
808 rd_rkb_log(rkb, LOG_NOTICE, "REQTMOUT",
809 "Timed out %sRequest %s "
810 "(after %"PRId64"ms, timeout #%d)%s",
811 rd_kafka_ApiKey2str(rkbuf->rkbuf_reqhdr.
812 ApiKey),
813 description, rkbuf->rkbuf_ts_sent/1000, cnt,
814 holbstr);
815 }
816
817 if (is_waitresp_q && rkbuf->rkbuf_flags & RD_KAFKA_OP_F_BLOCKING
818 && rd_atomic32_sub(&rkb->rkb_blocking_request_cnt, 1) == 0)
819 rd_kafka_brokers_broadcast_state_change(rkb->rkb_rk);
820
821 pre_state = rd_kafka_broker_get_state(rkb);
822
823 rd_kafka_buf_callback(rkb->rkb_rk, rkb, err, NULL, rkbuf);
824 cnt++;
825
826 /* If the buf_callback() triggered a broker state change
827 * (typically through broker_fail()) we can't trust the
828 * queue we are scanning to not have been touched, so we
829 * either restart the scan or bail out (if broker is now down),
830 * depending on the new state. #2326 */
831 post_state = rd_kafka_broker_get_state(rkb);
832 if (pre_state != post_state) {
833 /* If the new state is DOWN it means broker_fail()
834 * was called which may have modified the queues,
835 * to keep things safe we stop scanning this queue. */
836 if (post_state == RD_KAFKA_BROKER_STATE_DOWN)
837 break;
838 /* Else start scanning the queue from the beginning. */
839 goto restart;
840 }
841 }
842
843 return cnt;
844 }
845
846
847 /**
848 * Scan the wait-response and outbuf queues for message timeouts.
849 *
850 * Locality: Broker thread
851 */
rd_kafka_broker_timeout_scan(rd_kafka_broker_t * rkb,rd_ts_t now)852 static void rd_kafka_broker_timeout_scan (rd_kafka_broker_t *rkb, rd_ts_t now) {
853 int inflight_cnt, retry_cnt, outq_cnt;
854 int partial_cnt = 0;
855
856 rd_kafka_assert(rkb->rkb_rk, thrd_is_current(rkb->rkb_thread));
857
858 /* In-flight requests waiting for response */
859 inflight_cnt = rd_kafka_broker_bufq_timeout_scan(
860 rkb, 1, &rkb->rkb_waitresps, NULL, -1,
861 RD_KAFKA_RESP_ERR__TIMED_OUT, now, "in flight", 5);
862 /* Requests in retry queue */
863 retry_cnt = rd_kafka_broker_bufq_timeout_scan(
864 rkb, 0, &rkb->rkb_retrybufs, NULL, -1,
865 RD_KAFKA_RESP_ERR__TIMED_OUT_QUEUE, now, "in retry queue", 0);
866 /* Requests in local queue not sent yet.
867 * partial_cnt is included in outq_cnt and denotes a request
868 * that has been partially transmitted. */
869 outq_cnt = rd_kafka_broker_bufq_timeout_scan(
870 rkb, 0, &rkb->rkb_outbufs, &partial_cnt, -1,
871 RD_KAFKA_RESP_ERR__TIMED_OUT_QUEUE, now, "in output queue", 0);
872
873 if (inflight_cnt + retry_cnt + outq_cnt + partial_cnt > 0) {
874 rd_rkb_log(rkb, LOG_WARNING, "REQTMOUT",
875 "Timed out %i in-flight, %i retry-queued, "
876 "%i out-queue, %i partially-sent requests",
877 inflight_cnt, retry_cnt, outq_cnt, partial_cnt);
878
879 rkb->rkb_req_timeouts += inflight_cnt + outq_cnt;
880 rd_atomic64_add(&rkb->rkb_c.req_timeouts,
881 inflight_cnt + outq_cnt);
882
883 /* If this was a partially sent request that timed out, or the
884 * number of timed out requests have reached the
885 * socket.max.fails threshold, we need to take down the
886 * connection. */
887 if (partial_cnt > 0 ||
888 (rkb->rkb_rk->rk_conf.socket_max_fails &&
889 rkb->rkb_req_timeouts >=
890 rkb->rkb_rk->rk_conf.socket_max_fails &&
891 rkb->rkb_state >= RD_KAFKA_BROKER_STATE_UP)) {
892 char rttinfo[32];
893 /* Print average RTT (if avail) to help diagnose. */
894 rd_avg_calc(&rkb->rkb_avg_rtt, now);
895 if (rkb->rkb_avg_rtt.ra_v.avg)
896 rd_snprintf(rttinfo, sizeof(rttinfo),
897 " (average rtt %.3fms)",
898 (float)(rkb->rkb_avg_rtt.ra_v.avg/
899 1000.0f));
900 else
901 rttinfo[0] = 0;
902 rd_kafka_broker_fail(rkb, LOG_ERR,
903 RD_KAFKA_RESP_ERR__TIMED_OUT,
904 "%i request(s) timed out: "
905 "disconnect%s",
906 rkb->rkb_req_timeouts, rttinfo);
907 }
908 }
909 }
910
911
912
913 static ssize_t
rd_kafka_broker_send(rd_kafka_broker_t * rkb,rd_slice_t * slice)914 rd_kafka_broker_send (rd_kafka_broker_t *rkb, rd_slice_t *slice) {
915 ssize_t r;
916 char errstr[128];
917
918 rd_kafka_assert(rkb->rkb_rk, rkb->rkb_state >= RD_KAFKA_BROKER_STATE_UP);
919 rd_kafka_assert(rkb->rkb_rk, rkb->rkb_transport);
920
921 r = rd_kafka_transport_send(rkb->rkb_transport, slice,
922 errstr, sizeof(errstr));
923
924 if (r == -1) {
925 rd_kafka_broker_fail(rkb, LOG_ERR, RD_KAFKA_RESP_ERR__TRANSPORT,
926 "Send failed: %s", errstr);
927 rd_atomic64_add(&rkb->rkb_c.tx_err, 1);
928 return -1;
929 }
930
931 rd_atomic64_add(&rkb->rkb_c.tx_bytes, r);
932 rd_atomic64_add(&rkb->rkb_c.tx, 1);
933 return r;
934 }
935
936
937
938
rd_kafka_broker_resolve(rd_kafka_broker_t * rkb,const char * nodename,rd_bool_t reset_cached_addr)939 static int rd_kafka_broker_resolve (rd_kafka_broker_t *rkb,
940 const char *nodename,
941 rd_bool_t reset_cached_addr) {
942 const char *errstr;
943 int save_idx = 0;
944
945 if (!*nodename && rkb->rkb_source == RD_KAFKA_LOGICAL) {
946 rd_kafka_broker_fail(rkb, LOG_DEBUG,
947 RD_KAFKA_RESP_ERR__RESOLVE,
948 "Logical broker has no address yet");
949 return -1;
950 }
951
952 if (rkb->rkb_rsal &&
953 (reset_cached_addr ||
954 rkb->rkb_ts_rsal_last + (rkb->rkb_rk->rk_conf.broker_addr_ttl*1000)
955 < rd_clock())) {
956 /* Address list has expired. */
957
958 /* Save the address index to make sure we still round-robin
959 * if we get the same address list back */
960 save_idx = rkb->rkb_rsal->rsal_curr;
961
962 rd_sockaddr_list_destroy(rkb->rkb_rsal);
963 rkb->rkb_rsal = NULL;
964 }
965
966 if (!rkb->rkb_rsal) {
967 /* Resolve */
968 rkb->rkb_rsal = rd_getaddrinfo(nodename,
969 RD_KAFKA_PORT_STR,
970 AI_ADDRCONFIG,
971 rkb->rkb_rk->rk_conf.
972 broker_addr_family,
973 SOCK_STREAM,
974 IPPROTO_TCP, &errstr);
975
976 if (!rkb->rkb_rsal) {
977 rd_kafka_broker_fail(rkb, LOG_ERR,
978 RD_KAFKA_RESP_ERR__RESOLVE,
979 "Failed to resolve '%s': %s",
980 nodename, errstr);
981 return -1;
982 } else {
983 rkb->rkb_ts_rsal_last = rd_clock();
984 /* Continue at previous round-robin position */
985 if (rkb->rkb_rsal->rsal_cnt > save_idx)
986 rkb->rkb_rsal->rsal_curr = save_idx;
987 }
988 }
989
990 return 0;
991 }
992
993
rd_kafka_broker_buf_enq0(rd_kafka_broker_t * rkb,rd_kafka_buf_t * rkbuf)994 static void rd_kafka_broker_buf_enq0 (rd_kafka_broker_t *rkb,
995 rd_kafka_buf_t *rkbuf) {
996 rd_ts_t now;
997
998 rd_kafka_assert(rkb->rkb_rk, thrd_is_current(rkb->rkb_thread));
999
1000 if (rkb->rkb_rk->rk_conf.sparse_connections &&
1001 rkb->rkb_state == RD_KAFKA_BROKER_STATE_INIT) {
1002 /* Sparse connections:
1003 * Trigger connection when a new request is enqueued. */
1004 rkb->rkb_persistconn.internal++;
1005 rd_kafka_broker_lock(rkb);
1006 rd_kafka_broker_set_state(rkb,
1007 RD_KAFKA_BROKER_STATE_TRY_CONNECT);
1008 rd_kafka_broker_unlock(rkb);
1009 }
1010
1011 now = rd_clock();
1012 rkbuf->rkbuf_ts_enq = now;
1013 rkbuf->rkbuf_flags &= ~RD_KAFKA_OP_F_SENT;
1014
1015 /* Calculate request attempt timeout */
1016 rd_kafka_buf_calc_timeout(rkb->rkb_rk, rkbuf, now);
1017
1018 if (likely(rkbuf->rkbuf_prio == RD_KAFKA_PRIO_NORMAL)) {
1019 /* Insert request at tail of queue */
1020 TAILQ_INSERT_TAIL(&rkb->rkb_outbufs.rkbq_bufs,
1021 rkbuf, rkbuf_link);
1022
1023 } else {
1024 /* Insert request after any requests with a higher or
1025 * equal priority.
1026 * Also make sure the request is after added any partially
1027 * sent request (of any prio).
1028 * We need to check if buf corrid is set rather than
1029 * rkbuf_of since SSL_write may return 0 and expect the
1030 * exact same arguments the next call. */
1031 rd_kafka_buf_t *prev, *after = NULL;
1032
1033 TAILQ_FOREACH(prev, &rkb->rkb_outbufs.rkbq_bufs, rkbuf_link) {
1034 if (prev->rkbuf_prio < rkbuf->rkbuf_prio &&
1035 prev->rkbuf_corrid == 0)
1036 break;
1037 after = prev;
1038 }
1039
1040 if (after)
1041 TAILQ_INSERT_AFTER(&rkb->rkb_outbufs.rkbq_bufs,
1042 after, rkbuf, rkbuf_link);
1043 else
1044 TAILQ_INSERT_HEAD(&rkb->rkb_outbufs.rkbq_bufs,
1045 rkbuf, rkbuf_link);
1046 }
1047
1048 rd_atomic32_add(&rkb->rkb_outbufs.rkbq_cnt, 1);
1049 if (rkbuf->rkbuf_reqhdr.ApiKey == RD_KAFKAP_Produce)
1050 rd_atomic32_add(&rkb->rkb_outbufs.rkbq_msg_cnt,
1051 rd_kafka_msgq_len(&rkbuf->rkbuf_batch.msgq));
1052 }
1053
1054
1055 /**
1056 * Finalize a stuffed rkbuf for sending to broker.
1057 */
rd_kafka_buf_finalize(rd_kafka_t * rk,rd_kafka_buf_t * rkbuf)1058 static void rd_kafka_buf_finalize (rd_kafka_t *rk, rd_kafka_buf_t *rkbuf) {
1059 size_t totsize;
1060
1061 if (rkbuf->rkbuf_flags & RD_KAFKA_OP_F_FLEXVER) {
1062 /* Empty struct tags */
1063 rd_kafka_buf_write_i8(rkbuf, 0);
1064 }
1065
1066 /* Calculate total request buffer length. */
1067 totsize = rd_buf_len(&rkbuf->rkbuf_buf) - 4;
1068
1069 /* Set up a buffer reader for sending the buffer. */
1070 rd_slice_init_full(&rkbuf->rkbuf_reader, &rkbuf->rkbuf_buf);
1071
1072 /**
1073 * Update request header fields
1074 */
1075 /* Total request length */
1076 rd_kafka_buf_update_i32(rkbuf, 0, (int32_t)totsize);
1077
1078 /* ApiVersion */
1079 rd_kafka_buf_update_i16(rkbuf, 4+2, rkbuf->rkbuf_reqhdr.ApiVersion);
1080 }
1081
1082
rd_kafka_broker_buf_enq1(rd_kafka_broker_t * rkb,rd_kafka_buf_t * rkbuf,rd_kafka_resp_cb_t * resp_cb,void * opaque)1083 void rd_kafka_broker_buf_enq1 (rd_kafka_broker_t *rkb,
1084 rd_kafka_buf_t *rkbuf,
1085 rd_kafka_resp_cb_t *resp_cb,
1086 void *opaque) {
1087
1088
1089 rkbuf->rkbuf_cb = resp_cb;
1090 rkbuf->rkbuf_opaque = opaque;
1091
1092 rd_kafka_buf_finalize(rkb->rkb_rk, rkbuf);
1093
1094 rd_kafka_broker_buf_enq0(rkb, rkbuf);
1095 }
1096
1097
1098 /**
1099 * Enqueue buffer on broker's xmit queue, but fail buffer immediately
1100 * if broker is not up.
1101 *
1102 * Locality: broker thread
1103 */
rd_kafka_broker_buf_enq2(rd_kafka_broker_t * rkb,rd_kafka_buf_t * rkbuf)1104 static int rd_kafka_broker_buf_enq2 (rd_kafka_broker_t *rkb,
1105 rd_kafka_buf_t *rkbuf) {
1106 if (unlikely(rkb->rkb_source == RD_KAFKA_INTERNAL)) {
1107 /* Fail request immediately if this is the internal broker. */
1108 rd_kafka_buf_callback(rkb->rkb_rk, rkb,
1109 RD_KAFKA_RESP_ERR__TRANSPORT,
1110 NULL, rkbuf);
1111 return -1;
1112 }
1113
1114 rd_kafka_broker_buf_enq0(rkb, rkbuf);
1115
1116 return 0;
1117 }
1118
1119
1120
1121 /**
1122 * Enqueue buffer for tranmission.
1123 * Responses are enqueued on 'replyq' (RD_KAFKA_OP_RECV_BUF)
1124 *
1125 * Locality: any thread
1126 */
rd_kafka_broker_buf_enq_replyq(rd_kafka_broker_t * rkb,rd_kafka_buf_t * rkbuf,rd_kafka_replyq_t replyq,rd_kafka_resp_cb_t * resp_cb,void * opaque)1127 void rd_kafka_broker_buf_enq_replyq (rd_kafka_broker_t *rkb,
1128 rd_kafka_buf_t *rkbuf,
1129 rd_kafka_replyq_t replyq,
1130 rd_kafka_resp_cb_t *resp_cb,
1131 void *opaque) {
1132
1133 assert(rkbuf->rkbuf_rkb == rkb);
1134 if (resp_cb) {
1135 rkbuf->rkbuf_replyq = replyq;
1136 rkbuf->rkbuf_cb = resp_cb;
1137 rkbuf->rkbuf_opaque = opaque;
1138 } else {
1139 rd_dassert(!replyq.q);
1140 }
1141
1142 rd_kafka_buf_finalize(rkb->rkb_rk, rkbuf);
1143
1144
1145 if (thrd_is_current(rkb->rkb_thread)) {
1146 rd_kafka_broker_buf_enq2(rkb, rkbuf);
1147
1148 } else {
1149 rd_kafka_op_t *rko = rd_kafka_op_new(RD_KAFKA_OP_XMIT_BUF);
1150 rko->rko_u.xbuf.rkbuf = rkbuf;
1151 rd_kafka_q_enq(rkb->rkb_ops, rko);
1152 }
1153 }
1154
1155
1156
1157
1158 /**
1159 * @returns the current broker state change version.
1160 * Pass this value to future rd_kafka_brokers_wait_state_change() calls
1161 * to avoid the race condition where a state-change happens between
1162 * an initial call to some API that fails and the sub-sequent
1163 * .._wait_state_change() call.
1164 */
rd_kafka_brokers_get_state_version(rd_kafka_t * rk)1165 int rd_kafka_brokers_get_state_version (rd_kafka_t *rk) {
1166 int version;
1167 mtx_lock(&rk->rk_broker_state_change_lock);
1168 version = rk->rk_broker_state_change_version;
1169 mtx_unlock(&rk->rk_broker_state_change_lock);
1170 return version;
1171 }
1172
1173 /**
1174 * @brief Wait at most \p timeout_ms for any state change for any broker.
1175 * \p stored_version is the value previously returned by
1176 * rd_kafka_brokers_get_state_version() prior to another API call
1177 * that failed due to invalid state.
1178 *
1179 * Triggers:
1180 * - broker state changes
1181 * - broker transitioning from blocking to non-blocking
1182 * - partition leader changes
1183 * - group state changes
1184 *
1185 * @remark There is no guarantee that a state change actually took place.
1186 *
1187 * @returns 1 if a state change was signaled (maybe), else 0 (timeout)
1188 *
1189 * @locality any thread
1190 */
rd_kafka_brokers_wait_state_change(rd_kafka_t * rk,int stored_version,int timeout_ms)1191 int rd_kafka_brokers_wait_state_change (rd_kafka_t *rk, int stored_version,
1192 int timeout_ms) {
1193 int r;
1194 mtx_lock(&rk->rk_broker_state_change_lock);
1195 if (stored_version != rk->rk_broker_state_change_version)
1196 r = 1;
1197 else
1198 r = cnd_timedwait_ms(&rk->rk_broker_state_change_cnd,
1199 &rk->rk_broker_state_change_lock,
1200 timeout_ms) == thrd_success;
1201 mtx_unlock(&rk->rk_broker_state_change_lock);
1202 return r;
1203 }
1204
1205
1206 /**
1207 * @brief Same as rd_kafka_brokers_wait_state_change() but will trigger
1208 * the wakeup asynchronously through the provided \p eonce.
1209 *
1210 * If the eonce was added to the wait list its reference count
1211 * will have been updated, this reference is later removed by
1212 * rd_kafka_broker_state_change_trigger_eonce() by calling trigger().
1213 *
1214 * @returns 1 if the \p eonce was added to the wait-broker-state-changes list,
1215 * or 0 if the \p stored_version is outdated in which case the
1216 * caller should redo the broker lookup.
1217 */
rd_kafka_brokers_wait_state_change_async(rd_kafka_t * rk,int stored_version,rd_kafka_enq_once_t * eonce)1218 int rd_kafka_brokers_wait_state_change_async (rd_kafka_t *rk,
1219 int stored_version,
1220 rd_kafka_enq_once_t *eonce) {
1221 int r = 1;
1222 mtx_lock(&rk->rk_broker_state_change_lock);
1223
1224 if (stored_version != rk->rk_broker_state_change_version)
1225 r = 0;
1226 else {
1227 rd_kafka_enq_once_add_source(eonce, "wait broker state change");
1228 rd_list_add(&rk->rk_broker_state_change_waiters, eonce);
1229 }
1230
1231 mtx_unlock(&rk->rk_broker_state_change_lock);
1232 return r;
1233 }
1234
1235
1236 /**
1237 * @brief eonce trigger callback for rd_list_apply() call in
1238 * rd_kafka_brokers_broadcast_state_change()
1239 */
1240 static int
rd_kafka_broker_state_change_trigger_eonce(void * elem,void * opaque)1241 rd_kafka_broker_state_change_trigger_eonce (void *elem, void *opaque) {
1242 rd_kafka_enq_once_t *eonce = elem;
1243 rd_kafka_enq_once_trigger(eonce, RD_KAFKA_RESP_ERR_NO_ERROR,
1244 "broker state change");
1245 return 0; /* remove eonce from list */
1246 }
1247
1248
1249 /**
1250 * @brief Broadcast broker state change to listeners, if any.
1251 *
1252 * @locality any thread
1253 */
rd_kafka_brokers_broadcast_state_change(rd_kafka_t * rk)1254 void rd_kafka_brokers_broadcast_state_change (rd_kafka_t *rk) {
1255
1256 rd_kafka_dbg(rk, GENERIC, "BROADCAST",
1257 "Broadcasting state change");
1258
1259 mtx_lock(&rk->rk_broker_state_change_lock);
1260
1261 /* Bump version */
1262 rk->rk_broker_state_change_version++;
1263
1264 /* Trigger waiters */
1265 rd_list_apply(&rk->rk_broker_state_change_waiters,
1266 rd_kafka_broker_state_change_trigger_eonce, NULL);
1267
1268 /* Broadcast to listeners */
1269 cnd_broadcast(&rk->rk_broker_state_change_cnd);
1270
1271 mtx_unlock(&rk->rk_broker_state_change_lock);
1272 }
1273
1274
1275 /**
1276 * @returns a random broker (with refcnt increased) with matching \p state
1277 * and where the \p filter function returns 0.
1278 *
1279 * Uses reservoir sampling.
1280 *
1281 * @param is_up Any broker that is up (UP or UPDATE state), \p state is ignored.
1282 * @param filtered_cnt Optional pointer to integer which will be set to the
1283 * number of brokers that matches the \p state or \p is_up but
1284 * were filtered out by \p filter.
1285 * @param filter is an optional callback used to filter out undesired brokers.
1286 * The filter function should return 1 to filter out a broker,
1287 * or 0 to keep it in the list of eligible brokers to return.
1288 * rd_kafka_broker_lock() is held during the filter callback.
1289 *
1290 *
1291 * @locks rd_kafka_*lock() MUST be held
1292 * @locality any
1293 */
1294 static rd_kafka_broker_t *
rd_kafka_broker_random0(const char * func,int line,rd_kafka_t * rk,rd_bool_t is_up,int state,int * filtered_cnt,int (* filter)(rd_kafka_broker_t * rk,void * opaque),void * opaque)1295 rd_kafka_broker_random0 (const char *func, int line,
1296 rd_kafka_t *rk,
1297 rd_bool_t is_up,
1298 int state,
1299 int *filtered_cnt,
1300 int (*filter) (rd_kafka_broker_t *rk, void *opaque),
1301 void *opaque) {
1302 rd_kafka_broker_t *rkb, *good = NULL;
1303 int cnt = 0;
1304 int fcnt = 0;
1305
1306 TAILQ_FOREACH(rkb, &rk->rk_brokers, rkb_link) {
1307 if (RD_KAFKA_BROKER_IS_LOGICAL(rkb))
1308 continue;
1309
1310 rd_kafka_broker_lock(rkb);
1311 if ((is_up && rd_kafka_broker_state_is_up(rkb->rkb_state)) ||
1312 (!is_up && (int)rkb->rkb_state == state)) {
1313 if (filter && filter(rkb, opaque)) {
1314 /* Filtered out */
1315 fcnt++;
1316 } else {
1317 if (cnt < 1 || rd_jitter(0, cnt) < 1) {
1318 if (good)
1319 rd_kafka_broker_destroy(good);
1320 rd_kafka_broker_keep_fl(func, line,
1321 rkb);
1322 good = rkb;
1323 }
1324 cnt += 1;
1325 }
1326 }
1327 rd_kafka_broker_unlock(rkb);
1328 }
1329
1330 if (filtered_cnt)
1331 *filtered_cnt = fcnt;
1332
1333 return good;
1334 }
1335
1336 #define rd_kafka_broker_random(rk,state,filter,opaque) \
1337 rd_kafka_broker_random0(__FUNCTION__, __LINE__, \
1338 rk, rd_false, state, NULL, filter, opaque)
1339
1340
1341 /**
1342 * @returns the broker (with refcnt increased) with the highest weight based
1343 * based on the provided weighing function.
1344 *
1345 * If multiple brokers share the same weight reservoir sampling will be used
1346 * to randomly select one.
1347 *
1348 * @param weight_cb Weighing function that should return the sort weight
1349 * for the given broker.
1350 * Higher weight is better.
1351 * A weight of <= 0 will filter out the broker.
1352 * The passed broker object is locked.
1353 * @param features (optional) Required broker features.
1354 *
1355 * @locks_required rk(read)
1356 * @locality any
1357 */
1358 static rd_kafka_broker_t *
rd_kafka_broker_weighted(rd_kafka_t * rk,int (* weight_cb)(rd_kafka_broker_t * rkb),int features)1359 rd_kafka_broker_weighted (rd_kafka_t *rk,
1360 int (*weight_cb) (rd_kafka_broker_t *rkb),
1361 int features) {
1362 rd_kafka_broker_t *rkb, *good = NULL;
1363 int highest = 0;
1364 int cnt = 0;
1365
1366 TAILQ_FOREACH(rkb, &rk->rk_brokers, rkb_link) {
1367 int weight;
1368
1369 rd_kafka_broker_lock(rkb);
1370 if (features && (rkb->rkb_features & features) != features)
1371 weight = 0;
1372 else
1373 weight = weight_cb(rkb);
1374 rd_kafka_broker_unlock(rkb);
1375
1376 if (weight <= 0 || weight < highest)
1377 continue;
1378
1379 if (weight > highest) {
1380 highest = weight;
1381 cnt = 0;
1382 }
1383
1384 /* If same weight (cnt > 0), use reservoir sampling */
1385 if (cnt < 1 || rd_jitter(0, cnt) < 1) {
1386 if (good)
1387 rd_kafka_broker_destroy(good);
1388 rd_kafka_broker_keep(rkb);
1389 good = rkb;
1390 }
1391 cnt++;
1392 }
1393
1394 return good;
1395 }
1396
1397 /**
1398 * @brief Weighing function to select a usable broker connections,
1399 * promoting connections according to the scoring below.
1400 *
1401 * Priority order:
1402 * - is not a bootstrap broker
1403 * - least idle last 10 minutes (unless blocking)
1404 * - least idle hours (if above 10 minutes idle)
1405 * - is not a logical broker (these connections have dedicated use and should
1406 * preferably not be used for other purposes)
1407 * - is not blocking
1408 *
1409 * Will prefer the most recently used broker connection for two reasons:
1410 * - this connection is most likely to function properly.
1411 * - allows truly idle connections to be killed by the broker's/LB's
1412 * idle connection reaper.
1413 *
1414 * Connection must be up.
1415 *
1416 * @locks_required rkb
1417 */
rd_kafka_broker_weight_usable(rd_kafka_broker_t * rkb)1418 static int rd_kafka_broker_weight_usable (rd_kafka_broker_t *rkb) {
1419 int weight = 0;
1420
1421 if (!rd_kafka_broker_state_is_up(rkb->rkb_state))
1422 return 0;
1423
1424 weight += 2000 * (rkb->rkb_nodeid != -1 &&
1425 !RD_KAFKA_BROKER_IS_LOGICAL(rkb));
1426 weight += 10 * !RD_KAFKA_BROKER_IS_LOGICAL(rkb);
1427
1428 if (likely(!rd_atomic32_get(&rkb->rkb_blocking_request_cnt))) {
1429 rd_ts_t tx_last = rd_atomic64_get(&rkb->rkb_c.ts_send);
1430 int idle = (int)((rd_clock() -
1431 (tx_last > 0 ? tx_last : rkb->rkb_ts_state))
1432 / 1000000);
1433
1434 weight += 1; /* is not blocking */
1435
1436 /* Prefer least idle broker (based on last 10 minutes use) */
1437 if (idle < 0)
1438 ; /*clock going backwards? do nothing */
1439 else if (idle < 600/*10 minutes*/)
1440 weight += 1000 + (600 - idle);
1441 else /* Else least idle hours (capped to 100h) */
1442 weight += 100 + (100 - RD_MIN((idle / 3600), 100));
1443 }
1444
1445 return weight;
1446 }
1447
1448
1449 /**
1450 * @brief Returns a random broker (with refcnt increased) in state \p state.
1451 *
1452 * Uses Reservoir sampling.
1453 *
1454 * @param filter is optional, see rd_kafka_broker_random().
1455 *
1456 * @sa rd_kafka_broker_random
1457 *
1458 * @locks rd_kafka_*lock(rk) MUST be held.
1459 * @locality any thread
1460 */
rd_kafka_broker_any(rd_kafka_t * rk,int state,int (* filter)(rd_kafka_broker_t * rkb,void * opaque),void * opaque,const char * reason)1461 rd_kafka_broker_t *rd_kafka_broker_any (rd_kafka_t *rk, int state,
1462 int (*filter) (rd_kafka_broker_t *rkb,
1463 void *opaque),
1464 void *opaque,
1465 const char *reason) {
1466 rd_kafka_broker_t *rkb;
1467
1468 rkb = rd_kafka_broker_random(rk, state, filter, opaque);
1469
1470 if (!rkb && rk->rk_conf.sparse_connections) {
1471 /* Sparse connections:
1472 * If no eligible broker was found, schedule
1473 * a random broker for connecting. */
1474 rd_kafka_connect_any(rk, reason);
1475 }
1476
1477 return rkb;
1478 }
1479
1480
1481 /**
1482 * @brief Returns a random broker (with refcnt increased) which is up.
1483 *
1484 * @param filtered_cnt optional, see rd_kafka_broker_random0().
1485 * @param filter is optional, see rd_kafka_broker_random0().
1486 *
1487 * @sa rd_kafka_broker_random
1488 *
1489 * @locks rd_kafka_*lock(rk) MUST be held.
1490 * @locality any thread
1491 */
1492 rd_kafka_broker_t *
rd_kafka_broker_any_up(rd_kafka_t * rk,int * filtered_cnt,int (* filter)(rd_kafka_broker_t * rkb,void * opaque),void * opaque,const char * reason)1493 rd_kafka_broker_any_up (rd_kafka_t *rk,
1494 int *filtered_cnt,
1495 int (*filter) (rd_kafka_broker_t *rkb,
1496 void *opaque),
1497 void *opaque, const char *reason) {
1498 rd_kafka_broker_t *rkb;
1499
1500 rkb = rd_kafka_broker_random0(__FUNCTION__, __LINE__,
1501 rk, rd_true/*is_up*/, -1,
1502 filtered_cnt, filter, opaque);
1503
1504 if (!rkb && rk->rk_conf.sparse_connections) {
1505 /* Sparse connections:
1506 * If no eligible broker was found, schedule
1507 * a random broker for connecting. */
1508 rd_kafka_connect_any(rk, reason);
1509 }
1510
1511 return rkb;
1512 }
1513
1514
1515 /**
1516 * @brief Spend at most \p timeout_ms to acquire a usable (Up) broker.
1517 *
1518 * Prefers the most recently used broker, see rd_kafka_broker_weight_usable().
1519 *
1520 * @param features (optional) Required broker features.
1521 *
1522 * @returns A probably usable broker with increased refcount, or NULL on timeout
1523 * @locks rd_kafka_*lock() if !do_lock
1524 * @locality any
1525 *
1526 * @sa rd_kafka_broker_any_up()
1527 */
rd_kafka_broker_any_usable(rd_kafka_t * rk,int timeout_ms,rd_dolock_t do_lock,int features,const char * reason)1528 rd_kafka_broker_t *rd_kafka_broker_any_usable (rd_kafka_t *rk,
1529 int timeout_ms,
1530 rd_dolock_t do_lock,
1531 int features,
1532 const char *reason) {
1533 const rd_ts_t ts_end = rd_timeout_init(timeout_ms);
1534
1535 while (1) {
1536 rd_kafka_broker_t *rkb;
1537 int remains;
1538 int version = rd_kafka_brokers_get_state_version(rk);
1539
1540 if (do_lock)
1541 rd_kafka_rdlock(rk);
1542
1543 rkb = rd_kafka_broker_weighted(rk,
1544 rd_kafka_broker_weight_usable,
1545 features);
1546
1547 if (!rkb && rk->rk_conf.sparse_connections) {
1548 /* Sparse connections:
1549 * If no eligible broker was found, schedule
1550 * a random broker for connecting. */
1551 rd_kafka_connect_any(rk, reason);
1552 }
1553
1554 if (do_lock)
1555 rd_kafka_rdunlock(rk);
1556
1557 if (rkb)
1558 return rkb;
1559
1560 remains = rd_timeout_remains(ts_end);
1561 if (rd_timeout_expired(remains))
1562 return NULL;
1563
1564 rd_kafka_brokers_wait_state_change(rk, version, remains);
1565 }
1566
1567 return NULL;
1568 }
1569
1570
1571
1572 /**
1573 * @returns the broker handle for \p broker_id using cached metadata
1574 * information (if available) in state == \p state,
1575 * with refcount increaesd.
1576 *
1577 * Otherwise enqueues the \p eonce on the wait-state-change queue
1578 * which will be triggered on broker state changes.
1579 * It may also be triggered erroneously, so the caller
1580 * should call rd_kafka_broker_get_async() again when
1581 * the eonce is triggered.
1582 *
1583 * @locks none
1584 * @locality any thread
1585 */
1586 rd_kafka_broker_t *
rd_kafka_broker_get_async(rd_kafka_t * rk,int32_t broker_id,int state,rd_kafka_enq_once_t * eonce)1587 rd_kafka_broker_get_async (rd_kafka_t *rk, int32_t broker_id, int state,
1588 rd_kafka_enq_once_t *eonce) {
1589 int version;
1590 do {
1591 rd_kafka_broker_t *rkb;
1592
1593 version = rd_kafka_brokers_get_state_version(rk);
1594
1595 rd_kafka_rdlock(rk);
1596 rkb = rd_kafka_broker_find_by_nodeid0(rk, broker_id, state,
1597 rd_true);
1598 rd_kafka_rdunlock(rk);
1599
1600 if (rkb)
1601 return rkb;
1602
1603 } while (!rd_kafka_brokers_wait_state_change_async(rk, version, eonce));
1604
1605 return NULL; /* eonce added to wait list */
1606 }
1607
1608
1609 /**
1610 * @returns the current controller using cached metadata information,
1611 * and only if the broker's state == \p state.
1612 * The reference count is increased for the returned broker.
1613 *
1614 * @locks none
1615 * @locality any thread
1616 */
1617
rd_kafka_broker_controller_nowait(rd_kafka_t * rk,int state)1618 static rd_kafka_broker_t *rd_kafka_broker_controller_nowait (rd_kafka_t *rk,
1619 int state) {
1620 rd_kafka_broker_t *rkb;
1621
1622 rd_kafka_rdlock(rk);
1623
1624 if (rk->rk_controllerid == -1) {
1625 rd_kafka_rdunlock(rk);
1626 rd_kafka_metadata_refresh_brokers(rk, NULL,
1627 "lookup controller");
1628 return NULL;
1629 }
1630
1631 rkb = rd_kafka_broker_find_by_nodeid0(rk, rk->rk_controllerid, state,
1632 rd_true);
1633
1634 rd_kafka_rdunlock(rk);
1635
1636 return rkb;
1637 }
1638
1639
1640 /**
1641 * @returns the current controller using cached metadata information if
1642 * available in state == \p state, with refcount increaesd.
1643 *
1644 * Otherwise enqueues the \p eonce on the wait-controller queue
1645 * which will be triggered on controller updates or broker state
1646 * changes. It may also be triggered erroneously, so the caller
1647 * should call rd_kafka_broker_controller_async() again when
1648 * the eonce is triggered.
1649 *
1650 * @locks none
1651 * @locality any thread
1652 */
1653 rd_kafka_broker_t *
rd_kafka_broker_controller_async(rd_kafka_t * rk,int state,rd_kafka_enq_once_t * eonce)1654 rd_kafka_broker_controller_async (rd_kafka_t *rk, int state,
1655 rd_kafka_enq_once_t *eonce) {
1656 int version;
1657 do {
1658 rd_kafka_broker_t *rkb;
1659
1660 version = rd_kafka_brokers_get_state_version(rk);
1661
1662 rkb = rd_kafka_broker_controller_nowait(rk, state);
1663 if (rkb)
1664 return rkb;
1665
1666 } while (!rd_kafka_brokers_wait_state_change_async(rk, version, eonce));
1667
1668 return NULL; /* eonce added to wait list */
1669 }
1670
1671
1672 /**
1673 * @returns the current controller using cached metadata information,
1674 * blocking up to \p abs_timeout for the controller to be known
1675 * and to reach state == \p state. The reference count is increased
1676 * for the returned broker.
1677 *
1678 * @locks none
1679 * @locality any thread
1680 */
rd_kafka_broker_controller(rd_kafka_t * rk,int state,rd_ts_t abs_timeout)1681 rd_kafka_broker_t *rd_kafka_broker_controller (rd_kafka_t *rk, int state,
1682 rd_ts_t abs_timeout) {
1683
1684 while (1) {
1685 int version = rd_kafka_brokers_get_state_version(rk);
1686 rd_kafka_broker_t *rkb;
1687 int remains_ms;
1688
1689 rkb = rd_kafka_broker_controller_nowait(rk, state);
1690 if (rkb)
1691 return rkb;
1692
1693 remains_ms = rd_timeout_remains(abs_timeout);
1694 if (rd_timeout_expired(remains_ms))
1695 return NULL;
1696
1697 rd_kafka_brokers_wait_state_change(rk, version, remains_ms);
1698 }
1699 }
1700
1701
1702
1703
1704 /**
1705 * Find a waitresp (rkbuf awaiting response) by the correlation id.
1706 */
rd_kafka_waitresp_find(rd_kafka_broker_t * rkb,int32_t corrid)1707 static rd_kafka_buf_t *rd_kafka_waitresp_find (rd_kafka_broker_t *rkb,
1708 int32_t corrid) {
1709 rd_kafka_buf_t *rkbuf;
1710 rd_ts_t now = rd_clock();
1711
1712 rd_kafka_assert(rkb->rkb_rk, thrd_is_current(rkb->rkb_thread));
1713
1714 TAILQ_FOREACH(rkbuf, &rkb->rkb_waitresps.rkbq_bufs, rkbuf_link)
1715 if (rkbuf->rkbuf_corrid == corrid) {
1716 /* Convert ts_sent to RTT */
1717 rkbuf->rkbuf_ts_sent = now - rkbuf->rkbuf_ts_sent;
1718 rd_avg_add(&rkb->rkb_avg_rtt, rkbuf->rkbuf_ts_sent);
1719
1720 if (rkbuf->rkbuf_flags & RD_KAFKA_OP_F_BLOCKING &&
1721 rd_atomic32_sub(&rkb->rkb_blocking_request_cnt,
1722 1) == 1)
1723 rd_kafka_brokers_broadcast_state_change(
1724 rkb->rkb_rk);
1725
1726 rd_kafka_bufq_deq(&rkb->rkb_waitresps, rkbuf);
1727 return rkbuf;
1728 }
1729 return NULL;
1730 }
1731
1732
1733
1734
1735 /**
1736 * Map a response message to a request.
1737 */
rd_kafka_req_response(rd_kafka_broker_t * rkb,rd_kafka_buf_t * rkbuf)1738 static int rd_kafka_req_response (rd_kafka_broker_t *rkb,
1739 rd_kafka_buf_t *rkbuf) {
1740 rd_kafka_buf_t *req;
1741 int log_decode_errors = LOG_ERR;
1742
1743 rd_kafka_assert(rkb->rkb_rk, thrd_is_current(rkb->rkb_thread));
1744
1745
1746 /* Find corresponding request message by correlation id */
1747 if (unlikely(!(req =
1748 rd_kafka_waitresp_find(rkb,
1749 rkbuf->rkbuf_reshdr.CorrId)))) {
1750 /* unknown response. probably due to request timeout */
1751 rd_atomic64_add(&rkb->rkb_c.rx_corrid_err, 1);
1752 rd_rkb_dbg(rkb, BROKER, "RESPONSE",
1753 "Response for unknown CorrId %"PRId32" (timed out?)",
1754 rkbuf->rkbuf_reshdr.CorrId);
1755 rd_kafka_interceptors_on_response_received(
1756 rkb->rkb_rk,
1757 -1,
1758 rd_kafka_broker_name(rkb),
1759 rkb->rkb_nodeid,
1760 -1,
1761 -1,
1762 rkbuf->rkbuf_reshdr.CorrId,
1763 rkbuf->rkbuf_totlen,
1764 -1,
1765 RD_KAFKA_RESP_ERR__NOENT);
1766 rd_kafka_buf_destroy(rkbuf);
1767 return -1;
1768 }
1769
1770 rd_rkb_dbg(rkb, PROTOCOL, "RECV",
1771 "Received %sResponse (v%hd, %"PRIusz" bytes, CorrId %"PRId32
1772 ", rtt %.2fms)",
1773 rd_kafka_ApiKey2str(req->rkbuf_reqhdr.ApiKey),
1774 req->rkbuf_reqhdr.ApiVersion,
1775 rkbuf->rkbuf_totlen, rkbuf->rkbuf_reshdr.CorrId,
1776 (float)req->rkbuf_ts_sent / 1000.0f);
1777
1778 /* Copy request's header and certain flags to response object's
1779 * reqhdr for convenience. */
1780 rkbuf->rkbuf_reqhdr = req->rkbuf_reqhdr;
1781 rkbuf->rkbuf_flags |= (req->rkbuf_flags &
1782 RD_KAFKA_BUF_FLAGS_RESP_COPY_MASK);
1783 rkbuf->rkbuf_ts_sent = req->rkbuf_ts_sent; /* copy rtt */
1784
1785 /* Set up response reader slice starting past the response header */
1786 rd_slice_init(&rkbuf->rkbuf_reader, &rkbuf->rkbuf_buf,
1787 RD_KAFKAP_RESHDR_SIZE,
1788 rd_buf_len(&rkbuf->rkbuf_buf) - RD_KAFKAP_RESHDR_SIZE);
1789
1790 /* In case of flexibleVersion, skip the response header tags.
1791 * The ApiVersion request/response is different since it needs
1792 * be backwards compatible and thus has no header tags. */
1793 if (req->rkbuf_reqhdr.ApiKey != RD_KAFKAP_ApiVersion)
1794 rd_kafka_buf_skip_tags(rkbuf);
1795
1796 if (!rkbuf->rkbuf_rkb) {
1797 rkbuf->rkbuf_rkb = rkb;
1798 rd_kafka_broker_keep(rkbuf->rkbuf_rkb);
1799 } else
1800 rd_assert(rkbuf->rkbuf_rkb == rkb);
1801
1802 /* Call callback. */
1803 rd_kafka_buf_callback(rkb->rkb_rk, rkb, 0, rkbuf, req);
1804
1805 return 0;
1806
1807 err_parse:
1808 rd_atomic64_add(&rkb->rkb_c.rx_err, 1);
1809 rd_kafka_buf_callback(rkb->rkb_rk, rkb, rkbuf->rkbuf_err, NULL, req);
1810 rd_kafka_buf_destroy(rkbuf);
1811 return -1;
1812 }
1813
1814
1815
1816
rd_kafka_recv(rd_kafka_broker_t * rkb)1817 int rd_kafka_recv (rd_kafka_broker_t *rkb) {
1818 rd_kafka_buf_t *rkbuf;
1819 ssize_t r;
1820 /* errstr is not set by buf_read errors, so default it here. */
1821 char errstr[512] = "Protocol parse failure";
1822 rd_kafka_resp_err_t err = RD_KAFKA_RESP_ERR_NO_ERROR;
1823 const int log_decode_errors = LOG_ERR;
1824
1825
1826 /* It is impossible to estimate the correct size of the response
1827 * so we split the read up in two parts: first we read the protocol
1828 * length and correlation id (i.e., the Response header), and then
1829 * when we know the full length of the response we allocate a new
1830 * buffer and call receive again.
1831 * All this in an async fashion (e.g., partial reads).
1832 */
1833 if (!(rkbuf = rkb->rkb_recv_buf)) {
1834 /* No receive in progress: create new buffer */
1835
1836 rkbuf = rd_kafka_buf_new(2, RD_KAFKAP_RESHDR_SIZE);
1837
1838 rkb->rkb_recv_buf = rkbuf;
1839
1840 /* Set up buffer reader for the response header. */
1841 rd_buf_write_ensure(&rkbuf->rkbuf_buf,
1842 RD_KAFKAP_RESHDR_SIZE,
1843 RD_KAFKAP_RESHDR_SIZE);
1844 }
1845
1846 rd_dassert(rd_buf_write_remains(&rkbuf->rkbuf_buf) > 0);
1847
1848 r = rd_kafka_transport_recv(rkb->rkb_transport, &rkbuf->rkbuf_buf,
1849 errstr, sizeof(errstr));
1850 if (unlikely(r <= 0)) {
1851 if (r == 0)
1852 return 0; /* EAGAIN */
1853 err = RD_KAFKA_RESP_ERR__TRANSPORT;
1854 rd_atomic64_add(&rkb->rkb_c.rx_err, 1);
1855 goto err;
1856 }
1857
1858 rd_atomic64_set(&rkb->rkb_c.ts_recv, rd_clock());
1859
1860 if (rkbuf->rkbuf_totlen == 0) {
1861 /* Packet length not known yet. */
1862
1863 if (unlikely(rd_buf_write_pos(&rkbuf->rkbuf_buf) <
1864 RD_KAFKAP_RESHDR_SIZE)) {
1865 /* Need response header for packet length and corrid.
1866 * Wait for more data. */
1867 return 0;
1868 }
1869
1870 rd_assert(!rkbuf->rkbuf_rkb);
1871 rkbuf->rkbuf_rkb = rkb; /* Protocol parsing code needs
1872 * the rkb for logging, but we dont
1873 * want to keep a reference to the
1874 * broker this early since that extra
1875 * refcount will mess with the broker's
1876 * refcount-based termination code. */
1877
1878 /* Initialize reader */
1879 rd_slice_init(&rkbuf->rkbuf_reader, &rkbuf->rkbuf_buf, 0,
1880 RD_KAFKAP_RESHDR_SIZE);
1881
1882 /* Read protocol header */
1883 rd_kafka_buf_read_i32(rkbuf, &rkbuf->rkbuf_reshdr.Size);
1884 rd_kafka_buf_read_i32(rkbuf, &rkbuf->rkbuf_reshdr.CorrId);
1885
1886 rkbuf->rkbuf_rkb = NULL; /* Reset */
1887
1888 rkbuf->rkbuf_totlen = rkbuf->rkbuf_reshdr.Size;
1889
1890 /* Make sure message size is within tolerable limits. */
1891 if (rkbuf->rkbuf_totlen < 4/*CorrId*/ ||
1892 rkbuf->rkbuf_totlen >
1893 (size_t)rkb->rkb_rk->rk_conf.recv_max_msg_size) {
1894 rd_snprintf(errstr, sizeof(errstr),
1895 "Invalid response size %"PRId32" (0..%i): "
1896 "increase receive.message.max.bytes",
1897 rkbuf->rkbuf_reshdr.Size,
1898 rkb->rkb_rk->rk_conf.recv_max_msg_size);
1899 err = RD_KAFKA_RESP_ERR__BAD_MSG;
1900 rd_atomic64_add(&rkb->rkb_c.rx_err, 1);
1901 goto err;
1902 }
1903
1904 rkbuf->rkbuf_totlen -= 4; /*CorrId*/
1905
1906 if (rkbuf->rkbuf_totlen > 0) {
1907 /* Allocate another buffer that fits all data (short of
1908 * the common response header). We want all
1909 * data to be in contigious memory. */
1910
1911 rd_buf_write_ensure_contig(&rkbuf->rkbuf_buf,
1912 rkbuf->rkbuf_totlen);
1913 }
1914 }
1915
1916 if (rd_buf_write_pos(&rkbuf->rkbuf_buf) - RD_KAFKAP_RESHDR_SIZE ==
1917 rkbuf->rkbuf_totlen) {
1918 /* Message is complete, pass it on to the original requester. */
1919 rkb->rkb_recv_buf = NULL;
1920 rd_atomic64_add(&rkb->rkb_c.rx, 1);
1921 rd_atomic64_add(&rkb->rkb_c.rx_bytes,
1922 rd_buf_write_pos(&rkbuf->rkbuf_buf));
1923 rd_kafka_req_response(rkb, rkbuf);
1924 }
1925
1926 return 1;
1927
1928 err_parse:
1929 err = rkbuf->rkbuf_err;
1930 err:
1931 if (!strcmp(errstr, "Disconnected"))
1932 rd_kafka_broker_conn_closed(rkb, err, errstr);
1933 else
1934 rd_kafka_broker_fail(rkb, LOG_ERR, err,
1935 "Receive failed: %s", errstr);
1936 return -1;
1937 }
1938
1939
1940 /**
1941 * Linux version of socket_cb providing racefree CLOEXEC.
1942 */
rd_kafka_socket_cb_linux(int domain,int type,int protocol,void * opaque)1943 int rd_kafka_socket_cb_linux (int domain, int type, int protocol,
1944 void *opaque) {
1945 #ifdef SOCK_CLOEXEC
1946 return socket(domain, type | SOCK_CLOEXEC, protocol);
1947 #else
1948 return rd_kafka_socket_cb_generic(domain, type, protocol, opaque);
1949 #endif
1950 }
1951
1952 /**
1953 * Fallback version of socket_cb NOT providing racefree CLOEXEC,
1954 * but setting CLOEXEC after socket creation (if FD_CLOEXEC is defined).
1955 */
rd_kafka_socket_cb_generic(int domain,int type,int protocol,void * opaque)1956 int rd_kafka_socket_cb_generic (int domain, int type, int protocol,
1957 void *opaque) {
1958 int s;
1959 int on = 1;
1960 s = (int)socket(domain, type, protocol);
1961 if (s == -1)
1962 return -1;
1963 #ifdef FD_CLOEXEC
1964 if (fcntl(s, F_SETFD, FD_CLOEXEC, &on) == -1)
1965 fprintf(stderr, "WARNING: librdkafka: %s: "
1966 "fcntl(FD_CLOEXEC) failed: %s: ignoring\n",
1967 __FUNCTION__, rd_strerror(errno));
1968 #endif
1969 return s;
1970 }
1971
1972
1973
1974 /**
1975 * @brief Update the reconnect backoff.
1976 * Should be called when a connection is made, or all addresses
1977 * a broker resolves to has been exhausted without successful connect.
1978 *
1979 * @locality broker thread
1980 * @locks none
1981 */
1982 static void
rd_kafka_broker_update_reconnect_backoff(rd_kafka_broker_t * rkb,const rd_kafka_conf_t * conf,rd_ts_t now)1983 rd_kafka_broker_update_reconnect_backoff (rd_kafka_broker_t *rkb,
1984 const rd_kafka_conf_t *conf,
1985 rd_ts_t now) {
1986 int backoff;
1987
1988 /* If last connection attempt was more than reconnect.backoff.max.ms
1989 * ago, reset the reconnect backoff to the initial
1990 * reconnect.backoff.ms value. */
1991 if (rkb->rkb_ts_reconnect + (conf->reconnect_backoff_max_ms * 1000) <
1992 now)
1993 rkb->rkb_reconnect_backoff_ms = conf->reconnect_backoff_ms;
1994
1995 /* Apply -25%...+50% jitter to next backoff. */
1996 backoff = rd_jitter((int)((float)rkb->rkb_reconnect_backoff_ms * 0.75),
1997 (int)((float)rkb->rkb_reconnect_backoff_ms * 1.5));
1998
1999 /* Cap to reconnect.backoff.max.ms. */
2000 backoff = RD_MIN(backoff, conf->reconnect_backoff_max_ms);
2001
2002 /* Set time of next reconnect */
2003 rkb->rkb_ts_reconnect = now + (backoff * 1000);
2004 rkb->rkb_reconnect_backoff_ms =
2005 RD_MIN(rkb->rkb_reconnect_backoff_ms * 2,
2006 conf->reconnect_backoff_max_ms);
2007 }
2008
2009
2010 /**
2011 * @brief Calculate time until next reconnect attempt.
2012 *
2013 * @returns the number of milliseconds to the next connection attempt, or 0
2014 * if immediate.
2015 * @locality broker thread
2016 * @locks none
2017 */
2018
2019 static RD_INLINE int
rd_kafka_broker_reconnect_backoff(const rd_kafka_broker_t * rkb,rd_ts_t now)2020 rd_kafka_broker_reconnect_backoff (const rd_kafka_broker_t *rkb,
2021 rd_ts_t now) {
2022 rd_ts_t remains;
2023
2024 if (unlikely(rkb->rkb_ts_reconnect == 0))
2025 return 0; /* immediate */
2026
2027 remains = rkb->rkb_ts_reconnect - now;
2028 if (remains <= 0)
2029 return 0; /* immediate */
2030
2031 return (int)(remains / 1000);
2032 }
2033
2034
2035 /**
2036 * @brief Unittest for reconnect.backoff.ms
2037 */
rd_ut_reconnect_backoff(void)2038 static int rd_ut_reconnect_backoff (void) {
2039 rd_kafka_broker_t rkb = RD_ZERO_INIT;
2040 rd_kafka_conf_t conf = {
2041 .reconnect_backoff_ms = 10,
2042 .reconnect_backoff_max_ms = 90
2043 };
2044 rd_ts_t now = 1000000;
2045 int backoff;
2046
2047 rkb.rkb_reconnect_backoff_ms = conf.reconnect_backoff_ms;
2048
2049 /* broker's backoff is the initial reconnect.backoff.ms=10 */
2050 rd_kafka_broker_update_reconnect_backoff(&rkb, &conf, now);
2051 backoff = rd_kafka_broker_reconnect_backoff(&rkb, now);
2052 RD_UT_ASSERT_RANGE(backoff, 7, 15, "%d");
2053
2054 /* .. 20 */
2055 rd_kafka_broker_update_reconnect_backoff(&rkb, &conf, now);
2056 backoff = rd_kafka_broker_reconnect_backoff(&rkb, now);
2057 RD_UT_ASSERT_RANGE(backoff, 15, 30, "%d");
2058
2059 /* .. 40 */
2060 rd_kafka_broker_update_reconnect_backoff(&rkb, &conf, now);
2061 backoff = rd_kafka_broker_reconnect_backoff(&rkb, now);
2062 RD_UT_ASSERT_RANGE(backoff, 30, 60, "%d");
2063
2064 /* .. 80, the jitter is capped at reconnect.backoff.max.ms=90 */
2065 rd_kafka_broker_update_reconnect_backoff(&rkb, &conf, now);
2066 backoff = rd_kafka_broker_reconnect_backoff(&rkb, now);
2067 RD_UT_ASSERT_RANGE(backoff, 60, conf.reconnect_backoff_max_ms, "%d");
2068
2069 /* .. 90, capped by reconnect.backoff.max.ms */
2070 rd_kafka_broker_update_reconnect_backoff(&rkb, &conf, now);
2071 backoff = rd_kafka_broker_reconnect_backoff(&rkb, now);
2072 RD_UT_ASSERT_RANGE(backoff, 67, conf.reconnect_backoff_max_ms, "%d");
2073
2074 /* .. 90, should remain at capped value. */
2075 rd_kafka_broker_update_reconnect_backoff(&rkb, &conf, now);
2076 backoff = rd_kafka_broker_reconnect_backoff(&rkb, now);
2077 RD_UT_ASSERT_RANGE(backoff, 67, conf.reconnect_backoff_max_ms, "%d");
2078
2079 RD_UT_PASS();
2080 }
2081
2082
2083 /**
2084 * @brief Initiate asynchronous connection attempt to the next address
2085 * in the broker's address list.
2086 * While the connect is asynchronous and its IO served in the
2087 * CONNECT state, the initial name resolve is blocking.
2088 *
2089 * @returns -1 on error, 0 if broker does not have a hostname, or 1
2090 * if the connection is now in progress.
2091 */
rd_kafka_broker_connect(rd_kafka_broker_t * rkb)2092 static int rd_kafka_broker_connect (rd_kafka_broker_t *rkb) {
2093 const rd_sockaddr_inx_t *sinx;
2094 char errstr[512];
2095 char nodename[RD_KAFKA_NODENAME_SIZE];
2096 rd_bool_t reset_cached_addr = rd_false;
2097
2098 rd_rkb_dbg(rkb, BROKER, "CONNECT",
2099 "broker in state %s connecting",
2100 rd_kafka_broker_state_names[rkb->rkb_state]);
2101
2102 rd_atomic32_add(&rkb->rkb_c.connects, 1);
2103
2104 rd_kafka_broker_lock(rkb);
2105 rd_strlcpy(nodename, rkb->rkb_nodename, sizeof(nodename));
2106
2107 /* If the nodename was changed since the last connect,
2108 * reset the address cache. */
2109 reset_cached_addr = (rkb->rkb_connect_epoch != rkb->rkb_nodename_epoch);
2110 rkb->rkb_connect_epoch = rkb->rkb_nodename_epoch;
2111 /* Logical brokers might not have a hostname set, in which case
2112 * we should not try to connect. */
2113 if (*nodename)
2114 rd_kafka_broker_set_state(rkb, RD_KAFKA_BROKER_STATE_CONNECT);
2115 rd_kafka_broker_unlock(rkb);
2116
2117 if (!*nodename) {
2118 rd_rkb_dbg(rkb, BROKER, "CONNECT",
2119 "broker has no address yet: postponing connect");
2120 return 0;
2121 }
2122
2123 rd_kafka_broker_update_reconnect_backoff(rkb, &rkb->rkb_rk->rk_conf,
2124 rd_clock());
2125
2126 if (rd_kafka_broker_resolve(rkb, nodename, reset_cached_addr) == -1)
2127 return -1;
2128
2129 sinx = rd_sockaddr_list_next(rkb->rkb_rsal);
2130
2131 rd_kafka_assert(rkb->rkb_rk, !rkb->rkb_transport);
2132
2133 if (!(rkb->rkb_transport =
2134 rd_kafka_transport_connect(rkb, sinx, errstr, sizeof(errstr)))) {
2135 rd_kafka_broker_fail(rkb, LOG_ERR,
2136 RD_KAFKA_RESP_ERR__TRANSPORT,
2137 "%s", errstr);
2138 return -1;
2139 }
2140
2141 return 1;
2142 }
2143
2144
2145 /**
2146 * @brief Call when connection is ready to transition to fully functional
2147 * UP state.
2148 *
2149 * @locality Broker thread
2150 */
rd_kafka_broker_connect_up(rd_kafka_broker_t * rkb)2151 void rd_kafka_broker_connect_up (rd_kafka_broker_t *rkb) {
2152
2153 rkb->rkb_max_inflight = rkb->rkb_rk->rk_conf.max_inflight;
2154
2155 rd_kafka_broker_lock(rkb);
2156 rd_kafka_broker_set_state(rkb, RD_KAFKA_BROKER_STATE_UP);
2157 rd_kafka_broker_unlock(rkb);
2158
2159 /* Request metadata (async):
2160 * try locally known topics first and if there are none try
2161 * getting just the broker list. */
2162 if (rd_kafka_metadata_refresh_known_topics(NULL, rkb,
2163 rd_false/*dont force*/,
2164 "connected") ==
2165 RD_KAFKA_RESP_ERR__UNKNOWN_TOPIC)
2166 rd_kafka_metadata_refresh_brokers(NULL, rkb, "connected");
2167 }
2168
2169
2170
2171 static void rd_kafka_broker_connect_auth (rd_kafka_broker_t *rkb);
2172
2173
2174 /**
2175 * @brief Parses and handles SaslMechanism response, transitions
2176 * the broker state.
2177 *
2178 */
2179 static void
rd_kafka_broker_handle_SaslHandshake(rd_kafka_t * rk,rd_kafka_broker_t * rkb,rd_kafka_resp_err_t err,rd_kafka_buf_t * rkbuf,rd_kafka_buf_t * request,void * opaque)2180 rd_kafka_broker_handle_SaslHandshake (rd_kafka_t *rk,
2181 rd_kafka_broker_t *rkb,
2182 rd_kafka_resp_err_t err,
2183 rd_kafka_buf_t *rkbuf,
2184 rd_kafka_buf_t *request,
2185 void *opaque) {
2186 const int log_decode_errors = LOG_ERR;
2187 int32_t MechCnt;
2188 int16_t ErrorCode;
2189 int i = 0;
2190 char *mechs = "(n/a)";
2191 size_t msz, mof = 0;
2192
2193 if (err == RD_KAFKA_RESP_ERR__DESTROY)
2194 return;
2195
2196 if (err)
2197 goto err;
2198
2199 rd_kafka_buf_read_i16(rkbuf, &ErrorCode);
2200 rd_kafka_buf_read_i32(rkbuf, &MechCnt);
2201
2202 if (MechCnt < 0 || MechCnt > 100)
2203 rd_kafka_buf_parse_fail(rkbuf,
2204 "Invalid MechanismCount %"PRId32,
2205 MechCnt);
2206
2207 /* Build a CSV string of supported mechanisms. */
2208 msz = RD_MIN(511, 1 + (MechCnt * 32));
2209 mechs = rd_alloca(msz);
2210 *mechs = '\0';
2211
2212 for (i = 0 ; i < MechCnt ; i++) {
2213 rd_kafkap_str_t mech;
2214 rd_kafka_buf_read_str(rkbuf, &mech);
2215
2216 mof += rd_snprintf(mechs+mof, msz-mof, "%s%.*s",
2217 i ? ",":"", RD_KAFKAP_STR_PR(&mech));
2218
2219 if (mof >= msz)
2220 break;
2221 }
2222
2223 rd_rkb_dbg(rkb,
2224 PROTOCOL | RD_KAFKA_DBG_SECURITY | RD_KAFKA_DBG_BROKER,
2225 "SASLMECHS", "Broker supported SASL mechanisms: %s",
2226 mechs);
2227
2228 if (ErrorCode) {
2229 err = ErrorCode;
2230 goto err;
2231 }
2232
2233 /* Circle back to connect_auth() to start proper AUTH state. */
2234 rd_kafka_broker_connect_auth(rkb);
2235 return;
2236
2237 err_parse:
2238 err = rkbuf->rkbuf_err;
2239 err:
2240 rd_kafka_broker_fail(rkb, LOG_ERR,
2241 RD_KAFKA_RESP_ERR__AUTHENTICATION,
2242 "SASL %s mechanism handshake failed: %s: "
2243 "broker's supported mechanisms: %s",
2244 rkb->rkb_rk->rk_conf.sasl.mechanisms,
2245 rd_kafka_err2str(err), mechs);
2246 }
2247
2248
2249 /**
2250 * @brief Transition state to:
2251 * - AUTH_HANDSHAKE (if SASL is configured and handshakes supported)
2252 * - AUTH (if SASL is configured but no handshake is required or
2253 * not supported, or has already taken place.)
2254 * - UP (if SASL is not configured)
2255 *
2256 * @locks_acquired rkb
2257 */
rd_kafka_broker_connect_auth(rd_kafka_broker_t * rkb)2258 static void rd_kafka_broker_connect_auth (rd_kafka_broker_t *rkb) {
2259
2260 if ((rkb->rkb_proto == RD_KAFKA_PROTO_SASL_PLAINTEXT ||
2261 rkb->rkb_proto == RD_KAFKA_PROTO_SASL_SSL)) {
2262
2263 rd_rkb_dbg(rkb, SECURITY | RD_KAFKA_DBG_BROKER, "AUTH",
2264 "Auth in state %s (handshake %ssupported)",
2265 rd_kafka_broker_state_names[rkb->rkb_state],
2266 (rkb->rkb_features&RD_KAFKA_FEATURE_SASL_HANDSHAKE)
2267 ? "" : "not ");
2268
2269 /* Broker >= 0.10.0: send request to select mechanism */
2270 if (rkb->rkb_state != RD_KAFKA_BROKER_STATE_AUTH_HANDSHAKE &&
2271 (rkb->rkb_features & RD_KAFKA_FEATURE_SASL_HANDSHAKE)) {
2272
2273 rd_kafka_broker_lock(rkb);
2274 rd_kafka_broker_set_state(
2275 rkb, RD_KAFKA_BROKER_STATE_AUTH_HANDSHAKE);
2276 rd_kafka_broker_unlock(rkb);
2277
2278 rd_kafka_SaslHandshakeRequest(
2279 rkb, rkb->rkb_rk->rk_conf.sasl.mechanisms,
2280 RD_KAFKA_NO_REPLYQ,
2281 rd_kafka_broker_handle_SaslHandshake,
2282 NULL);
2283 } else {
2284 /* Either Handshake succeeded (protocol selected)
2285 * or Handshakes were not supported.
2286 * In both cases continue with authentication. */
2287 char sasl_errstr[512];
2288
2289 rd_kafka_broker_lock(rkb);
2290 rd_kafka_broker_set_state(
2291 rkb,
2292 (rkb->rkb_features &
2293 RD_KAFKA_FEATURE_SASL_AUTH_REQ) ?
2294 RD_KAFKA_BROKER_STATE_AUTH_REQ :
2295 RD_KAFKA_BROKER_STATE_AUTH_LEGACY);
2296 rd_kafka_broker_unlock(rkb);
2297
2298 if (rd_kafka_sasl_client_new(
2299 rkb->rkb_transport, sasl_errstr,
2300 sizeof(sasl_errstr)) == -1) {
2301 rd_kafka_broker_fail(
2302 rkb, LOG_ERR,
2303 RD_KAFKA_RESP_ERR__AUTHENTICATION,
2304 "Failed to initialize "
2305 "SASL authentication: %s",
2306 sasl_errstr);
2307 return;
2308 }
2309 }
2310
2311 return;
2312 }
2313
2314 /* No authentication required. */
2315 rd_kafka_broker_connect_up(rkb);
2316 }
2317
2318
2319 /**
2320 * @brief Specify API versions to use for this connection.
2321 *
2322 * @param apis is an allocated list of supported partitions.
2323 * If NULL the default set will be used based on the
2324 * \p broker.version.fallback property.
2325 * @param api_cnt number of elements in \p apis
2326 *
2327 * @remark \p rkb takes ownership of \p apis.
2328 *
2329 * @locality Broker thread
2330 * @locks_required rkb
2331 */
rd_kafka_broker_set_api_versions(rd_kafka_broker_t * rkb,struct rd_kafka_ApiVersion * apis,size_t api_cnt)2332 static void rd_kafka_broker_set_api_versions (rd_kafka_broker_t *rkb,
2333 struct rd_kafka_ApiVersion *apis,
2334 size_t api_cnt) {
2335
2336 if (rkb->rkb_ApiVersions)
2337 rd_free(rkb->rkb_ApiVersions);
2338
2339
2340 if (!apis) {
2341 rd_rkb_dbg(rkb, PROTOCOL | RD_KAFKA_DBG_BROKER, "APIVERSION",
2342 "Using (configuration fallback) %s protocol features",
2343 rkb->rkb_rk->rk_conf.broker_version_fallback);
2344
2345
2346 rd_kafka_get_legacy_ApiVersions(rkb->rkb_rk->rk_conf.
2347 broker_version_fallback,
2348 &apis, &api_cnt,
2349 rkb->rkb_rk->rk_conf.
2350 broker_version_fallback);
2351
2352 /* Make a copy to store on broker. */
2353 rd_kafka_ApiVersions_copy(apis, api_cnt, &apis, &api_cnt);
2354 }
2355
2356 rkb->rkb_ApiVersions = apis;
2357 rkb->rkb_ApiVersions_cnt = api_cnt;
2358
2359 /* Update feature set based on supported broker APIs. */
2360 rd_kafka_broker_features_set(rkb,
2361 rd_kafka_features_check(rkb, apis, api_cnt));
2362 }
2363
2364
2365 /**
2366 * Handler for ApiVersion response.
2367 */
2368 static void
rd_kafka_broker_handle_ApiVersion(rd_kafka_t * rk,rd_kafka_broker_t * rkb,rd_kafka_resp_err_t err,rd_kafka_buf_t * rkbuf,rd_kafka_buf_t * request,void * opaque)2369 rd_kafka_broker_handle_ApiVersion (rd_kafka_t *rk,
2370 rd_kafka_broker_t *rkb,
2371 rd_kafka_resp_err_t err,
2372 rd_kafka_buf_t *rkbuf,
2373 rd_kafka_buf_t *request, void *opaque) {
2374 struct rd_kafka_ApiVersion *apis = NULL;
2375 size_t api_cnt = 0;
2376 int16_t retry_ApiVersion = -1;
2377
2378 if (err == RD_KAFKA_RESP_ERR__DESTROY)
2379 return;
2380
2381 err = rd_kafka_handle_ApiVersion(rk, rkb, err, rkbuf, request,
2382 &apis, &api_cnt);
2383
2384 /* Broker does not support our ApiVersionRequest version,
2385 * see if we can downgrade to an older version. */
2386 if (err == RD_KAFKA_RESP_ERR_UNSUPPORTED_VERSION) {
2387 size_t i;
2388
2389 /* Find the broker's highest supported version for
2390 * ApiVersionRequest and use that to retry. */
2391 for (i = 0 ; i < api_cnt ; i++) {
2392 if (apis[i].ApiKey == RD_KAFKAP_ApiVersion) {
2393 retry_ApiVersion = RD_MIN(
2394 request->rkbuf_reqhdr.ApiVersion - 1,
2395 apis[i].MaxVer);
2396 break;
2397 }
2398 }
2399
2400 /* Before v3 the broker would not return its supported
2401 * ApiVersionRequests, so we go straight for version 0. */
2402 if (i == api_cnt && request->rkbuf_reqhdr.ApiVersion > 0)
2403 retry_ApiVersion = 0;
2404
2405 } else if (err == RD_KAFKA_RESP_ERR_INVALID_REQUEST) {
2406 rd_rkb_log(rkb, LOG_ERR, "APIVERSION",
2407 "ApiVersionRequest v%hd failed due to "
2408 "invalid request: "
2409 "check client.software.name (\"%s\") and "
2410 "client.software.version (\"%s\") "
2411 "for invalid characters: "
2412 "falling back to older request version",
2413 request->rkbuf_reqhdr.ApiVersion,
2414 rk->rk_conf.sw_name, rk->rk_conf.sw_version);
2415 retry_ApiVersion = 0;
2416 }
2417
2418 if (err && apis)
2419 rd_free(apis);
2420
2421 if (retry_ApiVersion != -1) {
2422 /* Retry request with a lower version */
2423 rd_rkb_dbg(rkb,
2424 BROKER|RD_KAFKA_DBG_FEATURE|RD_KAFKA_DBG_PROTOCOL,
2425 "APIVERSION",
2426 "ApiVersionRequest v%hd failed due to %s: "
2427 "retrying with v%hd",
2428 request->rkbuf_reqhdr.ApiVersion,
2429 rd_kafka_err2name(err), retry_ApiVersion);
2430 rd_kafka_ApiVersionRequest(rkb, retry_ApiVersion,
2431 RD_KAFKA_NO_REPLYQ,
2432 rd_kafka_broker_handle_ApiVersion,
2433 NULL);
2434 return;
2435 }
2436
2437
2438 if (err) {
2439 if (rkb->rkb_transport)
2440 rd_kafka_broker_fail(
2441 rkb, LOG_WARNING,
2442 RD_KAFKA_RESP_ERR__TRANSPORT,
2443 "ApiVersionRequest failed: %s: "
2444 "probably due to broker version < 0.10 "
2445 "(see api.version.request configuration)",
2446 rd_kafka_err2str(err));
2447 return;
2448 }
2449
2450 rd_kafka_broker_lock(rkb);
2451 rd_kafka_broker_set_api_versions(rkb, apis, api_cnt);
2452 rd_kafka_broker_unlock(rkb);
2453
2454 rd_kafka_broker_connect_auth(rkb);
2455 }
2456
2457
2458 /**
2459 * Call when asynchronous connection attempt completes, either succesfully
2460 * (if errstr is NULL) or fails.
2461 *
2462 * @locks_acquired rkb
2463 * @locality broker thread
2464 */
rd_kafka_broker_connect_done(rd_kafka_broker_t * rkb,const char * errstr)2465 void rd_kafka_broker_connect_done (rd_kafka_broker_t *rkb, const char *errstr) {
2466
2467 if (errstr) {
2468 /* Connect failed */
2469 rd_kafka_broker_fail(rkb, LOG_ERR, RD_KAFKA_RESP_ERR__TRANSPORT,
2470 "%s", errstr);
2471 return;
2472 }
2473
2474 /* Connect succeeded */
2475 rkb->rkb_connid++;
2476 rd_rkb_dbg(rkb, BROKER | RD_KAFKA_DBG_PROTOCOL,
2477 "CONNECTED", "Connected (#%d)", rkb->rkb_connid);
2478 rkb->rkb_max_inflight = 1; /* Hold back other requests until
2479 * ApiVersion, SaslHandshake, etc
2480 * are done. */
2481
2482 rd_kafka_transport_poll_set(rkb->rkb_transport, POLLIN);
2483
2484 rd_kafka_broker_lock(rkb);
2485
2486 if (rkb->rkb_rk->rk_conf.api_version_request &&
2487 rd_interval_immediate(&rkb->rkb_ApiVersion_fail_intvl, 0, 0) > 0) {
2488 /* Use ApiVersion to query broker for supported API versions. */
2489 rd_kafka_broker_feature_enable(rkb, RD_KAFKA_FEATURE_APIVERSION);
2490 }
2491
2492 if (!(rkb->rkb_features & RD_KAFKA_FEATURE_APIVERSION)) {
2493 /* Use configured broker.version.fallback to
2494 * figure out API versions.
2495 * In case broker.version.fallback indicates a version
2496 * that supports ApiVersionRequest it will update
2497 * rkb_features to have FEATURE_APIVERSION set which will
2498 * trigger an ApiVersionRequest below. */
2499 rd_kafka_broker_set_api_versions(rkb, NULL, 0);
2500 }
2501
2502 if (rkb->rkb_features & RD_KAFKA_FEATURE_APIVERSION) {
2503 /* Query broker for supported API versions.
2504 * This may fail with a disconnect on non-supporting brokers
2505 * so hold off any other requests until we get a response,
2506 * and if the connection is torn down we disable this feature.
2507 */
2508 rd_kafka_broker_set_state(
2509 rkb, RD_KAFKA_BROKER_STATE_APIVERSION_QUERY);
2510 rd_kafka_broker_unlock(rkb);
2511
2512 rd_kafka_ApiVersionRequest(
2513 rkb, -1 /* Use highest version we support */,
2514 RD_KAFKA_NO_REPLYQ,
2515 rd_kafka_broker_handle_ApiVersion, NULL);
2516 } else {
2517 rd_kafka_broker_unlock(rkb);
2518
2519 /* Authenticate if necessary */
2520 rd_kafka_broker_connect_auth(rkb);
2521 }
2522
2523 }
2524
2525
2526
2527 /**
2528 * @brief Checks if the given API request+version is supported by the broker.
2529 * @returns 1 if supported, else 0.
2530 * @locality broker thread
2531 * @locks none
2532 */
2533 static RD_INLINE int
rd_kafka_broker_request_supported(rd_kafka_broker_t * rkb,rd_kafka_buf_t * rkbuf)2534 rd_kafka_broker_request_supported (rd_kafka_broker_t *rkb,
2535 rd_kafka_buf_t *rkbuf) {
2536 struct rd_kafka_ApiVersion skel = {
2537 .ApiKey = rkbuf->rkbuf_reqhdr.ApiKey
2538 };
2539 struct rd_kafka_ApiVersion *ret;
2540
2541 if (unlikely(rkbuf->rkbuf_reqhdr.ApiKey == RD_KAFKAP_ApiVersion))
2542 return 1; /* ApiVersion requests are used to detect
2543 * the supported API versions, so should always
2544 * be allowed through. */
2545
2546 /* First try feature flags, if any, which may cover a larger
2547 * set of APIs. */
2548 if (rkbuf->rkbuf_features)
2549 return (rkb->rkb_features & rkbuf->rkbuf_features) ==
2550 rkbuf->rkbuf_features;
2551
2552 /* Then try the ApiVersion map. */
2553 ret = bsearch(&skel, rkb->rkb_ApiVersions, rkb->rkb_ApiVersions_cnt,
2554 sizeof(*rkb->rkb_ApiVersions),
2555 rd_kafka_ApiVersion_key_cmp);
2556 if (!ret)
2557 return 0;
2558
2559 return ret->MinVer <= rkbuf->rkbuf_reqhdr.ApiVersion &&
2560 rkbuf->rkbuf_reqhdr.ApiVersion <= ret->MaxVer;
2561 }
2562
2563
2564 /**
2565 * Send queued messages to broker
2566 *
2567 * Locality: io thread
2568 */
rd_kafka_send(rd_kafka_broker_t * rkb)2569 int rd_kafka_send (rd_kafka_broker_t *rkb) {
2570 rd_kafka_buf_t *rkbuf;
2571 unsigned int cnt = 0;
2572
2573 rd_kafka_assert(rkb->rkb_rk, thrd_is_current(rkb->rkb_thread));
2574
2575 while (rkb->rkb_state >= RD_KAFKA_BROKER_STATE_UP &&
2576 rd_kafka_bufq_cnt(&rkb->rkb_waitresps) < rkb->rkb_max_inflight &&
2577 (rkbuf = TAILQ_FIRST(&rkb->rkb_outbufs.rkbq_bufs))) {
2578 ssize_t r;
2579 size_t pre_of = rd_slice_offset(&rkbuf->rkbuf_reader);
2580 rd_ts_t now;
2581
2582 /* Check for broker support */
2583 if (unlikely(!rd_kafka_broker_request_supported(rkb, rkbuf))) {
2584 rd_kafka_bufq_deq(&rkb->rkb_outbufs, rkbuf);
2585 rd_rkb_dbg(rkb, BROKER | RD_KAFKA_DBG_PROTOCOL,
2586 "UNSUPPORTED",
2587 "Failing %sResponse "
2588 "(v%hd, %"PRIusz" bytes, CorrId %"PRId32"): "
2589 "request not supported by broker "
2590 "(missing api.version.request=false or "
2591 "incorrect broker.version.fallback config?)",
2592 rd_kafka_ApiKey2str(rkbuf->rkbuf_reqhdr.
2593 ApiKey),
2594 rkbuf->rkbuf_reqhdr.ApiVersion,
2595 rkbuf->rkbuf_totlen,
2596 rkbuf->rkbuf_reshdr.CorrId);
2597 rd_kafka_buf_callback(
2598 rkb->rkb_rk, rkb,
2599 RD_KAFKA_RESP_ERR__UNSUPPORTED_FEATURE,
2600 NULL, rkbuf);
2601 continue;
2602 }
2603
2604 /* Set CorrId header field, unless this is the latter part
2605 * of a partial send in which case the corrid has already
2606 * been set.
2607 * Due to how SSL_write() will accept a buffer but still
2608 * return 0 in some cases we can't rely on the buffer offset
2609 * but need to use corrid to check this. SSL_write() expects
2610 * us to send the same buffer again when 0 is returned.
2611 */
2612 if (rkbuf->rkbuf_corrid == 0 ||
2613 rkbuf->rkbuf_connid != rkb->rkb_connid) {
2614 rd_assert(rd_slice_offset(&rkbuf->rkbuf_reader) == 0);
2615 rkbuf->rkbuf_corrid = ++rkb->rkb_corrid;
2616 rd_kafka_buf_update_i32(rkbuf, 4+2+2,
2617 rkbuf->rkbuf_corrid);
2618 rkbuf->rkbuf_connid = rkb->rkb_connid;
2619 } else if (pre_of > RD_KAFKAP_REQHDR_SIZE) {
2620 rd_kafka_assert(NULL,
2621 rkbuf->rkbuf_connid == rkb->rkb_connid);
2622 }
2623
2624 if (0) {
2625 rd_rkb_dbg(rkb, PROTOCOL, "SEND",
2626 "Send %s corrid %"PRId32" at "
2627 "offset %"PRIusz"/%"PRIusz,
2628 rd_kafka_ApiKey2str(rkbuf->rkbuf_reqhdr.
2629 ApiKey),
2630 rkbuf->rkbuf_corrid,
2631 pre_of, rd_slice_size(&rkbuf->rkbuf_reader));
2632 }
2633
2634 if ((r = rd_kafka_broker_send(rkb, &rkbuf->rkbuf_reader)) == -1)
2635 return -1;
2636
2637 now = rd_clock();
2638 rd_atomic64_set(&rkb->rkb_c.ts_send, now);
2639
2640 /* Partial send? Continue next time. */
2641 if (rd_slice_remains(&rkbuf->rkbuf_reader) > 0) {
2642 rd_rkb_dbg(rkb, PROTOCOL, "SEND",
2643 "Sent partial %sRequest "
2644 "(v%hd, "
2645 "%"PRIdsz"+%"PRIdsz"/%"PRIusz" bytes, "
2646 "CorrId %"PRId32")",
2647 rd_kafka_ApiKey2str(rkbuf->rkbuf_reqhdr.
2648 ApiKey),
2649 rkbuf->rkbuf_reqhdr.ApiVersion,
2650 (ssize_t)pre_of, r,
2651 rd_slice_size(&rkbuf->rkbuf_reader),
2652 rkbuf->rkbuf_corrid);
2653 return 0;
2654 }
2655
2656 rd_rkb_dbg(rkb, PROTOCOL, "SEND",
2657 "Sent %sRequest (v%hd, %"PRIusz" bytes @ %"PRIusz", "
2658 "CorrId %"PRId32")",
2659 rd_kafka_ApiKey2str(rkbuf->rkbuf_reqhdr.ApiKey),
2660 rkbuf->rkbuf_reqhdr.ApiVersion,
2661 rd_slice_size(&rkbuf->rkbuf_reader),
2662 pre_of, rkbuf->rkbuf_corrid);
2663
2664 rd_atomic64_add(&rkb->rkb_c.reqtype[rkbuf->rkbuf_reqhdr.ApiKey],
2665 1);
2666
2667 /* Notify transport layer of full request sent */
2668 if (likely(rkb->rkb_transport != NULL))
2669 rd_kafka_transport_request_sent(rkb, rkbuf);
2670
2671 /* Entire buffer sent, unlink from outbuf */
2672 rd_kafka_bufq_deq(&rkb->rkb_outbufs, rkbuf);
2673 rkbuf->rkbuf_flags |= RD_KAFKA_OP_F_SENT;
2674
2675 /* Store time for RTT calculation */
2676 rkbuf->rkbuf_ts_sent = now;
2677
2678 /* Add to outbuf_latency averager */
2679 rd_avg_add(&rkb->rkb_avg_outbuf_latency,
2680 rkbuf->rkbuf_ts_sent - rkbuf->rkbuf_ts_enq);
2681
2682 if (rkbuf->rkbuf_flags & RD_KAFKA_OP_F_BLOCKING &&
2683 rd_atomic32_add(&rkb->rkb_blocking_request_cnt, 1) == 1)
2684 rd_kafka_brokers_broadcast_state_change(rkb->rkb_rk);
2685
2686 /* Put buffer on response wait list unless we are not
2687 * expecting a response (required_acks=0). */
2688 if (!(rkbuf->rkbuf_flags & RD_KAFKA_OP_F_NO_RESPONSE))
2689 rd_kafka_bufq_enq(&rkb->rkb_waitresps, rkbuf);
2690 else { /* Call buffer callback for delivery report. */
2691 rd_kafka_buf_callback(rkb->rkb_rk, rkb, 0, NULL, rkbuf);
2692 }
2693
2694 cnt++;
2695 }
2696
2697 return cnt;
2698 }
2699
2700
2701 /**
2702 * Add 'rkbuf' to broker 'rkb's retry queue.
2703 */
rd_kafka_broker_buf_retry(rd_kafka_broker_t * rkb,rd_kafka_buf_t * rkbuf)2704 void rd_kafka_broker_buf_retry (rd_kafka_broker_t *rkb, rd_kafka_buf_t *rkbuf) {
2705
2706 /* Restore original replyq since replyq.q will have been NULLed
2707 * by buf_callback()/replyq_enq(). */
2708 if (!rkbuf->rkbuf_replyq.q && rkbuf->rkbuf_orig_replyq.q) {
2709 rkbuf->rkbuf_replyq = rkbuf->rkbuf_orig_replyq;
2710 rd_kafka_replyq_clear(&rkbuf->rkbuf_orig_replyq);
2711 }
2712
2713 /* If called from another thread than rkb's broker thread
2714 * enqueue the buffer on the broker's op queue. */
2715 if (!thrd_is_current(rkb->rkb_thread)) {
2716 rd_kafka_op_t *rko = rd_kafka_op_new(RD_KAFKA_OP_XMIT_RETRY);
2717 rko->rko_u.xbuf.rkbuf = rkbuf;
2718 rd_kafka_q_enq(rkb->rkb_ops, rko);
2719 return;
2720 }
2721
2722 rd_rkb_dbg(rkb, PROTOCOL, "RETRY",
2723 "Retrying %sRequest (v%hd, %"PRIusz" bytes, retry %d/%d, "
2724 "prev CorrId %"PRId32") in %dms",
2725 rd_kafka_ApiKey2str(rkbuf->rkbuf_reqhdr.ApiKey),
2726 rkbuf->rkbuf_reqhdr.ApiVersion,
2727 rd_slice_size(&rkbuf->rkbuf_reader),
2728 rkbuf->rkbuf_retries, rkbuf->rkbuf_max_retries,
2729 rkbuf->rkbuf_corrid,
2730 rkb->rkb_rk->rk_conf.retry_backoff_ms);
2731
2732 rd_atomic64_add(&rkb->rkb_c.tx_retries, 1);
2733
2734 rkbuf->rkbuf_ts_retry = rd_clock() +
2735 (rkb->rkb_rk->rk_conf.retry_backoff_ms * 1000);
2736 /* Precaution: time out the request if it hasn't moved from the
2737 * retry queue within the retry interval (such as when the broker is
2738 * down). */
2739 // FIXME: implememt this properly.
2740 rkbuf->rkbuf_ts_timeout = rkbuf->rkbuf_ts_retry + (5*1000*1000);
2741
2742 /* Reset send offset */
2743 rd_slice_seek(&rkbuf->rkbuf_reader, 0);
2744 rkbuf->rkbuf_corrid = 0;
2745
2746 rd_kafka_bufq_enq(&rkb->rkb_retrybufs, rkbuf);
2747 }
2748
2749
2750 /**
2751 * Move buffers that have expired their retry backoff time from the
2752 * retry queue to the outbuf.
2753 */
rd_kafka_broker_retry_bufs_move(rd_kafka_broker_t * rkb,rd_ts_t * next_wakeup)2754 static void rd_kafka_broker_retry_bufs_move (rd_kafka_broker_t *rkb,
2755 rd_ts_t *next_wakeup) {
2756 rd_ts_t now = rd_clock();
2757 rd_kafka_buf_t *rkbuf;
2758 int cnt = 0;
2759
2760 while ((rkbuf = TAILQ_FIRST(&rkb->rkb_retrybufs.rkbq_bufs))) {
2761 if (rkbuf->rkbuf_ts_retry > now) {
2762 if (rkbuf->rkbuf_ts_retry < *next_wakeup)
2763 *next_wakeup = rkbuf->rkbuf_ts_retry;
2764 break;
2765 }
2766
2767 rd_kafka_bufq_deq(&rkb->rkb_retrybufs, rkbuf);
2768
2769 rd_kafka_broker_buf_enq0(rkb, rkbuf);
2770 cnt++;
2771 }
2772
2773 if (cnt > 0)
2774 rd_rkb_dbg(rkb, BROKER, "RETRY",
2775 "Moved %d retry buffer(s) to output queue", cnt);
2776 }
2777
2778
2779 /**
2780 * @brief Propagate delivery report for entire message queue.
2781 *
2782 * @param err The error which will be set on each message.
2783 * @param status The status which will be set on each message.
2784 *
2785 * To avoid extra iterations, the \p err and \p status are set on
2786 * the message as they are popped off the OP_DR msgq in rd_kafka_poll() et.al
2787 */
rd_kafka_dr_msgq(rd_kafka_topic_t * rkt,rd_kafka_msgq_t * rkmq,rd_kafka_resp_err_t err)2788 void rd_kafka_dr_msgq (rd_kafka_topic_t *rkt,
2789 rd_kafka_msgq_t *rkmq,
2790 rd_kafka_resp_err_t err) {
2791 rd_kafka_t *rk = rkt->rkt_rk;
2792
2793 if (unlikely(rd_kafka_msgq_len(rkmq) == 0))
2794 return;
2795
2796 if (err && rd_kafka_is_transactional(rk))
2797 rd_atomic64_add(&rk->rk_eos.txn_dr_fails,
2798 rd_kafka_msgq_len(rkmq));
2799
2800 /* Call on_acknowledgement() interceptors */
2801 rd_kafka_interceptors_on_acknowledgement_queue(rk, rkmq, err);
2802
2803 if (rk->rk_drmode != RD_KAFKA_DR_MODE_NONE &&
2804 (!rk->rk_conf.dr_err_only || err)) {
2805 /* Pass all messages to application thread in one op. */
2806 rd_kafka_op_t *rko;
2807
2808 rko = rd_kafka_op_new(RD_KAFKA_OP_DR);
2809 rko->rko_err = err;
2810 rko->rko_u.dr.rkt = rd_kafka_topic_keep(rkt);
2811 rd_kafka_msgq_init(&rko->rko_u.dr.msgq);
2812
2813 /* Move all messages to op's msgq */
2814 rd_kafka_msgq_move(&rko->rko_u.dr.msgq, rkmq);
2815
2816 rd_kafka_q_enq(rk->rk_rep, rko);
2817
2818 } else {
2819 /* No delivery report callback. */
2820
2821 /* Destroy the messages right away. */
2822 rd_kafka_msgq_purge(rk, rkmq);
2823 }
2824 }
2825
2826
2827 /**
2828 * @brief Trigger delivery reports for implicitly acked messages.
2829 *
2830 * @locks none
2831 * @locality broker thread - either last or current leader
2832 */
rd_kafka_dr_implicit_ack(rd_kafka_broker_t * rkb,rd_kafka_toppar_t * rktp,uint64_t last_msgid)2833 void rd_kafka_dr_implicit_ack (rd_kafka_broker_t *rkb,
2834 rd_kafka_toppar_t *rktp,
2835 uint64_t last_msgid) {
2836 rd_kafka_msgq_t acked = RD_KAFKA_MSGQ_INITIALIZER(acked);
2837 rd_kafka_msgq_t acked2 = RD_KAFKA_MSGQ_INITIALIZER(acked2);
2838 rd_kafka_msg_status_t status = RD_KAFKA_MSG_STATUS_POSSIBLY_PERSISTED;
2839
2840 if (rktp->rktp_rkt->rkt_conf.required_acks != 0)
2841 status = RD_KAFKA_MSG_STATUS_PERSISTED;
2842
2843 rd_kafka_msgq_move_acked(&acked, &rktp->rktp_xmit_msgq, last_msgid,
2844 status);
2845 rd_kafka_msgq_move_acked(&acked2, &rktp->rktp_msgq, last_msgid,
2846 status);
2847
2848 /* Insert acked2 into acked in correct order */
2849 rd_kafka_msgq_insert_msgq(&acked, &acked2,
2850 rktp->rktp_rkt->rkt_conf.msg_order_cmp);
2851
2852 if (!rd_kafka_msgq_len(&acked))
2853 return;
2854
2855 rd_rkb_dbg(rkb, MSG|RD_KAFKA_DBG_EOS, "IMPLICITACK",
2856 "%.*s [%"PRId32"] %d message(s) implicitly acked "
2857 "by subsequent batch success "
2858 "(msgids %"PRIu64"..%"PRIu64", "
2859 "last acked %"PRIu64")",
2860 RD_KAFKAP_STR_PR(rktp->rktp_rkt->rkt_topic),
2861 rktp->rktp_partition,
2862 rd_kafka_msgq_len(&acked),
2863 rd_kafka_msgq_first(&acked)->rkm_u.producer.msgid,
2864 rd_kafka_msgq_last(&acked)->rkm_u.producer.msgid,
2865 last_msgid);
2866
2867 /* Trigger delivery reports */
2868 rd_kafka_dr_msgq(rktp->rktp_rkt, &acked, RD_KAFKA_RESP_ERR_NO_ERROR);
2869 }
2870
2871
2872
2873
2874
2875
2876
2877
2878
2879
2880 /**
2881 * @brief Map existing partitions to this broker using the
2882 * toppar's leader_id. Only undelegated partitions
2883 * matching this broker are mapped.
2884 *
2885 * @locks none
2886 * @locality any
2887 */
rd_kafka_broker_map_partitions(rd_kafka_broker_t * rkb)2888 static void rd_kafka_broker_map_partitions (rd_kafka_broker_t *rkb) {
2889 rd_kafka_t *rk = rkb->rkb_rk;
2890 rd_kafka_topic_t *rkt;
2891 int cnt = 0;
2892
2893 if (rkb->rkb_nodeid == -1 || RD_KAFKA_BROKER_IS_LOGICAL(rkb))
2894 return;
2895
2896 rd_kafka_rdlock(rk);
2897 TAILQ_FOREACH(rkt, &rk->rk_topics, rkt_link) {
2898 int i;
2899
2900 rd_kafka_topic_wrlock(rkt);
2901 for (i = 0 ; i < rkt->rkt_partition_cnt ; i++) {
2902 rd_kafka_toppar_t *rktp = rkt->rkt_p[i];
2903
2904 /* Only map undelegated partitions matching this broker*/
2905 rd_kafka_toppar_lock(rktp);
2906 if (rktp->rktp_leader_id == rkb->rkb_nodeid &&
2907 !(rktp->rktp_broker && rktp->rktp_next_broker)) {
2908 rd_kafka_toppar_broker_update(
2909 rktp, rktp->rktp_leader_id, rkb,
2910 "broker node information updated");
2911 cnt++;
2912 }
2913 rd_kafka_toppar_unlock(rktp);
2914 }
2915 rd_kafka_topic_wrunlock(rkt);
2916 }
2917 rd_kafka_rdunlock(rk);
2918
2919 rd_rkb_dbg(rkb, TOPIC|RD_KAFKA_DBG_BROKER, "LEADER",
2920 "Mapped %d partition(s) to broker", cnt);
2921 }
2922
2923
2924 /**
2925 * @brief Broker id comparator
2926 */
rd_kafka_broker_cmp_by_id(const void * _a,const void * _b)2927 static int rd_kafka_broker_cmp_by_id (const void *_a, const void *_b) {
2928 const rd_kafka_broker_t *a = _a, *b = _b;
2929 return RD_CMP(a->rkb_nodeid, b->rkb_nodeid);
2930 }
2931
2932
2933 /**
2934 * @brief Set the broker logname (used in logs) to a copy of \p logname.
2935 *
2936 * @locality any
2937 * @locks none
2938 */
rd_kafka_broker_set_logname(rd_kafka_broker_t * rkb,const char * logname)2939 static void rd_kafka_broker_set_logname (rd_kafka_broker_t *rkb,
2940 const char *logname) {
2941 mtx_lock(&rkb->rkb_logname_lock);
2942 if (rkb->rkb_logname)
2943 rd_free(rkb->rkb_logname);
2944 rkb->rkb_logname = rd_strdup(logname);
2945 mtx_unlock(&rkb->rkb_logname_lock);
2946 }
2947
2948
2949
2950 /**
2951 * @brief Prepare destruction of the broker object.
2952 *
2953 * Since rd_kafka_broker_terminating() relies on the refcnt of the
2954 * broker to reach 1, we need to loose any self-references
2955 * to avoid a hang (waiting for refcnt decrease) on destruction.
2956 *
2957 * @locality broker thread
2958 * @locks none
2959 */
rd_kafka_broker_prepare_destroy(rd_kafka_broker_t * rkb)2960 static void rd_kafka_broker_prepare_destroy (rd_kafka_broker_t *rkb) {
2961 rd_kafka_broker_monitor_del(&rkb->rkb_coord_monitor);
2962 }
2963
2964
2965 /**
2966 * @brief Serve a broker op (an op posted by another thread to be handled by
2967 * this broker's thread).
2968 *
2969 * @returns true if calling op loop should break out, else false to continue.
2970 * @locality broker thread
2971 * @locks none
2972 */
2973 static RD_WARN_UNUSED_RESULT
rd_kafka_broker_op_serve(rd_kafka_broker_t * rkb,rd_kafka_op_t * rko)2974 rd_bool_t rd_kafka_broker_op_serve (rd_kafka_broker_t *rkb,
2975 rd_kafka_op_t *rko) {
2976 rd_kafka_toppar_t *rktp;
2977 rd_kafka_resp_err_t topic_err;
2978 rd_bool_t wakeup = rd_false;
2979
2980 rd_kafka_assert(rkb->rkb_rk, thrd_is_current(rkb->rkb_thread));
2981
2982 switch (rko->rko_type)
2983 {
2984 case RD_KAFKA_OP_NODE_UPDATE:
2985 {
2986 enum {
2987 _UPD_NAME = 0x1,
2988 _UPD_ID = 0x2
2989 } updated = 0;
2990 char brokername[RD_KAFKA_NODENAME_SIZE];
2991
2992 /* Need kafka_wrlock for updating rk_broker_by_id */
2993 rd_kafka_wrlock(rkb->rkb_rk);
2994 rd_kafka_broker_lock(rkb);
2995
2996 if (strcmp(rkb->rkb_nodename,
2997 rko->rko_u.node.nodename)) {
2998 rd_rkb_dbg(rkb, BROKER, "UPDATE",
2999 "Nodename changed from %s to %s",
3000 rkb->rkb_nodename,
3001 rko->rko_u.node.nodename);
3002 rd_strlcpy(rkb->rkb_nodename,
3003 rko->rko_u.node.nodename,
3004 sizeof(rkb->rkb_nodename));
3005 rkb->rkb_nodename_epoch++;
3006 updated |= _UPD_NAME;
3007 }
3008
3009 if (rko->rko_u.node.nodeid != -1 &&
3010 !RD_KAFKA_BROKER_IS_LOGICAL(rkb) &&
3011 rko->rko_u.node.nodeid != rkb->rkb_nodeid) {
3012 int32_t old_nodeid = rkb->rkb_nodeid;
3013 rd_rkb_dbg(rkb, BROKER, "UPDATE",
3014 "NodeId changed from %"PRId32" to %"PRId32,
3015 rkb->rkb_nodeid,
3016 rko->rko_u.node.nodeid);
3017
3018 rkb->rkb_nodeid = rko->rko_u.node.nodeid;
3019
3020 /* Update system thread name */
3021 rd_kafka_set_thread_sysname("rdk:broker%"PRId32,
3022 rkb->rkb_nodeid);
3023
3024 /* Update broker_by_id sorted list */
3025 if (old_nodeid == -1)
3026 rd_list_add(&rkb->rkb_rk->rk_broker_by_id, rkb);
3027 rd_list_sort(&rkb->rkb_rk->rk_broker_by_id,
3028 rd_kafka_broker_cmp_by_id);
3029
3030 updated |= _UPD_ID;
3031 }
3032
3033 rd_kafka_mk_brokername(brokername, sizeof(brokername),
3034 rkb->rkb_proto,
3035 rkb->rkb_nodename, rkb->rkb_nodeid,
3036 RD_KAFKA_LEARNED);
3037 if (strcmp(rkb->rkb_name, brokername)) {
3038 /* Udate the name copy used for logging. */
3039 rd_kafka_broker_set_logname(rkb, brokername);
3040
3041 rd_rkb_dbg(rkb, BROKER, "UPDATE",
3042 "Name changed from %s to %s",
3043 rkb->rkb_name, brokername);
3044 rd_strlcpy(rkb->rkb_name, brokername,
3045 sizeof(rkb->rkb_name));
3046 }
3047 rd_kafka_broker_unlock(rkb);
3048 rd_kafka_wrunlock(rkb->rkb_rk);
3049
3050 if (updated & _UPD_NAME)
3051 rd_kafka_broker_fail(rkb, LOG_DEBUG,
3052 RD_KAFKA_RESP_ERR__TRANSPORT,
3053 "Broker hostname updated");
3054 else if (updated & _UPD_ID) {
3055 /* Map existing partitions to this broker. */
3056 rd_kafka_broker_map_partitions(rkb);
3057
3058 /* If broker is currently in state up we need
3059 * to trigger a state change so it exits its
3060 * state&type based .._serve() loop. */
3061 rd_kafka_broker_lock(rkb);
3062 if (rkb->rkb_state == RD_KAFKA_BROKER_STATE_UP)
3063 rd_kafka_broker_set_state(
3064 rkb, RD_KAFKA_BROKER_STATE_UPDATE);
3065 rd_kafka_broker_unlock(rkb);
3066 }
3067
3068 rd_kafka_brokers_broadcast_state_change(rkb->rkb_rk);
3069 break;
3070 }
3071
3072 case RD_KAFKA_OP_XMIT_BUF:
3073 rd_kafka_broker_buf_enq2(rkb, rko->rko_u.xbuf.rkbuf);
3074 rko->rko_u.xbuf.rkbuf = NULL; /* buffer now owned by broker */
3075 if (rko->rko_replyq.q) {
3076 /* Op will be reused for forwarding response. */
3077 rko = NULL;
3078 }
3079 break;
3080
3081 case RD_KAFKA_OP_XMIT_RETRY:
3082 rd_kafka_broker_buf_retry(rkb, rko->rko_u.xbuf.rkbuf);
3083 rko->rko_u.xbuf.rkbuf = NULL;
3084 break;
3085
3086 case RD_KAFKA_OP_PARTITION_JOIN:
3087 /*
3088 * Add partition to broker toppars
3089 */
3090 rktp = rko->rko_rktp;
3091 rd_kafka_toppar_lock(rktp);
3092
3093 /* Abort join if instance is terminating */
3094 if (rd_kafka_terminating(rkb->rkb_rk) ||
3095 (rktp->rktp_flags & RD_KAFKA_TOPPAR_F_REMOVE)) {
3096 rd_rkb_dbg(rkb, BROKER | RD_KAFKA_DBG_TOPIC, "TOPBRK",
3097 "Topic %s [%"PRId32"]: not joining broker: "
3098 "%s",
3099 rktp->rktp_rkt->rkt_topic->str,
3100 rktp->rktp_partition,
3101 rd_kafka_terminating(rkb->rkb_rk) ?
3102 "instance is terminating" :
3103 "partition removed");
3104
3105 rd_kafka_broker_destroy(rktp->rktp_next_broker);
3106 rktp->rktp_next_broker = NULL;
3107 rd_kafka_toppar_unlock(rktp);
3108 break;
3109 }
3110
3111 /* See if we are still the next broker */
3112 if (rktp->rktp_next_broker != rkb) {
3113 rd_rkb_dbg(rkb, BROKER | RD_KAFKA_DBG_TOPIC, "TOPBRK",
3114 "Topic %s [%"PRId32"]: not joining broker "
3115 "(next broker %s)",
3116 rktp->rktp_rkt->rkt_topic->str,
3117 rktp->rktp_partition,
3118 rktp->rktp_next_broker ?
3119 rd_kafka_broker_name(rktp->rktp_next_broker):
3120 "(none)");
3121
3122 /* Need temporary refcount so we can safely unlock
3123 * after q_enq(). */
3124 rd_kafka_toppar_keep(rktp);
3125
3126 /* No, forward this op to the new next broker. */
3127 rd_kafka_q_enq(rktp->rktp_next_broker->rkb_ops, rko);
3128 rko = NULL;
3129
3130 rd_kafka_toppar_unlock(rktp);
3131 rd_kafka_toppar_destroy(rktp);
3132
3133 break;
3134 }
3135
3136 rd_rkb_dbg(rkb, BROKER | RD_KAFKA_DBG_TOPIC, "TOPBRK",
3137 "Topic %s [%"PRId32"]: joining broker "
3138 "(rktp %p, %d message(s) queued)",
3139 rktp->rktp_rkt->rkt_topic->str,
3140 rktp->rktp_partition, rktp,
3141 rd_kafka_msgq_len(&rktp->rktp_msgq));
3142
3143 rd_kafka_assert(NULL,
3144 !(rktp->rktp_flags & RD_KAFKA_TOPPAR_F_ON_RKB));
3145 rktp->rktp_flags |= RD_KAFKA_TOPPAR_F_ON_RKB;
3146 rd_kafka_toppar_keep(rktp);
3147 rd_kafka_broker_lock(rkb);
3148 TAILQ_INSERT_TAIL(&rkb->rkb_toppars, rktp, rktp_rkblink);
3149 rkb->rkb_toppar_cnt++;
3150 rd_kafka_broker_unlock(rkb);
3151 rktp->rktp_broker = rkb;
3152 rd_assert(!rktp->rktp_msgq_wakeup_q);
3153 rktp->rktp_msgq_wakeup_q = rd_kafka_q_keep(rkb->rkb_ops);
3154 rd_kafka_broker_keep(rkb);
3155
3156 if (rkb->rkb_rk->rk_type == RD_KAFKA_PRODUCER) {
3157 rd_kafka_broker_active_toppar_add(rkb, rktp, "joining");
3158
3159 if (rd_kafka_is_idempotent(rkb->rkb_rk)) {
3160 /* Wait for all outstanding requests from
3161 * the previous leader to finish before
3162 * producing anything to this new leader. */
3163 rd_kafka_idemp_drain_toppar(
3164 rktp,
3165 "wait for outstanding requests to "
3166 "finish before producing to "
3167 "new leader");
3168 }
3169 }
3170
3171 rd_kafka_broker_destroy(rktp->rktp_next_broker);
3172 rktp->rktp_next_broker = NULL;
3173
3174 rd_kafka_toppar_unlock(rktp);
3175
3176 rd_kafka_brokers_broadcast_state_change(rkb->rkb_rk);
3177 break;
3178
3179 case RD_KAFKA_OP_PARTITION_LEAVE:
3180 /*
3181 * Remove partition from broker toppars
3182 */
3183 rktp = rko->rko_rktp;
3184
3185 /* If there is a topic-wide error, use it as error code
3186 * when failing messages below. */
3187 topic_err = rd_kafka_topic_get_error(rktp->rktp_rkt);
3188
3189 rd_kafka_toppar_lock(rktp);
3190
3191 /* Multiple PARTITION_LEAVEs are possible during partition
3192 * migration, make sure we're supposed to handle this one. */
3193 if (unlikely(rktp->rktp_broker != rkb)) {
3194 rd_rkb_dbg(rkb, BROKER | RD_KAFKA_DBG_TOPIC, "TOPBRK",
3195 "Topic %s [%"PRId32"]: "
3196 "ignoring PARTITION_LEAVE: "
3197 "not delegated to broker (%s)",
3198 rktp->rktp_rkt->rkt_topic->str,
3199 rktp->rktp_partition,
3200 rktp->rktp_broker ?
3201 rd_kafka_broker_name(rktp->rktp_broker) :
3202 "none");
3203 rd_kafka_toppar_unlock(rktp);
3204 break;
3205 }
3206 rd_kafka_toppar_unlock(rktp);
3207
3208 /* Remove from fetcher list */
3209 rd_kafka_toppar_fetch_decide(rktp, rkb, 1/*force remove*/);
3210
3211 if (rkb->rkb_rk->rk_type == RD_KAFKA_PRODUCER) {
3212 /* Purge any ProduceRequests for this toppar
3213 * in the output queue. */
3214 rd_kafka_broker_bufq_purge_by_toppar(
3215 rkb,
3216 &rkb->rkb_outbufs,
3217 RD_KAFKAP_Produce, rktp,
3218 RD_KAFKA_RESP_ERR__RETRY);
3219 }
3220
3221
3222 rd_kafka_toppar_lock(rktp);
3223
3224 rd_rkb_dbg(rkb, BROKER | RD_KAFKA_DBG_TOPIC, "TOPBRK",
3225 "Topic %s [%"PRId32"]: leaving broker "
3226 "(%d messages in xmitq, next broker %s, rktp %p)",
3227 rktp->rktp_rkt->rkt_topic->str, rktp->rktp_partition,
3228 rd_kafka_msgq_len(&rktp->rktp_xmit_msgq),
3229 rktp->rktp_next_broker ?
3230 rd_kafka_broker_name(rktp->rktp_next_broker) :
3231 "(none)", rktp);
3232
3233 /* Insert xmitq(broker-local) messages to the msgq(global)
3234 * at their sorted position to maintain ordering. */
3235 rd_kafka_msgq_insert_msgq(&rktp->rktp_msgq,
3236 &rktp->rktp_xmit_msgq,
3237 rktp->rktp_rkt->rkt_conf.
3238 msg_order_cmp);
3239
3240 if (rkb->rkb_rk->rk_type == RD_KAFKA_PRODUCER)
3241 rd_kafka_broker_active_toppar_del(rkb, rktp, "leaving");
3242
3243 rd_kafka_broker_lock(rkb);
3244 TAILQ_REMOVE(&rkb->rkb_toppars, rktp, rktp_rkblink);
3245 rkb->rkb_toppar_cnt--;
3246 rd_kafka_broker_unlock(rkb);
3247 rd_kafka_broker_destroy(rktp->rktp_broker);
3248 if (rktp->rktp_msgq_wakeup_q) {
3249 rd_kafka_q_destroy(rktp->rktp_msgq_wakeup_q);
3250 rktp->rktp_msgq_wakeup_q = NULL;
3251 }
3252 rktp->rktp_broker = NULL;
3253
3254 rd_assert(rktp->rktp_flags & RD_KAFKA_TOPPAR_F_ON_RKB);
3255 rktp->rktp_flags &= ~RD_KAFKA_TOPPAR_F_ON_RKB;
3256
3257 if (rktp->rktp_next_broker) {
3258 /* There is a next broker we need to migrate to. */
3259 rko->rko_type = RD_KAFKA_OP_PARTITION_JOIN;
3260 rd_kafka_q_enq(rktp->rktp_next_broker->rkb_ops, rko);
3261 rko = NULL;
3262 } else {
3263 rd_rkb_dbg(rkb, BROKER | RD_KAFKA_DBG_TOPIC, "TOPBRK",
3264 "Topic %s [%"PRId32"]: no next broker, "
3265 "failing %d message(s) in partition queue",
3266 rktp->rktp_rkt->rkt_topic->str,
3267 rktp->rktp_partition,
3268 rd_kafka_msgq_len(&rktp->rktp_msgq));
3269 rd_kafka_assert(NULL, rd_kafka_msgq_len(&rktp->rktp_xmit_msgq) == 0);
3270 rd_kafka_dr_msgq(rktp->rktp_rkt, &rktp->rktp_msgq,
3271 rd_kafka_terminating(rkb->rkb_rk) ?
3272 RD_KAFKA_RESP_ERR__DESTROY :
3273 (topic_err ? topic_err :
3274 RD_KAFKA_RESP_ERR__UNKNOWN_PARTITION));
3275
3276 }
3277
3278 rd_kafka_toppar_unlock(rktp);
3279 rd_kafka_toppar_destroy(rktp); /* from JOIN */
3280
3281 rd_kafka_brokers_broadcast_state_change(rkb->rkb_rk);
3282 break;
3283
3284 case RD_KAFKA_OP_TERMINATE:
3285 /* nop: just a wake-up. */
3286 rd_rkb_dbg(rkb, BROKER, "TERM",
3287 "Received TERMINATE op in state %s: "
3288 "%d refcnts, %d toppar(s), %d active toppar(s), "
3289 "%d outbufs, %d waitresps, %d retrybufs",
3290 rd_kafka_broker_state_names[rkb->rkb_state],
3291 rd_refcnt_get(&rkb->rkb_refcnt),
3292 rkb->rkb_toppar_cnt, rkb->rkb_active_toppar_cnt,
3293 (int)rd_kafka_bufq_cnt(&rkb->rkb_outbufs),
3294 (int)rd_kafka_bufq_cnt(&rkb->rkb_waitresps),
3295 (int)rd_kafka_bufq_cnt(&rkb->rkb_retrybufs));
3296 /* Expedite termination by bringing down the broker
3297 * and trigger a state change.
3298 * This makes sure any eonce dependent on state changes
3299 * are triggered. */
3300 rd_kafka_broker_fail(rkb, LOG_DEBUG,
3301 RD_KAFKA_RESP_ERR__DESTROY,
3302 "Client is terminating");
3303
3304 rd_kafka_broker_prepare_destroy(rkb);
3305 wakeup = rd_true;
3306 break;
3307
3308 case RD_KAFKA_OP_WAKEUP:
3309 wakeup = rd_true;
3310 break;
3311
3312 case RD_KAFKA_OP_PURGE:
3313 rd_kafka_broker_handle_purge_queues(rkb, rko);
3314 rko = NULL; /* the rko is reused for the reply */
3315 break;
3316
3317 case RD_KAFKA_OP_CONNECT:
3318 /* Sparse connections: connection requested, transition
3319 * to TRY_CONNECT state to trigger new connection. */
3320 if (rkb->rkb_state == RD_KAFKA_BROKER_STATE_INIT) {
3321 rd_rkb_dbg(rkb, BROKER, "CONNECT",
3322 "Received CONNECT op");
3323 rkb->rkb_persistconn.internal++;
3324 rd_kafka_broker_lock(rkb);
3325 rd_kafka_broker_set_state(
3326 rkb, RD_KAFKA_BROKER_STATE_TRY_CONNECT);
3327 rd_kafka_broker_unlock(rkb);
3328
3329 } else if (rkb->rkb_state >=
3330 RD_KAFKA_BROKER_STATE_TRY_CONNECT) {
3331 rd_bool_t do_disconnect = rd_false;
3332
3333 /* If the nodename was changed since the last connect,
3334 * close the current connection. */
3335
3336 rd_kafka_broker_lock(rkb);
3337 do_disconnect = (rkb->rkb_connect_epoch !=
3338 rkb->rkb_nodename_epoch);
3339 rd_kafka_broker_unlock(rkb);
3340
3341 if (do_disconnect)
3342 rd_kafka_broker_fail(
3343 rkb, LOG_DEBUG,
3344 RD_KAFKA_RESP_ERR__TRANSPORT,
3345 "Closing connection due to "
3346 "nodename change");
3347 }
3348
3349 /* Expedite next reconnect */
3350 rkb->rkb_ts_reconnect = 0;
3351
3352 wakeup = rd_true;
3353 break;
3354
3355 default:
3356 rd_kafka_assert(rkb->rkb_rk, !*"unhandled op type");
3357 break;
3358 }
3359
3360 if (rko)
3361 rd_kafka_op_reply(rko, RD_KAFKA_RESP_ERR_NO_ERROR);
3362
3363 return wakeup;
3364 }
3365
3366
3367
3368 /**
3369 * @brief Serve broker ops.
3370 * @returns the number of ops served
3371 */
3372 static RD_WARN_UNUSED_RESULT
rd_kafka_broker_ops_serve(rd_kafka_broker_t * rkb,rd_ts_t timeout_us)3373 int rd_kafka_broker_ops_serve (rd_kafka_broker_t *rkb, rd_ts_t timeout_us) {
3374 rd_kafka_op_t *rko;
3375 int cnt = 0;
3376
3377 while ((rko = rd_kafka_q_pop(rkb->rkb_ops, timeout_us, 0)) &&
3378 (cnt++, !rd_kafka_broker_op_serve(rkb, rko)))
3379 timeout_us = RD_POLL_NOWAIT;
3380
3381 return cnt;
3382 }
3383
3384 /**
3385 * @brief Serve broker ops and IOs.
3386 *
3387 * If a connection exists, poll IO first based on timeout.
3388 * Use remaining timeout for ops queue poll.
3389 *
3390 * If no connection, poll ops queue using timeout.
3391 *
3392 * Sparse connections: if there's need for a connection, set
3393 * timeout to NOWAIT.
3394 *
3395 * @param abs_timeout Maximum block time (absolute time).
3396 *
3397 * @returns true on wakeup (broker state machine needs to be served),
3398 * else false.
3399 *
3400 * @locality broker thread
3401 * @locks none
3402 */
3403 static RD_WARN_UNUSED_RESULT
rd_kafka_broker_ops_io_serve(rd_kafka_broker_t * rkb,rd_ts_t abs_timeout)3404 rd_bool_t rd_kafka_broker_ops_io_serve (rd_kafka_broker_t *rkb,
3405 rd_ts_t abs_timeout) {
3406 rd_ts_t now;
3407 rd_bool_t wakeup;
3408
3409 if (unlikely(rd_kafka_terminating(rkb->rkb_rk)))
3410 abs_timeout = rd_clock() + 1000;
3411 else if (unlikely(rd_kafka_broker_needs_connection(rkb)))
3412 abs_timeout = RD_POLL_NOWAIT;
3413 else if (unlikely(abs_timeout == RD_POLL_INFINITE))
3414 abs_timeout = rd_clock() +
3415 ((rd_ts_t)rd_kafka_max_block_ms * 1000);
3416
3417 if (likely(rkb->rkb_transport != NULL)) {
3418 /* Serve IO events.
3419 *
3420 * If there are IO events, cut out the queue ops_serve
3421 * timeout (below) since we'll probably have to perform more
3422 * duties based on the IO.
3423 * IO polling granularity is milliseconds while
3424 * queue granularity is microseconds. */
3425 if (rd_kafka_transport_io_serve(
3426 rkb->rkb_transport,
3427 rd_timeout_remains(abs_timeout)))
3428 abs_timeout = RD_POLL_NOWAIT;
3429 }
3430
3431
3432 /* Serve broker ops */
3433 wakeup = rd_kafka_broker_ops_serve(rkb,
3434 rd_timeout_remains_us(abs_timeout));
3435
3436 /* An op might have triggered the need for a connection, if so
3437 * transition to TRY_CONNECT state. */
3438 if (unlikely(rd_kafka_broker_needs_connection(rkb) &&
3439 rkb->rkb_state == RD_KAFKA_BROKER_STATE_INIT)) {
3440 rd_kafka_broker_lock(rkb);
3441 rd_kafka_broker_set_state(
3442 rkb, RD_KAFKA_BROKER_STATE_TRY_CONNECT);
3443 rd_kafka_broker_unlock(rkb);
3444 wakeup = rd_true;
3445 }
3446
3447 /* Scan queues for timeouts. */
3448 now = rd_clock();
3449 if (rd_interval(&rkb->rkb_timeout_scan_intvl, 1000000, now) > 0)
3450 rd_kafka_broker_timeout_scan(rkb, now);
3451
3452 return wakeup;
3453 }
3454
3455
3456 /**
3457 * @brief Consumer: Serve the toppars assigned to this broker.
3458 *
3459 * @returns the minimum Fetch backoff time (abs timestamp) for the
3460 * partitions to fetch.
3461 *
3462 * @locality broker thread
3463 */
rd_kafka_broker_consumer_toppars_serve(rd_kafka_broker_t * rkb)3464 static rd_ts_t rd_kafka_broker_consumer_toppars_serve (rd_kafka_broker_t *rkb) {
3465 rd_kafka_toppar_t *rktp, *rktp_tmp;
3466 rd_ts_t min_backoff = RD_TS_MAX;
3467
3468 TAILQ_FOREACH_SAFE(rktp, &rkb->rkb_toppars, rktp_rkblink, rktp_tmp) {
3469 rd_ts_t backoff;
3470
3471 /* Serve toppar to update desired rktp state */
3472 backoff = rd_kafka_broker_consumer_toppar_serve(rkb, rktp);
3473 if (backoff < min_backoff)
3474 min_backoff = backoff;
3475 }
3476
3477 return min_backoff;
3478 }
3479
3480
3481 /**
3482 * @brief Scan toppar's xmit and producer queue for message timeouts and
3483 * enqueue delivery reports for timed out messages.
3484 *
3485 * @param abs_next_timeout will be set to the next message timeout, or 0
3486 * if no timeout.
3487 *
3488 * @returns the number of messages timed out.
3489 *
3490 * @locality toppar's broker handler thread
3491 * @locks toppar_lock MUST be held
3492 */
rd_kafka_broker_toppar_msgq_scan(rd_kafka_broker_t * rkb,rd_kafka_toppar_t * rktp,rd_ts_t now,rd_ts_t * abs_next_timeout)3493 static int rd_kafka_broker_toppar_msgq_scan (rd_kafka_broker_t *rkb,
3494 rd_kafka_toppar_t *rktp,
3495 rd_ts_t now,
3496 rd_ts_t *abs_next_timeout) {
3497 rd_kafka_msgq_t xtimedout = RD_KAFKA_MSGQ_INITIALIZER(xtimedout);
3498 rd_kafka_msgq_t qtimedout = RD_KAFKA_MSGQ_INITIALIZER(qtimedout);
3499 int xcnt, qcnt, cnt;
3500 uint64_t first, last;
3501 rd_ts_t next;
3502
3503 *abs_next_timeout = 0;
3504
3505 xcnt = rd_kafka_msgq_age_scan(rktp, &rktp->rktp_xmit_msgq,
3506 &xtimedout, now, &next);
3507 if (next && next < *abs_next_timeout)
3508 *abs_next_timeout = next;
3509
3510 qcnt = rd_kafka_msgq_age_scan(rktp, &rktp->rktp_msgq,
3511 &qtimedout, now, &next);
3512 if (next && (!*abs_next_timeout || next < *abs_next_timeout))
3513 *abs_next_timeout = next;
3514
3515 cnt = xcnt + qcnt;
3516 if (likely(cnt == 0))
3517 return 0;
3518
3519 /* Insert queue-timedout into xmitqueue-timedout in a sorted fashion */
3520 rd_kafka_msgq_insert_msgq(&xtimedout, &qtimedout,
3521 rktp->rktp_rkt->rkt_conf.msg_order_cmp);
3522
3523 first = rd_kafka_msgq_first(&xtimedout)->rkm_u.producer.msgid;
3524 last = rd_kafka_msgq_last(&xtimedout)->rkm_u.producer.msgid;
3525
3526 rd_rkb_dbg(rkb, MSG, "TIMEOUT",
3527 "%s [%"PRId32"]: timed out %d+%d message(s) "
3528 "(MsgId %"PRIu64"..%"PRIu64"): message.timeout.ms exceeded",
3529 rktp->rktp_rkt->rkt_topic->str, rktp->rktp_partition,
3530 xcnt, qcnt, first, last);
3531
3532 /* Trigger delivery report for timed out messages */
3533 rd_kafka_dr_msgq(rktp->rktp_rkt, &xtimedout,
3534 RD_KAFKA_RESP_ERR__MSG_TIMED_OUT);
3535
3536 return cnt;
3537 }
3538
3539
3540 /**
3541 * @brief Producer: Check this broker's toppars for message timeouts.
3542 *
3543 * This is only used by the internal broker to enforce message timeouts.
3544 *
3545 * @returns the next absolute scan time.
3546 *
3547 * @locality internal broker thread.
3548 */
3549 static rd_ts_t
rd_kafka_broker_toppars_timeout_scan(rd_kafka_broker_t * rkb,rd_ts_t now)3550 rd_kafka_broker_toppars_timeout_scan (rd_kafka_broker_t *rkb, rd_ts_t now) {
3551 rd_kafka_toppar_t *rktp;
3552 rd_ts_t next = now + (1000*1000);
3553
3554 TAILQ_FOREACH(rktp, &rkb->rkb_toppars, rktp_rkblink) {
3555 rd_ts_t this_next;
3556
3557 rd_kafka_toppar_lock(rktp);
3558
3559 if (unlikely(rktp->rktp_broker != rkb)) {
3560 /* Currently migrating away from this
3561 * broker. */
3562 rd_kafka_toppar_unlock(rktp);
3563 continue;
3564 }
3565
3566 /* Scan queues for msg timeouts */
3567 rd_kafka_broker_toppar_msgq_scan(rkb, rktp, now, &this_next);
3568
3569 rd_kafka_toppar_unlock(rktp);
3570
3571 if (this_next && this_next < next)
3572 next = this_next;
3573 }
3574
3575 return next;
3576 }
3577
3578
3579 /**
3580 * @brief Idle function for the internal broker handle.
3581 */
rd_kafka_broker_internal_serve(rd_kafka_broker_t * rkb,rd_ts_t abs_timeout)3582 static void rd_kafka_broker_internal_serve (rd_kafka_broker_t *rkb,
3583 rd_ts_t abs_timeout) {
3584 int initial_state = rkb->rkb_state;
3585 rd_bool_t wakeup;
3586
3587 if (rkb->rkb_rk->rk_type == RD_KAFKA_CONSUMER) {
3588 /* Consumer */
3589 do {
3590 rd_kafka_broker_consumer_toppars_serve(rkb);
3591
3592 wakeup = rd_kafka_broker_ops_io_serve(rkb, abs_timeout);
3593
3594 } while (!rd_kafka_broker_terminating(rkb) &&
3595 (int)rkb->rkb_state == initial_state &&
3596 !wakeup &&
3597 !rd_timeout_expired(rd_timeout_remains(abs_timeout)));
3598 } else {
3599 /* Producer */
3600 rd_ts_t next_timeout_scan = 0;
3601
3602 do {
3603 rd_ts_t now = rd_clock();
3604
3605 if (now >= next_timeout_scan)
3606 next_timeout_scan =
3607 rd_kafka_broker_toppars_timeout_scan(
3608 rkb, now);
3609
3610 wakeup = rd_kafka_broker_ops_io_serve(
3611 rkb, RD_MIN(abs_timeout, next_timeout_scan));
3612
3613 } while (!rd_kafka_broker_terminating(rkb) &&
3614 (int)rkb->rkb_state == initial_state &&
3615 !wakeup &&
3616 !rd_timeout_expired(rd_timeout_remains(abs_timeout)));
3617 }
3618 }
3619
3620
3621 /**
3622 * @returns the number of requests that may be enqueued before
3623 * queue.backpressure.threshold is reached.
3624 */
3625
3626 static RD_INLINE unsigned int
rd_kafka_broker_outbufs_space(rd_kafka_broker_t * rkb)3627 rd_kafka_broker_outbufs_space (rd_kafka_broker_t *rkb) {
3628 int r = rkb->rkb_rk->rk_conf.queue_backpressure_thres -
3629 rd_atomic32_get(&rkb->rkb_outbufs.rkbq_cnt);
3630 return r < 0 ? 0 : (unsigned int)r;
3631 }
3632
3633
3634 /**
3635 * @brief Serve a toppar for producing.
3636 *
3637 * @param next_wakeup will be updated to when the next wake-up/attempt is
3638 * desired, only lower (sooner) values will be set.
3639 * @param do_timeout_scan perform msg timeout scan
3640 * @param may_send if set to false there is something on the global level
3641 * that prohibits sending messages, such as a transactional
3642 * state.
3643 *
3644 * @returns the number of messages produced.
3645 *
3646 * @locks none
3647 * @locality broker thread
3648 */
rd_kafka_toppar_producer_serve(rd_kafka_broker_t * rkb,rd_kafka_toppar_t * rktp,const rd_kafka_pid_t pid,rd_ts_t now,rd_ts_t * next_wakeup,rd_bool_t do_timeout_scan,rd_bool_t may_send)3649 static int rd_kafka_toppar_producer_serve (rd_kafka_broker_t *rkb,
3650 rd_kafka_toppar_t *rktp,
3651 const rd_kafka_pid_t pid,
3652 rd_ts_t now,
3653 rd_ts_t *next_wakeup,
3654 rd_bool_t do_timeout_scan,
3655 rd_bool_t may_send) {
3656 int cnt = 0;
3657 int r;
3658 rd_kafka_msg_t *rkm;
3659 int move_cnt = 0;
3660 int max_requests;
3661 int reqcnt;
3662 int inflight = 0;
3663 uint64_t epoch_base_msgid = 0;
3664
3665 /* By limiting the number of not-yet-sent buffers (rkb_outbufs) we
3666 * provide a backpressure mechanism to the producer loop
3667 * which allows larger message batches to accumulate and thus
3668 * increase throughput.
3669 * This comes at no latency cost since there are already
3670 * buffers enqueued waiting for transmission. */
3671 max_requests = rd_kafka_broker_outbufs_space(rkb);
3672
3673 rd_kafka_toppar_lock(rktp);
3674
3675 if (unlikely(rktp->rktp_broker != rkb)) {
3676 /* Currently migrating away from this
3677 * broker. */
3678 rd_kafka_toppar_unlock(rktp);
3679 return 0;
3680 }
3681
3682 if (unlikely(do_timeout_scan)) {
3683 int timeoutcnt;
3684 rd_ts_t next;
3685
3686 /* Scan queues for msg timeouts */
3687 timeoutcnt = rd_kafka_broker_toppar_msgq_scan(rkb, rktp, now,
3688 &next);
3689
3690 if (next && next < *next_wakeup)
3691 *next_wakeup = next;
3692
3693 if (rd_kafka_is_idempotent(rkb->rkb_rk)) {
3694 if (!rd_kafka_pid_valid(pid)) {
3695 /* If we don't have a PID, we can't transmit
3696 * any messages. */
3697 rd_kafka_toppar_unlock(rktp);
3698 return 0;
3699
3700 } else if (timeoutcnt > 0) {
3701 /* Message timeouts will lead to gaps the in
3702 * the message sequence and thus trigger
3703 * OutOfOrderSequence errors from the broker.
3704 * Bump the epoch to reset the base msgid after
3705 * draining all partitions. */
3706
3707 /* Must not hold toppar lock */
3708 rd_kafka_toppar_unlock(rktp);
3709
3710 rd_kafka_idemp_drain_epoch_bump(
3711 rkb->rkb_rk,
3712 "%d message(s) timed out "
3713 "on %s [%"PRId32"]",
3714 timeoutcnt,
3715 rktp->rktp_rkt->rkt_topic->str,
3716 rktp->rktp_partition);
3717 return 0;
3718 }
3719 }
3720 }
3721
3722 if (unlikely(!may_send)) {
3723 /* Sends prohibited on the broker or instance level */
3724 max_requests = 0;
3725 } else if (unlikely(rd_kafka_fatal_error_code(rkb->rkb_rk))) {
3726 /* Fatal error has been raised, don't produce. */
3727 max_requests = 0;
3728 } else if (unlikely(RD_KAFKA_TOPPAR_IS_PAUSED(rktp))) {
3729 /* Partition is paused */
3730 max_requests = 0;
3731 } else if (unlikely(rd_kafka_is_transactional(rkb->rkb_rk) &&
3732 !rd_kafka_txn_toppar_may_send_msg(rktp))) {
3733 /* Partition not registered in transaction yet */
3734 max_requests = 0;
3735 } else if (max_requests > 0) {
3736 /* Move messages from locked partition produce queue
3737 * to broker-local xmit queue. */
3738 if ((move_cnt = rktp->rktp_msgq.rkmq_msg_cnt) > 0)
3739 rd_kafka_msgq_insert_msgq(&rktp->rktp_xmit_msgq,
3740 &rktp->rktp_msgq,
3741 rktp->rktp_rkt->rkt_conf.
3742 msg_order_cmp);
3743 }
3744
3745 rd_kafka_toppar_unlock(rktp);
3746
3747
3748 if (rd_kafka_is_idempotent(rkb->rkb_rk)) {
3749 /* Update the partition's cached PID, and reset the
3750 * base msg sequence if necessary */
3751 rd_bool_t did_purge = rd_false;
3752
3753 if (unlikely(!rd_kafka_pid_eq(pid, rktp->rktp_eos.pid))) {
3754 /* Flush any ProduceRequests for this partition in the
3755 * output buffer queue to speed up recovery. */
3756 rd_kafka_broker_bufq_purge_by_toppar(
3757 rkb,
3758 &rkb->rkb_outbufs,
3759 RD_KAFKAP_Produce, rktp,
3760 RD_KAFKA_RESP_ERR__RETRY);
3761 did_purge = rd_true;
3762
3763 if (rd_kafka_pid_valid(rktp->rktp_eos.pid))
3764 rd_rkb_dbg(rkb, QUEUE, "TOPPAR",
3765 "%.*s [%"PRId32"] PID has changed: "
3766 "must drain requests for all "
3767 "partitions before resuming reset "
3768 "of PID",
3769 RD_KAFKAP_STR_PR(rktp->rktp_rkt->
3770 rkt_topic),
3771 rktp->rktp_partition);
3772 }
3773
3774 inflight = rd_atomic32_get(&rktp->rktp_msgs_inflight);
3775
3776 if (unlikely(rktp->rktp_eos.wait_drain)) {
3777 if (inflight) {
3778 /* Waiting for in-flight requests to
3779 * drain/finish before producing anything more.
3780 * This is used to recover to a consistent
3781 * state when the partition leader
3782 * has changed, or timed out messages
3783 * have been removed from the queue. */
3784
3785 rd_rkb_dbg(rkb, QUEUE, "TOPPAR",
3786 "%.*s [%"PRId32"] waiting for "
3787 "%d in-flight request(s) to drain "
3788 "from queue before continuing "
3789 "to produce",
3790 RD_KAFKAP_STR_PR(rktp->rktp_rkt->
3791 rkt_topic),
3792 rktp->rktp_partition,
3793 inflight);
3794
3795 /* Flush any ProduceRequests for this
3796 * partition in the output buffer queue to
3797 * speed up draining. */
3798 if (!did_purge)
3799 rd_kafka_broker_bufq_purge_by_toppar(
3800 rkb,
3801 &rkb->rkb_outbufs,
3802 RD_KAFKAP_Produce, rktp,
3803 RD_KAFKA_RESP_ERR__RETRY);
3804
3805 return 0;
3806 }
3807
3808 rd_rkb_dbg(rkb, QUEUE, "TOPPAR",
3809 "%.*s [%"PRId32"] all in-flight requests "
3810 "drained from queue",
3811 RD_KAFKAP_STR_PR(rktp->rktp_rkt->rkt_topic),
3812 rktp->rktp_partition);
3813
3814 rktp->rktp_eos.wait_drain = rd_false;
3815 }
3816
3817 /* Limit the number of in-flight requests (per partition)
3818 * to the broker's sequence de-duplication window. */
3819 max_requests = RD_MIN(max_requests,
3820 RD_KAFKA_IDEMP_MAX_INFLIGHT - inflight);
3821 }
3822
3823
3824 /* Check if allowed to create and enqueue a ProduceRequest */
3825 if (max_requests <= 0)
3826 return 0;
3827
3828 r = rktp->rktp_xmit_msgq.rkmq_msg_cnt;
3829 if (r == 0)
3830 return 0;
3831
3832 rd_kafka_msgq_verify_order(rktp, &rktp->rktp_xmit_msgq, 0, rd_false);
3833
3834 rd_rkb_dbg(rkb, QUEUE, "TOPPAR",
3835 "%.*s [%"PRId32"] %d message(s) in "
3836 "xmit queue (%d added from partition queue)",
3837 RD_KAFKAP_STR_PR(rktp->rktp_rkt->rkt_topic),
3838 rktp->rktp_partition,
3839 r, move_cnt);
3840
3841 rkm = TAILQ_FIRST(&rktp->rktp_xmit_msgq.rkmq_msgs);
3842 rd_dassert(rkm != NULL);
3843
3844 if (rd_kafka_is_idempotent(rkb->rkb_rk)) {
3845 /* Update the partition's cached PID, and reset the
3846 * base msg sequence if necessary */
3847 if (unlikely(!rd_kafka_pid_eq(pid, rktp->rktp_eos.pid))) {
3848 /* Attempt to change the pid, it will fail if there
3849 * are outstanding messages in-flight, in which case
3850 * we eventually come back here to retry. */
3851 if (!rd_kafka_toppar_pid_change(
3852 rktp, pid, rkm->rkm_u.producer.msgid))
3853 return 0;
3854 }
3855
3856 rd_kafka_toppar_lock(rktp);
3857 /* Idempotent producer epoch base msgid, this is passed to the
3858 * ProduceRequest and msgset writer to adjust the protocol-level
3859 * per-message sequence number. */
3860 epoch_base_msgid = rktp->rktp_eos.epoch_base_msgid;
3861 rd_kafka_toppar_unlock(rktp);
3862 }
3863
3864 if (unlikely(rkb->rkb_state != RD_KAFKA_BROKER_STATE_UP)) {
3865 /* There are messages to send but connection is not up. */
3866 rd_rkb_dbg(rkb, BROKER, "TOPPAR",
3867 "%.*s [%"PRId32"] "
3868 "%d message(s) queued but broker not up",
3869 RD_KAFKAP_STR_PR(rktp->rktp_rkt->rkt_topic),
3870 rktp->rktp_partition,
3871 r);
3872 rkb->rkb_persistconn.internal++;
3873 return 0;
3874 }
3875
3876 /* Attempt to fill the batch size, but limit our waiting
3877 * to queue.buffering.max.ms, batch.num.messages, and batch.size. */
3878 if (r < rkb->rkb_rk->rk_conf.batch_num_messages &&
3879 rktp->rktp_xmit_msgq.rkmq_msg_bytes <
3880 (int64_t)rkb->rkb_rk->rk_conf.batch_size) {
3881 rd_ts_t wait_max;
3882
3883 /* Calculate maximum wait-time to honour
3884 * queue.buffering.max.ms contract. */
3885 wait_max = rd_kafka_msg_enq_time(rkm) +
3886 rkb->rkb_rk->rk_conf.buffering_max_us;
3887
3888 if (wait_max > now) {
3889 /* Wait for more messages or queue.buffering.max.ms
3890 * to expire. */
3891 if (wait_max < *next_wakeup)
3892 *next_wakeup = wait_max;
3893 return 0;
3894 }
3895 }
3896
3897 /* Honour retry.backoff.ms. */
3898 if (unlikely(rkm->rkm_u.producer.ts_backoff > now)) {
3899 if (rkm->rkm_u.producer.ts_backoff < *next_wakeup)
3900 *next_wakeup = rkm->rkm_u.producer.ts_backoff;
3901 /* Wait for backoff to expire */
3902 return 0;
3903 }
3904
3905 /* Send Produce requests for this toppar, honouring the
3906 * queue backpressure threshold. */
3907 for (reqcnt = 0 ; reqcnt < max_requests ; reqcnt++) {
3908 r = rd_kafka_ProduceRequest(rkb, rktp, pid, epoch_base_msgid);
3909 if (likely(r > 0))
3910 cnt += r;
3911 else
3912 break;
3913 }
3914
3915 /* If there are messages still in the queue, make the next
3916 * wakeup immediate. */
3917 if (rd_kafka_msgq_len(&rktp->rktp_xmit_msgq) > 0)
3918 *next_wakeup = now;
3919
3920 return cnt;
3921 }
3922
3923
3924
3925 /**
3926 * @brief Produce from all toppars assigned to this broker.
3927 *
3928 * @param next_wakeup is updated if the next IO/ops timeout should be
3929 * less than the input value.
3930 *
3931 * @returns the total number of messages produced.
3932 */
rd_kafka_broker_produce_toppars(rd_kafka_broker_t * rkb,rd_ts_t now,rd_ts_t * next_wakeup,rd_bool_t do_timeout_scan)3933 static int rd_kafka_broker_produce_toppars (rd_kafka_broker_t *rkb,
3934 rd_ts_t now,
3935 rd_ts_t *next_wakeup,
3936 rd_bool_t do_timeout_scan) {
3937 rd_kafka_toppar_t *rktp;
3938 int cnt = 0;
3939 rd_ts_t ret_next_wakeup = *next_wakeup;
3940 rd_kafka_pid_t pid = RD_KAFKA_PID_INITIALIZER;
3941 rd_bool_t may_send = rd_true;
3942
3943 /* Round-robin serve each toppar. */
3944 rktp = rkb->rkb_active_toppar_next;
3945 if (unlikely(!rktp))
3946 return 0;
3947
3948 if (rd_kafka_is_idempotent(rkb->rkb_rk)) {
3949 /* Idempotent producer: get a copy of the current pid. */
3950 pid = rd_kafka_idemp_get_pid(rkb->rkb_rk);
3951
3952 /* If we don't have a valid pid, or the transaction state
3953 * prohibits sending messages, return immedatiely,
3954 * unless the per-partition timeout scan needs to run.
3955 * The broker threads are woken up when a PID is acquired
3956 * or the transaction state changes. */
3957 if (!rd_kafka_pid_valid(pid))
3958 may_send = rd_false;
3959 else if (rd_kafka_is_transactional(rkb->rkb_rk) &&
3960 !rd_kafka_txn_may_send_msg(rkb->rkb_rk))
3961 may_send = rd_false;
3962
3963 if (!may_send && !do_timeout_scan)
3964 return 0;
3965 }
3966
3967 do {
3968 rd_ts_t this_next_wakeup = ret_next_wakeup;
3969
3970 /* Try producing toppar */
3971 cnt += rd_kafka_toppar_producer_serve(
3972 rkb, rktp, pid, now, &this_next_wakeup,
3973 do_timeout_scan, may_send);
3974
3975 if (this_next_wakeup < ret_next_wakeup)
3976 ret_next_wakeup = this_next_wakeup;
3977
3978 } while ((rktp = CIRCLEQ_LOOP_NEXT(&rkb->
3979 rkb_active_toppars,
3980 rktp, rktp_activelink)) !=
3981 rkb->rkb_active_toppar_next);
3982
3983 /* Update next starting toppar to produce in round-robin list. */
3984 rd_kafka_broker_active_toppar_next(
3985 rkb,
3986 CIRCLEQ_LOOP_NEXT(&rkb->rkb_active_toppars,
3987 rktp, rktp_activelink));
3988
3989 *next_wakeup = ret_next_wakeup;
3990
3991 return cnt;
3992 }
3993
3994 /**
3995 * @brief Producer serving
3996 */
rd_kafka_broker_producer_serve(rd_kafka_broker_t * rkb,rd_ts_t abs_timeout)3997 static void rd_kafka_broker_producer_serve (rd_kafka_broker_t *rkb,
3998 rd_ts_t abs_timeout) {
3999 rd_interval_t timeout_scan;
4000 unsigned int initial_state = rkb->rkb_state;
4001 rd_ts_t now;
4002 int cnt = 0;
4003
4004 rd_interval_init(&timeout_scan);
4005
4006 rd_kafka_assert(rkb->rkb_rk, thrd_is_current(rkb->rkb_thread));
4007
4008 rd_kafka_broker_lock(rkb);
4009
4010 while (!rd_kafka_broker_terminating(rkb) &&
4011 rkb->rkb_state == initial_state &&
4012 (abs_timeout > (now = rd_clock()))) {
4013 rd_bool_t do_timeout_scan;
4014 rd_ts_t next_wakeup = abs_timeout;
4015
4016 rd_kafka_broker_unlock(rkb);
4017
4018 /* Perform timeout scan on first iteration, thus
4019 * on each state change, to make sure messages in
4020 * partition rktp_xmit_msgq are timed out before
4021 * being attempted to re-transmit. */
4022 do_timeout_scan = cnt++ == 0 ||
4023 rd_interval(&timeout_scan, 1000*1000, now) >= 0;
4024
4025 rd_kafka_broker_produce_toppars(rkb, now, &next_wakeup,
4026 do_timeout_scan);
4027
4028 /* Check and move retry buffers */
4029 if (unlikely(rd_atomic32_get(&rkb->rkb_retrybufs.rkbq_cnt) > 0))
4030 rd_kafka_broker_retry_bufs_move(rkb, &next_wakeup);
4031
4032 if (rd_kafka_broker_ops_io_serve(rkb, next_wakeup))
4033 return; /* Wakeup */
4034
4035 rd_kafka_broker_lock(rkb);
4036 }
4037
4038 rd_kafka_broker_unlock(rkb);
4039 }
4040
4041
4042
4043
4044
4045
4046
4047 /**
4048 * Backoff the next Fetch request (due to error).
4049 */
rd_kafka_broker_fetch_backoff(rd_kafka_broker_t * rkb,rd_kafka_resp_err_t err)4050 static void rd_kafka_broker_fetch_backoff (rd_kafka_broker_t *rkb,
4051 rd_kafka_resp_err_t err) {
4052 int backoff_ms = rkb->rkb_rk->rk_conf.fetch_error_backoff_ms;
4053 rkb->rkb_ts_fetch_backoff = rd_clock() + (backoff_ms * 1000);
4054 rd_rkb_dbg(rkb, FETCH, "BACKOFF",
4055 "Fetch backoff for %dms: %s",
4056 backoff_ms, rd_kafka_err2str(err));
4057 }
4058
4059 /**
4060 * @brief Backoff the next Fetch for specific partition
4061 */
rd_kafka_toppar_fetch_backoff(rd_kafka_broker_t * rkb,rd_kafka_toppar_t * rktp,rd_kafka_resp_err_t err)4062 static void rd_kafka_toppar_fetch_backoff (rd_kafka_broker_t *rkb,
4063 rd_kafka_toppar_t *rktp,
4064 rd_kafka_resp_err_t err) {
4065 int backoff_ms = rkb->rkb_rk->rk_conf.fetch_error_backoff_ms;
4066
4067 /* Don't back off on reaching end of partition */
4068 if (err == RD_KAFKA_RESP_ERR__PARTITION_EOF)
4069 return;
4070
4071 /* Certain errors that may require manual intervention should have
4072 * a longer backoff time. */
4073 if (err == RD_KAFKA_RESP_ERR_TOPIC_AUTHORIZATION_FAILED)
4074 backoff_ms = RD_MAX(1000, backoff_ms * 10);
4075
4076 rktp->rktp_ts_fetch_backoff = rd_clock() + (backoff_ms * 1000);
4077
4078 rd_rkb_dbg(rkb, FETCH, "BACKOFF",
4079 "%s [%"PRId32"]: Fetch backoff for %dms%s%s",
4080 rktp->rktp_rkt->rkt_topic->str, rktp->rktp_partition,
4081 backoff_ms,
4082 err ? ": " : "",
4083 err ? rd_kafka_err2str(err) : "");
4084 }
4085
4086
4087 /**
4088 * @brief Handle preferred replica in fetch response.
4089 *
4090 * @locks rd_kafka_toppar_lock(rktp) and
4091 * rd_kafka_rdlock(rk) must NOT be held.
4092 *
4093 * @locality broker thread
4094 */
4095 static void
rd_kafka_fetch_preferred_replica_handle(rd_kafka_toppar_t * rktp,rd_kafka_buf_t * rkbuf,rd_kafka_broker_t * rkb,int32_t preferred_id)4096 rd_kafka_fetch_preferred_replica_handle (rd_kafka_toppar_t *rktp,
4097 rd_kafka_buf_t *rkbuf,
4098 rd_kafka_broker_t *rkb,
4099 int32_t preferred_id) {
4100 const rd_ts_t one_minute = 60*1000*1000;
4101 const rd_ts_t five_seconds = 5*1000*1000;
4102 rd_kafka_broker_t *preferred_rkb;
4103 rd_kafka_t *rk = rktp->rktp_rkt->rkt_rk;
4104 rd_ts_t new_intvl = rd_interval_immediate(&rktp->rktp_new_lease_intvl,
4105 one_minute, 0);
4106
4107 if (new_intvl < 0) {
4108 /* In lieu of KIP-320, the toppar is delegated back to
4109 * the leader in the event of an offset out-of-range
4110 * error (KIP-392 error case #4) because this scenario
4111 * implies the preferred replica is out-of-sync.
4112 *
4113 * If program execution reaches here, the leader has
4114 * relatively quickly instructed the client back to
4115 * a preferred replica, quite possibly the same one
4116 * as before (possibly resulting from stale metadata),
4117 * so we back off the toppar to slow down potential
4118 * back-and-forth.
4119 */
4120
4121 if (rd_interval_immediate(&rktp->rktp_new_lease_log_intvl,
4122 one_minute, 0) > 0)
4123 rd_rkb_log(rkb, LOG_NOTICE, "FETCH",
4124 "%.*s [%"PRId32"]: preferred replica "
4125 "(%"PRId32") lease changing too quickly "
4126 "(%"PRId64"s < 60s): possibly due to "
4127 "unavailable replica or stale cluster "
4128 "state: backing off next fetch",
4129 RD_KAFKAP_STR_PR(rktp->rktp_rkt->rkt_topic),
4130 rktp->rktp_partition,
4131 preferred_id,
4132 (one_minute - -new_intvl)/(1000*1000));
4133
4134 rd_kafka_toppar_fetch_backoff(rkb,
4135 rktp, RD_KAFKA_RESP_ERR_NO_ERROR);
4136 }
4137
4138 rd_kafka_rdlock(rk);
4139 preferred_rkb = rd_kafka_broker_find_by_nodeid(rk,
4140 preferred_id);
4141 rd_kafka_rdunlock(rk);
4142
4143 if (preferred_rkb) {
4144 rd_interval_reset_to_now(&rktp->rktp_lease_intvl, 0);
4145 rd_kafka_toppar_lock(rktp);
4146 rd_kafka_toppar_broker_update(rktp, preferred_id,
4147 preferred_rkb,
4148 "preferred replica updated");
4149 rd_kafka_toppar_unlock(rktp);
4150 rd_kafka_broker_destroy(preferred_rkb);
4151 return;
4152 }
4153
4154 if (rd_interval_immediate(&rktp->rktp_metadata_intvl,
4155 five_seconds, 0) > 0) {
4156 rd_rkb_log(rkb, LOG_NOTICE, "FETCH",
4157 "%.*s [%"PRId32"]: preferred replica (%"PRId32") "
4158 "is unknown: refreshing metadata",
4159 RD_KAFKAP_STR_PR(rktp->rktp_rkt->rkt_topic),
4160 rktp->rktp_partition,
4161 preferred_id);
4162
4163 rd_kafka_metadata_refresh_brokers(
4164 rktp->rktp_rkt->rkt_rk, NULL,
4165 "preferred replica unavailable");
4166 }
4167
4168 rd_kafka_toppar_fetch_backoff(
4169 rkb, rktp, RD_KAFKA_RESP_ERR_REPLICA_NOT_AVAILABLE);
4170 }
4171
4172
4173 /**
4174 * @brief Handle partition-specific Fetch error.
4175 */
rd_kafka_fetch_reply_handle_partition_error(rd_kafka_broker_t * rkb,rd_kafka_toppar_t * rktp,const struct rd_kafka_toppar_ver * tver,rd_kafka_resp_err_t err,int64_t HighwaterMarkOffset)4176 static void rd_kafka_fetch_reply_handle_partition_error (
4177 rd_kafka_broker_t *rkb,
4178 rd_kafka_toppar_t *rktp,
4179 const struct rd_kafka_toppar_ver *tver,
4180 rd_kafka_resp_err_t err,
4181 int64_t HighwaterMarkOffset) {
4182
4183 /* Some errors should be passed to the
4184 * application while some handled by rdkafka */
4185 switch (err)
4186 {
4187 /* Errors handled by rdkafka */
4188 case RD_KAFKA_RESP_ERR_UNKNOWN_TOPIC_OR_PART:
4189 case RD_KAFKA_RESP_ERR_LEADER_NOT_AVAILABLE:
4190 case RD_KAFKA_RESP_ERR_NOT_LEADER_FOR_PARTITION:
4191 case RD_KAFKA_RESP_ERR_BROKER_NOT_AVAILABLE:
4192 case RD_KAFKA_RESP_ERR_REPLICA_NOT_AVAILABLE:
4193 case RD_KAFKA_RESP_ERR_KAFKA_STORAGE_ERROR:
4194 case RD_KAFKA_RESP_ERR_FENCED_LEADER_EPOCH:
4195 /* Request metadata information update*/
4196 rd_kafka_toppar_leader_unavailable(rktp, "fetch", err);
4197 break;
4198
4199 case RD_KAFKA_RESP_ERR_OFFSET_NOT_AVAILABLE:
4200 /* Occurs when:
4201 * - Msg exists on broker but
4202 * offset > HWM, or:
4203 * - HWM is >= offset, but msg not
4204 * yet available at that offset
4205 * (replica is out of sync).
4206 *
4207 * Handle by retrying FETCH (with backoff).
4208 */
4209 rd_rkb_dbg(rkb, MSG, "FETCH",
4210 "Topic %s [%"PRId32"]: Offset %"PRId64" not "
4211 "available on broker %"PRId32" (leader %"PRId32"): "
4212 "retrying",
4213 rktp->rktp_rkt->rkt_topic->str, rktp->rktp_partition,
4214 rktp->rktp_offsets.
4215 fetch_offset,
4216 rktp->rktp_broker_id,
4217 rktp->rktp_leader_id);
4218 break;
4219
4220 case RD_KAFKA_RESP_ERR_OFFSET_OUT_OF_RANGE:
4221 {
4222 int64_t err_offset;
4223
4224 if (rktp->rktp_broker_id != rktp->rktp_leader_id &&
4225 rktp->rktp_offsets.fetch_offset > HighwaterMarkOffset) {
4226 rd_kafka_log(rkb->rkb_rk,
4227 LOG_WARNING, "FETCH",
4228 "Topic %s [%"PRId32"]: Offset %"PRId64
4229 " out of range (HighwaterMark %"PRId64
4230 " fetching from "
4231 "broker %"PRId32" (leader %"PRId32"): "
4232 "reverting to leader",
4233 rktp->rktp_rkt->rkt_topic->str,
4234 rktp->rktp_partition,
4235 rktp->rktp_offsets.fetch_offset,
4236 HighwaterMarkOffset,
4237 rktp->rktp_broker_id,
4238 rktp->rktp_leader_id);
4239
4240 /* Out of range error cannot be taken as definitive
4241 * when fetching from follower.
4242 * Revert back to the leader in lieu of KIP-320.
4243 */
4244 rd_kafka_toppar_delegate_to_leader(rktp);
4245 break;
4246 }
4247
4248 /* Application error */
4249 err_offset = rktp->rktp_offsets.fetch_offset;
4250 rktp->rktp_offsets.fetch_offset = RD_KAFKA_OFFSET_INVALID;
4251 rd_kafka_offset_reset(rktp, err_offset, err,
4252 "fetch failed due to requested offset "
4253 "not available on the broker");
4254 }
4255 break;
4256
4257 case RD_KAFKA_RESP_ERR_TOPIC_AUTHORIZATION_FAILED:
4258 /* If we're not authorized to access the
4259 * topic mark it as errored to deny
4260 * further Fetch requests. */
4261 if (rktp->rktp_last_error != err) {
4262 rd_kafka_consumer_err(
4263 rktp->rktp_fetchq,
4264 rd_kafka_broker_id(rkb),
4265 err,
4266 tver->version,
4267 NULL, rktp,
4268 rktp->rktp_offsets.fetch_offset,
4269 "Fetch from broker %"PRId32" failed: %s",
4270 rd_kafka_broker_id(rkb),
4271 rd_kafka_err2str(err));
4272 rktp->rktp_last_error = err;
4273 }
4274 break;
4275
4276
4277 /* Application errors */
4278 case RD_KAFKA_RESP_ERR__PARTITION_EOF:
4279 if (rkb->rkb_rk->rk_conf.enable_partition_eof)
4280 rd_kafka_consumer_err(
4281 rktp->rktp_fetchq,
4282 rd_kafka_broker_id(rkb),
4283 err, tver->version,
4284 NULL, rktp,
4285 rktp->rktp_offsets.fetch_offset,
4286 "Fetch from broker %"PRId32" reached end of "
4287 "partition at offset %"PRId64
4288 " (HighwaterMark %"PRId64")",
4289 rd_kafka_broker_id(rkb),
4290 rktp->rktp_offsets.fetch_offset,
4291 HighwaterMarkOffset);
4292 break;
4293
4294 case RD_KAFKA_RESP_ERR_MSG_SIZE_TOO_LARGE:
4295 default: /* and all other errors */
4296 rd_dassert(tver->version > 0);
4297 rd_kafka_consumer_err(
4298 rktp->rktp_fetchq,
4299 rd_kafka_broker_id(rkb),
4300 err, tver->version,
4301 NULL, rktp,
4302 rktp->rktp_offsets.fetch_offset,
4303 "Fetch from broker %"PRId32" failed: %s",
4304 rd_kafka_broker_id(rkb),
4305 rd_kafka_err2str(err));
4306 break;
4307 }
4308
4309 /* Back off the next fetch for this partition */
4310 rd_kafka_toppar_fetch_backoff(rkb, rktp, err);
4311 }
4312
4313
4314
4315 /**
4316 * Parses and handles a Fetch reply.
4317 * Returns 0 on success or an error code on failure.
4318 */
4319 static rd_kafka_resp_err_t
rd_kafka_fetch_reply_handle(rd_kafka_broker_t * rkb,rd_kafka_buf_t * rkbuf,rd_kafka_buf_t * request)4320 rd_kafka_fetch_reply_handle (rd_kafka_broker_t *rkb,
4321 rd_kafka_buf_t *rkbuf, rd_kafka_buf_t *request) {
4322 int32_t TopicArrayCnt;
4323 int i;
4324 const int log_decode_errors = LOG_ERR;
4325 rd_kafka_topic_t *rkt = NULL;
4326 int16_t ErrorCode = RD_KAFKA_RESP_ERR_NO_ERROR;
4327
4328 if (rd_kafka_buf_ApiVersion(request) >= 1) {
4329 int32_t Throttle_Time;
4330 rd_kafka_buf_read_i32(rkbuf, &Throttle_Time);
4331
4332 rd_kafka_op_throttle_time(rkb, rkb->rkb_rk->rk_rep,
4333 Throttle_Time);
4334 }
4335
4336 if (rd_kafka_buf_ApiVersion(request) >= 7) {
4337 int32_t SessionId;
4338 rd_kafka_buf_read_i16(rkbuf, &ErrorCode);
4339 rd_kafka_buf_read_i32(rkbuf, &SessionId);
4340 }
4341
4342 rd_kafka_buf_read_i32(rkbuf, &TopicArrayCnt);
4343 /* Verify that TopicArrayCnt seems to be in line with remaining size */
4344 rd_kafka_buf_check_len(rkbuf,
4345 TopicArrayCnt * (3/*topic min size*/ +
4346 4/*PartitionArrayCnt*/ +
4347 4+2+8+4/*inner header*/));
4348
4349 for (i = 0 ; i < TopicArrayCnt ; i++) {
4350 rd_kafkap_str_t topic;
4351 int32_t fetch_version;
4352 int32_t PartitionArrayCnt;
4353 int j;
4354
4355 rd_kafka_buf_read_str(rkbuf, &topic);
4356 rd_kafka_buf_read_i32(rkbuf, &PartitionArrayCnt);
4357
4358 rkt = rd_kafka_topic_find0(rkb->rkb_rk, &topic);
4359
4360 for (j = 0 ; j < PartitionArrayCnt ; j++) {
4361 struct rd_kafka_toppar_ver *tver, tver_skel;
4362 rd_kafka_toppar_t *rktp = NULL;
4363 rd_kafka_aborted_txns_t *aborted_txns = NULL;
4364 rd_slice_t save_slice;
4365 struct {
4366 int32_t Partition;
4367 int16_t ErrorCode;
4368 int64_t HighwaterMarkOffset;
4369 int64_t LastStableOffset; /* v4 */
4370 int64_t LogStartOffset; /* v5 */
4371 int32_t MessageSetSize;
4372 int32_t PreferredReadReplica; /* v11 */
4373 } hdr;
4374 rd_kafka_resp_err_t err;
4375 int64_t end_offset;
4376
4377 rd_kafka_buf_read_i32(rkbuf, &hdr.Partition);
4378 rd_kafka_buf_read_i16(rkbuf, &hdr.ErrorCode);
4379 if (ErrorCode)
4380 hdr.ErrorCode = ErrorCode;
4381 rd_kafka_buf_read_i64(rkbuf, &hdr.HighwaterMarkOffset);
4382
4383 end_offset = hdr.HighwaterMarkOffset;
4384
4385 hdr.LastStableOffset = RD_KAFKA_OFFSET_INVALID;
4386 hdr.LogStartOffset = RD_KAFKA_OFFSET_INVALID;
4387 if (rd_kafka_buf_ApiVersion(request) >= 4) {
4388 int32_t AbortedTxnCnt;
4389 rd_kafka_buf_read_i64(rkbuf,
4390 &hdr.LastStableOffset);
4391 if (rd_kafka_buf_ApiVersion(request) >= 5)
4392 rd_kafka_buf_read_i64(rkbuf,
4393 &hdr.LogStartOffset);
4394
4395 rd_kafka_buf_read_i32(rkbuf,
4396 &AbortedTxnCnt);
4397
4398 if (rkb->rkb_rk->rk_conf.isolation_level ==
4399 RD_KAFKA_READ_UNCOMMITTED) {
4400
4401 if (unlikely(AbortedTxnCnt > 0)) {
4402 rd_rkb_log(rkb, LOG_ERR,
4403 "FETCH",
4404 "%.*s [%"PRId32"]: "
4405 "%"PRId32" aborted "
4406 "transaction(s) "
4407 "encountered in "
4408 "READ_UNCOMMITTED "
4409 "fetch response: "
4410 "ignoring.",
4411 RD_KAFKAP_STR_PR(
4412 &topic),
4413 hdr.Partition,
4414 AbortedTxnCnt);
4415
4416 rd_kafka_buf_skip(rkbuf,
4417 AbortedTxnCnt
4418 * (8+8));
4419 }
4420 } else {
4421 /* Older brokers may return LSO -1,
4422 * in which case we use the HWM. */
4423 if (hdr.LastStableOffset >= 0)
4424 end_offset = hdr.LastStableOffset;
4425
4426 if (AbortedTxnCnt > 0) {
4427 int k;
4428
4429 if (unlikely(AbortedTxnCnt > 1000000))
4430 rd_kafka_buf_parse_fail(
4431 rkbuf,
4432 "%.*s [%"PRId32"]: "
4433 "invalid AbortedTxnCnt %"PRId32,
4434 RD_KAFKAP_STR_PR(&topic),
4435 hdr.Partition,
4436 AbortedTxnCnt);
4437
4438 aborted_txns = rd_kafka_aborted_txns_new(AbortedTxnCnt);
4439 for (k = 0 ; k < AbortedTxnCnt; k++) {
4440 int64_t PID;
4441 int64_t FirstOffset;
4442 rd_kafka_buf_read_i64(rkbuf, &PID);
4443 rd_kafka_buf_read_i64(rkbuf, &FirstOffset);
4444 rd_kafka_aborted_txns_add(aborted_txns, PID, FirstOffset);
4445 }
4446 rd_kafka_aborted_txns_sort(aborted_txns);
4447 }
4448 }
4449 }
4450
4451 if (rd_kafka_buf_ApiVersion(request) >= 11)
4452 rd_kafka_buf_read_i32(rkbuf,
4453 &hdr.PreferredReadReplica);
4454 else
4455 hdr.PreferredReadReplica = -1;
4456
4457 rd_kafka_buf_read_i32(rkbuf, &hdr.MessageSetSize);
4458
4459 if (unlikely(hdr.MessageSetSize < 0))
4460 rd_kafka_buf_parse_fail(
4461 rkbuf,
4462 "%.*s [%"PRId32"]: "
4463 "invalid MessageSetSize %"PRId32,
4464 RD_KAFKAP_STR_PR(&topic),
4465 hdr.Partition,
4466 hdr.MessageSetSize);
4467
4468 /* Look up topic+partition */
4469 if (likely(rkt != NULL)) {
4470 rd_kafka_topic_rdlock(rkt);
4471 rktp = rd_kafka_toppar_get(
4472 rkt, hdr.Partition, 0/*no ua-on-miss*/);
4473 rd_kafka_topic_rdunlock(rkt);
4474 }
4475
4476 if (unlikely(!rkt || !rktp)) {
4477 rd_rkb_dbg(rkb, TOPIC, "UNKTOPIC",
4478 "Received Fetch response "
4479 "(error %hu) for unknown topic "
4480 "%.*s [%"PRId32"]: ignoring",
4481 hdr.ErrorCode,
4482 RD_KAFKAP_STR_PR(&topic),
4483 hdr.Partition);
4484 rd_kafka_buf_skip(rkbuf, hdr.MessageSetSize);
4485 if (aborted_txns)
4486 rd_kafka_aborted_txns_destroy(
4487 aborted_txns);
4488 continue;
4489 }
4490
4491 rd_kafka_toppar_lock(rktp);
4492 rktp->rktp_lo_offset = hdr.LogStartOffset;
4493 rktp->rktp_hi_offset = hdr.HighwaterMarkOffset;
4494 /* Let the LastStable offset be the effective
4495 * end_offset based on protocol version, that is:
4496 * if connected to a broker that does not support
4497 * LastStableOffset we use the HighwaterMarkOffset. */
4498 rktp->rktp_ls_offset = end_offset;
4499 rd_kafka_toppar_unlock(rktp);
4500
4501 if (hdr.PreferredReadReplica != -1) {
4502
4503 rd_kafka_fetch_preferred_replica_handle(
4504 rktp, rkbuf, rkb,
4505 hdr.PreferredReadReplica);
4506
4507 if (unlikely(hdr.MessageSetSize != 0)) {
4508 rd_rkb_log(rkb, LOG_WARNING, "FETCH",
4509 "%.*s [%"PRId32"]: Fetch "
4510 "response has both "
4511 "preferred read replica "
4512 "and non-zero message set "
4513 "size: %"PRId32": "
4514 "skipping messages",
4515 RD_KAFKAP_STR_PR(rktp->
4516 rktp_rkt->rkt_topic),
4517 rktp->rktp_partition,
4518 hdr.MessageSetSize);
4519 rd_kafka_buf_skip(rkbuf,
4520 hdr.MessageSetSize);
4521 }
4522
4523 if (aborted_txns)
4524 rd_kafka_aborted_txns_destroy(
4525 aborted_txns);
4526 rd_kafka_toppar_destroy(rktp); /* from get */
4527 continue;
4528 }
4529
4530 rd_kafka_toppar_lock(rktp);
4531
4532 /* Make sure toppar hasn't moved to another broker
4533 * during the lifetime of the request. */
4534 if (unlikely(rktp->rktp_broker != rkb)) {
4535 rd_kafka_toppar_unlock(rktp);
4536 rd_rkb_dbg(rkb, MSG, "FETCH",
4537 "%.*s [%"PRId32"]: "
4538 "partition broker has changed: "
4539 "discarding fetch response",
4540 RD_KAFKAP_STR_PR(&topic),
4541 hdr.Partition);
4542 rd_kafka_toppar_destroy(rktp); /* from get */
4543 rd_kafka_buf_skip(rkbuf, hdr.MessageSetSize);
4544 if (aborted_txns)
4545 rd_kafka_aborted_txns_destroy(
4546 aborted_txns);
4547 continue;
4548 }
4549 fetch_version = rktp->rktp_fetch_version;
4550 rd_kafka_toppar_unlock(rktp);
4551
4552 /* Check if this Fetch is for an outdated fetch version,
4553 * or the original rktp was removed and a new one
4554 * created (due to partition count decreasing and
4555 * then increasing again, which can happen in
4556 * desynchronized clusters): if so ignore it. */
4557 tver_skel.rktp = rktp;
4558 tver = rd_list_find(request->rkbuf_rktp_vers,
4559 &tver_skel,
4560 rd_kafka_toppar_ver_cmp);
4561 rd_kafka_assert(NULL, tver);
4562 if (tver->rktp != rktp ||
4563 tver->version < fetch_version) {
4564 rd_rkb_dbg(rkb, MSG, "DROP",
4565 "%s [%"PRId32"]: "
4566 "dropping outdated fetch response "
4567 "(v%d < %d or old rktp)",
4568 rktp->rktp_rkt->rkt_topic->str,
4569 rktp->rktp_partition,
4570 tver->version, fetch_version);
4571 rd_atomic64_add(&rktp->rktp_c. rx_ver_drops, 1);
4572 rd_kafka_toppar_destroy(rktp); /* from get */
4573 rd_kafka_buf_skip(rkbuf, hdr.MessageSetSize);
4574 if (aborted_txns)
4575 rd_kafka_aborted_txns_destroy(
4576 aborted_txns);
4577 continue;
4578 }
4579
4580 rd_rkb_dbg(rkb, MSG, "FETCH",
4581 "Topic %.*s [%"PRId32"] MessageSet "
4582 "size %"PRId32", error \"%s\", "
4583 "MaxOffset %"PRId64", "
4584 "LSO %"PRId64", "
4585 "Ver %"PRId32"/%"PRId32,
4586 RD_KAFKAP_STR_PR(&topic), hdr.Partition,
4587 hdr.MessageSetSize,
4588 rd_kafka_err2str(hdr.ErrorCode),
4589 hdr.HighwaterMarkOffset,
4590 hdr.LastStableOffset,
4591 tver->version, fetch_version);
4592
4593 /* If this is the last message of the queue,
4594 * signal EOF back to the application. */
4595 if (end_offset ==
4596 rktp->rktp_offsets.fetch_offset
4597 &&
4598 rktp->rktp_offsets.eof_offset !=
4599 rktp->rktp_offsets.fetch_offset) {
4600 hdr.ErrorCode =
4601 RD_KAFKA_RESP_ERR__PARTITION_EOF;
4602 rktp->rktp_offsets.eof_offset =
4603 rktp->rktp_offsets.fetch_offset;
4604 }
4605
4606 if (unlikely(hdr.ErrorCode !=
4607 RD_KAFKA_RESP_ERR_NO_ERROR)) {
4608 /* Handle partition-level errors. */
4609 rd_kafka_fetch_reply_handle_partition_error(
4610 rkb, rktp, tver, hdr.ErrorCode,
4611 hdr.HighwaterMarkOffset);
4612
4613 rd_kafka_toppar_destroy(rktp); /* from get()*/
4614
4615 rd_kafka_buf_skip(rkbuf, hdr.MessageSetSize);
4616
4617 if (aborted_txns)
4618 rd_kafka_aborted_txns_destroy(
4619 aborted_txns);
4620 continue;
4621 }
4622
4623 /* No error, clear any previous fetch error. */
4624 rktp->rktp_last_error = RD_KAFKA_RESP_ERR_NO_ERROR;
4625
4626 if (unlikely(hdr.MessageSetSize <= 0)) {
4627 rd_kafka_toppar_destroy(rktp); /*from get()*/
4628 if (aborted_txns)
4629 rd_kafka_aborted_txns_destroy(
4630 aborted_txns);
4631 continue;
4632 }
4633
4634 /**
4635 * Parse MessageSet
4636 */
4637 if (!rd_slice_narrow_relative(
4638 &rkbuf->rkbuf_reader,
4639 &save_slice,
4640 (size_t)hdr.MessageSetSize))
4641 rd_kafka_buf_check_len(rkbuf,
4642 hdr.MessageSetSize);
4643
4644 /* Parse messages */
4645 err = rd_kafka_msgset_parse(
4646 rkbuf, request, rktp, aborted_txns, tver);
4647
4648 if (aborted_txns)
4649 rd_kafka_aborted_txns_destroy(
4650 aborted_txns);
4651
4652 rd_slice_widen(&rkbuf->rkbuf_reader, &save_slice);
4653 /* Continue with next partition regardless of
4654 * parse errors (which are partition-specific) */
4655
4656 /* On error: back off the fetcher for this partition */
4657 if (unlikely(err))
4658 rd_kafka_toppar_fetch_backoff(rkb, rktp, err);
4659
4660 rd_kafka_toppar_destroy(rktp); /* from get */
4661 }
4662
4663 if (rkt) {
4664 rd_kafka_topic_destroy0(rkt);
4665 rkt = NULL;
4666 }
4667 }
4668
4669 if (rd_kafka_buf_read_remain(rkbuf) != 0) {
4670 rd_kafka_buf_parse_fail(rkbuf,
4671 "Remaining data after message set "
4672 "parse: %"PRIusz" bytes",
4673 rd_kafka_buf_read_remain(rkbuf));
4674 RD_NOTREACHED();
4675 }
4676
4677 return 0;
4678
4679 err_parse:
4680 if (rkt)
4681 rd_kafka_topic_destroy0(rkt);
4682 rd_rkb_dbg(rkb, MSG, "BADMSG", "Bad message (Fetch v%d): "
4683 "is broker.version.fallback incorrectly set?",
4684 (int)request->rkbuf_reqhdr.ApiVersion);
4685 return rkbuf->rkbuf_err;
4686 }
4687
4688
4689
rd_kafka_broker_fetch_reply(rd_kafka_t * rk,rd_kafka_broker_t * rkb,rd_kafka_resp_err_t err,rd_kafka_buf_t * reply,rd_kafka_buf_t * request,void * opaque)4690 static void rd_kafka_broker_fetch_reply (rd_kafka_t *rk,
4691 rd_kafka_broker_t *rkb,
4692 rd_kafka_resp_err_t err,
4693 rd_kafka_buf_t *reply,
4694 rd_kafka_buf_t *request,
4695 void *opaque) {
4696
4697 if (err == RD_KAFKA_RESP_ERR__DESTROY)
4698 return; /* Terminating */
4699
4700 rd_kafka_assert(rkb->rkb_rk, rkb->rkb_fetching > 0);
4701 rkb->rkb_fetching = 0;
4702
4703 /* Parse and handle the messages (unless the request errored) */
4704 if (!err && reply)
4705 err = rd_kafka_fetch_reply_handle(rkb, reply, request);
4706
4707 if (unlikely(err)) {
4708 char tmp[128];
4709
4710 rd_rkb_dbg(rkb, MSG, "FETCH", "Fetch reply: %s",
4711 rd_kafka_err2str(err));
4712 switch (err)
4713 {
4714 case RD_KAFKA_RESP_ERR_UNKNOWN_TOPIC_OR_PART:
4715 case RD_KAFKA_RESP_ERR_LEADER_NOT_AVAILABLE:
4716 case RD_KAFKA_RESP_ERR_NOT_LEADER_FOR_PARTITION:
4717 case RD_KAFKA_RESP_ERR_BROKER_NOT_AVAILABLE:
4718 case RD_KAFKA_RESP_ERR_REPLICA_NOT_AVAILABLE:
4719 /* Request metadata information update */
4720 rd_snprintf(tmp, sizeof(tmp),
4721 "FetchRequest failed: %s",
4722 rd_kafka_err2str(err));
4723 rd_kafka_metadata_refresh_known_topics(
4724 rkb->rkb_rk, NULL, rd_true/*force*/, tmp);
4725 /* FALLTHRU */
4726
4727 case RD_KAFKA_RESP_ERR__TRANSPORT:
4728 case RD_KAFKA_RESP_ERR_REQUEST_TIMED_OUT:
4729 case RD_KAFKA_RESP_ERR__MSG_TIMED_OUT:
4730 /* The fetch is already intervalled from
4731 * consumer_serve() so dont retry. */
4732 break;
4733
4734 default:
4735 break;
4736 }
4737
4738 rd_kafka_broker_fetch_backoff(rkb, err);
4739 /* FALLTHRU */
4740 }
4741 }
4742
4743
4744
4745
4746
4747
4748
4749
4750
4751
4752
4753 /**
4754 * Build and send a Fetch request message for all underflowed toppars
4755 * for a specific broker.
4756 */
rd_kafka_broker_fetch_toppars(rd_kafka_broker_t * rkb,rd_ts_t now)4757 static int rd_kafka_broker_fetch_toppars (rd_kafka_broker_t *rkb, rd_ts_t now) {
4758 rd_kafka_toppar_t *rktp;
4759 rd_kafka_buf_t *rkbuf;
4760 int cnt = 0;
4761 size_t of_TopicArrayCnt = 0;
4762 int TopicArrayCnt = 0;
4763 size_t of_PartitionArrayCnt = 0;
4764 int PartitionArrayCnt = 0;
4765 rd_kafka_topic_t *rkt_last = NULL;
4766 int16_t ApiVersion = 0;
4767
4768 /* Create buffer and segments:
4769 * 1 x ReplicaId MaxWaitTime MinBytes TopicArrayCnt
4770 * N x topic name
4771 * N x PartitionArrayCnt Partition FetchOffset MaxBytes
4772 * where N = number of toppars.
4773 * Since we dont keep track of the number of topics served by
4774 * this broker, only the partition count, we do a worst-case calc
4775 * when allocating and assume each partition is on its own topic
4776 */
4777
4778 if (unlikely(rkb->rkb_active_toppar_cnt == 0))
4779 return 0;
4780
4781 rkbuf = rd_kafka_buf_new_request(
4782 rkb, RD_KAFKAP_Fetch, 1,
4783 /* ReplicaId+MaxWaitTime+MinBytes+MaxBytes+IsolationLevel+
4784 * SessionId+Epoch+TopicCnt */
4785 4+4+4+4+1+4+4+4+
4786 /* N x PartCnt+Partition+CurrentLeaderEpoch+FetchOffset+
4787 * LogStartOffset+MaxBytes+?TopicNameLen?*/
4788 (rkb->rkb_active_toppar_cnt * (4+4+4+8+8+4+40)) +
4789 /* ForgottenTopicsCnt */
4790 4+
4791 /* N x ForgottenTopicsData */
4792 0);
4793
4794 ApiVersion = rd_kafka_broker_ApiVersion_supported(
4795 rkb, RD_KAFKAP_Fetch, 0, 11, NULL);
4796
4797 if (rkb->rkb_features & RD_KAFKA_FEATURE_MSGVER2)
4798 rd_kafka_buf_ApiVersion_set(rkbuf, ApiVersion,
4799 RD_KAFKA_FEATURE_MSGVER2);
4800 else if (rkb->rkb_features & RD_KAFKA_FEATURE_MSGVER1)
4801 rd_kafka_buf_ApiVersion_set(rkbuf, ApiVersion,
4802 RD_KAFKA_FEATURE_MSGVER1);
4803 else if (rkb->rkb_features & RD_KAFKA_FEATURE_THROTTLETIME)
4804 rd_kafka_buf_ApiVersion_set(rkbuf, ApiVersion,
4805 RD_KAFKA_FEATURE_THROTTLETIME);
4806
4807
4808 /* FetchRequest header */
4809 /* ReplicaId */
4810 rd_kafka_buf_write_i32(rkbuf, -1);
4811 /* MaxWaitTime */
4812 rd_kafka_buf_write_i32(rkbuf, rkb->rkb_rk->rk_conf.fetch_wait_max_ms);
4813 /* MinBytes */
4814 rd_kafka_buf_write_i32(rkbuf, rkb->rkb_rk->rk_conf.fetch_min_bytes);
4815
4816 if (rd_kafka_buf_ApiVersion(rkbuf) >= 3)
4817 /* MaxBytes */
4818 rd_kafka_buf_write_i32(rkbuf,
4819 rkb->rkb_rk->rk_conf.fetch_max_bytes);
4820
4821 if (rd_kafka_buf_ApiVersion(rkbuf) >= 4)
4822 /* IsolationLevel */
4823 rd_kafka_buf_write_i8(rkbuf,
4824 rkb->rkb_rk->rk_conf.isolation_level);
4825
4826 if (rd_kafka_buf_ApiVersion(rkbuf) >= 7) {
4827 /* SessionId */
4828 rd_kafka_buf_write_i32(rkbuf, 0);
4829 /* Epoch */
4830 rd_kafka_buf_write_i32(rkbuf, -1);
4831 }
4832
4833 /* Write zero TopicArrayCnt but store pointer for later update */
4834 of_TopicArrayCnt = rd_kafka_buf_write_i32(rkbuf, 0);
4835
4836 /* Prepare map for storing the fetch version for each partition,
4837 * this will later be checked in Fetch response to purge outdated
4838 * responses (e.g., after a seek). */
4839 rkbuf->rkbuf_rktp_vers = rd_list_new(
4840 0, (void *)rd_kafka_toppar_ver_destroy);
4841 rd_list_prealloc_elems(rkbuf->rkbuf_rktp_vers,
4842 sizeof(struct rd_kafka_toppar_ver),
4843 rkb->rkb_active_toppar_cnt, 0);
4844
4845 /* Round-robin start of the list. */
4846 rktp = rkb->rkb_active_toppar_next;
4847 do {
4848 struct rd_kafka_toppar_ver *tver;
4849
4850 if (rkt_last != rktp->rktp_rkt) {
4851 if (rkt_last != NULL) {
4852 /* Update PartitionArrayCnt */
4853 rd_kafka_buf_update_i32(rkbuf,
4854 of_PartitionArrayCnt,
4855 PartitionArrayCnt);
4856 }
4857
4858 /* Topic name */
4859 rd_kafka_buf_write_kstr(rkbuf,
4860 rktp->rktp_rkt->rkt_topic);
4861 TopicArrayCnt++;
4862 rkt_last = rktp->rktp_rkt;
4863 /* Partition count */
4864 of_PartitionArrayCnt = rd_kafka_buf_write_i32(rkbuf, 0);
4865 PartitionArrayCnt = 0;
4866 }
4867
4868 PartitionArrayCnt++;
4869
4870 /* Partition */
4871 rd_kafka_buf_write_i32(rkbuf, rktp->rktp_partition);
4872
4873 if (rd_kafka_buf_ApiVersion(rkbuf) >= 9)
4874 /* CurrentLeaderEpoch */
4875 rd_kafka_buf_write_i32(rkbuf, -1);
4876
4877 /* FetchOffset */
4878 rd_kafka_buf_write_i64(rkbuf, rktp->rktp_offsets.fetch_offset);
4879
4880 if (rd_kafka_buf_ApiVersion(rkbuf) >= 5)
4881 /* LogStartOffset - only used by follower replica */
4882 rd_kafka_buf_write_i64(rkbuf, -1);
4883
4884 /* MaxBytes */
4885 rd_kafka_buf_write_i32(rkbuf, rktp->rktp_fetch_msg_max_bytes);
4886
4887 rd_rkb_dbg(rkb, FETCH, "FETCH",
4888 "Fetch topic %.*s [%"PRId32"] at offset %"PRId64
4889 " (v%d)",
4890 RD_KAFKAP_STR_PR(rktp->rktp_rkt->rkt_topic),
4891 rktp->rktp_partition,
4892 rktp->rktp_offsets.fetch_offset,
4893 rktp->rktp_fetch_version);
4894
4895 /* We must have a valid fetch offset when we get here */
4896 rd_dassert(rktp->rktp_offsets.fetch_offset >= 0);
4897
4898 /* Add toppar + op version mapping. */
4899 tver = rd_list_add(rkbuf->rkbuf_rktp_vers, NULL);
4900 tver->rktp = rd_kafka_toppar_keep(rktp);
4901 tver->version = rktp->rktp_fetch_version;
4902
4903 cnt++;
4904 } while ((rktp = CIRCLEQ_LOOP_NEXT(&rkb->rkb_active_toppars,
4905 rktp, rktp_activelink)) !=
4906 rkb->rkb_active_toppar_next);
4907
4908 /* Update next toppar to fetch in round-robin list. */
4909 rd_kafka_broker_active_toppar_next(
4910 rkb,
4911 rktp ?
4912 CIRCLEQ_LOOP_NEXT(&rkb->rkb_active_toppars,
4913 rktp, rktp_activelink) : NULL);
4914
4915 rd_rkb_dbg(rkb, FETCH, "FETCH", "Fetch %i/%i/%i toppar(s)",
4916 cnt, rkb->rkb_active_toppar_cnt, rkb->rkb_toppar_cnt);
4917 if (!cnt) {
4918 rd_kafka_buf_destroy(rkbuf);
4919 return cnt;
4920 }
4921
4922 if (rkt_last != NULL) {
4923 /* Update last topic's PartitionArrayCnt */
4924 rd_kafka_buf_update_i32(rkbuf,
4925 of_PartitionArrayCnt,
4926 PartitionArrayCnt);
4927 }
4928
4929 /* Update TopicArrayCnt */
4930 rd_kafka_buf_update_i32(rkbuf, of_TopicArrayCnt, TopicArrayCnt);
4931
4932
4933 if (rd_kafka_buf_ApiVersion(rkbuf) >= 7)
4934 /* Length of the ForgottenTopics list (KIP-227). Broker
4935 * use only - not used by the consumer. */
4936 rd_kafka_buf_write_i32(rkbuf, 0);
4937
4938 if (rd_kafka_buf_ApiVersion(rkbuf) >= 11)
4939 /* RackId */
4940 rd_kafka_buf_write_kstr(rkbuf,
4941 rkb->rkb_rk->rk_conf.client_rack);
4942
4943 /* Consider Fetch requests blocking if fetch.wait.max.ms >= 1s */
4944 if (rkb->rkb_rk->rk_conf.fetch_wait_max_ms >= 1000)
4945 rkbuf->rkbuf_flags |= RD_KAFKA_OP_F_BLOCKING;
4946
4947 /* Use configured timeout */
4948 rd_kafka_buf_set_timeout(rkbuf,
4949 rkb->rkb_rk->rk_conf.socket_timeout_ms +
4950 rkb->rkb_rk->rk_conf.fetch_wait_max_ms,
4951 now);
4952
4953 /* Sort toppar versions for quicker lookups in Fetch response. */
4954 rd_list_sort(rkbuf->rkbuf_rktp_vers, rd_kafka_toppar_ver_cmp);
4955
4956 rkb->rkb_fetching = 1;
4957 rd_kafka_broker_buf_enq1(rkb, rkbuf, rd_kafka_broker_fetch_reply, NULL);
4958
4959 return cnt;
4960 }
4961
4962
4963
4964
4965 /**
4966 * Consumer serving
4967 */
rd_kafka_broker_consumer_serve(rd_kafka_broker_t * rkb,rd_ts_t abs_timeout)4968 static void rd_kafka_broker_consumer_serve (rd_kafka_broker_t *rkb,
4969 rd_ts_t abs_timeout) {
4970 unsigned int initial_state = rkb->rkb_state;
4971 rd_ts_t now;
4972
4973 rd_kafka_assert(rkb->rkb_rk, thrd_is_current(rkb->rkb_thread));
4974
4975 rd_kafka_broker_lock(rkb);
4976
4977 while (!rd_kafka_broker_terminating(rkb) &&
4978 rkb->rkb_state == initial_state &&
4979 abs_timeout > (now = rd_clock())) {
4980 rd_ts_t min_backoff;
4981
4982 rd_kafka_broker_unlock(rkb);
4983
4984 /* Serve toppars */
4985 min_backoff = rd_kafka_broker_consumer_toppars_serve(rkb);
4986 if (rkb->rkb_ts_fetch_backoff > now &&
4987 rkb->rkb_ts_fetch_backoff < min_backoff)
4988 min_backoff = rkb->rkb_ts_fetch_backoff;
4989
4990 if (min_backoff < RD_TS_MAX &&
4991 rkb->rkb_state != RD_KAFKA_BROKER_STATE_UP) {
4992 /* There are partitions to fetch but the
4993 * connection is not up. */
4994 rkb->rkb_persistconn.internal++;
4995 }
4996
4997 /* Send Fetch request message for all underflowed toppars
4998 * if the connection is up and there are no outstanding
4999 * fetch requests for this connection. */
5000 if (!rkb->rkb_fetching &&
5001 rkb->rkb_state == RD_KAFKA_BROKER_STATE_UP) {
5002 if (min_backoff < now) {
5003 rd_kafka_broker_fetch_toppars(rkb, now);
5004 min_backoff = abs_timeout;
5005 } else if (min_backoff < RD_TS_MAX)
5006 rd_rkb_dbg(rkb, FETCH, "FETCH",
5007 "Fetch backoff for %"PRId64
5008 "ms",
5009 (min_backoff-now)/1000);
5010 } else {
5011 /* Nothing needs to be done, next wakeup
5012 * is from ops, state change, IO, or this timeout */
5013 min_backoff = abs_timeout;
5014 }
5015
5016 /* Check and move retry buffers */
5017 if (unlikely(rd_atomic32_get(&rkb->rkb_retrybufs.rkbq_cnt) > 0))
5018 rd_kafka_broker_retry_bufs_move(rkb, &min_backoff);
5019
5020 if (min_backoff > abs_timeout)
5021 min_backoff = abs_timeout;
5022
5023 if (rd_kafka_broker_ops_io_serve(rkb, min_backoff))
5024 return; /* Wakeup */
5025
5026 rd_kafka_broker_lock(rkb);
5027 }
5028
5029 rd_kafka_broker_unlock(rkb);
5030 }
5031
5032
5033
5034 /**
5035 * @brief Check if connections.max.idle.ms has been exceeded and if so
5036 * close the connection.
5037 *
5038 * @remark Must only be called if connections.max.idle.ms > 0 and
5039 * the current broker state is UP (or UPDATE).
5040 *
5041 * @locality broker thread
5042 */
rd_kafka_broker_idle_check(rd_kafka_broker_t * rkb)5043 static RD_INLINE void rd_kafka_broker_idle_check (rd_kafka_broker_t *rkb) {
5044 rd_ts_t ts_send = rd_atomic64_get(&rkb->rkb_c.ts_send);
5045 rd_ts_t ts_recv = rd_atomic64_get(&rkb->rkb_c.ts_recv);
5046 rd_ts_t ts_last_activity = RD_MAX(ts_send, ts_recv);
5047 int idle_ms;
5048
5049 /* If nothing has been sent yet, use the connection time as
5050 * last activity. */
5051 if (unlikely(!ts_last_activity))
5052 ts_last_activity = rkb->rkb_ts_state;
5053
5054 idle_ms = (int)((rd_clock() - ts_last_activity) / 1000);
5055
5056 if (likely(idle_ms < rkb->rkb_rk->rk_conf.connections_max_idle_ms))
5057 return;
5058
5059 rd_kafka_broker_fail(rkb, LOG_DEBUG,
5060 RD_KAFKA_RESP_ERR__TRANSPORT,
5061 "Connection max idle time exceeded "
5062 "(%dms since last activity)",
5063 idle_ms);
5064 }
5065
5066
5067 /**
5068 * @brief Serve broker thread according to client type.
5069 * May be called in any broker state.
5070 *
5071 * This function is to be called from the state machine in
5072 * rd_kafka_broker_thread_main, and will return when
5073 * there was a state change, or the handle is terminating.
5074 *
5075 * Broker threads are triggered by three things:
5076 * - Ops from other parts of librdkafka / app.
5077 * This is the rkb_ops queue which is served from
5078 * rd_kafka_broker_ops_io_serve().
5079 * - IO from broker socket.
5080 * The ops queue is also IO-triggered to provide
5081 * quick wakeup when thread is blocking on IO.
5082 * Also serverd from rd_kafka_broker_ops_io_serve().
5083 * When there is no broker socket only the ops
5084 * queue is served.
5085 * - Ops/IO timeout when there were no ops or
5086 * IO events within a variable timeout.
5087 *
5088 * For each iteration of the loops in producer_serve(), consumer_serve(),
5089 * etc, the Ops and IO are polled, and the client type specific
5090 * logic is executed. For the consumer this logic checks which partitions
5091 * to fetch or backoff, and sends Fetch requests.
5092 * The producer checks for messages to batch and transmit.
5093 * All types check for request timeouts, etc.
5094 *
5095 * Wakeups
5096 * =======
5097 * The logic returns a next wakeup time which controls how long the
5098 * next Ops/IO poll may block before the logic wants to run again;
5099 * this is typically controlled by `linger.ms` on the Producer
5100 * and fetch backoffs on the consumer.
5101 *
5102 * Remote threads may also want to wake up the Ops/IO poll so that
5103 * the logic is run more quickly. For example when a new message
5104 * is enqueued by produce() it is important that it is batched
5105 * and transmitted within the configured `linger.ms`.
5106 *
5107 * Any op enqueued on the broker ops queue (rkb_ops) will automatically
5108 * trigger a wakeup of the broker thread (either by wakeup_fd IO event
5109 * or by the conditional variable of rkb_ops being triggered - or both).
5110 *
5111 * Produced messages are not enqueued on the rkb_ops queue but on
5112 * the partition's rktp_msgq message queue. To provide quick wakeups
5113 * the partition has a reference to the partition's current leader broker
5114 * thread's rkb_ops queue, rktp_msgq_wakeup_q.
5115 * When enqueuing a message on the partition queue and the queue was
5116 * previously empty, the rktp_msgq_wakeup_q (which is rkb_ops) is woken up
5117 * by rd_kafka_q_yield(), which sets a YIELD flag and triggers the cond var
5118 * to wake up the broker thread (without allocating and enqueuing an rko).
5119 * This also triggers the wakeup_fd of rkb_ops, if necessary.
5120 *
5121 * When sparse connections is enabled the broker will linger in the
5122 * INIT state until there's a need for a connection, in which case
5123 * it will set its state to DOWN to trigger the connection.
5124 * This is controlled both by the shared rkb_persistconn atomic counters
5125 * that may be updated from other parts of the code, as well as the
5126 * temporary per broker_serve() rkb_persistconn.internal counter which
5127 * is used by the broker handler code to detect if a connection is needed,
5128 * such as when a partition is being produced to.
5129 *
5130 *
5131 * @param timeout_ms The maximum timeout for blocking Ops/IO.
5132 *
5133 * @locality broker thread
5134 * @locks none
5135 */
rd_kafka_broker_serve(rd_kafka_broker_t * rkb,int timeout_ms)5136 static void rd_kafka_broker_serve (rd_kafka_broker_t *rkb, int timeout_ms) {
5137 rd_ts_t abs_timeout;
5138
5139 if (unlikely(rd_kafka_terminating(rkb->rkb_rk) ||
5140 timeout_ms == RD_POLL_NOWAIT))
5141 timeout_ms = 1;
5142 else if (timeout_ms == RD_POLL_INFINITE)
5143 timeout_ms = rd_kafka_max_block_ms;
5144
5145 abs_timeout = rd_timeout_init(timeout_ms);
5146 /* Must be a valid absolute time from here on. */
5147 rd_assert(abs_timeout > 0);
5148
5149 /* rkb_persistconn.internal is the per broker_serve()
5150 * automatic counter that keeps track of anything
5151 * in the producer/consumer logic needs this broker connection
5152 * to be up.
5153 * The value is reset here on each serve(). If there are queued
5154 * requests we know right away that a connection is needed. */
5155 rkb->rkb_persistconn.internal =
5156 rd_atomic32_get(&rkb->rkb_outbufs.rkbq_cnt) > 0;
5157
5158 if (rkb->rkb_source == RD_KAFKA_INTERNAL) {
5159 rd_kafka_broker_internal_serve(rkb, abs_timeout);
5160 return;
5161 }
5162
5163 if (rkb->rkb_rk->rk_type == RD_KAFKA_PRODUCER)
5164 rd_kafka_broker_producer_serve(rkb, abs_timeout);
5165 else if (rkb->rkb_rk->rk_type == RD_KAFKA_CONSUMER)
5166 rd_kafka_broker_consumer_serve(rkb, abs_timeout);
5167
5168 if (rkb->rkb_rk->rk_conf.connections_max_idle_ms &&
5169 rkb->rkb_state == RD_KAFKA_BROKER_STATE_UP)
5170 rd_kafka_broker_idle_check(rkb);
5171 }
5172
5173
5174 /**
5175 * @returns true if all broker addresses have been tried.
5176 *
5177 * @locality broker thread
5178 * @locks_required none
5179 * @locks_acquired none
5180 */
5181 static rd_bool_t
rd_kafka_broker_addresses_exhausted(const rd_kafka_broker_t * rkb)5182 rd_kafka_broker_addresses_exhausted (const rd_kafka_broker_t *rkb) {
5183 return !rkb->rkb_rsal ||
5184 rkb->rkb_rsal->rsal_cnt == 0 ||
5185 rkb->rkb_rsal->rsal_curr + 1 == rkb->rkb_rsal->rsal_cnt;
5186 }
5187
5188
rd_kafka_broker_thread_main(void * arg)5189 static int rd_kafka_broker_thread_main (void *arg) {
5190 rd_kafka_broker_t *rkb = arg;
5191 rd_kafka_t *rk = rkb->rkb_rk;
5192
5193 rd_kafka_set_thread_name("%s", rkb->rkb_name);
5194 rd_kafka_set_thread_sysname("rdk:broker%"PRId32, rkb->rkb_nodeid);
5195
5196 rd_kafka_interceptors_on_thread_start(rk, RD_KAFKA_THREAD_BROKER);
5197
5198 (void)rd_atomic32_add(&rd_kafka_thread_cnt_curr, 1);
5199
5200 /* Our own refcount was increased just prior to thread creation,
5201 * when refcount drops to 1 it is just us left and the broker
5202 * thread should terminate. */
5203
5204 /* Acquire lock (which was held by thread creator during creation)
5205 * to synchronise state. */
5206 rd_kafka_broker_lock(rkb);
5207 rd_kafka_broker_unlock(rkb);
5208
5209 rd_rkb_dbg(rkb, BROKER, "BRKMAIN", "Enter main broker thread");
5210
5211 while (!rd_kafka_broker_terminating(rkb)) {
5212 int backoff;
5213 int r;
5214
5215 redo:
5216 switch (rkb->rkb_state)
5217 {
5218 case RD_KAFKA_BROKER_STATE_INIT:
5219 /* Check if there is demand for a connection
5220 * to this broker, if so jump to TRY_CONNECT state. */
5221 if (!rd_kafka_broker_needs_connection(rkb)) {
5222 rd_kafka_broker_serve(rkb,
5223 rd_kafka_max_block_ms);
5224 break;
5225 }
5226
5227 /* The INIT state also exists so that an initial
5228 * connection failure triggers a state transition
5229 * which might trigger a ALL_BROKERS_DOWN error. */
5230 rd_kafka_broker_lock(rkb);
5231 rd_kafka_broker_set_state(
5232 rkb, RD_KAFKA_BROKER_STATE_TRY_CONNECT);
5233 rd_kafka_broker_unlock(rkb);
5234 goto redo; /* effectively a fallthru to TRY_CONNECT */
5235
5236 case RD_KAFKA_BROKER_STATE_DOWN:
5237 rd_kafka_broker_lock(rkb);
5238 if (rkb->rkb_rk->rk_conf.sparse_connections)
5239 rd_kafka_broker_set_state(
5240 rkb, RD_KAFKA_BROKER_STATE_INIT);
5241 else
5242 rd_kafka_broker_set_state(
5243 rkb, RD_KAFKA_BROKER_STATE_TRY_CONNECT);
5244 rd_kafka_broker_unlock(rkb);
5245 goto redo; /* effectively a fallthru to TRY_CONNECT */
5246
5247 case RD_KAFKA_BROKER_STATE_TRY_CONNECT:
5248 if (rkb->rkb_source == RD_KAFKA_INTERNAL) {
5249 rd_kafka_broker_lock(rkb);
5250 rd_kafka_broker_set_state(rkb,
5251 RD_KAFKA_BROKER_STATE_UP);
5252 rd_kafka_broker_unlock(rkb);
5253 break;
5254 }
5255
5256 if (unlikely(rd_kafka_terminating(rkb->rkb_rk)))
5257 rd_kafka_broker_serve(rkb, 1000);
5258
5259 if (!rd_kafka_sasl_ready(rkb->rkb_rk)) {
5260 /* SASL provider not yet ready. */
5261 rd_kafka_broker_serve(rkb,
5262 rd_kafka_max_block_ms);
5263 /* Continue while loop to try again (as long as
5264 * we are not terminating). */
5265 continue;
5266 }
5267
5268 /* Throttle & jitter reconnects to avoid
5269 * thundering horde of reconnecting clients after
5270 * a broker / network outage. Issue #403 */
5271 backoff = rd_kafka_broker_reconnect_backoff(rkb,
5272 rd_clock());
5273 if (backoff > 0) {
5274 rd_rkb_dbg(rkb, BROKER, "RECONNECT",
5275 "Delaying next reconnect by %dms",
5276 backoff);
5277 rd_kafka_broker_serve(rkb, (int)backoff);
5278 continue;
5279 }
5280
5281 /* Initiate asynchronous connection attempt.
5282 * Only the host lookup is blocking here. */
5283 r = rd_kafka_broker_connect(rkb);
5284 if (r == -1) {
5285 /* Immediate failure, most likely host
5286 * resolving failed.
5287 * Try the next resolve result until we've
5288 * tried them all, in which case we sleep a
5289 * short while to avoid busy looping. */
5290 if (rd_kafka_broker_addresses_exhausted(rkb))
5291 rd_kafka_broker_serve(
5292 rkb, rd_kafka_max_block_ms);
5293 } else if (r == 0) {
5294 /* Broker has no hostname yet, wait
5295 * for hostname to be set and connection
5296 * triggered by received OP_CONNECT. */
5297 rd_kafka_broker_serve(rkb,
5298 rd_kafka_max_block_ms);
5299 } else {
5300 /* Connection in progress, state will
5301 * have changed to STATE_CONNECT. */
5302 }
5303
5304 break;
5305
5306 case RD_KAFKA_BROKER_STATE_CONNECT:
5307 case RD_KAFKA_BROKER_STATE_SSL_HANDSHAKE:
5308 case RD_KAFKA_BROKER_STATE_AUTH_LEGACY:
5309 case RD_KAFKA_BROKER_STATE_AUTH_REQ:
5310 case RD_KAFKA_BROKER_STATE_AUTH_HANDSHAKE:
5311 case RD_KAFKA_BROKER_STATE_APIVERSION_QUERY:
5312 /* Asynchronous connect in progress. */
5313 rd_kafka_broker_serve(rkb, rd_kafka_max_block_ms);
5314
5315 /* Connect failure.
5316 * Try the next resolve result until we've
5317 * tried them all, in which case we back off the next
5318 * connection attempt to avoid busy looping. */
5319 if (rkb->rkb_state == RD_KAFKA_BROKER_STATE_DOWN &&
5320 rd_kafka_broker_addresses_exhausted(rkb))
5321 rd_kafka_broker_update_reconnect_backoff(
5322 rkb, &rkb->rkb_rk->rk_conf, rd_clock());
5323 break;
5324
5325 case RD_KAFKA_BROKER_STATE_UPDATE:
5326 /* FALLTHRU */
5327 case RD_KAFKA_BROKER_STATE_UP:
5328 rd_kafka_broker_serve(rkb, rd_kafka_max_block_ms);
5329
5330 if (rkb->rkb_state == RD_KAFKA_BROKER_STATE_UPDATE) {
5331 rd_kafka_broker_lock(rkb);
5332 rd_kafka_broker_set_state(rkb, RD_KAFKA_BROKER_STATE_UP);
5333 rd_kafka_broker_unlock(rkb);
5334 }
5335 break;
5336 }
5337
5338 if (rd_kafka_terminating(rkb->rkb_rk)) {
5339 /* Handle is terminating: fail the send+retry queue
5340 * to speed up termination, otherwise we'll
5341 * need to wait for request timeouts. */
5342 r = rd_kafka_broker_bufq_timeout_scan(
5343 rkb, 0, &rkb->rkb_outbufs, NULL, -1,
5344 RD_KAFKA_RESP_ERR__DESTROY, 0, NULL, 0);
5345 r += rd_kafka_broker_bufq_timeout_scan(
5346 rkb, 0, &rkb->rkb_retrybufs, NULL, -1,
5347 RD_KAFKA_RESP_ERR__DESTROY, 0, NULL, 0);
5348 rd_rkb_dbg(rkb, BROKER, "TERMINATE",
5349 "Handle is terminating in state %s: "
5350 "%d refcnts (%p), %d toppar(s), "
5351 "%d active toppar(s), "
5352 "%d outbufs, %d waitresps, %d retrybufs: "
5353 "failed %d request(s) in retry+outbuf",
5354 rd_kafka_broker_state_names[rkb->rkb_state],
5355 rd_refcnt_get(&rkb->rkb_refcnt),
5356 &rkb->rkb_refcnt,
5357 rkb->rkb_toppar_cnt,
5358 rkb->rkb_active_toppar_cnt,
5359 (int)rd_kafka_bufq_cnt(&rkb->rkb_outbufs),
5360 (int)rd_kafka_bufq_cnt(&rkb->rkb_waitresps),
5361 (int)rd_kafka_bufq_cnt(&rkb->rkb_retrybufs),
5362 r);
5363 }
5364 }
5365
5366 if (rkb->rkb_source != RD_KAFKA_INTERNAL) {
5367 rd_kafka_wrlock(rkb->rkb_rk);
5368 TAILQ_REMOVE(&rkb->rkb_rk->rk_brokers, rkb, rkb_link);
5369 if (rkb->rkb_nodeid != -1 && !RD_KAFKA_BROKER_IS_LOGICAL(rkb))
5370 rd_list_remove(&rkb->rkb_rk->rk_broker_by_id, rkb);
5371 (void)rd_atomic32_sub(&rkb->rkb_rk->rk_broker_cnt, 1);
5372 rd_kafka_wrunlock(rkb->rkb_rk);
5373 }
5374
5375 rd_kafka_broker_fail(rkb, LOG_DEBUG, RD_KAFKA_RESP_ERR__DESTROY,
5376 "Broker handle is terminating");
5377
5378 /* Disable and drain ops queue.
5379 * Simply purging the ops queue risks leaving dangling references
5380 * for ops such as PARTITION_JOIN/PARTITION_LEAVE where the broker
5381 * reference is not maintained in the rko (but in rktp_next_leader).
5382 * #1596 */
5383 rd_kafka_q_disable(rkb->rkb_ops);
5384 while (rd_kafka_broker_ops_serve(rkb, RD_POLL_NOWAIT))
5385 ;
5386
5387 rd_kafka_broker_destroy(rkb);
5388
5389 #if WITH_SSL
5390 /* Remove OpenSSL per-thread error state to avoid memory leaks */
5391 #if OPENSSL_VERSION_NUMBER >= 0x10100000L && !defined(LIBRESSL_VERSION_NUMBER)
5392 /*(OpenSSL libraries handle thread init and deinit)
5393 * https://github.com/openssl/openssl/pull/1048 */
5394 #elif OPENSSL_VERSION_NUMBER >= 0x10000000L
5395 ERR_remove_thread_state(NULL);
5396 #endif
5397 #endif
5398
5399 rd_kafka_interceptors_on_thread_exit(rk, RD_KAFKA_THREAD_BROKER);
5400
5401 rd_atomic32_sub(&rd_kafka_thread_cnt_curr, 1);
5402
5403 return 0;
5404 }
5405
5406
5407 /**
5408 * Final destructor. Refcnt must be 0.
5409 */
rd_kafka_broker_destroy_final(rd_kafka_broker_t * rkb)5410 void rd_kafka_broker_destroy_final (rd_kafka_broker_t *rkb) {
5411
5412 rd_assert(thrd_is_current(rkb->rkb_thread));
5413 rd_assert(TAILQ_EMPTY(&rkb->rkb_monitors));
5414 rd_assert(TAILQ_EMPTY(&rkb->rkb_outbufs.rkbq_bufs));
5415 rd_assert(TAILQ_EMPTY(&rkb->rkb_waitresps.rkbq_bufs));
5416 rd_assert(TAILQ_EMPTY(&rkb->rkb_retrybufs.rkbq_bufs));
5417 rd_assert(TAILQ_EMPTY(&rkb->rkb_toppars));
5418
5419 if (rkb->rkb_source != RD_KAFKA_INTERNAL &&
5420 (rkb->rkb_rk->rk_conf.security_protocol ==
5421 RD_KAFKA_PROTO_SASL_PLAINTEXT ||
5422 rkb->rkb_rk->rk_conf.security_protocol ==
5423 RD_KAFKA_PROTO_SASL_SSL))
5424 rd_kafka_sasl_broker_term(rkb);
5425
5426 if (rkb->rkb_wakeup_fd[0] != -1)
5427 rd_close(rkb->rkb_wakeup_fd[0]);
5428 if (rkb->rkb_wakeup_fd[1] != -1)
5429 rd_close(rkb->rkb_wakeup_fd[1]);
5430
5431 if (rkb->rkb_recv_buf)
5432 rd_kafka_buf_destroy(rkb->rkb_recv_buf);
5433
5434 if (rkb->rkb_rsal)
5435 rd_sockaddr_list_destroy(rkb->rkb_rsal);
5436
5437 if (rkb->rkb_ApiVersions)
5438 rd_free(rkb->rkb_ApiVersions);
5439 rd_free(rkb->rkb_origname);
5440
5441 rd_kafka_q_purge(rkb->rkb_ops);
5442 rd_kafka_q_destroy_owner(rkb->rkb_ops);
5443
5444 rd_avg_destroy(&rkb->rkb_avg_int_latency);
5445 rd_avg_destroy(&rkb->rkb_avg_outbuf_latency);
5446 rd_avg_destroy(&rkb->rkb_avg_rtt);
5447 rd_avg_destroy(&rkb->rkb_avg_throttle);
5448
5449 mtx_lock(&rkb->rkb_logname_lock);
5450 rd_free(rkb->rkb_logname);
5451 rkb->rkb_logname = NULL;
5452 mtx_unlock(&rkb->rkb_logname_lock);
5453 mtx_destroy(&rkb->rkb_logname_lock);
5454
5455 mtx_destroy(&rkb->rkb_lock);
5456
5457 rd_refcnt_destroy(&rkb->rkb_refcnt);
5458
5459 rd_free(rkb);
5460 }
5461
5462
5463 /**
5464 * Returns the internal broker with refcnt increased.
5465 */
rd_kafka_broker_internal(rd_kafka_t * rk)5466 rd_kafka_broker_t *rd_kafka_broker_internal (rd_kafka_t *rk) {
5467 rd_kafka_broker_t *rkb;
5468
5469 mtx_lock(&rk->rk_internal_rkb_lock);
5470 rkb = rk->rk_internal_rkb;
5471 if (rkb)
5472 rd_kafka_broker_keep(rkb);
5473 mtx_unlock(&rk->rk_internal_rkb_lock);
5474
5475 return rkb;
5476 }
5477
5478
5479 /**
5480 * Adds a broker with refcount set to 1.
5481 * If 'source' is RD_KAFKA_INTERNAL an internal broker is added
5482 * that does not actually represent or connect to a real broker, it is used
5483 * for serving unassigned toppar's op queues.
5484 *
5485 * Locks: rd_kafka_wrlock(rk) must be held
5486 */
rd_kafka_broker_add(rd_kafka_t * rk,rd_kafka_confsource_t source,rd_kafka_secproto_t proto,const char * name,uint16_t port,int32_t nodeid)5487 rd_kafka_broker_t *rd_kafka_broker_add (rd_kafka_t *rk,
5488 rd_kafka_confsource_t source,
5489 rd_kafka_secproto_t proto,
5490 const char *name, uint16_t port,
5491 int32_t nodeid) {
5492 rd_kafka_broker_t *rkb;
5493 int r;
5494 #ifndef _WIN32
5495 sigset_t newset, oldset;
5496 #endif
5497
5498 rkb = rd_calloc(1, sizeof(*rkb));
5499
5500 if (source != RD_KAFKA_LOGICAL) {
5501 rd_kafka_mk_nodename(rkb->rkb_nodename,
5502 sizeof(rkb->rkb_nodename),
5503 name, port);
5504 rd_kafka_mk_brokername(rkb->rkb_name, sizeof(rkb->rkb_name),
5505 proto, rkb->rkb_nodename,
5506 nodeid, source);
5507 } else {
5508 /* Logical broker does not have a nodename (address) or port
5509 * at initialization. */
5510 rd_snprintf(rkb->rkb_name, sizeof(rkb->rkb_name), "%s", name);
5511 }
5512
5513 rkb->rkb_source = source;
5514 rkb->rkb_rk = rk;
5515 rkb->rkb_ts_state = rd_clock();
5516 rkb->rkb_nodeid = nodeid;
5517 rkb->rkb_proto = proto;
5518 rkb->rkb_port = port;
5519 rkb->rkb_origname = rd_strdup(name);
5520
5521 mtx_init(&rkb->rkb_lock, mtx_plain);
5522 mtx_init(&rkb->rkb_logname_lock, mtx_plain);
5523 rkb->rkb_logname = rd_strdup(rkb->rkb_name);
5524 TAILQ_INIT(&rkb->rkb_toppars);
5525 CIRCLEQ_INIT(&rkb->rkb_active_toppars);
5526 TAILQ_INIT(&rkb->rkb_monitors);
5527 rd_kafka_bufq_init(&rkb->rkb_outbufs);
5528 rd_kafka_bufq_init(&rkb->rkb_waitresps);
5529 rd_kafka_bufq_init(&rkb->rkb_retrybufs);
5530 rkb->rkb_ops = rd_kafka_q_new(rk);
5531 rd_avg_init(&rkb->rkb_avg_int_latency, RD_AVG_GAUGE, 0, 100*1000, 2,
5532 rk->rk_conf.stats_interval_ms ? 1 : 0);
5533 rd_avg_init(&rkb->rkb_avg_outbuf_latency, RD_AVG_GAUGE, 0, 100*1000, 2,
5534 rk->rk_conf.stats_interval_ms ? 1 : 0);
5535 rd_avg_init(&rkb->rkb_avg_rtt, RD_AVG_GAUGE, 0, 500*1000, 2,
5536 rk->rk_conf.stats_interval_ms ? 1 : 0);
5537 rd_avg_init(&rkb->rkb_avg_throttle, RD_AVG_GAUGE, 0, 5000*1000, 2,
5538 rk->rk_conf.stats_interval_ms ? 1 : 0);
5539 rd_refcnt_init(&rkb->rkb_refcnt, 0);
5540 rd_kafka_broker_keep(rkb); /* rk_broker's refcount */
5541
5542 rkb->rkb_reconnect_backoff_ms = rk->rk_conf.reconnect_backoff_ms;
5543 rd_atomic32_init(&rkb->rkb_persistconn.coord, 0);
5544
5545 rd_atomic64_init(&rkb->rkb_c.ts_send, 0);
5546 rd_atomic64_init(&rkb->rkb_c.ts_recv, 0);
5547
5548 /* ApiVersion fallback interval */
5549 if (rkb->rkb_rk->rk_conf.api_version_request) {
5550 rd_interval_init(&rkb->rkb_ApiVersion_fail_intvl);
5551 rd_interval_fixed(
5552 &rkb->rkb_ApiVersion_fail_intvl,
5553 (rd_ts_t)rkb->rkb_rk->rk_conf.api_version_fallback_ms *
5554 1000);
5555 }
5556
5557 rd_interval_init(&rkb->rkb_suppress.unsupported_compression);
5558 rd_interval_init(&rkb->rkb_suppress.unsupported_kip62);
5559 rd_interval_init(&rkb->rkb_suppress.fail_error);
5560
5561 #ifndef _WIN32
5562 /* Block all signals in newly created thread.
5563 * To avoid race condition we block all signals in the calling
5564 * thread, which the new thread will inherit its sigmask from,
5565 * and then restore the original sigmask of the calling thread when
5566 * we're done creating the thread.
5567 * NOTE: term_sig remains unblocked since we use it on termination
5568 * to quickly interrupt system calls. */
5569 sigemptyset(&oldset);
5570 sigfillset(&newset);
5571 if (rkb->rkb_rk->rk_conf.term_sig)
5572 sigdelset(&newset, rkb->rkb_rk->rk_conf.term_sig);
5573 pthread_sigmask(SIG_SETMASK, &newset, &oldset);
5574 #endif
5575
5576 /*
5577 * Fd-based queue wake-ups using a non-blocking pipe.
5578 * Writes are best effort, if the socket queue is full
5579 * the write fails (silently) but this has no effect on latency
5580 * since the POLLIN flag will already have been raised for fd.
5581 */
5582 rkb->rkb_wakeup_fd[0] = -1;
5583 rkb->rkb_wakeup_fd[1] = -1;
5584 rkb->rkb_toppar_wakeup_fd = -1;
5585
5586 if ((r = rd_pipe_nonblocking(rkb->rkb_wakeup_fd)) == -1) {
5587 rd_rkb_log(rkb, LOG_ERR, "WAKEUPFD",
5588 "Failed to setup broker queue wake-up fds: "
5589 "%s: disabling low-latency mode",
5590 rd_strerror(r));
5591
5592 } else if (source == RD_KAFKA_INTERNAL) {
5593 /* nop: internal broker has no IO transport. */
5594
5595 } else {
5596 char onebyte = 1;
5597
5598 rd_rkb_dbg(rkb, QUEUE, "WAKEUPFD",
5599 "Enabled low-latency ops queue wake-ups");
5600 rd_kafka_q_io_event_enable(rkb->rkb_ops, rkb->rkb_wakeup_fd[1],
5601 &onebyte, sizeof(onebyte));
5602 }
5603
5604 /* Lock broker's lock here to synchronise state, i.e., hold off
5605 * the broker thread until we've finalized the rkb. */
5606 rd_kafka_broker_lock(rkb);
5607 rd_kafka_broker_keep(rkb); /* broker thread's refcnt */
5608 if (thrd_create(&rkb->rkb_thread,
5609 rd_kafka_broker_thread_main, rkb) != thrd_success) {
5610 rd_kafka_broker_unlock(rkb);
5611
5612 rd_kafka_log(rk, LOG_CRIT, "THREAD",
5613 "Unable to create broker thread");
5614
5615 /* Send ERR op back to application for processing. */
5616 rd_kafka_op_err(rk, RD_KAFKA_RESP_ERR__CRIT_SYS_RESOURCE,
5617 "Unable to create broker thread");
5618
5619 rd_free(rkb);
5620
5621 #ifndef _WIN32
5622 /* Restore sigmask of caller */
5623 pthread_sigmask(SIG_SETMASK, &oldset, NULL);
5624 #endif
5625
5626 return NULL;
5627 }
5628
5629 if (rkb->rkb_source != RD_KAFKA_INTERNAL) {
5630 if (rk->rk_conf.security_protocol ==
5631 RD_KAFKA_PROTO_SASL_PLAINTEXT ||
5632 rk->rk_conf.security_protocol == RD_KAFKA_PROTO_SASL_SSL)
5633 rd_kafka_sasl_broker_init(rkb);
5634
5635 /* Insert broker at head of list, idea is that
5636 * newer brokers are more relevant than old ones,
5637 * and in particular LEARNED brokers are more relevant
5638 * than CONFIGURED (bootstrap) and LOGICAL brokers. */
5639 TAILQ_INSERT_HEAD(&rkb->rkb_rk->rk_brokers, rkb, rkb_link);
5640 (void)rd_atomic32_add(&rkb->rkb_rk->rk_broker_cnt, 1);
5641
5642 if (rkb->rkb_nodeid != -1 && !RD_KAFKA_BROKER_IS_LOGICAL(rkb)) {
5643 rd_list_add(&rkb->rkb_rk->rk_broker_by_id, rkb);
5644 rd_list_sort(&rkb->rkb_rk->rk_broker_by_id,
5645 rd_kafka_broker_cmp_by_id);
5646 }
5647
5648 rd_rkb_dbg(rkb, BROKER, "BROKER",
5649 "Added new broker with NodeId %"PRId32,
5650 rkb->rkb_nodeid);
5651 }
5652
5653 rd_kafka_broker_unlock(rkb);
5654
5655 /* Add broker state monitor for the coordinator request to use.
5656 * This is needed by the transactions implementation and DeleteGroups. */
5657 rd_kafka_broker_monitor_add(&rkb->rkb_coord_monitor, rkb,
5658 rk->rk_ops,
5659 rd_kafka_coord_rkb_monitor_cb);
5660
5661
5662 #ifndef _WIN32
5663 /* Restore sigmask of caller */
5664 pthread_sigmask(SIG_SETMASK, &oldset, NULL);
5665 #endif
5666
5667 return rkb;
5668 }
5669
5670
5671 /**
5672 * @brief Adds a logical broker.
5673 *
5674 * Logical brokers act just like any broker handle, but will not have
5675 * an initial address set. The address (or nodename is it is called
5676 * internally) can be set from another broker handle
5677 * by calling rd_kafka_broker_set_nodename().
5678 *
5679 * This allows maintaining a logical group coordinator broker
5680 * handle that can ambulate between real broker addresses.
5681 *
5682 * Logical broker constraints:
5683 * - will not have a broker-id set (-1).
5684 * - will not have a port set (0).
5685 * - the address for the broker may change.
5686 * - the name of broker will not correspond to the address,
5687 * but the \p name given here.
5688 *
5689 * @returns a new broker, holding a refcount for the caller.
5690 *
5691 * @locality any rdkafka thread
5692 * @locks none
5693 */
rd_kafka_broker_add_logical(rd_kafka_t * rk,const char * name)5694 rd_kafka_broker_t *rd_kafka_broker_add_logical (rd_kafka_t *rk,
5695 const char *name) {
5696 rd_kafka_broker_t *rkb;
5697
5698 rd_kafka_wrlock(rk);
5699 rkb = rd_kafka_broker_add(rk, RD_KAFKA_LOGICAL,
5700 rk->rk_conf.security_protocol,
5701 name, 0/*port*/, -1/*brokerid*/);
5702 rd_assert(rkb && *"failed to create broker thread");
5703 rd_kafka_wrunlock(rk);
5704
5705 rd_atomic32_add(&rk->rk_broker_addrless_cnt, 1);
5706
5707 rd_dassert(RD_KAFKA_BROKER_IS_LOGICAL(rkb));
5708 rd_kafka_broker_keep(rkb);
5709 return rkb;
5710 }
5711
5712
5713 /**
5714 * @brief Update the nodename (address) of broker \p rkb
5715 * with the nodename from broker \p from_rkb (may be NULL).
5716 *
5717 * If \p rkb is connected, the connection will be torn down.
5718 * A new connection may be attempted to the new address
5719 * if a persistent connection is needed (standard connection rules).
5720 *
5721 * The broker's logname is also updated to include \p from_rkb's
5722 * broker id.
5723 *
5724 * @param from_rkb Use the nodename from this broker. If NULL, clear
5725 * the \p rkb nodename.
5726 *
5727 * @remark Must only be called for logical brokers.
5728 *
5729 * @locks none
5730 */
rd_kafka_broker_set_nodename(rd_kafka_broker_t * rkb,rd_kafka_broker_t * from_rkb)5731 void rd_kafka_broker_set_nodename (rd_kafka_broker_t *rkb,
5732 rd_kafka_broker_t *from_rkb) {
5733 char nodename[RD_KAFKA_NODENAME_SIZE];
5734 char brokername[RD_KAFKA_NODENAME_SIZE];
5735 int32_t nodeid;
5736 rd_bool_t changed = rd_false;
5737
5738 rd_assert(RD_KAFKA_BROKER_IS_LOGICAL(rkb));
5739
5740 rd_assert(rkb != from_rkb);
5741
5742 /* Get nodename from from_rkb */
5743 if (from_rkb) {
5744 rd_kafka_broker_lock(from_rkb);
5745 rd_strlcpy(nodename, from_rkb->rkb_nodename, sizeof(nodename));
5746 nodeid = from_rkb->rkb_nodeid;
5747 rd_kafka_broker_unlock(from_rkb);
5748 } else {
5749 *nodename = '\0';
5750 nodeid = -1;
5751 }
5752
5753 /* Set nodename on rkb */
5754 rd_kafka_broker_lock(rkb);
5755 if (strcmp(rkb->rkb_nodename, nodename)) {
5756 rd_rkb_dbg(rkb, BROKER, "NODENAME",
5757 "Broker nodename changed from \"%s\" to \"%s\"",
5758 rkb->rkb_nodename, nodename);
5759 rd_strlcpy(rkb->rkb_nodename, nodename,
5760 sizeof(rkb->rkb_nodename));
5761 rkb->rkb_nodename_epoch++;
5762 changed = rd_true;
5763 }
5764
5765 if (rkb->rkb_nodeid != nodeid) {
5766 rd_rkb_dbg(rkb, BROKER, "NODEID",
5767 "Broker nodeid changed from %"PRId32" to %"PRId32,
5768 rkb->rkb_nodeid, nodeid);
5769 rkb->rkb_nodeid = nodeid;
5770 }
5771
5772 rd_kafka_broker_unlock(rkb);
5773
5774 /* Update the log name to include (or exclude) the nodeid.
5775 * The nodeid is appended as "..logname../nodeid" */
5776 rd_kafka_mk_brokername(brokername, sizeof(brokername),
5777 rkb->rkb_proto,
5778 rkb->rkb_name, nodeid,
5779 rkb->rkb_source);
5780
5781 rd_kafka_broker_set_logname(rkb, brokername);
5782
5783 if (!changed)
5784 return;
5785
5786 if (*rkb->rkb_nodename)
5787 rd_atomic32_sub(&rkb->rkb_rk->rk_broker_addrless_cnt, 1);
5788 else
5789 rd_atomic32_add(&rkb->rkb_rk->rk_broker_addrless_cnt, 1);
5790
5791 /* Trigger a disconnect & reconnect */
5792 rd_kafka_broker_schedule_connection(rkb);
5793 }
5794
5795
5796 /**
5797 * @brief Find broker by nodeid (not -1) and
5798 * possibly filtered by state (unless -1).
5799 *
5800 * @param do_connect If sparse connections are enabled and the broker is found
5801 * but not up, a connection will be triggered.
5802 *
5803 * @locks: rd_kafka_*lock() MUST be held
5804 * @remark caller must release rkb reference by rd_kafka_broker_destroy()
5805 */
5806 rd_kafka_broker_t *
rd_kafka_broker_find_by_nodeid0_fl(const char * func,int line,rd_kafka_t * rk,int32_t nodeid,int state,rd_bool_t do_connect)5807 rd_kafka_broker_find_by_nodeid0_fl (const char *func, int line,
5808 rd_kafka_t *rk,
5809 int32_t nodeid,
5810 int state,
5811 rd_bool_t do_connect) {
5812 rd_kafka_broker_t *rkb;
5813 rd_kafka_broker_t skel = { .rkb_nodeid = nodeid };
5814
5815 if (rd_kafka_terminating(rk))
5816 return NULL;
5817
5818 rkb = rd_list_find(&rk->rk_broker_by_id, &skel,
5819 rd_kafka_broker_cmp_by_id);
5820
5821 if (!rkb)
5822 return NULL;
5823
5824 if (state != -1) {
5825 int broker_state;
5826 rd_kafka_broker_lock(rkb);
5827 broker_state = (int)rkb->rkb_state;
5828 rd_kafka_broker_unlock(rkb);
5829
5830 if (broker_state != state) {
5831 if (do_connect &&
5832 broker_state == RD_KAFKA_BROKER_STATE_INIT &&
5833 rk->rk_conf.sparse_connections)
5834 rd_kafka_broker_schedule_connection(rkb);
5835 return NULL;
5836 }
5837 }
5838
5839 rd_kafka_broker_keep_fl(func, line, rkb);
5840 return rkb;
5841 }
5842
5843 /**
5844 * Locks: rd_kafka_rdlock(rk) must be held
5845 * NOTE: caller must release rkb reference by rd_kafka_broker_destroy()
5846 */
rd_kafka_broker_find(rd_kafka_t * rk,rd_kafka_secproto_t proto,const char * name,uint16_t port)5847 static rd_kafka_broker_t *rd_kafka_broker_find (rd_kafka_t *rk,
5848 rd_kafka_secproto_t proto,
5849 const char *name,
5850 uint16_t port) {
5851 rd_kafka_broker_t *rkb;
5852 char nodename[RD_KAFKA_NODENAME_SIZE];
5853
5854 rd_kafka_mk_nodename(nodename, sizeof(nodename), name, port);
5855
5856 TAILQ_FOREACH(rkb, &rk->rk_brokers, rkb_link) {
5857 if (RD_KAFKA_BROKER_IS_LOGICAL(rkb))
5858 continue;
5859
5860 rd_kafka_broker_lock(rkb);
5861 if (!rd_kafka_terminating(rk) &&
5862 rkb->rkb_proto == proto &&
5863 !strcmp(rkb->rkb_nodename, nodename)) {
5864 rd_kafka_broker_keep(rkb);
5865 rd_kafka_broker_unlock(rkb);
5866 return rkb;
5867 }
5868 rd_kafka_broker_unlock(rkb);
5869 }
5870
5871 return NULL;
5872 }
5873
5874
5875 /**
5876 * Parse a broker host name.
5877 * The string 'name' is modified and null-terminated portions of it
5878 * are returned in 'proto', 'host', and 'port'.
5879 *
5880 * Returns 0 on success or -1 on parse error.
5881 */
rd_kafka_broker_name_parse(rd_kafka_t * rk,char ** name,rd_kafka_secproto_t * proto,const char ** host,uint16_t * port)5882 static int rd_kafka_broker_name_parse (rd_kafka_t *rk,
5883 char **name,
5884 rd_kafka_secproto_t *proto,
5885 const char **host,
5886 uint16_t *port) {
5887 char *s = *name;
5888 char *orig;
5889 char *n, *t, *t2;
5890
5891 /* Save a temporary copy of the original name for logging purposes */
5892 rd_strdupa(&orig, *name);
5893
5894 /* Find end of this name (either by delimiter or end of string */
5895 if ((n = strchr(s, ',')))
5896 *n = '\0';
5897 else
5898 n = s + strlen(s)-1;
5899
5900
5901 /* Check if this looks like an url. */
5902 if ((t = strstr(s, "://"))) {
5903 int i;
5904 /* "proto://host[:port]" */
5905
5906 if (t == s) {
5907 rd_kafka_log(rk, LOG_WARNING, "BROKER",
5908 "Broker name \"%s\" parse error: "
5909 "empty protocol name", orig);
5910 return -1;
5911 }
5912
5913 /* Make protocol uppercase */
5914 for (t2 = s ; t2 < t ; t2++)
5915 *t2 = toupper(*t2);
5916
5917 *t = '\0';
5918
5919 /* Find matching protocol by name. */
5920 for (i = 0 ; i < RD_KAFKA_PROTO_NUM ; i++)
5921 if (!rd_strcasecmp(s, rd_kafka_secproto_names[i]))
5922 break;
5923
5924 /* Unsupported protocol */
5925 if (i == RD_KAFKA_PROTO_NUM) {
5926 rd_kafka_log(rk, LOG_WARNING, "BROKER",
5927 "Broker name \"%s\" parse error: "
5928 "unsupported protocol \"%s\"", orig, s);
5929
5930 return -1;
5931 }
5932
5933 *proto = i;
5934
5935 /* Enforce protocol */
5936 if (rk->rk_conf.security_protocol != *proto) {
5937 rd_kafka_log(rk, LOG_WARNING, "BROKER",
5938 "Broker name \"%s\" parse error: "
5939 "protocol \"%s\" does not match "
5940 "security.protocol setting \"%s\"",
5941 orig, s,
5942 rd_kafka_secproto_names[
5943 rk->rk_conf.security_protocol]);
5944 return -1;
5945 }
5946
5947 /* Hostname starts here */
5948 s = t+3;
5949
5950 /* Ignore anything that looks like the path part of an URL */
5951 if ((t = strchr(s, '/')))
5952 *t = '\0';
5953
5954 } else
5955 *proto = rk->rk_conf.security_protocol; /* Default protocol */
5956
5957
5958 *port = RD_KAFKA_PORT;
5959 /* Check if port has been specified, but try to identify IPv6
5960 * addresses first:
5961 * t = last ':' in string
5962 * t2 = first ':' in string
5963 * If t and t2 are equal then only one ":" exists in name
5964 * and thus an IPv4 address with port specified.
5965 * Else if not equal and t is prefixed with "]" then it's an
5966 * IPv6 address with port specified.
5967 * Else no port specified. */
5968 if ((t = strrchr(s, ':')) &&
5969 ((t2 = strchr(s, ':')) == t || *(t-1) == ']')) {
5970 *t = '\0';
5971 *port = atoi(t+1);
5972 }
5973
5974 /* Empty host name -> localhost */
5975 if (!*s)
5976 s = "localhost";
5977
5978 *host = s;
5979 *name = n+1; /* past this name. e.g., next name/delimiter to parse */
5980
5981 return 0;
5982 }
5983
5984 /**
5985 * @brief Adds a (csv list of) broker(s).
5986 * Returns the number of brokers succesfully added.
5987 *
5988 * @locality any thread
5989 * @locks none
5990 */
rd_kafka_brokers_add0(rd_kafka_t * rk,const char * brokerlist)5991 int rd_kafka_brokers_add0 (rd_kafka_t *rk, const char *brokerlist) {
5992 char *s_copy = rd_strdup(brokerlist);
5993 char *s = s_copy;
5994 int cnt = 0;
5995 rd_kafka_broker_t *rkb;
5996 int pre_cnt = rd_atomic32_get(&rk->rk_broker_cnt);
5997
5998 /* Parse comma-separated list of brokers. */
5999 while (*s) {
6000 uint16_t port;
6001 const char *host;
6002 rd_kafka_secproto_t proto;
6003
6004 if (*s == ',' || *s == ' ') {
6005 s++;
6006 continue;
6007 }
6008
6009 if (rd_kafka_broker_name_parse(rk, &s, &proto,
6010 &host, &port) == -1)
6011 break;
6012
6013 rd_kafka_wrlock(rk);
6014
6015 if ((rkb = rd_kafka_broker_find(rk, proto, host, port)) &&
6016 rkb->rkb_source == RD_KAFKA_CONFIGURED) {
6017 cnt++;
6018 } else if (rd_kafka_broker_add(rk, RD_KAFKA_CONFIGURED,
6019 proto, host, port,
6020 RD_KAFKA_NODEID_UA) != NULL)
6021 cnt++;
6022
6023 /* If rd_kafka_broker_find returned a broker its
6024 * reference needs to be released
6025 * See issue #193 */
6026 if (rkb)
6027 rd_kafka_broker_destroy(rkb);
6028
6029 rd_kafka_wrunlock(rk);
6030 }
6031
6032 rd_free(s_copy);
6033
6034 if (rk->rk_conf.sparse_connections && cnt > 0 && pre_cnt == 0) {
6035 /* Sparse connections:
6036 * If this was the first set of brokers added,
6037 * select a random one to trigger the initial cluster
6038 * connection. */
6039 rd_kafka_rdlock(rk);
6040 rd_kafka_connect_any(rk, "bootstrap servers added");
6041 rd_kafka_rdunlock(rk);
6042 }
6043
6044 return cnt;
6045 }
6046
6047
rd_kafka_brokers_add(rd_kafka_t * rk,const char * brokerlist)6048 int rd_kafka_brokers_add (rd_kafka_t *rk, const char *brokerlist) {
6049 return rd_kafka_brokers_add0(rk, brokerlist);
6050 }
6051
6052
6053 /**
6054 * @brief Adds a new broker or updates an existing one.
6055 *
6056 * @param rkbp if non-NULL, will be set to the broker object with
6057 * refcount increased, or NULL on error.
6058 *
6059 * @locks none
6060 * @locality any
6061 */
6062 void
rd_kafka_broker_update(rd_kafka_t * rk,rd_kafka_secproto_t proto,const struct rd_kafka_metadata_broker * mdb,rd_kafka_broker_t ** rkbp)6063 rd_kafka_broker_update (rd_kafka_t *rk, rd_kafka_secproto_t proto,
6064 const struct rd_kafka_metadata_broker *mdb,
6065 rd_kafka_broker_t **rkbp) {
6066 rd_kafka_broker_t *rkb;
6067 char nodename[RD_KAFKA_NODENAME_SIZE];
6068 int needs_update = 0;
6069
6070 rd_kafka_mk_nodename(nodename, sizeof(nodename), mdb->host, mdb->port);
6071
6072 rd_kafka_wrlock(rk);
6073 if (unlikely(rd_kafka_terminating(rk))) {
6074 /* Dont update metadata while terminating, do this
6075 * after acquiring lock for proper synchronisation */
6076 rd_kafka_wrunlock(rk);
6077 if (rkbp)
6078 *rkbp = NULL;
6079 return;
6080 }
6081
6082 if ((rkb = rd_kafka_broker_find_by_nodeid(rk, mdb->id))) {
6083 /* Broker matched by nodeid, see if we need to update
6084 * the hostname. */
6085 if (strcmp(rkb->rkb_nodename, nodename))
6086 needs_update = 1;
6087 } else if ((rkb = rd_kafka_broker_find(rk, proto,
6088 mdb->host, mdb->port))) {
6089 /* Broker matched by hostname (but not by nodeid),
6090 * update the nodeid. */
6091 needs_update = 1;
6092
6093 } else if ((rkb = rd_kafka_broker_add(rk, RD_KAFKA_LEARNED, proto,
6094 mdb->host, mdb->port, mdb->id))){
6095 rd_kafka_broker_keep(rkb);
6096 }
6097
6098 rd_kafka_wrunlock(rk);
6099
6100 if (rkb) {
6101 /* Existing broker */
6102 if (needs_update) {
6103 rd_kafka_op_t *rko;
6104 rko = rd_kafka_op_new(RD_KAFKA_OP_NODE_UPDATE);
6105 rd_strlcpy(rko->rko_u.node.nodename, nodename,
6106 sizeof(rko->rko_u.node.nodename));
6107 rko->rko_u.node.nodeid = mdb->id;
6108 /* Perform a blocking op request so that all
6109 * broker-related state, such as the rk broker list,
6110 * is up to date by the time this call returns.
6111 * Ignore&destroy the response. */
6112 rd_kafka_op_err_destroy(
6113 rd_kafka_op_req(rkb->rkb_ops, rko, -1));
6114 }
6115 }
6116
6117 if (rkbp)
6118 *rkbp = rkb;
6119 else if (rkb)
6120 rd_kafka_broker_destroy(rkb);
6121 }
6122
6123
6124 /**
6125 * @returns the broker id, or RD_KAFKA_NODEID_UA if \p rkb is NULL.
6126 *
6127 * @locality any
6128 * @locks_required none
6129 * @locks_acquired rkb_lock
6130 */
rd_kafka_broker_id(rd_kafka_broker_t * rkb)6131 int32_t rd_kafka_broker_id (rd_kafka_broker_t *rkb) {
6132 int32_t broker_id;
6133
6134 if (unlikely(!rkb))
6135 return RD_KAFKA_NODEID_UA;
6136
6137 /* Avoid locking if already on the broker thread */
6138 if (thrd_is_current(rkb->rkb_thread))
6139 return rkb->rkb_nodeid;
6140
6141 rd_kafka_broker_lock(rkb);
6142 broker_id = rkb->rkb_nodeid;
6143 rd_kafka_broker_unlock(rkb);
6144
6145 return broker_id;
6146 }
6147
6148
6149 /**
6150 * Returns a thread-safe temporary copy of the broker name.
6151 * Must not be called more than 4 times from the same expression.
6152 *
6153 * Locks: none
6154 * Locality: any thread
6155 */
rd_kafka_broker_name(rd_kafka_broker_t * rkb)6156 const char *rd_kafka_broker_name (rd_kafka_broker_t *rkb) {
6157 static RD_TLS char ret[4][RD_KAFKA_NODENAME_SIZE];
6158 static RD_TLS int reti = 0;
6159
6160 reti = (reti + 1) % 4;
6161 mtx_lock(&rkb->rkb_logname_lock);
6162 rd_snprintf(ret[reti], sizeof(ret[reti]), "%s", rkb->rkb_logname);
6163 mtx_unlock(&rkb->rkb_logname_lock);
6164
6165 return ret[reti];
6166 }
6167
6168
6169
6170 /**
6171 * @brief Send dummy OP to broker thread to wake it up from IO sleep.
6172 *
6173 * @locality any
6174 * @locks any
6175 */
rd_kafka_broker_wakeup(rd_kafka_broker_t * rkb)6176 void rd_kafka_broker_wakeup (rd_kafka_broker_t *rkb) {
6177 rd_kafka_op_t *rko = rd_kafka_op_new(RD_KAFKA_OP_WAKEUP);
6178 rd_kafka_op_set_prio(rko, RD_KAFKA_PRIO_FLASH);
6179 rd_kafka_q_enq(rkb->rkb_ops, rko);
6180 rd_rkb_dbg(rkb, QUEUE, "WAKEUP", "Wake-up");
6181 }
6182
6183 /**
6184 * @brief Wake up all broker threads that are in at least state \p min_state
6185 *
6186 * @locality any
6187 * @locks none: rd_kafka_*lock() MUST NOT be held
6188 *
6189 * @returns the number of broker threads woken up
6190 */
rd_kafka_all_brokers_wakeup(rd_kafka_t * rk,int min_state)6191 int rd_kafka_all_brokers_wakeup (rd_kafka_t *rk, int min_state) {
6192 int cnt = 0;
6193 rd_kafka_broker_t *rkb;
6194
6195 rd_kafka_rdlock(rk);
6196 TAILQ_FOREACH(rkb, &rk->rk_brokers, rkb_link) {
6197 int do_wakeup;
6198
6199 rd_kafka_broker_lock(rkb);
6200 do_wakeup = (int)rkb->rkb_state >= min_state;
6201 rd_kafka_broker_unlock(rkb);
6202
6203 if (do_wakeup) {
6204 rd_kafka_broker_wakeup(rkb);
6205 cnt += 1;
6206 }
6207 }
6208 rd_kafka_rdunlock(rk);
6209
6210 return cnt;
6211 }
6212
6213 /**
6214 * @brief Filter out brokers that have at least one connection attempt.
6215 */
rd_kafka_broker_filter_never_connected(rd_kafka_broker_t * rkb,void * opaque)6216 static int rd_kafka_broker_filter_never_connected (rd_kafka_broker_t *rkb,
6217 void *opaque) {
6218 return rd_atomic32_get(&rkb->rkb_c.connects);
6219 }
6220
6221
6222 /**
6223 * @brief Sparse connections:
6224 * Select a random broker to connect to if no brokers are up.
6225 *
6226 * This is a non-blocking call, the connection is
6227 * performed by the selected broker thread.
6228 *
6229 * @locality any
6230 * @locks rd_kafka_rdlock() MUST be held
6231 */
rd_kafka_connect_any(rd_kafka_t * rk,const char * reason)6232 void rd_kafka_connect_any (rd_kafka_t *rk, const char *reason) {
6233 rd_kafka_broker_t *rkb;
6234 rd_ts_t suppr;
6235
6236 /* Don't count connections to logical brokers since they serve
6237 * a specific purpose (group coordinator) and their connections
6238 * should not be reused for other purposes.
6239 * rd_kafka_broker_random() will not return LOGICAL brokers. */
6240 if (rd_atomic32_get(&rk->rk_broker_up_cnt) -
6241 rd_atomic32_get(&rk->rk_logical_broker_up_cnt) > 0 ||
6242 rd_atomic32_get(&rk->rk_broker_cnt) -
6243 rd_atomic32_get(&rk->rk_broker_addrless_cnt) == 0)
6244 return;
6245
6246 mtx_lock(&rk->rk_suppress.sparse_connect_lock);
6247 suppr = rd_interval(&rk->rk_suppress.sparse_connect_random,
6248 rk->rk_conf.sparse_connect_intvl*1000, 0);
6249 mtx_unlock(&rk->rk_suppress.sparse_connect_lock);
6250
6251 if (suppr <= 0) {
6252 rd_kafka_dbg(rk, BROKER|RD_KAFKA_DBG_GENERIC, "CONNECT",
6253 "Not selecting any broker for cluster connection: "
6254 "still suppressed for %"PRId64"ms: %s",
6255 -suppr/1000, reason);
6256 return;
6257 }
6258
6259 /* First pass: only match brokers never connected to,
6260 * to try to exhaust the available brokers
6261 * so that an ERR_ALL_BROKERS_DOWN error can be raised. */
6262 rkb = rd_kafka_broker_random(rk, RD_KAFKA_BROKER_STATE_INIT,
6263 rd_kafka_broker_filter_never_connected,
6264 NULL);
6265 /* Second pass: match any non-connected/non-connecting broker. */
6266 if (!rkb)
6267 rkb = rd_kafka_broker_random(rk, RD_KAFKA_BROKER_STATE_INIT,
6268 NULL, NULL);
6269
6270 if (!rkb) {
6271 /* No brokers matched:
6272 * this happens if there are brokers in > INIT state,
6273 * in which case they're already connecting. */
6274
6275 rd_kafka_dbg(rk, BROKER|RD_KAFKA_DBG_GENERIC, "CONNECT",
6276 "Cluster connection already in progress: %s",
6277 reason);
6278 return;
6279 }
6280
6281 rd_rkb_dbg(rkb, BROKER|RD_KAFKA_DBG_GENERIC, "CONNECT",
6282 "Selected for cluster connection: "
6283 "%s (broker has %d connection attempt(s))",
6284 reason, rd_atomic32_get(&rkb->rkb_c.connects));
6285
6286 rd_kafka_broker_schedule_connection(rkb);
6287
6288 rd_kafka_broker_destroy(rkb); /* refcnt from ..broker_random() */
6289 }
6290
6291
6292
6293 /**
6294 * @brief Send PURGE queue request to broker.
6295 *
6296 * @locality any
6297 * @locks none
6298 */
rd_kafka_broker_purge_queues(rd_kafka_broker_t * rkb,int purge_flags,rd_kafka_replyq_t replyq)6299 void rd_kafka_broker_purge_queues (rd_kafka_broker_t *rkb, int purge_flags,
6300 rd_kafka_replyq_t replyq) {
6301 rd_kafka_op_t *rko = rd_kafka_op_new(RD_KAFKA_OP_PURGE);
6302 rd_kafka_op_set_prio(rko, RD_KAFKA_PRIO_FLASH);
6303 rko->rko_replyq = replyq;
6304 rko->rko_u.purge.flags = purge_flags;
6305 rd_kafka_q_enq(rkb->rkb_ops, rko);
6306 }
6307
6308
6309 /**
6310 * @brief Handle purge queues request
6311 *
6312 * @locality broker thread
6313 * @locks none
6314 */
rd_kafka_broker_handle_purge_queues(rd_kafka_broker_t * rkb,rd_kafka_op_t * rko)6315 static void rd_kafka_broker_handle_purge_queues (rd_kafka_broker_t *rkb,
6316 rd_kafka_op_t *rko) {
6317 int purge_flags = rko->rko_u.purge.flags;
6318 int inflight_cnt = 0, retry_cnt = 0, outq_cnt = 0, partial_cnt = 0;
6319
6320 rd_rkb_dbg(rkb, QUEUE|RD_KAFKA_DBG_TOPIC, "PURGE",
6321 "Purging queues with flags %s",
6322 rd_kafka_purge_flags2str(purge_flags));
6323
6324
6325 /**
6326 * First purge any Produce requests to move the
6327 * messages from the request's message queue to delivery reports.
6328 */
6329
6330 /* Purge in-flight ProduceRequests */
6331 if (purge_flags & RD_KAFKA_PURGE_F_INFLIGHT)
6332 inflight_cnt = rd_kafka_broker_bufq_timeout_scan(
6333 rkb, 1, &rkb->rkb_waitresps, NULL, RD_KAFKAP_Produce,
6334 RD_KAFKA_RESP_ERR__PURGE_INFLIGHT, 0, NULL, 0);
6335
6336 if (purge_flags & RD_KAFKA_PURGE_F_QUEUE) {
6337 /* Requests in retry queue */
6338 retry_cnt = rd_kafka_broker_bufq_timeout_scan(
6339 rkb, 0, &rkb->rkb_retrybufs, NULL, RD_KAFKAP_Produce,
6340 RD_KAFKA_RESP_ERR__PURGE_QUEUE, 0, NULL, 0);
6341
6342 /* Requests in transmit queue not completely sent yet.
6343 * partial_cnt is included in outq_cnt and denotes a request
6344 * that has been partially transmitted. */
6345 outq_cnt = rd_kafka_broker_bufq_timeout_scan(
6346 rkb, 0, &rkb->rkb_outbufs, &partial_cnt,
6347 RD_KAFKAP_Produce, RD_KAFKA_RESP_ERR__PURGE_QUEUE, 0,
6348 NULL, 0);
6349
6350 /* Purging a partially transmitted request will mess up
6351 * the protocol stream, so we need to disconnect from the broker
6352 * to get a clean protocol socket. */
6353 if (partial_cnt)
6354 rd_kafka_broker_fail(
6355 rkb,
6356 LOG_DEBUG,
6357 RD_KAFKA_RESP_ERR__PURGE_QUEUE,
6358 "Purged %d partially sent request: "
6359 "forcing disconnect", partial_cnt);
6360 }
6361
6362 rd_rkb_dbg(rkb, QUEUE|RD_KAFKA_DBG_TOPIC, "PURGEQ",
6363 "Purged %i in-flight, %i retry-queued, "
6364 "%i out-queue, %i partially-sent requests",
6365 inflight_cnt, retry_cnt, outq_cnt, partial_cnt);
6366
6367 /* Purge partition queues */
6368 if (purge_flags & RD_KAFKA_PURGE_F_QUEUE) {
6369 rd_kafka_toppar_t *rktp;
6370 int msg_cnt = 0;
6371 int part_cnt = 0;
6372
6373 TAILQ_FOREACH(rktp, &rkb->rkb_toppars, rktp_rkblink) {
6374 int r;
6375
6376 r = rd_kafka_toppar_purge_queues(
6377 rktp, purge_flags,
6378 rd_true/*include xmit msgq*/);
6379 if (r > 0) {
6380 msg_cnt += r;
6381 part_cnt++;
6382 }
6383 }
6384
6385 rd_rkb_dbg(rkb, QUEUE|RD_KAFKA_DBG_TOPIC, "PURGEQ",
6386 "Purged %i message(s) from %d partition(s)",
6387 msg_cnt, part_cnt);
6388 }
6389
6390 rd_kafka_op_reply(rko, RD_KAFKA_RESP_ERR_NO_ERROR);
6391 }
6392
6393
6394 /**
6395 * @brief Add toppar to broker's active list list.
6396 *
6397 * For consumer this means the fetch list.
6398 * For producers this is all partitions assigned to this broker.
6399 *
6400 * @locality broker thread
6401 * @locks rktp_lock MUST be held
6402 */
rd_kafka_broker_active_toppar_add(rd_kafka_broker_t * rkb,rd_kafka_toppar_t * rktp,const char * reason)6403 void rd_kafka_broker_active_toppar_add (rd_kafka_broker_t *rkb,
6404 rd_kafka_toppar_t *rktp,
6405 const char *reason) {
6406 int is_consumer = rkb->rkb_rk->rk_type == RD_KAFKA_CONSUMER;
6407
6408 if (is_consumer && rktp->rktp_fetch)
6409 return; /* Already added */
6410
6411 CIRCLEQ_INSERT_TAIL(&rkb->rkb_active_toppars, rktp, rktp_activelink);
6412 rkb->rkb_active_toppar_cnt++;
6413
6414 if (is_consumer)
6415 rktp->rktp_fetch = 1;
6416
6417 if (unlikely(rkb->rkb_active_toppar_cnt == 1))
6418 rd_kafka_broker_active_toppar_next(rkb, rktp);
6419
6420 rd_rkb_dbg(rkb, TOPIC, "FETCHADD",
6421 "Added %.*s [%"PRId32"] to %s list (%d entries, opv %d, "
6422 "%d messages queued): %s",
6423 RD_KAFKAP_STR_PR(rktp->rktp_rkt->rkt_topic),
6424 rktp->rktp_partition,
6425 is_consumer ? "fetch" : "active",
6426 rkb->rkb_active_toppar_cnt, rktp->rktp_fetch_version,
6427 rd_kafka_msgq_len(&rktp->rktp_msgq),
6428 reason);
6429 }
6430
6431
6432 /**
6433 * @brief Remove toppar from active list.
6434 *
6435 * Locality: broker thread
6436 * Locks: none
6437 */
rd_kafka_broker_active_toppar_del(rd_kafka_broker_t * rkb,rd_kafka_toppar_t * rktp,const char * reason)6438 void rd_kafka_broker_active_toppar_del (rd_kafka_broker_t *rkb,
6439 rd_kafka_toppar_t *rktp,
6440 const char *reason) {
6441 int is_consumer = rkb->rkb_rk->rk_type == RD_KAFKA_CONSUMER;
6442
6443 if (is_consumer && !rktp->rktp_fetch)
6444 return; /* Not added */
6445
6446 CIRCLEQ_REMOVE(&rkb->rkb_active_toppars, rktp, rktp_activelink);
6447 rd_kafka_assert(NULL, rkb->rkb_active_toppar_cnt > 0);
6448 rkb->rkb_active_toppar_cnt--;
6449
6450 if (is_consumer)
6451 rktp->rktp_fetch = 0;
6452
6453 if (rkb->rkb_active_toppar_next == rktp) {
6454 /* Update next pointer */
6455 rd_kafka_broker_active_toppar_next(
6456 rkb, CIRCLEQ_LOOP_NEXT(&rkb->rkb_active_toppars,
6457 rktp, rktp_activelink));
6458 }
6459
6460 rd_rkb_dbg(rkb, TOPIC, "FETCHADD",
6461 "Removed %.*s [%"PRId32"] from %s list "
6462 "(%d entries, opv %d): %s",
6463 RD_KAFKAP_STR_PR(rktp->rktp_rkt->rkt_topic),
6464 rktp->rktp_partition,
6465 is_consumer ? "fetch" : "active",
6466 rkb->rkb_active_toppar_cnt, rktp->rktp_fetch_version,
6467 reason);
6468
6469 }
6470
6471
6472 /**
6473 * @brief Schedule connection for \p rkb.
6474 * Will trigger disconnection for logical brokers whose nodename
6475 * was changed.
6476 *
6477 * @locality any
6478 * @locks none
6479 */
rd_kafka_broker_schedule_connection(rd_kafka_broker_t * rkb)6480 void rd_kafka_broker_schedule_connection (rd_kafka_broker_t *rkb) {
6481 rd_kafka_op_t *rko;
6482
6483 rko = rd_kafka_op_new(RD_KAFKA_OP_CONNECT);
6484 rd_kafka_op_set_prio(rko, RD_KAFKA_PRIO_FLASH);
6485 rd_kafka_q_enq(rkb->rkb_ops, rko);
6486 }
6487
6488
6489 /**
6490 * @brief Add need for persistent connection to \p rkb
6491 * with rkb_persistconn atomic counter \p acntp
6492 *
6493 * @locality any
6494 * @locks none
6495 */
6496 void
rd_kafka_broker_persistent_connection_add(rd_kafka_broker_t * rkb,rd_atomic32_t * acntp)6497 rd_kafka_broker_persistent_connection_add (rd_kafka_broker_t *rkb,
6498 rd_atomic32_t *acntp) {
6499
6500 if (rd_atomic32_add(acntp, 1) == 1) {
6501 /* First one, trigger event. */
6502 rd_kafka_broker_schedule_connection(rkb);
6503 }
6504 }
6505
6506
6507 /**
6508 * @brief Remove need for persistent connection to \p rkb
6509 * with rkb_persistconn atomic counter \p acntp
6510 *
6511 * @locality any
6512 * @locks none
6513 */
6514 void
rd_kafka_broker_persistent_connection_del(rd_kafka_broker_t * rkb,rd_atomic32_t * acntp)6515 rd_kafka_broker_persistent_connection_del (rd_kafka_broker_t *rkb,
6516 rd_atomic32_t *acntp) {
6517 int32_t r = rd_atomic32_sub(acntp, 1);
6518 rd_assert(r >= 0);
6519 }
6520
6521
6522
6523 /**
6524 * @brief OP_BROKER_MONITOR callback trampoline which
6525 * calls the rkbmon's callback.
6526 *
6527 * @locality monitoree's op handler thread
6528 * @locks none
6529 */
rd_kafka_broker_monitor_op_cb(rd_kafka_t * rk,rd_kafka_q_t * rkq,rd_kafka_op_t * rko)6530 static rd_kafka_op_res_t rd_kafka_broker_monitor_op_cb (rd_kafka_t *rk,
6531 rd_kafka_q_t *rkq,
6532 rd_kafka_op_t *rko) {
6533 if (rko->rko_err != RD_KAFKA_RESP_ERR__DESTROY)
6534 rko->rko_u.broker_monitor.cb(rko->rko_u.broker_monitor.rkb);
6535 return RD_KAFKA_OP_RES_HANDLED;
6536 }
6537
6538 /**
6539 * @brief Trigger ops for registered monitors when the broker
6540 * state goes from or to UP.
6541 *
6542 * @locality broker thread
6543 * @locks rkb_lock MUST be held
6544 */
rd_kafka_broker_trigger_monitors(rd_kafka_broker_t * rkb)6545 static void rd_kafka_broker_trigger_monitors (rd_kafka_broker_t *rkb) {
6546 rd_kafka_broker_monitor_t *rkbmon;
6547
6548 TAILQ_FOREACH(rkbmon, &rkb->rkb_monitors, rkbmon_link) {
6549 rd_kafka_op_t *rko = rd_kafka_op_new_cb(
6550 rkb->rkb_rk,
6551 RD_KAFKA_OP_BROKER_MONITOR,
6552 rd_kafka_broker_monitor_op_cb);
6553 rd_kafka_broker_keep(rkb);
6554 rko->rko_u.broker_monitor.rkb = rkb;
6555 rko->rko_u.broker_monitor.cb = rkbmon->rkbmon_cb;
6556 rd_kafka_q_enq(rkbmon->rkbmon_q, rko);
6557 }
6558 }
6559
6560
6561 /**
6562 * @brief Adds a monitor for when the broker goes up or down.
6563 *
6564 * The callback will be triggered on the caller's op queue handler thread.
6565 *
6566 * Use rd_kafka_broker_is_up() in your callback to get the current
6567 * state of the broker, since it might have changed since the event
6568 * was enqueued.
6569 *
6570 * @param rkbmon monitoree's monitor.
6571 * @param rkb broker to monitor.
6572 * @param rkq queue for event op.
6573 * @param callback callback to be triggered from \p rkq's op handler.
6574 * @opaque opaque passed to callback.
6575 *
6576 * @locks none
6577 * @locality any
6578 */
rd_kafka_broker_monitor_add(rd_kafka_broker_monitor_t * rkbmon,rd_kafka_broker_t * rkb,rd_kafka_q_t * rkq,void (* callback)(rd_kafka_broker_t * rkb))6579 void rd_kafka_broker_monitor_add (rd_kafka_broker_monitor_t *rkbmon,
6580 rd_kafka_broker_t *rkb,
6581 rd_kafka_q_t *rkq,
6582 void (*callback) (rd_kafka_broker_t *rkb)) {
6583 rd_assert(!rkbmon->rkbmon_rkb);
6584 rkbmon->rkbmon_rkb = rkb;
6585 rkbmon->rkbmon_q = rkq;
6586 rd_kafka_q_keep(rkbmon->rkbmon_q);
6587 rkbmon->rkbmon_cb = callback;
6588
6589 rd_kafka_broker_keep(rkb);
6590
6591 rd_kafka_broker_lock(rkb);
6592 TAILQ_INSERT_TAIL(&rkb->rkb_monitors, rkbmon, rkbmon_link);
6593 rd_kafka_broker_unlock(rkb);
6594 }
6595
6596
6597 /**
6598 * @brief Removes a monitor previously added with
6599 * rd_kafka_broker_monitor_add().
6600 *
6601 * @warning The rkbmon's callback may still be called after
6602 * _del() has been called due to the buffering nature
6603 * of op queues.
6604 *
6605 * @locks none
6606 * @locality any
6607 */
rd_kafka_broker_monitor_del(rd_kafka_broker_monitor_t * rkbmon)6608 void rd_kafka_broker_monitor_del (rd_kafka_broker_monitor_t *rkbmon) {
6609 rd_kafka_broker_t *rkb = rkbmon->rkbmon_rkb;
6610
6611 if (!rkb)
6612 return;
6613
6614 rd_kafka_broker_lock(rkb);
6615 rkbmon->rkbmon_rkb = NULL;
6616 rd_kafka_q_destroy(rkbmon->rkbmon_q);
6617 TAILQ_REMOVE(&rkb->rkb_monitors, rkbmon, rkbmon_link);
6618 rd_kafka_broker_unlock(rkb);
6619
6620 rd_kafka_broker_destroy(rkb);
6621 }
6622
6623
6624
6625 /**
6626 * @name Unit tests
6627 * @{
6628 *
6629 */
unittest_broker(void)6630 int unittest_broker (void) {
6631 int fails = 0;
6632
6633 fails += rd_ut_reconnect_backoff();
6634
6635 return fails;
6636 }
6637
6638 /**@}*/
6639