1 /*
2 * librdkafka - Apache Kafka C library
3 *
4 * Copyright (c) 2012-2013, Magnus Edenhill
5 * All rights reserved.
6 *
7 * Redistribution and use in source and binary forms, with or without
8 * modification, are permitted provided that the following conditions are met:
9 *
10 * 1. Redistributions of source code must retain the above copyright notice,
11 * this list of conditions and the following disclaimer.
12 * 2. Redistributions in binary form must reproduce the above copyright notice,
13 * this list of conditions and the following disclaimer in the documentation
14 * and/or other materials provided with the distribution.
15 *
16 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
17 * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
18 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
19 * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
20 * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
21 * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
22 * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
23 * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
24 * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
25 * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
26 * POSSIBILITY OF SUCH DAMAGE.
27 */
28
29
30 #define _GNU_SOURCE
31 #include <errno.h>
32 #include <string.h>
33 #include <stdarg.h>
34 #include <signal.h>
35 #include <stdlib.h>
36 #include <sys/stat.h>
37 #if !_WIN32
38 #include <sys/types.h>
39 #include <dirent.h>
40 #endif
41
42 #include "rdkafka_int.h"
43 #include "rdkafka_msg.h"
44 #include "rdkafka_broker.h"
45 #include "rdkafka_topic.h"
46 #include "rdkafka_partition.h"
47 #include "rdkafka_offset.h"
48 #include "rdkafka_transport.h"
49 #include "rdkafka_cgrp.h"
50 #include "rdkafka_assignor.h"
51 #include "rdkafka_request.h"
52 #include "rdkafka_event.h"
53 #include "rdkafka_sasl.h"
54 #include "rdkafka_interceptor.h"
55 #include "rdkafka_idempotence.h"
56 #include "rdkafka_sasl_oauthbearer.h"
57 #if WITH_SSL
58 #include "rdkafka_ssl.h"
59 #endif
60
61 #include "rdtime.h"
62 #include "crc32c.h"
63 #include "rdunittest.h"
64
65 #ifdef _WIN32
66 #include <sys/types.h>
67 #include <sys/timeb.h>
68 #endif
69
70
71
72 static once_flag rd_kafka_global_init_once = ONCE_FLAG_INIT;
73 static once_flag rd_kafka_global_srand_once = ONCE_FLAG_INIT;
74
75 /**
76 * @brief Global counter+lock for all active librdkafka instances
77 */
78 mtx_t rd_kafka_global_lock;
79 int rd_kafka_global_cnt;
80
81
82 /**
83 * Last API error code, per thread.
84 * Shared among all rd_kafka_t instances.
85 */
86 rd_kafka_resp_err_t RD_TLS rd_kafka_last_error_code;
87
88
89 /**
90 * Current number of threads created by rdkafka.
91 * This is used in regression tests.
92 */
93 rd_atomic32_t rd_kafka_thread_cnt_curr;
rd_kafka_thread_cnt(void)94 int rd_kafka_thread_cnt (void) {
95 return rd_atomic32_get(&rd_kafka_thread_cnt_curr);
96 }
97
98 /**
99 * Current thread's log name (TLS)
100 */
101 char RD_TLS rd_kafka_thread_name[64] = "app";
102
rd_kafka_set_thread_name(const char * fmt,...)103 void rd_kafka_set_thread_name (const char *fmt, ...) {
104 va_list ap;
105
106 va_start(ap, fmt);
107 rd_vsnprintf(rd_kafka_thread_name, sizeof(rd_kafka_thread_name),
108 fmt, ap);
109 va_end(ap);
110 }
111
112 /**
113 * @brief Current thread's system name (TLS)
114 *
115 * Note the name must be 15 characters or less, because it is passed to
116 * pthread_setname_np on Linux which imposes this limit.
117 */
118 static char RD_TLS rd_kafka_thread_sysname[16] = "app";
119
rd_kafka_set_thread_sysname(const char * fmt,...)120 void rd_kafka_set_thread_sysname (const char *fmt, ...) {
121 va_list ap;
122
123 va_start(ap, fmt);
124 rd_vsnprintf(rd_kafka_thread_sysname, sizeof(rd_kafka_thread_sysname),
125 fmt, ap);
126 va_end(ap);
127
128 thrd_setname(rd_kafka_thread_sysname);
129 }
130
rd_kafka_global_init0(void)131 static void rd_kafka_global_init0 (void) {
132 mtx_init(&rd_kafka_global_lock, mtx_plain);
133 #if ENABLE_DEVEL
134 rd_atomic32_init(&rd_kafka_op_cnt, 0);
135 #endif
136 rd_crc32c_global_init();
137 #if WITH_SSL
138 /* The configuration interface might need to use
139 * OpenSSL to parse keys, prior to any rd_kafka_t
140 * object has been created. */
141 rd_kafka_ssl_init();
142 #endif
143 }
144
145 /**
146 * @brief Initialize once per process
147 */
rd_kafka_global_init(void)148 void rd_kafka_global_init (void) {
149 call_once(&rd_kafka_global_init_once, rd_kafka_global_init0);
150 }
151
152
153 /**
154 * @brief Seed the PRNG with current_time.milliseconds
155 */
rd_kafka_global_srand(void)156 static void rd_kafka_global_srand (void) {
157 struct timeval tv;
158
159 rd_gettimeofday(&tv, NULL);
160
161 srand((unsigned int)(tv.tv_usec / 1000));
162 }
163
164
165 /**
166 * @returns the current number of active librdkafka instances
167 */
rd_kafka_global_cnt_get(void)168 static int rd_kafka_global_cnt_get (void) {
169 int r;
170 mtx_lock(&rd_kafka_global_lock);
171 r = rd_kafka_global_cnt;
172 mtx_unlock(&rd_kafka_global_lock);
173 return r;
174 }
175
176
177 /**
178 * @brief Increase counter for active librdkafka instances.
179 * If this is the first instance the global constructors will be called, if any.
180 */
rd_kafka_global_cnt_incr(void)181 static void rd_kafka_global_cnt_incr (void) {
182 mtx_lock(&rd_kafka_global_lock);
183 rd_kafka_global_cnt++;
184 if (rd_kafka_global_cnt == 1) {
185 rd_kafka_transport_init();
186 #if WITH_SSL
187 rd_kafka_ssl_init();
188 #endif
189 rd_kafka_sasl_global_init();
190 }
191 mtx_unlock(&rd_kafka_global_lock);
192 }
193
194 /**
195 * @brief Decrease counter for active librdkafka instances.
196 * If this counter reaches 0 the global destructors will be called, if any.
197 */
rd_kafka_global_cnt_decr(void)198 static void rd_kafka_global_cnt_decr (void) {
199 mtx_lock(&rd_kafka_global_lock);
200 rd_kafka_assert(NULL, rd_kafka_global_cnt > 0);
201 rd_kafka_global_cnt--;
202 if (rd_kafka_global_cnt == 0) {
203 rd_kafka_sasl_global_term();
204 #if WITH_SSL
205 rd_kafka_ssl_term();
206 #endif
207 }
208 mtx_unlock(&rd_kafka_global_lock);
209 }
210
211
212 /**
213 * Wait for all rd_kafka_t objects to be destroyed.
214 * Returns 0 if all kafka objects are now destroyed, or -1 if the
215 * timeout was reached.
216 */
rd_kafka_wait_destroyed(int timeout_ms)217 int rd_kafka_wait_destroyed (int timeout_ms) {
218 rd_ts_t timeout = rd_clock() + (timeout_ms * 1000);
219
220 while (rd_kafka_thread_cnt() > 0 ||
221 rd_kafka_global_cnt_get() > 0) {
222 if (rd_clock() >= timeout) {
223 rd_kafka_set_last_error(RD_KAFKA_RESP_ERR__TIMED_OUT,
224 ETIMEDOUT);
225 return -1;
226 }
227 rd_usleep(25000, NULL); /* 25ms */
228 }
229
230 return 0;
231 }
232
rd_kafka_log_buf(const rd_kafka_conf_t * conf,const rd_kafka_t * rk,int level,int ctx,const char * fac,const char * buf)233 static void rd_kafka_log_buf (const rd_kafka_conf_t *conf,
234 const rd_kafka_t *rk, int level, int ctx,
235 const char *fac, const char *buf) {
236 if (level > conf->log_level)
237 return;
238 else if (rk && conf->log_queue) {
239 rd_kafka_op_t *rko;
240
241 if (!rk->rk_logq)
242 return; /* Terminating */
243
244 rko = rd_kafka_op_new(RD_KAFKA_OP_LOG);
245 rd_kafka_op_set_prio(rko, RD_KAFKA_PRIO_MEDIUM);
246 rko->rko_u.log.level = level;
247 rd_strlcpy(rko->rko_u.log.fac, fac, sizeof(rko->rko_u.log.fac));
248 rko->rko_u.log.str = rd_strdup(buf);
249 rko->rko_u.log.ctx = ctx;
250 rd_kafka_q_enq(rk->rk_logq, rko);
251
252 } else if (conf->log_cb) {
253 conf->log_cb(rk, level, fac, buf);
254 }
255 }
256
257 /**
258 * @brief Logger
259 *
260 * @remark conf must be set, but rk may be NULL
261 */
rd_kafka_log0(const rd_kafka_conf_t * conf,const rd_kafka_t * rk,const char * extra,int level,int ctx,const char * fac,const char * fmt,...)262 void rd_kafka_log0 (const rd_kafka_conf_t *conf,
263 const rd_kafka_t *rk,
264 const char *extra, int level, int ctx,
265 const char *fac, const char *fmt, ...) {
266 char buf[2048];
267 va_list ap;
268 unsigned int elen = 0;
269 unsigned int of = 0;
270
271 if (level > conf->log_level)
272 return;
273
274 if (conf->log_thread_name) {
275 elen = rd_snprintf(buf, sizeof(buf), "[thrd:%s]: ",
276 rd_kafka_thread_name);
277 if (unlikely(elen >= sizeof(buf)))
278 elen = sizeof(buf);
279 of = elen;
280 }
281
282 if (extra) {
283 elen = rd_snprintf(buf+of, sizeof(buf)-of, "%s: ", extra);
284 if (unlikely(elen >= sizeof(buf)-of))
285 elen = sizeof(buf)-of;
286 of += elen;
287 }
288
289 va_start(ap, fmt);
290 rd_vsnprintf(buf+of, sizeof(buf)-of, fmt, ap);
291 va_end(ap);
292
293 rd_kafka_log_buf(conf, rk, level, ctx, fac, buf);
294 }
295
296 rd_kafka_resp_err_t
rd_kafka_oauthbearer_set_token(rd_kafka_t * rk,const char * token_value,int64_t md_lifetime_ms,const char * md_principal_name,const char ** extensions,size_t extension_size,char * errstr,size_t errstr_size)297 rd_kafka_oauthbearer_set_token (rd_kafka_t *rk,
298 const char *token_value,
299 int64_t md_lifetime_ms,
300 const char *md_principal_name,
301 const char **extensions, size_t extension_size,
302 char *errstr, size_t errstr_size) {
303 #if WITH_SASL_OAUTHBEARER
304 return rd_kafka_oauthbearer_set_token0(
305 rk, token_value,
306 md_lifetime_ms, md_principal_name, extensions, extension_size,
307 errstr, errstr_size);
308 #else
309 rd_snprintf(errstr, errstr_size,
310 "librdkafka not built with SASL OAUTHBEARER support");
311 return RD_KAFKA_RESP_ERR__NOT_IMPLEMENTED;
312 #endif
313 }
314
315 rd_kafka_resp_err_t
rd_kafka_oauthbearer_set_token_failure(rd_kafka_t * rk,const char * errstr)316 rd_kafka_oauthbearer_set_token_failure (rd_kafka_t *rk, const char *errstr) {
317 #if WITH_SASL_OAUTHBEARER
318 return rd_kafka_oauthbearer_set_token_failure0(rk, errstr);
319 #else
320 return RD_KAFKA_RESP_ERR__NOT_IMPLEMENTED;
321 #endif
322 }
323
rd_kafka_log_print(const rd_kafka_t * rk,int level,const char * fac,const char * buf)324 void rd_kafka_log_print(const rd_kafka_t *rk, int level,
325 const char *fac, const char *buf) {
326 int secs, msecs;
327 struct timeval tv;
328 rd_gettimeofday(&tv, NULL);
329 secs = (int)tv.tv_sec;
330 msecs = (int)(tv.tv_usec / 1000);
331 fprintf(stderr, "%%%i|%u.%03u|%s|%s| %s\n",
332 level, secs, msecs,
333 fac, rk ? rk->rk_name : "", buf);
334 }
335
rd_kafka_log_syslog(const rd_kafka_t * rk,int level,const char * fac,const char * buf)336 void rd_kafka_log_syslog (const rd_kafka_t *rk, int level,
337 const char *fac, const char *buf) {
338 #if WITH_SYSLOG
339 static int initialized = 0;
340
341 if (!initialized)
342 openlog("rdkafka", LOG_PID|LOG_CONS, LOG_USER);
343
344 syslog(level, "%s: %s: %s", fac, rk ? rk->rk_name : "", buf);
345 #else
346 rd_assert(!*"syslog support not enabled in this build");
347 #endif
348 }
349
rd_kafka_set_logger(rd_kafka_t * rk,void (* func)(const rd_kafka_t * rk,int level,const char * fac,const char * buf))350 void rd_kafka_set_logger (rd_kafka_t *rk,
351 void (*func) (const rd_kafka_t *rk, int level,
352 const char *fac, const char *buf)) {
353 #if !WITH_SYSLOG
354 if (func == rd_kafka_log_syslog)
355 rd_assert(!*"syslog support not enabled in this build");
356 #endif
357 rk->rk_conf.log_cb = func;
358 }
359
rd_kafka_set_log_level(rd_kafka_t * rk,int level)360 void rd_kafka_set_log_level (rd_kafka_t *rk, int level) {
361 rk->rk_conf.log_level = level;
362 }
363
364
365
366
367
368
rd_kafka_type2str(rd_kafka_type_t type)369 static const char *rd_kafka_type2str (rd_kafka_type_t type) {
370 static const char *types[] = {
371 [RD_KAFKA_PRODUCER] = "producer",
372 [RD_KAFKA_CONSUMER] = "consumer",
373 };
374 return types[type];
375 }
376
377 #define _ERR_DESC(ENUM,DESC) \
378 [ENUM - RD_KAFKA_RESP_ERR__BEGIN] = { ENUM, &(# ENUM)[18]/*pfx*/, DESC }
379
380 static const struct rd_kafka_err_desc rd_kafka_err_descs[] = {
381 _ERR_DESC(RD_KAFKA_RESP_ERR__BEGIN, NULL),
382 _ERR_DESC(RD_KAFKA_RESP_ERR__BAD_MSG,
383 "Local: Bad message format"),
384 _ERR_DESC(RD_KAFKA_RESP_ERR__BAD_COMPRESSION,
385 "Local: Invalid compressed data"),
386 _ERR_DESC(RD_KAFKA_RESP_ERR__DESTROY,
387 "Local: Broker handle destroyed"),
388 _ERR_DESC(RD_KAFKA_RESP_ERR__FAIL,
389 "Local: Communication failure with broker"), //FIXME: too specific
390 _ERR_DESC(RD_KAFKA_RESP_ERR__TRANSPORT,
391 "Local: Broker transport failure"),
392 _ERR_DESC(RD_KAFKA_RESP_ERR__CRIT_SYS_RESOURCE,
393 "Local: Critical system resource failure"),
394 _ERR_DESC(RD_KAFKA_RESP_ERR__RESOLVE,
395 "Local: Host resolution failure"),
396 _ERR_DESC(RD_KAFKA_RESP_ERR__MSG_TIMED_OUT,
397 "Local: Message timed out"),
398 _ERR_DESC(RD_KAFKA_RESP_ERR__PARTITION_EOF,
399 "Broker: No more messages"),
400 _ERR_DESC(RD_KAFKA_RESP_ERR__UNKNOWN_PARTITION,
401 "Local: Unknown partition"),
402 _ERR_DESC(RD_KAFKA_RESP_ERR__FS,
403 "Local: File or filesystem error"),
404 _ERR_DESC(RD_KAFKA_RESP_ERR__UNKNOWN_TOPIC,
405 "Local: Unknown topic"),
406 _ERR_DESC(RD_KAFKA_RESP_ERR__ALL_BROKERS_DOWN,
407 "Local: All broker connections are down"),
408 _ERR_DESC(RD_KAFKA_RESP_ERR__INVALID_ARG,
409 "Local: Invalid argument or configuration"),
410 _ERR_DESC(RD_KAFKA_RESP_ERR__TIMED_OUT,
411 "Local: Timed out"),
412 _ERR_DESC(RD_KAFKA_RESP_ERR__QUEUE_FULL,
413 "Local: Queue full"),
414 _ERR_DESC(RD_KAFKA_RESP_ERR__ISR_INSUFF,
415 "Local: ISR count insufficient"),
416 _ERR_DESC(RD_KAFKA_RESP_ERR__NODE_UPDATE,
417 "Local: Broker node update"),
418 _ERR_DESC(RD_KAFKA_RESP_ERR__SSL,
419 "Local: SSL error"),
420 _ERR_DESC(RD_KAFKA_RESP_ERR__WAIT_COORD,
421 "Local: Waiting for coordinator"),
422 _ERR_DESC(RD_KAFKA_RESP_ERR__UNKNOWN_GROUP,
423 "Local: Unknown group"),
424 _ERR_DESC(RD_KAFKA_RESP_ERR__IN_PROGRESS,
425 "Local: Operation in progress"),
426 _ERR_DESC(RD_KAFKA_RESP_ERR__PREV_IN_PROGRESS,
427 "Local: Previous operation in progress"),
428 _ERR_DESC(RD_KAFKA_RESP_ERR__EXISTING_SUBSCRIPTION,
429 "Local: Existing subscription"),
430 _ERR_DESC(RD_KAFKA_RESP_ERR__ASSIGN_PARTITIONS,
431 "Local: Assign partitions"),
432 _ERR_DESC(RD_KAFKA_RESP_ERR__REVOKE_PARTITIONS,
433 "Local: Revoke partitions"),
434 _ERR_DESC(RD_KAFKA_RESP_ERR__CONFLICT,
435 "Local: Conflicting use"),
436 _ERR_DESC(RD_KAFKA_RESP_ERR__STATE,
437 "Local: Erroneous state"),
438 _ERR_DESC(RD_KAFKA_RESP_ERR__UNKNOWN_PROTOCOL,
439 "Local: Unknown protocol"),
440 _ERR_DESC(RD_KAFKA_RESP_ERR__NOT_IMPLEMENTED,
441 "Local: Not implemented"),
442 _ERR_DESC(RD_KAFKA_RESP_ERR__AUTHENTICATION,
443 "Local: Authentication failure"),
444 _ERR_DESC(RD_KAFKA_RESP_ERR__NO_OFFSET,
445 "Local: No offset stored"),
446 _ERR_DESC(RD_KAFKA_RESP_ERR__OUTDATED,
447 "Local: Outdated"),
448 _ERR_DESC(RD_KAFKA_RESP_ERR__TIMED_OUT_QUEUE,
449 "Local: Timed out in queue"),
450 _ERR_DESC(RD_KAFKA_RESP_ERR__UNSUPPORTED_FEATURE,
451 "Local: Required feature not supported by broker"),
452 _ERR_DESC(RD_KAFKA_RESP_ERR__WAIT_CACHE,
453 "Local: Awaiting cache update"),
454 _ERR_DESC(RD_KAFKA_RESP_ERR__INTR,
455 "Local: Operation interrupted"),
456 _ERR_DESC(RD_KAFKA_RESP_ERR__KEY_SERIALIZATION,
457 "Local: Key serialization error"),
458 _ERR_DESC(RD_KAFKA_RESP_ERR__VALUE_SERIALIZATION,
459 "Local: Value serialization error"),
460 _ERR_DESC(RD_KAFKA_RESP_ERR__KEY_DESERIALIZATION,
461 "Local: Key deserialization error"),
462 _ERR_DESC(RD_KAFKA_RESP_ERR__VALUE_DESERIALIZATION,
463 "Local: Value deserialization error"),
464 _ERR_DESC(RD_KAFKA_RESP_ERR__PARTIAL,
465 "Local: Partial response"),
466 _ERR_DESC(RD_KAFKA_RESP_ERR__READ_ONLY,
467 "Local: Read-only object"),
468 _ERR_DESC(RD_KAFKA_RESP_ERR__NOENT,
469 "Local: No such entry"),
470 _ERR_DESC(RD_KAFKA_RESP_ERR__UNDERFLOW,
471 "Local: Read underflow"),
472 _ERR_DESC(RD_KAFKA_RESP_ERR__INVALID_TYPE,
473 "Local: Invalid type"),
474 _ERR_DESC(RD_KAFKA_RESP_ERR__RETRY,
475 "Local: Retry operation"),
476 _ERR_DESC(RD_KAFKA_RESP_ERR__PURGE_QUEUE,
477 "Local: Purged in queue"),
478 _ERR_DESC(RD_KAFKA_RESP_ERR__PURGE_INFLIGHT,
479 "Local: Purged in flight"),
480 _ERR_DESC(RD_KAFKA_RESP_ERR__FATAL,
481 "Local: Fatal error"),
482 _ERR_DESC(RD_KAFKA_RESP_ERR__INCONSISTENT,
483 "Local: Inconsistent state"),
484 _ERR_DESC(RD_KAFKA_RESP_ERR__GAPLESS_GUARANTEE,
485 "Local: Gap-less ordering would not be guaranteed "
486 "if proceeding"),
487 _ERR_DESC(RD_KAFKA_RESP_ERR__MAX_POLL_EXCEEDED,
488 "Local: Maximum application poll interval "
489 "(max.poll.interval.ms) exceeded"),
490 _ERR_DESC(RD_KAFKA_RESP_ERR__UNKNOWN_BROKER,
491 "Local: Unknown broker"),
492 _ERR_DESC(RD_KAFKA_RESP_ERR__NOT_CONFIGURED,
493 "Local: Functionality not configured"),
494 _ERR_DESC(RD_KAFKA_RESP_ERR__FENCED,
495 "Local: This instance has been fenced by a newer instance"),
496 _ERR_DESC(RD_KAFKA_RESP_ERR__APPLICATION,
497 "Local: Application generated error"),
498 _ERR_DESC(RD_KAFKA_RESP_ERR__ASSIGNMENT_LOST,
499 "Local: Group partition assignment lost"),
500 _ERR_DESC(RD_KAFKA_RESP_ERR__NOOP,
501 "Local: No operation performed"),
502 _ERR_DESC(RD_KAFKA_RESP_ERR__AUTO_OFFSET_RESET,
503 "Local: No offset to automatically reset to"),
504
505 _ERR_DESC(RD_KAFKA_RESP_ERR_UNKNOWN,
506 "Unknown broker error"),
507 _ERR_DESC(RD_KAFKA_RESP_ERR_NO_ERROR,
508 "Success"),
509 _ERR_DESC(RD_KAFKA_RESP_ERR_OFFSET_OUT_OF_RANGE,
510 "Broker: Offset out of range"),
511 _ERR_DESC(RD_KAFKA_RESP_ERR_INVALID_MSG,
512 "Broker: Invalid message"),
513 _ERR_DESC(RD_KAFKA_RESP_ERR_UNKNOWN_TOPIC_OR_PART,
514 "Broker: Unknown topic or partition"),
515 _ERR_DESC(RD_KAFKA_RESP_ERR_INVALID_MSG_SIZE,
516 "Broker: Invalid message size"),
517 _ERR_DESC(RD_KAFKA_RESP_ERR_LEADER_NOT_AVAILABLE,
518 "Broker: Leader not available"),
519 _ERR_DESC(RD_KAFKA_RESP_ERR_NOT_LEADER_FOR_PARTITION,
520 "Broker: Not leader for partition"),
521 _ERR_DESC(RD_KAFKA_RESP_ERR_REQUEST_TIMED_OUT,
522 "Broker: Request timed out"),
523 _ERR_DESC(RD_KAFKA_RESP_ERR_BROKER_NOT_AVAILABLE,
524 "Broker: Broker not available"),
525 _ERR_DESC(RD_KAFKA_RESP_ERR_REPLICA_NOT_AVAILABLE,
526 "Broker: Replica not available"),
527 _ERR_DESC(RD_KAFKA_RESP_ERR_MSG_SIZE_TOO_LARGE,
528 "Broker: Message size too large"),
529 _ERR_DESC(RD_KAFKA_RESP_ERR_STALE_CTRL_EPOCH,
530 "Broker: StaleControllerEpochCode"),
531 _ERR_DESC(RD_KAFKA_RESP_ERR_OFFSET_METADATA_TOO_LARGE,
532 "Broker: Offset metadata string too large"),
533 _ERR_DESC(RD_KAFKA_RESP_ERR_NETWORK_EXCEPTION,
534 "Broker: Broker disconnected before response received"),
535 _ERR_DESC(RD_KAFKA_RESP_ERR_COORDINATOR_LOAD_IN_PROGRESS,
536 "Broker: Coordinator load in progress"),
537 _ERR_DESC(RD_KAFKA_RESP_ERR_COORDINATOR_NOT_AVAILABLE,
538 "Broker: Coordinator not available"),
539 _ERR_DESC(RD_KAFKA_RESP_ERR_NOT_COORDINATOR,
540 "Broker: Not coordinator"),
541 _ERR_DESC(RD_KAFKA_RESP_ERR_TOPIC_EXCEPTION,
542 "Broker: Invalid topic"),
543 _ERR_DESC(RD_KAFKA_RESP_ERR_RECORD_LIST_TOO_LARGE,
544 "Broker: Message batch larger than configured server "
545 "segment size"),
546 _ERR_DESC(RD_KAFKA_RESP_ERR_NOT_ENOUGH_REPLICAS,
547 "Broker: Not enough in-sync replicas"),
548 _ERR_DESC(RD_KAFKA_RESP_ERR_NOT_ENOUGH_REPLICAS_AFTER_APPEND,
549 "Broker: Message(s) written to insufficient number of "
550 "in-sync replicas"),
551 _ERR_DESC(RD_KAFKA_RESP_ERR_INVALID_REQUIRED_ACKS,
552 "Broker: Invalid required acks value"),
553 _ERR_DESC(RD_KAFKA_RESP_ERR_ILLEGAL_GENERATION,
554 "Broker: Specified group generation id is not valid"),
555 _ERR_DESC(RD_KAFKA_RESP_ERR_INCONSISTENT_GROUP_PROTOCOL,
556 "Broker: Inconsistent group protocol"),
557 _ERR_DESC(RD_KAFKA_RESP_ERR_INVALID_GROUP_ID,
558 "Broker: Invalid group.id"),
559 _ERR_DESC(RD_KAFKA_RESP_ERR_UNKNOWN_MEMBER_ID,
560 "Broker: Unknown member"),
561 _ERR_DESC(RD_KAFKA_RESP_ERR_INVALID_SESSION_TIMEOUT,
562 "Broker: Invalid session timeout"),
563 _ERR_DESC(RD_KAFKA_RESP_ERR_REBALANCE_IN_PROGRESS,
564 "Broker: Group rebalance in progress"),
565 _ERR_DESC(RD_KAFKA_RESP_ERR_INVALID_COMMIT_OFFSET_SIZE,
566 "Broker: Commit offset data size is not valid"),
567 _ERR_DESC(RD_KAFKA_RESP_ERR_TOPIC_AUTHORIZATION_FAILED,
568 "Broker: Topic authorization failed"),
569 _ERR_DESC(RD_KAFKA_RESP_ERR_GROUP_AUTHORIZATION_FAILED,
570 "Broker: Group authorization failed"),
571 _ERR_DESC(RD_KAFKA_RESP_ERR_CLUSTER_AUTHORIZATION_FAILED,
572 "Broker: Cluster authorization failed"),
573 _ERR_DESC(RD_KAFKA_RESP_ERR_INVALID_TIMESTAMP,
574 "Broker: Invalid timestamp"),
575 _ERR_DESC(RD_KAFKA_RESP_ERR_UNSUPPORTED_SASL_MECHANISM,
576 "Broker: Unsupported SASL mechanism"),
577 _ERR_DESC(RD_KAFKA_RESP_ERR_ILLEGAL_SASL_STATE,
578 "Broker: Request not valid in current SASL state"),
579 _ERR_DESC(RD_KAFKA_RESP_ERR_UNSUPPORTED_VERSION,
580 "Broker: API version not supported"),
581 _ERR_DESC(RD_KAFKA_RESP_ERR_TOPIC_ALREADY_EXISTS,
582 "Broker: Topic already exists"),
583 _ERR_DESC(RD_KAFKA_RESP_ERR_INVALID_PARTITIONS,
584 "Broker: Invalid number of partitions"),
585 _ERR_DESC(RD_KAFKA_RESP_ERR_INVALID_REPLICATION_FACTOR,
586 "Broker: Invalid replication factor"),
587 _ERR_DESC(RD_KAFKA_RESP_ERR_INVALID_REPLICA_ASSIGNMENT,
588 "Broker: Invalid replica assignment"),
589 _ERR_DESC(RD_KAFKA_RESP_ERR_INVALID_CONFIG,
590 "Broker: Configuration is invalid"),
591 _ERR_DESC(RD_KAFKA_RESP_ERR_NOT_CONTROLLER,
592 "Broker: Not controller for cluster"),
593 _ERR_DESC(RD_KAFKA_RESP_ERR_INVALID_REQUEST,
594 "Broker: Invalid request"),
595 _ERR_DESC(RD_KAFKA_RESP_ERR_UNSUPPORTED_FOR_MESSAGE_FORMAT,
596 "Broker: Message format on broker does not support request"),
597 _ERR_DESC(RD_KAFKA_RESP_ERR_POLICY_VIOLATION,
598 "Broker: Policy violation"),
599 _ERR_DESC(RD_KAFKA_RESP_ERR_OUT_OF_ORDER_SEQUENCE_NUMBER,
600 "Broker: Broker received an out of order sequence number"),
601 _ERR_DESC(RD_KAFKA_RESP_ERR_DUPLICATE_SEQUENCE_NUMBER,
602 "Broker: Broker received a duplicate sequence number"),
603 _ERR_DESC(RD_KAFKA_RESP_ERR_INVALID_PRODUCER_EPOCH,
604 "Broker: Producer attempted an operation with an old epoch"),
605 _ERR_DESC(RD_KAFKA_RESP_ERR_INVALID_TXN_STATE,
606 "Broker: Producer attempted a transactional operation in "
607 "an invalid state"),
608 _ERR_DESC(RD_KAFKA_RESP_ERR_INVALID_PRODUCER_ID_MAPPING,
609 "Broker: Producer attempted to use a producer id which is "
610 "not currently assigned to its transactional id"),
611 _ERR_DESC(RD_KAFKA_RESP_ERR_INVALID_TRANSACTION_TIMEOUT,
612 "Broker: Transaction timeout is larger than the maximum "
613 "value allowed by the broker's max.transaction.timeout.ms"),
614 _ERR_DESC(RD_KAFKA_RESP_ERR_CONCURRENT_TRANSACTIONS,
615 "Broker: Producer attempted to update a transaction while "
616 "another concurrent operation on the same transaction was "
617 "ongoing"),
618 _ERR_DESC(RD_KAFKA_RESP_ERR_TRANSACTION_COORDINATOR_FENCED,
619 "Broker: Indicates that the transaction coordinator sending "
620 "a WriteTxnMarker is no longer the current coordinator for "
621 "a given producer"),
622 _ERR_DESC(RD_KAFKA_RESP_ERR_TRANSACTIONAL_ID_AUTHORIZATION_FAILED,
623 "Broker: Transactional Id authorization failed"),
624 _ERR_DESC(RD_KAFKA_RESP_ERR_SECURITY_DISABLED,
625 "Broker: Security features are disabled"),
626 _ERR_DESC(RD_KAFKA_RESP_ERR_OPERATION_NOT_ATTEMPTED,
627 "Broker: Operation not attempted"),
628 _ERR_DESC(RD_KAFKA_RESP_ERR_KAFKA_STORAGE_ERROR,
629 "Broker: Disk error when trying to access log file on disk"),
630 _ERR_DESC(RD_KAFKA_RESP_ERR_LOG_DIR_NOT_FOUND,
631 "Broker: The user-specified log directory is not found "
632 "in the broker config"),
633 _ERR_DESC(RD_KAFKA_RESP_ERR_SASL_AUTHENTICATION_FAILED,
634 "Broker: SASL Authentication failed"),
635 _ERR_DESC(RD_KAFKA_RESP_ERR_UNKNOWN_PRODUCER_ID,
636 "Broker: Unknown Producer Id"),
637 _ERR_DESC(RD_KAFKA_RESP_ERR_REASSIGNMENT_IN_PROGRESS,
638 "Broker: Partition reassignment is in progress"),
639 _ERR_DESC(RD_KAFKA_RESP_ERR_DELEGATION_TOKEN_AUTH_DISABLED,
640 "Broker: Delegation Token feature is not enabled"),
641 _ERR_DESC(RD_KAFKA_RESP_ERR_DELEGATION_TOKEN_NOT_FOUND,
642 "Broker: Delegation Token is not found on server"),
643 _ERR_DESC(RD_KAFKA_RESP_ERR_DELEGATION_TOKEN_OWNER_MISMATCH,
644 "Broker: Specified Principal is not valid Owner/Renewer"),
645 _ERR_DESC(RD_KAFKA_RESP_ERR_DELEGATION_TOKEN_REQUEST_NOT_ALLOWED,
646 "Broker: Delegation Token requests are not allowed on "
647 "this connection"),
648 _ERR_DESC(RD_KAFKA_RESP_ERR_DELEGATION_TOKEN_AUTHORIZATION_FAILED,
649 "Broker: Delegation Token authorization failed"),
650 _ERR_DESC(RD_KAFKA_RESP_ERR_DELEGATION_TOKEN_EXPIRED,
651 "Broker: Delegation Token is expired"),
652 _ERR_DESC(RD_KAFKA_RESP_ERR_INVALID_PRINCIPAL_TYPE,
653 "Broker: Supplied principalType is not supported"),
654 _ERR_DESC(RD_KAFKA_RESP_ERR_NON_EMPTY_GROUP,
655 "Broker: The group is not empty"),
656 _ERR_DESC(RD_KAFKA_RESP_ERR_GROUP_ID_NOT_FOUND,
657 "Broker: The group id does not exist"),
658 _ERR_DESC(RD_KAFKA_RESP_ERR_FETCH_SESSION_ID_NOT_FOUND,
659 "Broker: The fetch session ID was not found"),
660 _ERR_DESC(RD_KAFKA_RESP_ERR_INVALID_FETCH_SESSION_EPOCH,
661 "Broker: The fetch session epoch is invalid"),
662 _ERR_DESC(RD_KAFKA_RESP_ERR_LISTENER_NOT_FOUND,
663 "Broker: No matching listener"),
664 _ERR_DESC(RD_KAFKA_RESP_ERR_TOPIC_DELETION_DISABLED,
665 "Broker: Topic deletion is disabled"),
666 _ERR_DESC(RD_KAFKA_RESP_ERR_FENCED_LEADER_EPOCH,
667 "Broker: Leader epoch is older than broker epoch"),
668 _ERR_DESC(RD_KAFKA_RESP_ERR_UNKNOWN_LEADER_EPOCH,
669 "Broker: Leader epoch is newer than broker epoch"),
670 _ERR_DESC(RD_KAFKA_RESP_ERR_UNSUPPORTED_COMPRESSION_TYPE,
671 "Broker: Unsupported compression type"),
672 _ERR_DESC(RD_KAFKA_RESP_ERR_STALE_BROKER_EPOCH,
673 "Broker: Broker epoch has changed"),
674 _ERR_DESC(RD_KAFKA_RESP_ERR_OFFSET_NOT_AVAILABLE,
675 "Broker: Leader high watermark is not caught up"),
676 _ERR_DESC(RD_KAFKA_RESP_ERR_MEMBER_ID_REQUIRED,
677 "Broker: Group member needs a valid member ID"),
678 _ERR_DESC(RD_KAFKA_RESP_ERR_PREFERRED_LEADER_NOT_AVAILABLE,
679 "Broker: Preferred leader was not available"),
680 _ERR_DESC(RD_KAFKA_RESP_ERR_GROUP_MAX_SIZE_REACHED,
681 "Broker: Consumer group has reached maximum size"),
682 _ERR_DESC(RD_KAFKA_RESP_ERR_FENCED_INSTANCE_ID,
683 "Broker: Static consumer fenced by other consumer with same "
684 "group.instance.id"),
685 _ERR_DESC(RD_KAFKA_RESP_ERR_ELIGIBLE_LEADERS_NOT_AVAILABLE,
686 "Broker: Eligible partition leaders are not available"),
687 _ERR_DESC(RD_KAFKA_RESP_ERR_ELECTION_NOT_NEEDED,
688 "Broker: Leader election not needed for topic partition"),
689 _ERR_DESC(RD_KAFKA_RESP_ERR_NO_REASSIGNMENT_IN_PROGRESS,
690 "Broker: No partition reassignment is in progress"),
691 _ERR_DESC(RD_KAFKA_RESP_ERR_GROUP_SUBSCRIBED_TO_TOPIC,
692 "Broker: Deleting offsets of a topic while the consumer "
693 "group is subscribed to it"),
694 _ERR_DESC(RD_KAFKA_RESP_ERR_INVALID_RECORD,
695 "Broker: Broker failed to validate record"),
696 _ERR_DESC(RD_KAFKA_RESP_ERR_UNSTABLE_OFFSET_COMMIT,
697 "Broker: There are unstable offsets that need to be cleared"),
698 _ERR_DESC(RD_KAFKA_RESP_ERR_THROTTLING_QUOTA_EXCEEDED,
699 "Broker: Throttling quota has been exceeded"),
700 _ERR_DESC(RD_KAFKA_RESP_ERR_PRODUCER_FENCED,
701 "Broker: There is a newer producer with the same "
702 "transactionalId which fences the current one"),
703 _ERR_DESC(RD_KAFKA_RESP_ERR_RESOURCE_NOT_FOUND,
704 "Broker: Request illegally referred to resource that "
705 "does not exist"),
706 _ERR_DESC(RD_KAFKA_RESP_ERR_DUPLICATE_RESOURCE,
707 "Broker: Request illegally referred to the same resource "
708 "twice"),
709 _ERR_DESC(RD_KAFKA_RESP_ERR_UNACCEPTABLE_CREDENTIAL,
710 "Broker: Requested credential would not meet criteria for "
711 "acceptability"),
712 _ERR_DESC(RD_KAFKA_RESP_ERR_INCONSISTENT_VOTER_SET,
713 "Broker: Indicates that the either the sender or recipient "
714 "of a voter-only request is not one of the expected voters"),
715 _ERR_DESC(RD_KAFKA_RESP_ERR_INVALID_UPDATE_VERSION,
716 "Broker: Invalid update version"),
717 _ERR_DESC(RD_KAFKA_RESP_ERR_FEATURE_UPDATE_FAILED,
718 "Broker: Unable to update finalized features due to "
719 "server error"),
720 _ERR_DESC(RD_KAFKA_RESP_ERR_PRINCIPAL_DESERIALIZATION_FAILURE,
721 "Broker: Request principal deserialization failed during "
722 "forwarding"),
723
724 _ERR_DESC(RD_KAFKA_RESP_ERR__END, NULL)
725 };
726
727
rd_kafka_get_err_descs(const struct rd_kafka_err_desc ** errdescs,size_t * cntp)728 void rd_kafka_get_err_descs (const struct rd_kafka_err_desc **errdescs,
729 size_t *cntp) {
730 *errdescs = rd_kafka_err_descs;
731 *cntp = RD_ARRAYSIZE(rd_kafka_err_descs);
732 }
733
734
rd_kafka_err2str(rd_kafka_resp_err_t err)735 const char *rd_kafka_err2str (rd_kafka_resp_err_t err) {
736 static RD_TLS char ret[32];
737 int idx = err - RD_KAFKA_RESP_ERR__BEGIN;
738
739 if (unlikely(err <= RD_KAFKA_RESP_ERR__BEGIN ||
740 err >= RD_KAFKA_RESP_ERR_END_ALL ||
741 !rd_kafka_err_descs[idx].desc)) {
742 rd_snprintf(ret, sizeof(ret), "Err-%i?", err);
743 return ret;
744 }
745
746 return rd_kafka_err_descs[idx].desc;
747 }
748
749
rd_kafka_err2name(rd_kafka_resp_err_t err)750 const char *rd_kafka_err2name (rd_kafka_resp_err_t err) {
751 static RD_TLS char ret[32];
752 int idx = err - RD_KAFKA_RESP_ERR__BEGIN;
753
754 if (unlikely(err <= RD_KAFKA_RESP_ERR__BEGIN ||
755 err >= RD_KAFKA_RESP_ERR_END_ALL ||
756 !rd_kafka_err_descs[idx].desc)) {
757 rd_snprintf(ret, sizeof(ret), "ERR_%i?", err);
758 return ret;
759 }
760
761 return rd_kafka_err_descs[idx].name;
762 }
763
764
rd_kafka_last_error(void)765 rd_kafka_resp_err_t rd_kafka_last_error (void) {
766 return rd_kafka_last_error_code;
767 }
768
769
rd_kafka_errno2err(int errnox)770 rd_kafka_resp_err_t rd_kafka_errno2err (int errnox) {
771 switch (errnox)
772 {
773 case EINVAL:
774 return RD_KAFKA_RESP_ERR__INVALID_ARG;
775
776 case EBUSY:
777 return RD_KAFKA_RESP_ERR__CONFLICT;
778
779 case ENOENT:
780 return RD_KAFKA_RESP_ERR__UNKNOWN_TOPIC;
781
782 case ESRCH:
783 return RD_KAFKA_RESP_ERR__UNKNOWN_PARTITION;
784
785 case ETIMEDOUT:
786 return RD_KAFKA_RESP_ERR__TIMED_OUT;
787
788 case EMSGSIZE:
789 return RD_KAFKA_RESP_ERR_MSG_SIZE_TOO_LARGE;
790
791 case ENOBUFS:
792 return RD_KAFKA_RESP_ERR__QUEUE_FULL;
793
794 case ECANCELED:
795 return RD_KAFKA_RESP_ERR__FATAL;
796
797 default:
798 return RD_KAFKA_RESP_ERR__FAIL;
799 }
800 }
801
802
rd_kafka_fatal_error(rd_kafka_t * rk,char * errstr,size_t errstr_size)803 rd_kafka_resp_err_t rd_kafka_fatal_error (rd_kafka_t *rk,
804 char *errstr, size_t errstr_size) {
805 rd_kafka_resp_err_t err;
806
807 if (unlikely((err = rd_atomic32_get(&rk->rk_fatal.err)))) {
808 rd_kafka_rdlock(rk);
809 rd_snprintf(errstr, errstr_size, "%s", rk->rk_fatal.errstr);
810 rd_kafka_rdunlock(rk);
811 }
812
813 return err;
814 }
815
816
817 /**
818 * @brief Set's the fatal error for this instance.
819 *
820 * @param do_lock RD_DO_LOCK: rd_kafka_wrlock() will be acquired and released,
821 * RD_DONT_LOCK: caller must hold rd_kafka_wrlock().
822 *
823 * @returns 1 if the error was set, or 0 if a previous fatal error
824 * has already been set on this instance.
825 *
826 * @locality any
827 * @locks none
828 */
rd_kafka_set_fatal_error0(rd_kafka_t * rk,rd_dolock_t do_lock,rd_kafka_resp_err_t err,const char * fmt,...)829 int rd_kafka_set_fatal_error0 (rd_kafka_t *rk, rd_dolock_t do_lock,
830 rd_kafka_resp_err_t err,
831 const char *fmt, ...) {
832 va_list ap;
833 char buf[512];
834
835 if (do_lock)
836 rd_kafka_wrlock(rk);
837 rk->rk_fatal.cnt++;
838 if (rd_atomic32_get(&rk->rk_fatal.err)) {
839 if (do_lock)
840 rd_kafka_wrunlock(rk);
841 rd_kafka_dbg(rk, GENERIC, "FATAL",
842 "Suppressing subsequent fatal error: %s",
843 rd_kafka_err2name(err));
844 return 0;
845 }
846
847 rd_atomic32_set(&rk->rk_fatal.err, err);
848
849 va_start(ap, fmt);
850 rd_vsnprintf(buf, sizeof(buf), fmt, ap);
851 va_end(ap);
852 rk->rk_fatal.errstr = rd_strdup(buf);
853
854 if (do_lock)
855 rd_kafka_wrunlock(rk);
856
857 /* If there is an error callback or event handler we
858 * also log the fatal error as it happens.
859 * If there is no error callback the error event
860 * will be automatically logged, and this check here
861 * prevents us from duplicate logs. */
862 if (rk->rk_conf.enabled_events & RD_KAFKA_EVENT_ERROR)
863 rd_kafka_log(rk, LOG_EMERG, "FATAL",
864 "Fatal error: %s: %s",
865 rd_kafka_err2str(err), rk->rk_fatal.errstr);
866 else
867 rd_kafka_dbg(rk, ALL, "FATAL",
868 "Fatal error: %s: %s",
869 rd_kafka_err2str(err), rk->rk_fatal.errstr);
870
871 /* Indicate to the application that a fatal error was raised,
872 * the app should use rd_kafka_fatal_error() to extract the
873 * fatal error code itself.
874 * For the high-level consumer we propagate the error as a
875 * consumer error so it is returned from consumer_poll(),
876 * while for all other client types (the producer) we propagate to
877 * the standard error handler (typically error_cb). */
878 if (rk->rk_type == RD_KAFKA_CONSUMER && rk->rk_cgrp)
879 rd_kafka_consumer_err(rk->rk_cgrp->rkcg_q, RD_KAFKA_NODEID_UA,
880 RD_KAFKA_RESP_ERR__FATAL, 0, NULL, NULL,
881 RD_KAFKA_OFFSET_INVALID,
882 "Fatal error: %s: %s",
883 rd_kafka_err2str(err),
884 rk->rk_fatal.errstr);
885 else
886 rd_kafka_op_err(rk, RD_KAFKA_RESP_ERR__FATAL,
887 "Fatal error: %s: %s",
888 rd_kafka_err2str(err), rk->rk_fatal.errstr);
889
890
891 /* Tell rdkafka main thread to purge producer queues, but not
892 * in-flight since we'll want proper delivery status for transmitted
893 * requests.
894 * Need NON_BLOCKING to avoid dead-lock if user is
895 * calling purge() at the same time, which could be
896 * waiting for this broker thread to handle its
897 * OP_PURGE request. */
898 if (rk->rk_type == RD_KAFKA_PRODUCER) {
899 rd_kafka_op_t *rko = rd_kafka_op_new(RD_KAFKA_OP_PURGE);
900 rko->rko_u.purge.flags = RD_KAFKA_PURGE_F_QUEUE|
901 RD_KAFKA_PURGE_F_NON_BLOCKING;
902 rd_kafka_q_enq(rk->rk_ops, rko);
903 }
904
905 return 1;
906 }
907
908
909 rd_kafka_resp_err_t
rd_kafka_test_fatal_error(rd_kafka_t * rk,rd_kafka_resp_err_t err,const char * reason)910 rd_kafka_test_fatal_error (rd_kafka_t *rk, rd_kafka_resp_err_t err,
911 const char *reason) {
912 if (!rd_kafka_set_fatal_error(rk, err, "test_fatal_error: %s", reason))
913 return RD_KAFKA_RESP_ERR__PREV_IN_PROGRESS;
914 else
915 return RD_KAFKA_RESP_ERR_NO_ERROR;
916 }
917
918
919
920 /**
921 * @brief Final destructor for rd_kafka_t, must only be called with refcnt 0.
922 *
923 * @locality application thread
924 */
rd_kafka_destroy_final(rd_kafka_t * rk)925 void rd_kafka_destroy_final (rd_kafka_t *rk) {
926
927 rd_kafka_assert(rk, rd_kafka_terminating(rk));
928
929 /* Synchronize state */
930 rd_kafka_wrlock(rk);
931 rd_kafka_wrunlock(rk);
932
933 /* Terminate SASL provider */
934 if (rk->rk_conf.sasl.provider)
935 rd_kafka_sasl_term(rk);
936
937 rd_kafka_timers_destroy(&rk->rk_timers);
938
939 rd_kafka_dbg(rk, GENERIC, "TERMINATE", "Destroying op queues");
940
941 /* Destroy cgrp */
942 if (rk->rk_cgrp) {
943 rd_kafka_dbg(rk, GENERIC, "TERMINATE",
944 "Destroying cgrp");
945 /* Reset queue forwarding (rep -> cgrp) */
946 rd_kafka_q_fwd_set(rk->rk_rep, NULL);
947 rd_kafka_cgrp_destroy_final(rk->rk_cgrp);
948 }
949
950 rd_kafka_assignors_term(rk);
951
952 if (rk->rk_type == RD_KAFKA_CONSUMER) {
953 rd_kafka_assignment_destroy(rk);
954 if (rk->rk_consumer.q)
955 rd_kafka_q_destroy(rk->rk_consumer.q);
956 }
957
958 /* Purge op-queues */
959 rd_kafka_q_destroy_owner(rk->rk_rep);
960 rd_kafka_q_destroy_owner(rk->rk_ops);
961
962 #if WITH_SSL
963 if (rk->rk_conf.ssl.ctx) {
964 rd_kafka_dbg(rk, GENERIC, "TERMINATE", "Destroying SSL CTX");
965 rd_kafka_ssl_ctx_term(rk);
966 }
967 #endif
968
969 /* It is not safe to log after this point. */
970 rd_kafka_dbg(rk, GENERIC, "TERMINATE",
971 "Termination done: freeing resources");
972
973 if (rk->rk_logq) {
974 rd_kafka_q_destroy_owner(rk->rk_logq);
975 rk->rk_logq = NULL;
976 }
977
978 if (rk->rk_type == RD_KAFKA_PRODUCER) {
979 cnd_destroy(&rk->rk_curr_msgs.cnd);
980 mtx_destroy(&rk->rk_curr_msgs.lock);
981 }
982
983 if (rk->rk_fatal.errstr) {
984 rd_free(rk->rk_fatal.errstr);
985 rk->rk_fatal.errstr = NULL;
986 }
987
988 cnd_destroy(&rk->rk_broker_state_change_cnd);
989 mtx_destroy(&rk->rk_broker_state_change_lock);
990
991 mtx_destroy(&rk->rk_suppress.sparse_connect_lock);
992
993 cnd_destroy(&rk->rk_init_cnd);
994 mtx_destroy(&rk->rk_init_lock);
995
996 if (rk->rk_full_metadata)
997 rd_kafka_metadata_destroy(rk->rk_full_metadata);
998 rd_kafkap_str_destroy(rk->rk_client_id);
999 rd_kafkap_str_destroy(rk->rk_group_id);
1000 rd_kafkap_str_destroy(rk->rk_eos.transactional_id);
1001 rd_kafka_anyconf_destroy(_RK_GLOBAL, &rk->rk_conf);
1002 rd_list_destroy(&rk->rk_broker_by_id);
1003
1004 rwlock_destroy(&rk->rk_lock);
1005
1006 rd_free(rk);
1007 rd_kafka_global_cnt_decr();
1008 }
1009
1010
rd_kafka_destroy_app(rd_kafka_t * rk,int flags)1011 static void rd_kafka_destroy_app (rd_kafka_t *rk, int flags) {
1012 thrd_t thrd;
1013 #ifndef _WIN32
1014 int term_sig = rk->rk_conf.term_sig;
1015 #endif
1016 int res;
1017 char flags_str[256];
1018 static const char *rd_kafka_destroy_flags_names[] = {
1019 "Terminate",
1020 "DestroyCalled",
1021 "Immediate",
1022 "NoConsumerClose",
1023 NULL
1024 };
1025
1026 /* Fatal errors and _F_IMMEDIATE also sets .._NO_CONSUMER_CLOSE */
1027 if (flags & RD_KAFKA_DESTROY_F_IMMEDIATE ||
1028 rd_kafka_fatal_error_code(rk))
1029 flags |= RD_KAFKA_DESTROY_F_NO_CONSUMER_CLOSE;
1030
1031 rd_flags2str(flags_str, sizeof(flags_str),
1032 rd_kafka_destroy_flags_names, flags);
1033 rd_kafka_dbg(rk, ALL, "DESTROY", "Terminating instance "
1034 "(destroy flags %s (0x%x))",
1035 flags ? flags_str : "none", flags);
1036
1037 /* If producer still has messages in queue the application
1038 * is terminating the producer without first calling flush() or purge()
1039 * which is a common new user mistake, so hint the user of proper
1040 * shutdown semantics. */
1041 if (rk->rk_type == RD_KAFKA_PRODUCER) {
1042 unsigned int tot_cnt;
1043 size_t tot_size;
1044
1045 rd_kafka_curr_msgs_get(rk, &tot_cnt, &tot_size);
1046
1047 if (tot_cnt > 0)
1048 rd_kafka_log(rk, LOG_WARNING, "TERMINATE",
1049 "Producer terminating with %u message%s "
1050 "(%"PRIusz" byte%s) still in "
1051 "queue or transit: "
1052 "use flush() to wait for "
1053 "outstanding message delivery",
1054 tot_cnt, tot_cnt > 1 ? "s" : "",
1055 tot_size, tot_size > 1 ? "s" : "");
1056 }
1057
1058 /* Make sure destroy is not called from a librdkafka thread
1059 * since this will most likely cause a deadlock.
1060 * FIXME: include broker threads (for log_cb) */
1061 if (thrd_is_current(rk->rk_thread) ||
1062 thrd_is_current(rk->rk_background.thread)) {
1063 rd_kafka_log(rk, LOG_EMERG, "BGQUEUE",
1064 "Application bug: "
1065 "rd_kafka_destroy() called from "
1066 "librdkafka owned thread");
1067 rd_kafka_assert(NULL,
1068 !*"Application bug: "
1069 "calling rd_kafka_destroy() from "
1070 "librdkafka owned thread is prohibited");
1071 }
1072
1073 /* Before signaling for general termination, set the destroy
1074 * flags to hint cgrp how to shut down. */
1075 rd_atomic32_set(&rk->rk_terminate,
1076 flags|RD_KAFKA_DESTROY_F_DESTROY_CALLED);
1077
1078 /* The legacy/simple consumer lacks an API to close down the consumer*/
1079 if (rk->rk_cgrp) {
1080 rd_kafka_dbg(rk, GENERIC, "TERMINATE",
1081 "Terminating consumer group handler");
1082 rd_kafka_consumer_close(rk);
1083 }
1084
1085 /* With the consumer closed, terminate the rest of librdkafka. */
1086 rd_atomic32_set(&rk->rk_terminate, flags|RD_KAFKA_DESTROY_F_TERMINATE);
1087
1088 rd_kafka_dbg(rk, GENERIC, "TERMINATE", "Interrupting timers");
1089 rd_kafka_wrlock(rk);
1090 thrd = rk->rk_thread;
1091 rd_kafka_timers_interrupt(&rk->rk_timers);
1092 rd_kafka_wrunlock(rk);
1093
1094 rd_kafka_dbg(rk, GENERIC, "TERMINATE",
1095 "Sending TERMINATE to internal main thread");
1096 /* Send op to trigger queue/io wake-up.
1097 * The op itself is (likely) ignored by the receiver. */
1098 rd_kafka_q_enq(rk->rk_ops, rd_kafka_op_new(RD_KAFKA_OP_TERMINATE));
1099
1100 #ifndef _WIN32
1101 /* Interrupt main kafka thread to speed up termination. */
1102 if (term_sig) {
1103 rd_kafka_dbg(rk, GENERIC, "TERMINATE",
1104 "Sending thread kill signal %d", term_sig);
1105 pthread_kill(thrd, term_sig);
1106 }
1107 #endif
1108
1109 if (rd_kafka_destroy_flags_check(rk, RD_KAFKA_DESTROY_F_IMMEDIATE))
1110 return; /* FIXME: thread resource leak */
1111
1112 rd_kafka_dbg(rk, GENERIC, "TERMINATE",
1113 "Joining internal main thread");
1114
1115 if (thrd_join(thrd, &res) != thrd_success)
1116 rd_kafka_log(rk, LOG_ERR, "DESTROY",
1117 "Failed to join internal main thread: %s "
1118 "(was process forked?)",
1119 rd_strerror(errno));
1120
1121 rd_kafka_destroy_final(rk);
1122 }
1123
1124
1125 /* NOTE: Must only be called by application.
1126 * librdkafka itself must use rd_kafka_destroy0(). */
rd_kafka_destroy(rd_kafka_t * rk)1127 void rd_kafka_destroy (rd_kafka_t *rk) {
1128 rd_kafka_destroy_app(rk, 0);
1129 }
1130
rd_kafka_destroy_flags(rd_kafka_t * rk,int flags)1131 void rd_kafka_destroy_flags (rd_kafka_t *rk, int flags) {
1132 rd_kafka_destroy_app(rk, flags);
1133 }
1134
1135
1136 /**
1137 * Main destructor for rd_kafka_t
1138 *
1139 * Locality: rdkafka main thread or application thread during rd_kafka_new()
1140 */
rd_kafka_destroy_internal(rd_kafka_t * rk)1141 static void rd_kafka_destroy_internal (rd_kafka_t *rk) {
1142 rd_kafka_topic_t *rkt, *rkt_tmp;
1143 rd_kafka_broker_t *rkb, *rkb_tmp;
1144 rd_list_t wait_thrds;
1145 thrd_t *thrd;
1146 int i;
1147
1148 rd_kafka_dbg(rk, ALL, "DESTROY", "Destroy internal");
1149
1150 /* Trigger any state-change waiters (which should check the
1151 * terminate flag whenever they wake up). */
1152 rd_kafka_brokers_broadcast_state_change(rk);
1153
1154 if (rk->rk_background.thread) {
1155 int res;
1156 /* Send op to trigger queue/io wake-up.
1157 * The op itself is (likely) ignored by the receiver. */
1158 rd_kafka_q_enq(rk->rk_background.q,
1159 rd_kafka_op_new(RD_KAFKA_OP_TERMINATE));
1160
1161 rd_kafka_dbg(rk, ALL, "DESTROY",
1162 "Waiting for background queue thread "
1163 "to terminate");
1164 thrd_join(rk->rk_background.thread, &res);
1165 rd_kafka_q_destroy_owner(rk->rk_background.q);
1166 }
1167
1168 /* Call on_destroy() interceptors */
1169 rd_kafka_interceptors_on_destroy(rk);
1170
1171 /* Brokers pick up on rk_terminate automatically. */
1172
1173 /* List of (broker) threads to join to synchronize termination */
1174 rd_list_init(&wait_thrds, rd_atomic32_get(&rk->rk_broker_cnt), NULL);
1175
1176 rd_kafka_wrlock(rk);
1177
1178 rd_kafka_dbg(rk, ALL, "DESTROY", "Removing all topics");
1179 /* Decommission all topics */
1180 TAILQ_FOREACH_SAFE(rkt, &rk->rk_topics, rkt_link, rkt_tmp) {
1181 rd_kafka_wrunlock(rk);
1182 rd_kafka_topic_partitions_remove(rkt);
1183 rd_kafka_wrlock(rk);
1184 }
1185
1186 /* Decommission brokers.
1187 * Broker thread holds a refcount and detects when broker refcounts
1188 * reaches 1 and then decommissions itself. */
1189 TAILQ_FOREACH_SAFE(rkb, &rk->rk_brokers, rkb_link, rkb_tmp) {
1190 /* Add broker's thread to wait_thrds list for later joining */
1191 thrd = rd_malloc(sizeof(*thrd));
1192 *thrd = rkb->rkb_thread;
1193 rd_list_add(&wait_thrds, thrd);
1194 rd_kafka_wrunlock(rk);
1195
1196 rd_kafka_dbg(rk, BROKER, "DESTROY",
1197 "Sending TERMINATE to %s",
1198 rd_kafka_broker_name(rkb));
1199 /* Send op to trigger queue/io wake-up.
1200 * The op itself is (likely) ignored by the broker thread. */
1201 rd_kafka_q_enq(rkb->rkb_ops,
1202 rd_kafka_op_new(RD_KAFKA_OP_TERMINATE));
1203
1204 #ifndef _WIN32
1205 /* Interrupt IO threads to speed up termination. */
1206 if (rk->rk_conf.term_sig)
1207 pthread_kill(rkb->rkb_thread, rk->rk_conf.term_sig);
1208 #endif
1209
1210 rd_kafka_broker_destroy(rkb);
1211
1212 rd_kafka_wrlock(rk);
1213 }
1214
1215 if (rk->rk_clusterid) {
1216 rd_free(rk->rk_clusterid);
1217 rk->rk_clusterid = NULL;
1218 }
1219
1220 /* Destroy coord requests */
1221 rd_kafka_coord_reqs_term(rk);
1222
1223 /* Destroy the coordinator cache */
1224 rd_kafka_coord_cache_destroy(&rk->rk_coord_cache);
1225
1226 /* Purge metadata cache.
1227 * #3279:
1228 * We mustn't call cache_destroy() here since there might be outstanding
1229 * broker rkos that hold references to the metadata cache lock,
1230 * and these brokers are destroyed below. So to avoid a circular
1231 * dependency refcnt deadlock we first purge the cache here
1232 * and destroy it after the brokers are destroyed. */
1233 rd_kafka_metadata_cache_purge(rk, rd_true/*observers too*/);
1234
1235 rd_kafka_wrunlock(rk);
1236
1237 mtx_lock(&rk->rk_broker_state_change_lock);
1238 /* Purge broker state change waiters */
1239 rd_list_destroy(&rk->rk_broker_state_change_waiters);
1240 mtx_unlock(&rk->rk_broker_state_change_lock);
1241
1242 if (rk->rk_type == RD_KAFKA_CONSUMER) {
1243 if (rk->rk_consumer.q)
1244 rd_kafka_q_disable(rk->rk_consumer.q);
1245 }
1246
1247 rd_kafka_dbg(rk, GENERIC, "TERMINATE",
1248 "Purging reply queue");
1249
1250 /* Purge op-queue */
1251 rd_kafka_q_disable(rk->rk_rep);
1252 rd_kafka_q_purge(rk->rk_rep);
1253
1254 /* Loose our special reference to the internal broker. */
1255 mtx_lock(&rk->rk_internal_rkb_lock);
1256 if ((rkb = rk->rk_internal_rkb)) {
1257 rd_kafka_dbg(rk, GENERIC, "TERMINATE",
1258 "Decommissioning internal broker");
1259
1260 /* Send op to trigger queue wake-up. */
1261 rd_kafka_q_enq(rkb->rkb_ops,
1262 rd_kafka_op_new(RD_KAFKA_OP_TERMINATE));
1263
1264 rk->rk_internal_rkb = NULL;
1265 thrd = rd_malloc(sizeof(*thrd));
1266 *thrd = rkb->rkb_thread;
1267 rd_list_add(&wait_thrds, thrd);
1268 }
1269 mtx_unlock(&rk->rk_internal_rkb_lock);
1270 if (rkb)
1271 rd_kafka_broker_destroy(rkb);
1272
1273
1274 rd_kafka_dbg(rk, GENERIC, "TERMINATE",
1275 "Join %d broker thread(s)", rd_list_cnt(&wait_thrds));
1276
1277 /* Join broker threads */
1278 RD_LIST_FOREACH(thrd, &wait_thrds, i) {
1279 int res;
1280 if (thrd_join(*thrd, &res) != thrd_success)
1281 ;
1282 rd_free(thrd);
1283 }
1284
1285 rd_list_destroy(&wait_thrds);
1286
1287 /* Destroy mock cluster */
1288 if (rk->rk_mock.cluster)
1289 rd_kafka_mock_cluster_destroy(rk->rk_mock.cluster);
1290
1291 if (rd_atomic32_get(&rk->rk_mock.cluster_cnt) > 0) {
1292 rd_kafka_log(rk, LOG_EMERG, "MOCK",
1293 "%d mock cluster(s) still active: "
1294 "must be explicitly destroyed with "
1295 "rd_kafka_mock_cluster_destroy() prior to "
1296 "terminating the rd_kafka_t instance",
1297 (int)rd_atomic32_get(&rk->rk_mock.cluster_cnt));
1298 rd_assert(!*"All mock clusters must be destroyed prior to "
1299 "rd_kafka_t destroy");
1300 }
1301
1302 /* Destroy metadata cache */
1303 rd_kafka_wrlock(rk);
1304 rd_kafka_metadata_cache_destroy(rk);
1305 rd_kafka_wrunlock(rk);
1306 }
1307
1308 /**
1309 * @brief Buffer state for stats emitter
1310 */
1311 struct _stats_emit {
1312 char *buf; /* Pointer to allocated buffer */
1313 size_t size; /* Current allocated size of buf */
1314 size_t of; /* Current write-offset in buf */
1315 };
1316
1317
1318 /* Stats buffer printf. Requires a (struct _stats_emit *)st variable in the
1319 * current scope. */
1320 #define _st_printf(...) do { \
1321 ssize_t _r; \
1322 ssize_t _rem = st->size - st->of; \
1323 _r = rd_snprintf(st->buf+st->of, _rem, __VA_ARGS__); \
1324 if (_r >= _rem) { \
1325 st->size *= 2; \
1326 _rem = st->size - st->of; \
1327 st->buf = rd_realloc(st->buf, st->size); \
1328 _r = rd_snprintf(st->buf+st->of, _rem, __VA_ARGS__); \
1329 } \
1330 st->of += _r; \
1331 } while (0)
1332
1333 struct _stats_total {
1334 int64_t tx; /**< broker.tx */
1335 int64_t tx_bytes; /**< broker.tx_bytes */
1336 int64_t rx; /**< broker.rx */
1337 int64_t rx_bytes; /**< broker.rx_bytes */
1338 int64_t txmsgs; /**< partition.txmsgs */
1339 int64_t txmsg_bytes; /**< partition.txbytes */
1340 int64_t rxmsgs; /**< partition.rxmsgs */
1341 int64_t rxmsg_bytes; /**< partition.rxbytes */
1342 };
1343
1344
1345
1346 /**
1347 * @brief Rollover and emit an average window.
1348 */
rd_kafka_stats_emit_avg(struct _stats_emit * st,const char * name,rd_avg_t * src_avg)1349 static RD_INLINE void rd_kafka_stats_emit_avg (struct _stats_emit *st,
1350 const char *name,
1351 rd_avg_t *src_avg) {
1352 rd_avg_t avg;
1353
1354 rd_avg_rollover(&avg, src_avg);
1355 _st_printf(
1356 "\"%s\": {"
1357 " \"min\":%"PRId64","
1358 " \"max\":%"PRId64","
1359 " \"avg\":%"PRId64","
1360 " \"sum\":%"PRId64","
1361 " \"stddev\": %"PRId64","
1362 " \"p50\": %"PRId64","
1363 " \"p75\": %"PRId64","
1364 " \"p90\": %"PRId64","
1365 " \"p95\": %"PRId64","
1366 " \"p99\": %"PRId64","
1367 " \"p99_99\": %"PRId64","
1368 " \"outofrange\": %"PRId64","
1369 " \"hdrsize\": %"PRId32","
1370 " \"cnt\":%i "
1371 "}, ",
1372 name,
1373 avg.ra_v.minv,
1374 avg.ra_v.maxv,
1375 avg.ra_v.avg,
1376 avg.ra_v.sum,
1377 (int64_t)avg.ra_hist.stddev,
1378 avg.ra_hist.p50,
1379 avg.ra_hist.p75,
1380 avg.ra_hist.p90,
1381 avg.ra_hist.p95,
1382 avg.ra_hist.p99,
1383 avg.ra_hist.p99_99,
1384 avg.ra_hist.oor,
1385 avg.ra_hist.hdrsize,
1386 avg.ra_v.cnt);
1387 rd_avg_destroy(&avg);
1388 }
1389
1390 /**
1391 * Emit stats for toppar
1392 */
rd_kafka_stats_emit_toppar(struct _stats_emit * st,struct _stats_total * total,rd_kafka_toppar_t * rktp,int first)1393 static RD_INLINE void rd_kafka_stats_emit_toppar (struct _stats_emit *st,
1394 struct _stats_total *total,
1395 rd_kafka_toppar_t *rktp,
1396 int first) {
1397 rd_kafka_t *rk = rktp->rktp_rkt->rkt_rk;
1398 int64_t end_offset;
1399 int64_t consumer_lag = -1;
1400 int64_t consumer_lag_stored = -1;
1401 struct offset_stats offs;
1402 int32_t broker_id = -1;
1403
1404 rd_kafka_toppar_lock(rktp);
1405
1406 if (rktp->rktp_broker) {
1407 rd_kafka_broker_lock(rktp->rktp_broker);
1408 broker_id = rktp->rktp_broker->rkb_nodeid;
1409 rd_kafka_broker_unlock(rktp->rktp_broker);
1410 }
1411
1412 /* Grab a copy of the latest finalized offset stats */
1413 offs = rktp->rktp_offsets_fin;
1414
1415 end_offset = (rk->rk_conf.isolation_level == RD_KAFKA_READ_COMMITTED)
1416 ? rktp->rktp_ls_offset
1417 : rktp->rktp_hi_offset;
1418
1419 /* Calculate consumer_lag by using the highest offset
1420 * of stored_offset (the last message passed to application + 1, or
1421 * if enable.auto.offset.store=false the last message manually stored),
1422 * or the committed_offset (the last message committed by this or
1423 * another consumer).
1424 * Using stored_offset allows consumer_lag to be up to date even if
1425 * offsets are not (yet) committed.
1426 */
1427 if (end_offset != RD_KAFKA_OFFSET_INVALID) {
1428 if (rktp->rktp_stored_offset >= 0 &&
1429 rktp->rktp_stored_offset <= end_offset)
1430 consumer_lag_stored =
1431 end_offset - rktp->rktp_stored_offset;
1432 if (rktp->rktp_committed_offset >= 0 &&
1433 rktp->rktp_committed_offset <= end_offset)
1434 consumer_lag = end_offset - rktp->rktp_committed_offset;
1435 }
1436
1437 _st_printf("%s\"%"PRId32"\": { "
1438 "\"partition\":%"PRId32", "
1439 "\"broker\":%"PRId32", "
1440 "\"leader\":%"PRId32", "
1441 "\"desired\":%s, "
1442 "\"unknown\":%s, "
1443 "\"msgq_cnt\":%i, "
1444 "\"msgq_bytes\":%"PRIusz", "
1445 "\"xmit_msgq_cnt\":%i, "
1446 "\"xmit_msgq_bytes\":%"PRIusz", "
1447 "\"fetchq_cnt\":%i, "
1448 "\"fetchq_size\":%"PRIu64", "
1449 "\"fetch_state\":\"%s\", "
1450 "\"query_offset\":%"PRId64", "
1451 "\"next_offset\":%"PRId64", "
1452 "\"app_offset\":%"PRId64", "
1453 "\"stored_offset\":%"PRId64", "
1454 "\"commited_offset\":%"PRId64", " /*FIXME: issue #80 */
1455 "\"committed_offset\":%"PRId64", "
1456 "\"eof_offset\":%"PRId64", "
1457 "\"lo_offset\":%"PRId64", "
1458 "\"hi_offset\":%"PRId64", "
1459 "\"ls_offset\":%"PRId64", "
1460 "\"consumer_lag\":%"PRId64", "
1461 "\"consumer_lag_stored\":%"PRId64", "
1462 "\"txmsgs\":%"PRIu64", "
1463 "\"txbytes\":%"PRIu64", "
1464 "\"rxmsgs\":%"PRIu64", "
1465 "\"rxbytes\":%"PRIu64", "
1466 "\"msgs\": %"PRIu64", "
1467 "\"rx_ver_drops\": %"PRIu64", "
1468 "\"msgs_inflight\": %"PRId32", "
1469 "\"next_ack_seq\": %"PRId32", "
1470 "\"next_err_seq\": %"PRId32", "
1471 "\"acked_msgid\": %"PRIu64
1472 "} ",
1473 first ? "" : ", ",
1474 rktp->rktp_partition,
1475 rktp->rktp_partition,
1476 broker_id,
1477 rktp->rktp_leader_id,
1478 (rktp->rktp_flags&RD_KAFKA_TOPPAR_F_DESIRED)?"true":"false",
1479 (rktp->rktp_flags&RD_KAFKA_TOPPAR_F_UNKNOWN)?"true":"false",
1480 rd_kafka_msgq_len(&rktp->rktp_msgq),
1481 rd_kafka_msgq_size(&rktp->rktp_msgq),
1482 /* FIXME: xmit_msgq is local to the broker thread. */
1483 0,
1484 (size_t)0,
1485 rd_kafka_q_len(rktp->rktp_fetchq),
1486 rd_kafka_q_size(rktp->rktp_fetchq),
1487 rd_kafka_fetch_states[rktp->rktp_fetch_state],
1488 rktp->rktp_query_offset,
1489 offs.fetch_offset,
1490 rktp->rktp_app_offset,
1491 rktp->rktp_stored_offset,
1492 rktp->rktp_committed_offset, /* FIXME: issue #80 */
1493 rktp->rktp_committed_offset,
1494 offs.eof_offset,
1495 rktp->rktp_lo_offset,
1496 rktp->rktp_hi_offset,
1497 rktp->rktp_ls_offset,
1498 consumer_lag,
1499 consumer_lag_stored,
1500 rd_atomic64_get(&rktp->rktp_c.tx_msgs),
1501 rd_atomic64_get(&rktp->rktp_c.tx_msg_bytes),
1502 rd_atomic64_get(&rktp->rktp_c.rx_msgs),
1503 rd_atomic64_get(&rktp->rktp_c.rx_msg_bytes),
1504 rk->rk_type == RD_KAFKA_PRODUCER ?
1505 rd_atomic64_get(&rktp->rktp_c.producer_enq_msgs) :
1506 rd_atomic64_get(&rktp->rktp_c.rx_msgs), /* legacy, same as rx_msgs */
1507 rd_atomic64_get(&rktp->rktp_c.rx_ver_drops),
1508 rd_atomic32_get(&rktp->rktp_msgs_inflight),
1509 rktp->rktp_eos.next_ack_seq,
1510 rktp->rktp_eos.next_err_seq,
1511 rktp->rktp_eos.acked_msgid);
1512
1513 if (total) {
1514 total->txmsgs += rd_atomic64_get(&rktp->rktp_c.tx_msgs);
1515 total->txmsg_bytes += rd_atomic64_get(&rktp->rktp_c.tx_msg_bytes);
1516 total->rxmsgs += rd_atomic64_get(&rktp->rktp_c.rx_msgs);
1517 total->rxmsg_bytes += rd_atomic64_get(&rktp->rktp_c.rx_msg_bytes);
1518 }
1519
1520 rd_kafka_toppar_unlock(rktp);
1521 }
1522
1523 /**
1524 * @brief Emit broker request type stats
1525 */
rd_kafka_stats_emit_broker_reqs(struct _stats_emit * st,rd_kafka_broker_t * rkb)1526 static void rd_kafka_stats_emit_broker_reqs (struct _stats_emit *st,
1527 rd_kafka_broker_t *rkb) {
1528 /* Filter out request types that will never be sent by the client. */
1529 static const rd_bool_t filter[4][RD_KAFKAP__NUM] = {
1530 [RD_KAFKA_PRODUCER] = {
1531 [RD_KAFKAP_Fetch] = rd_true,
1532 [RD_KAFKAP_OffsetCommit] = rd_true,
1533 [RD_KAFKAP_OffsetFetch] = rd_true,
1534 [RD_KAFKAP_JoinGroup] = rd_true,
1535 [RD_KAFKAP_Heartbeat] = rd_true,
1536 [RD_KAFKAP_LeaveGroup] = rd_true,
1537 [RD_KAFKAP_SyncGroup] = rd_true
1538 },
1539 [RD_KAFKA_CONSUMER] = {
1540 [RD_KAFKAP_Produce] = rd_true,
1541 [RD_KAFKAP_InitProducerId] = rd_true,
1542 /* Transactional producer */
1543 [RD_KAFKAP_AddPartitionsToTxn] = rd_true,
1544 [RD_KAFKAP_AddOffsetsToTxn] = rd_true,
1545 [RD_KAFKAP_EndTxn] = rd_true,
1546 [RD_KAFKAP_TxnOffsetCommit] = rd_true,
1547 },
1548 [2/*any client type*/] = {
1549 [RD_KAFKAP_UpdateMetadata] = rd_true,
1550 [RD_KAFKAP_ControlledShutdown] = rd_true,
1551 [RD_KAFKAP_LeaderAndIsr] = rd_true,
1552 [RD_KAFKAP_StopReplica] = rd_true,
1553 [RD_KAFKAP_OffsetForLeaderEpoch] = rd_true,
1554
1555 [RD_KAFKAP_WriteTxnMarkers] = rd_true,
1556
1557 [RD_KAFKAP_AlterReplicaLogDirs] = rd_true,
1558 [RD_KAFKAP_DescribeLogDirs] = rd_true,
1559
1560 [RD_KAFKAP_SaslAuthenticate] = rd_false,
1561
1562 [RD_KAFKAP_CreateDelegationToken] = rd_true,
1563 [RD_KAFKAP_RenewDelegationToken] = rd_true,
1564 [RD_KAFKAP_ExpireDelegationToken] = rd_true,
1565 [RD_KAFKAP_DescribeDelegationToken] = rd_true,
1566 [RD_KAFKAP_IncrementalAlterConfigs] = rd_true,
1567 [RD_KAFKAP_ElectLeaders] = rd_true,
1568 [RD_KAFKAP_AlterPartitionReassignments] = rd_true,
1569 [RD_KAFKAP_ListPartitionReassignments] = rd_true,
1570 [RD_KAFKAP_AlterUserScramCredentials] = rd_true,
1571 [RD_KAFKAP_Vote] = rd_true,
1572 [RD_KAFKAP_BeginQuorumEpoch] = rd_true,
1573 [RD_KAFKAP_EndQuorumEpoch] = rd_true,
1574 [RD_KAFKAP_DescribeQuorum] = rd_true,
1575 [RD_KAFKAP_AlterIsr] = rd_true,
1576 [RD_KAFKAP_UpdateFeatures] = rd_true,
1577 [RD_KAFKAP_Envelope] = rd_true,
1578 },
1579 [3/*hide-unless-non-zero*/] = {
1580 /* Hide Admin requests unless they've been used */
1581 [RD_KAFKAP_CreateTopics] = rd_true,
1582 [RD_KAFKAP_DeleteTopics] = rd_true,
1583 [RD_KAFKAP_DeleteRecords] = rd_true,
1584 [RD_KAFKAP_CreatePartitions] = rd_true,
1585 [RD_KAFKAP_DescribeAcls] = rd_true,
1586 [RD_KAFKAP_CreateAcls] = rd_true,
1587 [RD_KAFKAP_DeleteAcls] = rd_true,
1588 [RD_KAFKAP_DescribeConfigs] = rd_true,
1589 [RD_KAFKAP_AlterConfigs] = rd_true,
1590 [RD_KAFKAP_DeleteGroups] = rd_true,
1591 [RD_KAFKAP_ListGroups] = rd_true,
1592 [RD_KAFKAP_DescribeGroups] = rd_true
1593 }
1594 };
1595 int i;
1596 int cnt = 0;
1597
1598 _st_printf("\"req\": { ");
1599 for (i = 0 ; i < RD_KAFKAP__NUM ; i++) {
1600 int64_t v;
1601
1602 if (filter[rkb->rkb_rk->rk_type][i] || filter[2][i])
1603 continue;
1604
1605 v = rd_atomic64_get(&rkb->rkb_c.reqtype[i]);
1606 if (!v && filter[3][i])
1607 continue; /* Filter out zero values */
1608
1609 _st_printf("%s\"%s\": %"PRId64,
1610 cnt > 0 ? ", " : "",
1611 rd_kafka_ApiKey2str(i), v);
1612
1613 cnt++;
1614 }
1615 _st_printf(" }, ");
1616 }
1617
1618
1619 /**
1620 * Emit all statistics
1621 */
rd_kafka_stats_emit_all(rd_kafka_t * rk)1622 static void rd_kafka_stats_emit_all (rd_kafka_t *rk) {
1623 rd_kafka_broker_t *rkb;
1624 rd_kafka_topic_t *rkt;
1625 rd_ts_t now;
1626 rd_kafka_op_t *rko;
1627 unsigned int tot_cnt;
1628 size_t tot_size;
1629 rd_kafka_resp_err_t err;
1630 struct _stats_emit stx = { .size = 1024*10 };
1631 struct _stats_emit *st = &stx;
1632 struct _stats_total total = {0};
1633
1634 st->buf = rd_malloc(st->size);
1635
1636
1637 rd_kafka_curr_msgs_get(rk, &tot_cnt, &tot_size);
1638 rd_kafka_rdlock(rk);
1639
1640 now = rd_clock();
1641 _st_printf("{ "
1642 "\"name\": \"%s\", "
1643 "\"client_id\": \"%s\", "
1644 "\"type\": \"%s\", "
1645 "\"ts\":%"PRId64", "
1646 "\"time\":%lli, "
1647 "\"age\":%"PRId64", "
1648 "\"replyq\":%i, "
1649 "\"msg_cnt\":%u, "
1650 "\"msg_size\":%"PRIusz", "
1651 "\"msg_max\":%u, "
1652 "\"msg_size_max\":%"PRIusz", "
1653 "\"simple_cnt\":%i, "
1654 "\"metadata_cache_cnt\":%i, "
1655 "\"brokers\":{ "/*open brokers*/,
1656 rk->rk_name,
1657 rk->rk_conf.client_id_str,
1658 rd_kafka_type2str(rk->rk_type),
1659 now,
1660 (signed long long)time(NULL),
1661 now - rk->rk_ts_created,
1662 rd_kafka_q_len(rk->rk_rep),
1663 tot_cnt, tot_size,
1664 rk->rk_curr_msgs.max_cnt, rk->rk_curr_msgs.max_size,
1665 rd_atomic32_get(&rk->rk_simple_cnt),
1666 rk->rk_metadata_cache.rkmc_cnt);
1667
1668
1669 TAILQ_FOREACH(rkb, &rk->rk_brokers, rkb_link) {
1670 rd_kafka_toppar_t *rktp;
1671 rd_ts_t txidle = -1, rxidle = -1;
1672
1673 rd_kafka_broker_lock(rkb);
1674
1675 if (rkb->rkb_state >= RD_KAFKA_BROKER_STATE_UP) {
1676 /* Calculate tx and rx idle time in usecs */
1677 txidle = rd_atomic64_get(&rkb->rkb_c.ts_send);
1678 rxidle = rd_atomic64_get(&rkb->rkb_c.ts_recv);
1679
1680 if (txidle)
1681 txidle = RD_MAX(now - txidle, 0);
1682 else
1683 txidle = -1;
1684
1685 if (rxidle)
1686 rxidle = RD_MAX(now - rxidle, 0);
1687 else
1688 rxidle = -1;
1689 }
1690
1691 _st_printf("%s\"%s\": { "/*open broker*/
1692 "\"name\":\"%s\", "
1693 "\"nodeid\":%"PRId32", "
1694 "\"nodename\":\"%s\", "
1695 "\"source\":\"%s\", "
1696 "\"state\":\"%s\", "
1697 "\"stateage\":%"PRId64", "
1698 "\"outbuf_cnt\":%i, "
1699 "\"outbuf_msg_cnt\":%i, "
1700 "\"waitresp_cnt\":%i, "
1701 "\"waitresp_msg_cnt\":%i, "
1702 "\"tx\":%"PRIu64", "
1703 "\"txbytes\":%"PRIu64", "
1704 "\"txerrs\":%"PRIu64", "
1705 "\"txretries\":%"PRIu64", "
1706 "\"txidle\":%"PRId64", "
1707 "\"req_timeouts\":%"PRIu64", "
1708 "\"rx\":%"PRIu64", "
1709 "\"rxbytes\":%"PRIu64", "
1710 "\"rxerrs\":%"PRIu64", "
1711 "\"rxcorriderrs\":%"PRIu64", "
1712 "\"rxpartial\":%"PRIu64", "
1713 "\"rxidle\":%"PRId64", "
1714 "\"zbuf_grow\":%"PRIu64", "
1715 "\"buf_grow\":%"PRIu64", "
1716 "\"wakeups\":%"PRIu64", "
1717 "\"connects\":%"PRId32", "
1718 "\"disconnects\":%"PRId32", ",
1719 rkb == TAILQ_FIRST(&rk->rk_brokers) ? "" : ", ",
1720 rkb->rkb_name,
1721 rkb->rkb_name,
1722 rkb->rkb_nodeid,
1723 rkb->rkb_nodename,
1724 rd_kafka_confsource2str(rkb->rkb_source),
1725 rd_kafka_broker_state_names[rkb->rkb_state],
1726 rkb->rkb_ts_state ? now - rkb->rkb_ts_state : 0,
1727 rd_atomic32_get(&rkb->rkb_outbufs.rkbq_cnt),
1728 rd_atomic32_get(&rkb->rkb_outbufs.rkbq_msg_cnt),
1729 rd_atomic32_get(&rkb->rkb_waitresps.rkbq_cnt),
1730 rd_atomic32_get(&rkb->rkb_waitresps.rkbq_msg_cnt),
1731 rd_atomic64_get(&rkb->rkb_c.tx),
1732 rd_atomic64_get(&rkb->rkb_c.tx_bytes),
1733 rd_atomic64_get(&rkb->rkb_c.tx_err),
1734 rd_atomic64_get(&rkb->rkb_c.tx_retries),
1735 txidle,
1736 rd_atomic64_get(&rkb->rkb_c.req_timeouts),
1737 rd_atomic64_get(&rkb->rkb_c.rx),
1738 rd_atomic64_get(&rkb->rkb_c.rx_bytes),
1739 rd_atomic64_get(&rkb->rkb_c.rx_err),
1740 rd_atomic64_get(&rkb->rkb_c.rx_corrid_err),
1741 rd_atomic64_get(&rkb->rkb_c.rx_partial),
1742 rxidle,
1743 rd_atomic64_get(&rkb->rkb_c.zbuf_grow),
1744 rd_atomic64_get(&rkb->rkb_c.buf_grow),
1745 rd_atomic64_get(&rkb->rkb_c.wakeups),
1746 rd_atomic32_get(&rkb->rkb_c.connects),
1747 rd_atomic32_get(&rkb->rkb_c.disconnects));
1748
1749 total.tx += rd_atomic64_get(&rkb->rkb_c.tx);
1750 total.tx_bytes += rd_atomic64_get(&rkb->rkb_c.tx_bytes);
1751 total.rx += rd_atomic64_get(&rkb->rkb_c.rx);
1752 total.rx_bytes += rd_atomic64_get(&rkb->rkb_c.rx_bytes);
1753
1754 rd_kafka_stats_emit_avg(st, "int_latency",
1755 &rkb->rkb_avg_int_latency);
1756 rd_kafka_stats_emit_avg(st, "outbuf_latency",
1757 &rkb->rkb_avg_outbuf_latency);
1758 rd_kafka_stats_emit_avg(st, "rtt", &rkb->rkb_avg_rtt);
1759 rd_kafka_stats_emit_avg(st, "throttle", &rkb->rkb_avg_throttle);
1760
1761 rd_kafka_stats_emit_broker_reqs(st, rkb);
1762
1763 _st_printf("\"toppars\":{ "/*open toppars*/);
1764
1765 TAILQ_FOREACH(rktp, &rkb->rkb_toppars, rktp_rkblink) {
1766 _st_printf("%s\"%.*s-%"PRId32"\": { "
1767 "\"topic\":\"%.*s\", "
1768 "\"partition\":%"PRId32"} ",
1769 rktp==TAILQ_FIRST(&rkb->rkb_toppars)?"":", ",
1770 RD_KAFKAP_STR_PR(rktp->rktp_rkt->rkt_topic),
1771 rktp->rktp_partition,
1772 RD_KAFKAP_STR_PR(rktp->rktp_rkt->rkt_topic),
1773 rktp->rktp_partition);
1774 }
1775
1776 rd_kafka_broker_unlock(rkb);
1777
1778 _st_printf("} "/*close toppars*/
1779 "} "/*close broker*/);
1780 }
1781
1782
1783 _st_printf("}, " /* close "brokers" array */
1784 "\"topics\":{ ");
1785
1786 TAILQ_FOREACH(rkt, &rk->rk_topics, rkt_link) {
1787 rd_kafka_toppar_t *rktp;
1788 int i, j;
1789
1790 rd_kafka_topic_rdlock(rkt);
1791 _st_printf("%s\"%.*s\": { "
1792 "\"topic\":\"%.*s\", "
1793 "\"age\":%"PRId64", "
1794 "\"metadata_age\":%"PRId64", ",
1795 rkt==TAILQ_FIRST(&rk->rk_topics)?"":", ",
1796 RD_KAFKAP_STR_PR(rkt->rkt_topic),
1797 RD_KAFKAP_STR_PR(rkt->rkt_topic),
1798 (now - rkt->rkt_ts_create)/1000,
1799 rkt->rkt_ts_metadata ?
1800 (now - rkt->rkt_ts_metadata)/1000 : 0);
1801
1802 rd_kafka_stats_emit_avg(st, "batchsize",
1803 &rkt->rkt_avg_batchsize);
1804 rd_kafka_stats_emit_avg(st, "batchcnt",
1805 &rkt->rkt_avg_batchcnt);
1806
1807 _st_printf("\"partitions\":{ " /*open partitions*/);
1808
1809 for (i = 0 ; i < rkt->rkt_partition_cnt ; i++)
1810 rd_kafka_stats_emit_toppar(st, &total, rkt->rkt_p[i],
1811 i == 0);
1812
1813 RD_LIST_FOREACH(rktp, &rkt->rkt_desp, j)
1814 rd_kafka_stats_emit_toppar(st, &total, rktp, i+j == 0);
1815
1816 i += j;
1817
1818 if (rkt->rkt_ua)
1819 rd_kafka_stats_emit_toppar(st, NULL, rkt->rkt_ua,
1820 i++ == 0);
1821
1822 rd_kafka_topic_rdunlock(rkt);
1823
1824 _st_printf("} "/*close partitions*/
1825 "} "/*close topic*/);
1826
1827 }
1828 _st_printf("} "/*close topics*/);
1829
1830 if (rk->rk_cgrp) {
1831 rd_kafka_cgrp_t *rkcg = rk->rk_cgrp;
1832 _st_printf(", \"cgrp\": { "
1833 "\"state\": \"%s\", "
1834 "\"stateage\": %"PRId64", "
1835 "\"join_state\": \"%s\", "
1836 "\"rebalance_age\": %"PRId64", "
1837 "\"rebalance_cnt\": %d, "
1838 "\"rebalance_reason\": \"%s\", "
1839 "\"assignment_size\": %d }",
1840 rd_kafka_cgrp_state_names[rkcg->rkcg_state],
1841 rkcg->rkcg_ts_statechange ?
1842 (now - rkcg->rkcg_ts_statechange) / 1000 : 0,
1843 rd_kafka_cgrp_join_state_names[rkcg->rkcg_join_state],
1844 rkcg->rkcg_c.ts_rebalance ?
1845 (now - rkcg->rkcg_c.ts_rebalance)/1000 : 0,
1846 rkcg->rkcg_c.rebalance_cnt,
1847 rkcg->rkcg_c.rebalance_reason,
1848 rkcg->rkcg_c.assignment_size);
1849 }
1850
1851 if (rd_kafka_is_idempotent(rk)) {
1852 _st_printf(", \"eos\": { "
1853 "\"idemp_state\": \"%s\", "
1854 "\"idemp_stateage\": %"PRId64", "
1855 "\"txn_state\": \"%s\", "
1856 "\"txn_stateage\": %"PRId64", "
1857 "\"txn_may_enq\": %s, "
1858 "\"producer_id\": %"PRId64", "
1859 "\"producer_epoch\": %hd, "
1860 "\"epoch_cnt\": %d "
1861 "}",
1862 rd_kafka_idemp_state2str(rk->rk_eos.idemp_state),
1863 (now - rk->rk_eos.ts_idemp_state) / 1000,
1864 rd_kafka_txn_state2str(rk->rk_eos.txn_state),
1865 (now - rk->rk_eos.ts_txn_state) / 1000,
1866 rd_atomic32_get(&rk->rk_eos.txn_may_enq) ?
1867 "true":"false",
1868 rk->rk_eos.pid.id,
1869 rk->rk_eos.pid.epoch,
1870 rk->rk_eos.epoch_cnt);
1871 }
1872
1873 if ((err = rd_atomic32_get(&rk->rk_fatal.err)))
1874 _st_printf(", \"fatal\": { "
1875 "\"error\": \"%s\", "
1876 "\"reason\": \"%s\", "
1877 "\"cnt\": %d "
1878 "}",
1879 rd_kafka_err2str(err),
1880 rk->rk_fatal.errstr,
1881 rk->rk_fatal.cnt);
1882
1883 rd_kafka_rdunlock(rk);
1884
1885 /* Total counters */
1886 _st_printf(", "
1887 "\"tx\":%"PRId64", "
1888 "\"tx_bytes\":%"PRId64", "
1889 "\"rx\":%"PRId64", "
1890 "\"rx_bytes\":%"PRId64", "
1891 "\"txmsgs\":%"PRId64", "
1892 "\"txmsg_bytes\":%"PRId64", "
1893 "\"rxmsgs\":%"PRId64", "
1894 "\"rxmsg_bytes\":%"PRId64,
1895 total.tx,
1896 total.tx_bytes,
1897 total.rx,
1898 total.rx_bytes,
1899 total.txmsgs,
1900 total.txmsg_bytes,
1901 total.rxmsgs,
1902 total.rxmsg_bytes);
1903
1904 _st_printf("}"/*close object*/);
1905
1906
1907 /* Enqueue op for application */
1908 rko = rd_kafka_op_new(RD_KAFKA_OP_STATS);
1909 rd_kafka_op_set_prio(rko, RD_KAFKA_PRIO_HIGH);
1910 rko->rko_u.stats.json = st->buf;
1911 rko->rko_u.stats.json_len = st->of;
1912 rd_kafka_q_enq(rk->rk_rep, rko);
1913 }
1914
1915
1916 /**
1917 * @brief 1 second generic timer.
1918 *
1919 * @locality rdkafka main thread
1920 * @locks none
1921 */
rd_kafka_1s_tmr_cb(rd_kafka_timers_t * rkts,void * arg)1922 static void rd_kafka_1s_tmr_cb (rd_kafka_timers_t *rkts, void *arg) {
1923 rd_kafka_t *rk = rkts->rkts_rk;
1924
1925 /* Scan topic state, message timeouts, etc. */
1926 rd_kafka_topic_scan_all(rk, rd_clock());
1927
1928 /* Sparse connections:
1929 * try to maintain at least one connection to the cluster. */
1930 if (rk->rk_conf.sparse_connections &&
1931 rd_atomic32_get(&rk->rk_broker_up_cnt) == 0)
1932 rd_kafka_connect_any(rk, "no cluster connection");
1933
1934 rd_kafka_coord_cache_expire(&rk->rk_coord_cache);
1935 }
1936
rd_kafka_stats_emit_tmr_cb(rd_kafka_timers_t * rkts,void * arg)1937 static void rd_kafka_stats_emit_tmr_cb (rd_kafka_timers_t *rkts, void *arg) {
1938 rd_kafka_t *rk = rkts->rkts_rk;
1939 rd_kafka_stats_emit_all(rk);
1940 }
1941
1942
1943 /**
1944 * @brief Periodic metadata refresh callback
1945 *
1946 * @locality rdkafka main thread
1947 */
rd_kafka_metadata_refresh_cb(rd_kafka_timers_t * rkts,void * arg)1948 static void rd_kafka_metadata_refresh_cb (rd_kafka_timers_t *rkts, void *arg) {
1949 rd_kafka_t *rk = rkts->rkts_rk;
1950 rd_kafka_resp_err_t err;
1951
1952 /* High-level consumer:
1953 * We need to query both locally known topics and subscribed topics
1954 * so that we can detect locally known topics changing partition
1955 * count or disappearing, as well as detect previously non-existent
1956 * subscribed topics now being available in the cluster. */
1957 if (rk->rk_type == RD_KAFKA_CONSUMER && rk->rk_cgrp)
1958 err = rd_kafka_metadata_refresh_consumer_topics(
1959 rk, NULL,
1960 "periodic topic and broker list refresh");
1961 else
1962 err = rd_kafka_metadata_refresh_known_topics(
1963 rk, NULL, rd_true/*force*/,
1964 "periodic topic and broker list refresh");
1965
1966
1967 if (err == RD_KAFKA_RESP_ERR__UNKNOWN_TOPIC &&
1968 rd_interval(&rk->rk_suppress.broker_metadata_refresh,
1969 10*1000*1000 /*10s*/, 0) > 0) {
1970 /* If there are no (locally referenced) topics
1971 * to query, refresh the broker list.
1972 * This avoids getting idle-disconnected for clients
1973 * that have not yet referenced a topic and makes
1974 * sure such a client has an up to date broker list. */
1975 rd_kafka_metadata_refresh_brokers(
1976 rk, NULL, "periodic broker list refresh");
1977 }
1978 }
1979
1980
1981
1982 /**
1983 * @brief Wait for background threads to initialize.
1984 *
1985 * @returns the number of background threads still not initialized.
1986 *
1987 * @locality app thread calling rd_kafka_new()
1988 * @locks none
1989 */
rd_kafka_init_wait(rd_kafka_t * rk,int timeout_ms)1990 static int rd_kafka_init_wait (rd_kafka_t *rk, int timeout_ms) {
1991 struct timespec tspec;
1992 int ret;
1993
1994 rd_timeout_init_timespec(&tspec, timeout_ms);
1995
1996 mtx_lock(&rk->rk_init_lock);
1997 while (rk->rk_init_wait_cnt > 0 &&
1998 cnd_timedwait_abs(&rk->rk_init_cnd, &rk->rk_init_lock,
1999 &tspec) == thrd_success)
2000 ;
2001 ret = rk->rk_init_wait_cnt;
2002 mtx_unlock(&rk->rk_init_lock);
2003
2004 return ret;
2005 }
2006
2007
2008 /**
2009 * Main loop for Kafka handler thread.
2010 */
rd_kafka_thread_main(void * arg)2011 static int rd_kafka_thread_main (void *arg) {
2012 rd_kafka_t *rk = arg;
2013 rd_kafka_timer_t tmr_1s = RD_ZERO_INIT;
2014 rd_kafka_timer_t tmr_stats_emit = RD_ZERO_INIT;
2015 rd_kafka_timer_t tmr_metadata_refresh = RD_ZERO_INIT;
2016
2017 rd_kafka_set_thread_name("main");
2018 rd_kafka_set_thread_sysname("rdk:main");
2019
2020 rd_kafka_interceptors_on_thread_start(rk, RD_KAFKA_THREAD_MAIN);
2021
2022 (void)rd_atomic32_add(&rd_kafka_thread_cnt_curr, 1);
2023
2024 /* Acquire lock (which was held by thread creator during creation)
2025 * to synchronise state. */
2026 rd_kafka_wrlock(rk);
2027 rd_kafka_wrunlock(rk);
2028
2029 /* 1 second timer for topic scan and connection checking. */
2030 rd_kafka_timer_start(&rk->rk_timers, &tmr_1s, 1000000,
2031 rd_kafka_1s_tmr_cb, NULL);
2032 if (rk->rk_conf.stats_interval_ms)
2033 rd_kafka_timer_start(&rk->rk_timers, &tmr_stats_emit,
2034 rk->rk_conf.stats_interval_ms * 1000ll,
2035 rd_kafka_stats_emit_tmr_cb, NULL);
2036 if (rk->rk_conf.metadata_refresh_interval_ms > 0)
2037 rd_kafka_timer_start(&rk->rk_timers, &tmr_metadata_refresh,
2038 rk->rk_conf.metadata_refresh_interval_ms *
2039 1000ll,
2040 rd_kafka_metadata_refresh_cb, NULL);
2041
2042 if (rk->rk_cgrp)
2043 rd_kafka_q_fwd_set(rk->rk_cgrp->rkcg_ops, rk->rk_ops);
2044
2045 if (rd_kafka_is_idempotent(rk))
2046 rd_kafka_idemp_init(rk);
2047
2048 mtx_lock(&rk->rk_init_lock);
2049 rk->rk_init_wait_cnt--;
2050 cnd_broadcast(&rk->rk_init_cnd);
2051 mtx_unlock(&rk->rk_init_lock);
2052
2053 while (likely(!rd_kafka_terminating(rk) ||
2054 rd_kafka_q_len(rk->rk_ops) ||
2055 (rk->rk_cgrp && (rk->rk_cgrp->rkcg_state != RD_KAFKA_CGRP_STATE_TERM)))) {
2056 rd_ts_t sleeptime = rd_kafka_timers_next(
2057 &rk->rk_timers, 1000*1000/*1s*/, 1/*lock*/);
2058 rd_kafka_q_serve(rk->rk_ops, (int)(sleeptime / 1000), 0,
2059 RD_KAFKA_Q_CB_CALLBACK, NULL, NULL);
2060 if (rk->rk_cgrp) /* FIXME: move to timer-triggered */
2061 rd_kafka_cgrp_serve(rk->rk_cgrp);
2062 rd_kafka_timers_run(&rk->rk_timers, RD_POLL_NOWAIT);
2063 }
2064
2065 rd_kafka_dbg(rk, GENERIC, "TERMINATE",
2066 "Internal main thread terminating");
2067
2068 if (rd_kafka_is_idempotent(rk))
2069 rd_kafka_idemp_term(rk);
2070
2071 rd_kafka_q_disable(rk->rk_ops);
2072 rd_kafka_q_purge(rk->rk_ops);
2073
2074 rd_kafka_timer_stop(&rk->rk_timers, &tmr_1s, 1);
2075 if (rk->rk_conf.stats_interval_ms)
2076 rd_kafka_timer_stop(&rk->rk_timers, &tmr_stats_emit, 1);
2077 rd_kafka_timer_stop(&rk->rk_timers, &tmr_metadata_refresh, 1);
2078
2079 /* Synchronise state */
2080 rd_kafka_wrlock(rk);
2081 rd_kafka_wrunlock(rk);
2082
2083 rd_kafka_interceptors_on_thread_exit(rk, RD_KAFKA_THREAD_MAIN);
2084
2085 rd_kafka_destroy_internal(rk);
2086
2087 rd_kafka_dbg(rk, GENERIC, "TERMINATE",
2088 "Internal main thread termination done");
2089
2090 rd_atomic32_sub(&rd_kafka_thread_cnt_curr, 1);
2091
2092 return 0;
2093 }
2094
2095
rd_kafka_term_sig_handler(int sig)2096 static void rd_kafka_term_sig_handler (int sig) {
2097 /* nop */
2098 }
2099
2100
rd_kafka_new(rd_kafka_type_t type,rd_kafka_conf_t * app_conf,char * errstr,size_t errstr_size)2101 rd_kafka_t *rd_kafka_new (rd_kafka_type_t type, rd_kafka_conf_t *app_conf,
2102 char *errstr, size_t errstr_size) {
2103 rd_kafka_t *rk;
2104 static rd_atomic32_t rkid;
2105 rd_kafka_conf_t *conf;
2106 rd_kafka_resp_err_t ret_err = RD_KAFKA_RESP_ERR_NO_ERROR;
2107 int ret_errno = 0;
2108 const char *conf_err;
2109 #ifndef _WIN32
2110 sigset_t newset, oldset;
2111 #endif
2112 char builtin_features[128];
2113 size_t bflen;
2114
2115 rd_kafka_global_init();
2116
2117 /* rd_kafka_new() takes ownership of the provided \p app_conf
2118 * object if rd_kafka_new() succeeds.
2119 * Since \p app_conf is optional we allocate a default configuration
2120 * object here if \p app_conf is NULL.
2121 * The configuration object itself is struct-copied later
2122 * leaving the default *conf pointer to be ready for freeing.
2123 * In case new() fails and app_conf was specified we will clear out
2124 * rk_conf to avoid double-freeing from destroy_internal() and the
2125 * user's eventual call to rd_kafka_conf_destroy().
2126 * This is all a bit tricky but that's the nature of
2127 * legacy interfaces. */
2128 if (!app_conf)
2129 conf = rd_kafka_conf_new();
2130 else
2131 conf = app_conf;
2132
2133 /* Verify and finalize configuration */
2134 if ((conf_err = rd_kafka_conf_finalize(type, conf))) {
2135 /* Incompatible configuration settings */
2136 rd_snprintf(errstr, errstr_size, "%s", conf_err);
2137 if (!app_conf)
2138 rd_kafka_conf_destroy(conf);
2139 rd_kafka_set_last_error(RD_KAFKA_RESP_ERR__INVALID_ARG, EINVAL);
2140 return NULL;
2141 }
2142
2143
2144 rd_kafka_global_cnt_incr();
2145
2146 /*
2147 * Set up the handle.
2148 */
2149 rk = rd_calloc(1, sizeof(*rk));
2150
2151 rk->rk_type = type;
2152 rk->rk_ts_created = rd_clock();
2153
2154 /* Struct-copy the config object. */
2155 rk->rk_conf = *conf;
2156 if (!app_conf)
2157 rd_free(conf); /* Free the base config struct only,
2158 * not its fields since they were copied to
2159 * rk_conf just above. Those fields are
2160 * freed from rd_kafka_destroy_internal()
2161 * as the rk itself is destroyed. */
2162
2163 /* Seed PRNG, don't bother about HAVE_RAND_R, since it is pretty cheap. */
2164 if (rk->rk_conf.enable_random_seed)
2165 call_once(&rd_kafka_global_srand_once, rd_kafka_global_srand);
2166
2167 /* Call on_new() interceptors */
2168 rd_kafka_interceptors_on_new(rk, &rk->rk_conf);
2169
2170 rwlock_init(&rk->rk_lock);
2171 mtx_init(&rk->rk_internal_rkb_lock, mtx_plain);
2172
2173 cnd_init(&rk->rk_broker_state_change_cnd);
2174 mtx_init(&rk->rk_broker_state_change_lock, mtx_plain);
2175 rd_list_init(&rk->rk_broker_state_change_waiters, 8,
2176 rd_kafka_enq_once_trigger_destroy);
2177
2178 cnd_init(&rk->rk_init_cnd);
2179 mtx_init(&rk->rk_init_lock, mtx_plain);
2180
2181 rd_interval_init(&rk->rk_suppress.no_idemp_brokers);
2182 rd_interval_init(&rk->rk_suppress.broker_metadata_refresh);
2183 rd_interval_init(&rk->rk_suppress.sparse_connect_random);
2184 mtx_init(&rk->rk_suppress.sparse_connect_lock, mtx_plain);
2185
2186 rd_atomic64_init(&rk->rk_ts_last_poll, rk->rk_ts_created);
2187 rd_atomic32_init(&rk->rk_flushing, 0);
2188
2189 rk->rk_rep = rd_kafka_q_new(rk);
2190 rk->rk_ops = rd_kafka_q_new(rk);
2191 rk->rk_ops->rkq_serve = rd_kafka_poll_cb;
2192 rk->rk_ops->rkq_opaque = rk;
2193
2194 if (rk->rk_conf.log_queue) {
2195 rk->rk_logq = rd_kafka_q_new(rk);
2196 rk->rk_logq->rkq_serve = rd_kafka_poll_cb;
2197 rk->rk_logq->rkq_opaque = rk;
2198 }
2199
2200 TAILQ_INIT(&rk->rk_brokers);
2201 TAILQ_INIT(&rk->rk_topics);
2202 rd_kafka_timers_init(&rk->rk_timers, rk, rk->rk_ops);
2203 rd_kafka_metadata_cache_init(rk);
2204 rd_kafka_coord_cache_init(&rk->rk_coord_cache,
2205 rk->rk_conf.metadata_max_age_ms);
2206 rd_kafka_coord_reqs_init(rk);
2207
2208 if (rk->rk_conf.dr_cb || rk->rk_conf.dr_msg_cb)
2209 rk->rk_drmode = RD_KAFKA_DR_MODE_CB;
2210 else if (rk->rk_conf.enabled_events & RD_KAFKA_EVENT_DR)
2211 rk->rk_drmode = RD_KAFKA_DR_MODE_EVENT;
2212 else
2213 rk->rk_drmode = RD_KAFKA_DR_MODE_NONE;
2214 if (rk->rk_drmode != RD_KAFKA_DR_MODE_NONE)
2215 rk->rk_conf.enabled_events |= RD_KAFKA_EVENT_DR;
2216
2217 if (rk->rk_conf.rebalance_cb)
2218 rk->rk_conf.enabled_events |= RD_KAFKA_EVENT_REBALANCE;
2219 if (rk->rk_conf.offset_commit_cb)
2220 rk->rk_conf.enabled_events |= RD_KAFKA_EVENT_OFFSET_COMMIT;
2221 if (rk->rk_conf.error_cb)
2222 rk->rk_conf.enabled_events |= RD_KAFKA_EVENT_ERROR;
2223 #if WITH_SASL_OAUTHBEARER
2224 if (rk->rk_conf.sasl.enable_oauthbearer_unsecure_jwt &&
2225 !rk->rk_conf.sasl.oauthbearer_token_refresh_cb)
2226 rd_kafka_conf_set_oauthbearer_token_refresh_cb(
2227 &rk->rk_conf,
2228 rd_kafka_oauthbearer_unsecured_token);
2229
2230 if (rk->rk_conf.sasl.oauthbearer_token_refresh_cb)
2231 rk->rk_conf.enabled_events |=
2232 RD_KAFKA_EVENT_OAUTHBEARER_TOKEN_REFRESH;
2233 #endif
2234
2235 rk->rk_controllerid = -1;
2236
2237 /* Admin client defaults */
2238 rk->rk_conf.admin.request_timeout_ms = rk->rk_conf.socket_timeout_ms;
2239
2240 if (rk->rk_conf.debug)
2241 rk->rk_conf.log_level = LOG_DEBUG;
2242
2243 rd_snprintf(rk->rk_name, sizeof(rk->rk_name), "%s#%s-%i",
2244 rk->rk_conf.client_id_str, rd_kafka_type2str(rk->rk_type),
2245 rd_atomic32_add(&rkid, 1));
2246
2247 /* Construct clientid kafka string */
2248 rk->rk_client_id = rd_kafkap_str_new(rk->rk_conf.client_id_str,-1);
2249
2250 /* Convert group.id to kafka string (may be NULL) */
2251 rk->rk_group_id = rd_kafkap_str_new(rk->rk_conf.group_id_str,-1);
2252
2253 /* Config fixups */
2254 rk->rk_conf.queued_max_msg_bytes =
2255 (int64_t)rk->rk_conf.queued_max_msg_kbytes * 1000ll;
2256
2257 /* Enable api.version.request=true if fallback.broker.version
2258 * indicates a supporting broker. */
2259 if (rd_kafka_ApiVersion_is_queryable(rk->rk_conf.broker_version_fallback))
2260 rk->rk_conf.api_version_request = 1;
2261
2262 if (rk->rk_type == RD_KAFKA_PRODUCER) {
2263 mtx_init(&rk->rk_curr_msgs.lock, mtx_plain);
2264 cnd_init(&rk->rk_curr_msgs.cnd);
2265 rk->rk_curr_msgs.max_cnt =
2266 rk->rk_conf.queue_buffering_max_msgs;
2267 if ((unsigned long long)rk->rk_conf.
2268 queue_buffering_max_kbytes * 1024 >
2269 (unsigned long long)SIZE_MAX) {
2270 rk->rk_curr_msgs.max_size = SIZE_MAX;
2271 rd_kafka_log(rk, LOG_WARNING, "QUEUESIZE",
2272 "queue.buffering.max.kbytes adjusted "
2273 "to system SIZE_MAX limit %"PRIusz" bytes",
2274 rk->rk_curr_msgs.max_size);
2275 } else {
2276 rk->rk_curr_msgs.max_size =
2277 (size_t)rk->rk_conf.
2278 queue_buffering_max_kbytes * 1024;
2279 }
2280 }
2281
2282 if (rd_kafka_assignors_init(rk, errstr, errstr_size) == -1) {
2283 ret_err = RD_KAFKA_RESP_ERR__INVALID_ARG;
2284 ret_errno = EINVAL;
2285 goto fail;
2286 }
2287
2288 /* Create Mock cluster */
2289 rd_atomic32_init(&rk->rk_mock.cluster_cnt, 0);
2290 if (rk->rk_conf.mock.broker_cnt > 0) {
2291 rk->rk_mock.cluster = rd_kafka_mock_cluster_new(
2292 rk, rk->rk_conf.mock.broker_cnt);
2293
2294 if (!rk->rk_mock.cluster) {
2295 rd_snprintf(errstr, errstr_size,
2296 "Failed to create mock cluster, see logs");
2297 ret_err = RD_KAFKA_RESP_ERR__FAIL;
2298 ret_errno = EINVAL;
2299 goto fail;
2300 }
2301
2302 rd_kafka_log(rk, LOG_NOTICE, "MOCK", "Mock cluster enabled: "
2303 "original bootstrap.servers and security.protocol "
2304 "ignored and replaced");
2305
2306 /* Overwrite bootstrap.servers and connection settings */
2307 if (rd_kafka_conf_set(&rk->rk_conf, "bootstrap.servers",
2308 rd_kafka_mock_cluster_bootstraps(
2309 rk->rk_mock.cluster),
2310 NULL, 0) != RD_KAFKA_CONF_OK)
2311 rd_assert(!"failed to replace mock bootstrap.servers");
2312
2313 if (rd_kafka_conf_set(&rk->rk_conf, "security.protocol",
2314 "plaintext", NULL, 0) != RD_KAFKA_CONF_OK)
2315 rd_assert(!"failed to reset mock security.protocol");
2316
2317 rk->rk_conf.security_protocol = RD_KAFKA_PROTO_PLAINTEXT;
2318 }
2319
2320
2321 if (rk->rk_conf.security_protocol == RD_KAFKA_PROTO_SASL_SSL ||
2322 rk->rk_conf.security_protocol == RD_KAFKA_PROTO_SASL_PLAINTEXT) {
2323 /* Select SASL provider */
2324 if (rd_kafka_sasl_select_provider(rk,
2325 errstr, errstr_size) == -1) {
2326 ret_err = RD_KAFKA_RESP_ERR__INVALID_ARG;
2327 ret_errno = EINVAL;
2328 goto fail;
2329 }
2330
2331 /* Initialize SASL provider */
2332 if (rd_kafka_sasl_init(rk, errstr, errstr_size) == -1) {
2333 rk->rk_conf.sasl.provider = NULL;
2334 ret_err = RD_KAFKA_RESP_ERR__INVALID_ARG;
2335 ret_errno = EINVAL;
2336 goto fail;
2337 }
2338 }
2339
2340 #if WITH_SSL
2341 if (rk->rk_conf.security_protocol == RD_KAFKA_PROTO_SSL ||
2342 rk->rk_conf.security_protocol == RD_KAFKA_PROTO_SASL_SSL) {
2343 /* Create SSL context */
2344 if (rd_kafka_ssl_ctx_init(rk, errstr, errstr_size) == -1) {
2345 ret_err = RD_KAFKA_RESP_ERR__INVALID_ARG;
2346 ret_errno = EINVAL;
2347 goto fail;
2348 }
2349 }
2350 #endif
2351
2352 if (type == RD_KAFKA_CONSUMER) {
2353 rd_kafka_assignment_init(rk);
2354
2355 if (RD_KAFKAP_STR_LEN(rk->rk_group_id) > 0) {
2356 /* Create consumer group handle */
2357 rk->rk_cgrp = rd_kafka_cgrp_new(rk,
2358 rk->rk_group_id,
2359 rk->rk_client_id);
2360 rk->rk_consumer.q =
2361 rd_kafka_q_keep(rk->rk_cgrp->rkcg_q);
2362 } else {
2363 /* Legacy consumer */
2364 rk->rk_consumer.q = rd_kafka_q_keep(rk->rk_rep);
2365 }
2366
2367 } else if (type == RD_KAFKA_PRODUCER) {
2368 rk->rk_eos.transactional_id =
2369 rd_kafkap_str_new(rk->rk_conf.eos.transactional_id, -1);
2370 }
2371
2372 #ifndef _WIN32
2373 /* Block all signals in newly created threads.
2374 * To avoid race condition we block all signals in the calling
2375 * thread, which the new thread will inherit its sigmask from,
2376 * and then restore the original sigmask of the calling thread when
2377 * we're done creating the thread. */
2378 sigemptyset(&oldset);
2379 sigfillset(&newset);
2380 if (rk->rk_conf.term_sig) {
2381 struct sigaction sa_term = {
2382 .sa_handler = rd_kafka_term_sig_handler
2383 };
2384 sigaction(rk->rk_conf.term_sig, &sa_term, NULL);
2385 }
2386 pthread_sigmask(SIG_SETMASK, &newset, &oldset);
2387 #endif
2388
2389 mtx_lock(&rk->rk_init_lock);
2390
2391 /* Create background thread and queue if background_event_cb()
2392 * has been configured.
2393 * Do this before creating the main thread since after
2394 * the main thread is created it is no longer trivial to error
2395 * out from rd_kafka_new(). */
2396 if (rk->rk_conf.background_event_cb) {
2397 /* Hold off background thread until thrd_create() is done. */
2398 rd_kafka_wrlock(rk);
2399
2400 rk->rk_background.q = rd_kafka_q_new(rk);
2401
2402 rk->rk_init_wait_cnt++;
2403
2404 if ((thrd_create(&rk->rk_background.thread,
2405 rd_kafka_background_thread_main, rk)) !=
2406 thrd_success) {
2407 rk->rk_init_wait_cnt--;
2408 ret_err = RD_KAFKA_RESP_ERR__CRIT_SYS_RESOURCE;
2409 ret_errno = errno;
2410 if (errstr)
2411 rd_snprintf(errstr, errstr_size,
2412 "Failed to create background "
2413 "thread: %s (%i)",
2414 rd_strerror(errno), errno);
2415 rd_kafka_wrunlock(rk);
2416 mtx_unlock(&rk->rk_init_lock);
2417
2418 #ifndef _WIN32
2419 /* Restore sigmask of caller */
2420 pthread_sigmask(SIG_SETMASK, &oldset, NULL);
2421 #endif
2422 goto fail;
2423 }
2424
2425 rd_kafka_wrunlock(rk);
2426 }
2427
2428
2429
2430 /* Lock handle here to synchronise state, i.e., hold off
2431 * the thread until we've finalized the handle. */
2432 rd_kafka_wrlock(rk);
2433
2434 /* Create handler thread */
2435 rk->rk_init_wait_cnt++;
2436 if ((thrd_create(&rk->rk_thread,
2437 rd_kafka_thread_main, rk)) != thrd_success) {
2438 rk->rk_init_wait_cnt--;
2439 ret_err = RD_KAFKA_RESP_ERR__CRIT_SYS_RESOURCE;
2440 ret_errno = errno;
2441 if (errstr)
2442 rd_snprintf(errstr, errstr_size,
2443 "Failed to create thread: %s (%i)",
2444 rd_strerror(errno), errno);
2445 rd_kafka_wrunlock(rk);
2446 mtx_unlock(&rk->rk_init_lock);
2447 #ifndef _WIN32
2448 /* Restore sigmask of caller */
2449 pthread_sigmask(SIG_SETMASK, &oldset, NULL);
2450 #endif
2451 goto fail;
2452 }
2453
2454 rd_kafka_wrunlock(rk);
2455 mtx_unlock(&rk->rk_init_lock);
2456
2457 /*
2458 * @warning `goto fail` is prohibited past this point
2459 */
2460
2461 mtx_lock(&rk->rk_internal_rkb_lock);
2462 rk->rk_internal_rkb = rd_kafka_broker_add(rk, RD_KAFKA_INTERNAL,
2463 RD_KAFKA_PROTO_PLAINTEXT,
2464 "", 0, RD_KAFKA_NODEID_UA);
2465 mtx_unlock(&rk->rk_internal_rkb_lock);
2466
2467 /* Add initial list of brokers from configuration */
2468 if (rk->rk_conf.brokerlist) {
2469 if (rd_kafka_brokers_add0(rk, rk->rk_conf.brokerlist) == 0)
2470 rd_kafka_op_err(rk, RD_KAFKA_RESP_ERR__ALL_BROKERS_DOWN,
2471 "No brokers configured");
2472 }
2473
2474 #ifndef _WIN32
2475 /* Restore sigmask of caller */
2476 pthread_sigmask(SIG_SETMASK, &oldset, NULL);
2477 #endif
2478
2479 /* Wait for background threads to fully initialize so that
2480 * the client instance is fully functional at the time it is
2481 * returned from the constructor. */
2482 if (rd_kafka_init_wait(rk, 60*1000) != 0) {
2483 /* This should never happen unless there is a bug
2484 * or the OS is not scheduling the background threads.
2485 * Either case there is no point in handling this gracefully
2486 * in the current state since the thread joins are likely
2487 * to hang as well. */
2488 mtx_lock(&rk->rk_init_lock);
2489 rd_kafka_log(rk, LOG_CRIT, "INIT",
2490 "Failed to initialize %s: "
2491 "%d background thread(s) did not initialize "
2492 "within 60 seconds",
2493 rk->rk_name, rk->rk_init_wait_cnt);
2494 if (errstr)
2495 rd_snprintf(errstr, errstr_size,
2496 "Timed out waiting for "
2497 "%d background thread(s) to initialize",
2498 rk->rk_init_wait_cnt);
2499 mtx_unlock(&rk->rk_init_lock);
2500
2501 rd_kafka_set_last_error(RD_KAFKA_RESP_ERR__CRIT_SYS_RESOURCE,
2502 EDEADLK);
2503 return NULL;
2504 }
2505
2506 rk->rk_initialized = 1;
2507
2508 bflen = sizeof(builtin_features);
2509 if (rd_kafka_conf_get(&rk->rk_conf, "builtin.features",
2510 builtin_features, &bflen) !=
2511 RD_KAFKA_CONF_OK)
2512 rd_snprintf(builtin_features, sizeof(builtin_features), "?");
2513 rd_kafka_dbg(rk, ALL, "INIT",
2514 "librdkafka v%s (0x%x) %s initialized "
2515 "(builtin.features %s, %s, debug 0x%x)",
2516 rd_kafka_version_str(), rd_kafka_version(),
2517 rk->rk_name,
2518 builtin_features, BUILT_WITH,
2519 rk->rk_conf.debug);
2520
2521 /* Log warnings for deprecated configuration */
2522 rd_kafka_conf_warn(rk);
2523
2524 /* Debug dump configuration */
2525 if (rk->rk_conf.debug & RD_KAFKA_DBG_CONF) {
2526 rd_kafka_anyconf_dump_dbg(rk, _RK_GLOBAL,
2527 &rk->rk_conf,
2528 "Client configuration");
2529 if (rk->rk_conf.topic_conf)
2530 rd_kafka_anyconf_dump_dbg(
2531 rk, _RK_TOPIC,
2532 rk->rk_conf.topic_conf,
2533 "Default topic configuration");
2534 }
2535
2536 /* Free user supplied conf's base pointer on success,
2537 * but not the actual allocated fields since the struct
2538 * will have been copied in its entirety above. */
2539 if (app_conf)
2540 rd_free(app_conf);
2541 rd_kafka_set_last_error(0, 0);
2542
2543 return rk;
2544
2545 fail:
2546 /*
2547 * Error out and clean up
2548 */
2549
2550 /*
2551 * Tell background thread to terminate and wait for it to return.
2552 */
2553 rd_atomic32_set(&rk->rk_terminate, RD_KAFKA_DESTROY_F_TERMINATE);
2554
2555 /* Terminate SASL provider */
2556 if (rk->rk_conf.sasl.provider)
2557 rd_kafka_sasl_term(rk);
2558
2559 if (rk->rk_background.thread) {
2560 int res;
2561 thrd_join(rk->rk_background.thread, &res);
2562 rd_kafka_q_destroy_owner(rk->rk_background.q);
2563 }
2564
2565 /* If on_new() interceptors have been called we also need
2566 * to allow interceptor clean-up by calling on_destroy() */
2567 rd_kafka_interceptors_on_destroy(rk);
2568
2569 /* If rk_conf is a struct-copy of the application configuration
2570 * we need to avoid rk_conf fields from being freed from
2571 * rd_kafka_destroy_internal() since they belong to app_conf.
2572 * However, there are some internal fields, such as interceptors,
2573 * that belong to rk_conf and thus needs to be cleaned up.
2574 * Legacy APIs, sigh.. */
2575 if (app_conf) {
2576 rd_kafka_assignors_term(rk);
2577 rd_kafka_interceptors_destroy(&rk->rk_conf);
2578 memset(&rk->rk_conf, 0, sizeof(rk->rk_conf));
2579 }
2580
2581 rd_kafka_destroy_internal(rk);
2582 rd_kafka_destroy_final(rk);
2583
2584 rd_kafka_set_last_error(ret_err, ret_errno);
2585
2586 return NULL;
2587 }
2588
2589
2590
2591
2592 /**
2593 * Counts usage of the legacy/simple consumer (rd_kafka_consume_start() with
2594 * friends) since it does not have an API for stopping the cgrp we will need to
2595 * sort that out automatically in the background when all consumption
2596 * has stopped.
2597 *
2598 * Returns 0 if a High level consumer is already instantiated
2599 * which means a Simple consumer cannot co-operate with it, else 1.
2600 *
2601 * A rd_kafka_t handle can never migrate from simple to high-level, or
2602 * vice versa, so we dont need a ..consumer_del().
2603 */
rd_kafka_simple_consumer_add(rd_kafka_t * rk)2604 int rd_kafka_simple_consumer_add (rd_kafka_t *rk) {
2605 if (rd_atomic32_get(&rk->rk_simple_cnt) < 0)
2606 return 0;
2607
2608 return (int)rd_atomic32_add(&rk->rk_simple_cnt, 1);
2609 }
2610
2611
2612
2613
2614 /**
2615 * rktp fetch is split up in these parts:
2616 * * application side:
2617 * * broker side (handled by current leader broker thread for rktp):
2618 * - the fetch state, initial offset, etc.
2619 * - fetching messages, updating fetched offset, etc.
2620 * - offset commits
2621 *
2622 * Communication between the two are:
2623 * app side -> rdkafka main side: rktp_ops
2624 * broker thread -> app side: rktp_fetchq
2625 *
2626 * There is no shared state between these threads, instead
2627 * state is communicated through the two op queues, and state synchronization
2628 * is performed by version barriers.
2629 *
2630 */
2631
2632 static RD_UNUSED
rd_kafka_consume_start0(rd_kafka_topic_t * rkt,int32_t partition,int64_t offset,rd_kafka_q_t * rkq)2633 int rd_kafka_consume_start0 (rd_kafka_topic_t *rkt, int32_t partition,
2634 int64_t offset, rd_kafka_q_t *rkq) {
2635 rd_kafka_toppar_t *rktp;
2636
2637 if (partition < 0) {
2638 rd_kafka_set_last_error(RD_KAFKA_RESP_ERR__UNKNOWN_PARTITION,
2639 ESRCH);
2640 return -1;
2641 }
2642
2643 if (!rd_kafka_simple_consumer_add(rkt->rkt_rk)) {
2644 rd_kafka_set_last_error(RD_KAFKA_RESP_ERR__INVALID_ARG, EINVAL);
2645 return -1;
2646 }
2647
2648 rd_kafka_topic_wrlock(rkt);
2649 rktp = rd_kafka_toppar_desired_add(rkt, partition);
2650 rd_kafka_topic_wrunlock(rkt);
2651
2652 /* Verify offset */
2653 if (offset == RD_KAFKA_OFFSET_BEGINNING ||
2654 offset == RD_KAFKA_OFFSET_END ||
2655 offset <= RD_KAFKA_OFFSET_TAIL_BASE) {
2656 /* logical offsets */
2657
2658 } else if (offset == RD_KAFKA_OFFSET_STORED) {
2659 /* offset manager */
2660
2661 if (rkt->rkt_conf.offset_store_method ==
2662 RD_KAFKA_OFFSET_METHOD_BROKER &&
2663 RD_KAFKAP_STR_IS_NULL(rkt->rkt_rk->rk_group_id)) {
2664 /* Broker based offsets require a group id. */
2665 rd_kafka_toppar_destroy(rktp);
2666 rd_kafka_set_last_error(RD_KAFKA_RESP_ERR__INVALID_ARG,
2667 EINVAL);
2668 return -1;
2669 }
2670
2671 } else if (offset < 0) {
2672 rd_kafka_toppar_destroy(rktp);
2673 rd_kafka_set_last_error(RD_KAFKA_RESP_ERR__INVALID_ARG,
2674 EINVAL);
2675 return -1;
2676
2677 }
2678
2679 rd_kafka_toppar_op_fetch_start(rktp, offset, rkq, RD_KAFKA_NO_REPLYQ);
2680
2681 rd_kafka_toppar_destroy(rktp);
2682
2683 rd_kafka_set_last_error(0, 0);
2684 return 0;
2685 }
2686
2687
2688
2689
rd_kafka_consume_start(rd_kafka_topic_t * app_rkt,int32_t partition,int64_t offset)2690 int rd_kafka_consume_start (rd_kafka_topic_t *app_rkt, int32_t partition,
2691 int64_t offset) {
2692 rd_kafka_topic_t *rkt = rd_kafka_topic_proper(app_rkt);
2693 rd_kafka_dbg(rkt->rkt_rk, TOPIC, "START",
2694 "Start consuming partition %"PRId32,partition);
2695 return rd_kafka_consume_start0(rkt, partition, offset, NULL);
2696 }
2697
rd_kafka_consume_start_queue(rd_kafka_topic_t * app_rkt,int32_t partition,int64_t offset,rd_kafka_queue_t * rkqu)2698 int rd_kafka_consume_start_queue (rd_kafka_topic_t *app_rkt, int32_t partition,
2699 int64_t offset, rd_kafka_queue_t *rkqu) {
2700 rd_kafka_topic_t *rkt = rd_kafka_topic_proper(app_rkt);
2701
2702 return rd_kafka_consume_start0(rkt, partition, offset, rkqu->rkqu_q);
2703 }
2704
2705
2706
2707
rd_kafka_consume_stop0(rd_kafka_toppar_t * rktp)2708 static RD_UNUSED int rd_kafka_consume_stop0 (rd_kafka_toppar_t *rktp) {
2709 rd_kafka_q_t *tmpq = NULL;
2710 rd_kafka_resp_err_t err;
2711
2712 rd_kafka_topic_wrlock(rktp->rktp_rkt);
2713 rd_kafka_toppar_lock(rktp);
2714 rd_kafka_toppar_desired_del(rktp);
2715 rd_kafka_toppar_unlock(rktp);
2716 rd_kafka_topic_wrunlock(rktp->rktp_rkt);
2717
2718 tmpq = rd_kafka_q_new(rktp->rktp_rkt->rkt_rk);
2719
2720 rd_kafka_toppar_op_fetch_stop(rktp, RD_KAFKA_REPLYQ(tmpq, 0));
2721
2722 /* Synchronisation: Wait for stop reply from broker thread */
2723 err = rd_kafka_q_wait_result(tmpq, RD_POLL_INFINITE);
2724 rd_kafka_q_destroy_owner(tmpq);
2725
2726 rd_kafka_set_last_error(err, err ? EINVAL : 0);
2727
2728 return err ? -1 : 0;
2729 }
2730
2731
rd_kafka_consume_stop(rd_kafka_topic_t * app_rkt,int32_t partition)2732 int rd_kafka_consume_stop (rd_kafka_topic_t *app_rkt, int32_t partition) {
2733 rd_kafka_topic_t *rkt = rd_kafka_topic_proper(app_rkt);
2734 rd_kafka_toppar_t *rktp;
2735 int r;
2736
2737 if (partition == RD_KAFKA_PARTITION_UA) {
2738 rd_kafka_set_last_error(RD_KAFKA_RESP_ERR__INVALID_ARG, EINVAL);
2739 return -1;
2740 }
2741
2742 rd_kafka_topic_wrlock(rkt);
2743 if (!(rktp = rd_kafka_toppar_get(rkt, partition, 0)) &&
2744 !(rktp = rd_kafka_toppar_desired_get(rkt, partition))) {
2745 rd_kafka_topic_wrunlock(rkt);
2746 rd_kafka_set_last_error(RD_KAFKA_RESP_ERR__UNKNOWN_PARTITION,
2747 ESRCH);
2748 return -1;
2749 }
2750 rd_kafka_topic_wrunlock(rkt);
2751
2752 r = rd_kafka_consume_stop0(rktp);
2753 /* set_last_error() called by stop0() */
2754
2755 rd_kafka_toppar_destroy(rktp);
2756
2757 return r;
2758 }
2759
2760
2761
rd_kafka_seek(rd_kafka_topic_t * app_rkt,int32_t partition,int64_t offset,int timeout_ms)2762 rd_kafka_resp_err_t rd_kafka_seek (rd_kafka_topic_t *app_rkt,
2763 int32_t partition,
2764 int64_t offset,
2765 int timeout_ms) {
2766 rd_kafka_topic_t *rkt = rd_kafka_topic_proper(app_rkt);
2767 rd_kafka_toppar_t *rktp;
2768 rd_kafka_q_t *tmpq = NULL;
2769 rd_kafka_resp_err_t err;
2770 rd_kafka_replyq_t replyq = RD_KAFKA_NO_REPLYQ;
2771
2772 /* FIXME: simple consumer check */
2773
2774 if (partition == RD_KAFKA_PARTITION_UA)
2775 return RD_KAFKA_RESP_ERR__INVALID_ARG;
2776
2777 rd_kafka_topic_rdlock(rkt);
2778 if (!(rktp = rd_kafka_toppar_get(rkt, partition, 0)) &&
2779 !(rktp = rd_kafka_toppar_desired_get(rkt, partition))) {
2780 rd_kafka_topic_rdunlock(rkt);
2781 return RD_KAFKA_RESP_ERR__UNKNOWN_PARTITION;
2782 }
2783 rd_kafka_topic_rdunlock(rkt);
2784
2785 if (timeout_ms) {
2786 tmpq = rd_kafka_q_new(rkt->rkt_rk);
2787 replyq = RD_KAFKA_REPLYQ(tmpq, 0);
2788 }
2789
2790 if ((err = rd_kafka_toppar_op_seek(rktp, offset, replyq))) {
2791 if (tmpq)
2792 rd_kafka_q_destroy_owner(tmpq);
2793 rd_kafka_toppar_destroy(rktp);
2794 return err;
2795 }
2796
2797 rd_kafka_toppar_destroy(rktp);
2798
2799 if (tmpq) {
2800 err = rd_kafka_q_wait_result(tmpq, timeout_ms);
2801 rd_kafka_q_destroy_owner(tmpq);
2802 return err;
2803 }
2804
2805 return RD_KAFKA_RESP_ERR_NO_ERROR;
2806 }
2807
2808
2809 rd_kafka_error_t *
rd_kafka_seek_partitions(rd_kafka_t * rk,rd_kafka_topic_partition_list_t * partitions,int timeout_ms)2810 rd_kafka_seek_partitions (rd_kafka_t *rk,
2811 rd_kafka_topic_partition_list_t *partitions,
2812 int timeout_ms) {
2813 rd_kafka_q_t *tmpq = NULL;
2814 rd_kafka_topic_partition_t *rktpar;
2815 rd_ts_t abs_timeout = rd_timeout_init(timeout_ms);
2816 int cnt = 0;
2817
2818 if (rk->rk_type != RD_KAFKA_CONSUMER)
2819 return rd_kafka_error_new(
2820 RD_KAFKA_RESP_ERR__INVALID_ARG,
2821 "Must only be used on consumer instance");
2822
2823 if (!partitions || partitions->cnt == 0)
2824 return rd_kafka_error_new(RD_KAFKA_RESP_ERR__INVALID_ARG,
2825 "partitions must be specified");
2826
2827 if (timeout_ms)
2828 tmpq = rd_kafka_q_new(rk);
2829
2830 RD_KAFKA_TPLIST_FOREACH(rktpar, partitions) {
2831 rd_kafka_toppar_t *rktp;
2832 rd_kafka_resp_err_t err;
2833
2834 rktp = rd_kafka_toppar_get2(rk,
2835 rktpar->topic,
2836 rktpar->partition,
2837 rd_false/*no-ua-on-miss*/,
2838 rd_false/*no-create-on-miss*/);
2839 if (!rktp) {
2840 rktpar->err = RD_KAFKA_RESP_ERR__UNKNOWN_PARTITION;
2841 continue;
2842 }
2843
2844 err = rd_kafka_toppar_op_seek(rktp, rktpar->offset,
2845 RD_KAFKA_REPLYQ(tmpq, 0));
2846 if (err) {
2847 rktpar->err = err;
2848 } else {
2849 rktpar->err = RD_KAFKA_RESP_ERR__IN_PROGRESS;
2850 cnt++;
2851 }
2852
2853 rd_kafka_toppar_destroy(rktp); /* refcnt from toppar_get2() */
2854 }
2855
2856 if (!timeout_ms)
2857 return NULL;
2858
2859
2860 while (cnt > 0) {
2861 rd_kafka_op_t *rko;
2862
2863 rko = rd_kafka_q_pop(tmpq, rd_timeout_remains(abs_timeout), 0);
2864 if (!rko) {
2865 rd_kafka_q_destroy_owner(tmpq);
2866
2867 return rd_kafka_error_new(
2868 RD_KAFKA_RESP_ERR__TIMED_OUT,
2869 "Timed out waiting for %d remaining partition "
2870 "seek(s) to finish", cnt);
2871 }
2872
2873 if (rko->rko_err == RD_KAFKA_RESP_ERR__DESTROY) {
2874 rd_kafka_q_destroy_owner(tmpq);
2875 rd_kafka_op_destroy(rko);
2876
2877 return rd_kafka_error_new(RD_KAFKA_RESP_ERR__DESTROY,
2878 "Instance is terminating");
2879 }
2880
2881 rd_assert(rko->rko_rktp);
2882
2883 rktpar = rd_kafka_topic_partition_list_find(
2884 partitions,
2885 rko->rko_rktp->rktp_rkt->rkt_topic->str,
2886 rko->rko_rktp->rktp_partition);
2887 rd_assert(rktpar);
2888
2889 rktpar->err = rko->rko_err;
2890
2891 rd_kafka_op_destroy(rko);
2892
2893 cnt--;
2894 }
2895
2896 rd_kafka_q_destroy_owner(tmpq);
2897
2898 return NULL;
2899 }
2900
2901
2902
rd_kafka_consume_batch0(rd_kafka_q_t * rkq,int timeout_ms,rd_kafka_message_t ** rkmessages,size_t rkmessages_size)2903 static ssize_t rd_kafka_consume_batch0 (rd_kafka_q_t *rkq,
2904 int timeout_ms,
2905 rd_kafka_message_t **rkmessages,
2906 size_t rkmessages_size) {
2907 /* Populate application's rkmessages array. */
2908 return rd_kafka_q_serve_rkmessages(rkq, timeout_ms,
2909 rkmessages, rkmessages_size);
2910 }
2911
2912
rd_kafka_consume_batch(rd_kafka_topic_t * app_rkt,int32_t partition,int timeout_ms,rd_kafka_message_t ** rkmessages,size_t rkmessages_size)2913 ssize_t rd_kafka_consume_batch (rd_kafka_topic_t *app_rkt, int32_t partition,
2914 int timeout_ms,
2915 rd_kafka_message_t **rkmessages,
2916 size_t rkmessages_size) {
2917 rd_kafka_topic_t *rkt = rd_kafka_topic_proper(app_rkt);
2918 rd_kafka_toppar_t *rktp;
2919 ssize_t cnt;
2920
2921 /* Get toppar */
2922 rd_kafka_topic_rdlock(rkt);
2923 rktp = rd_kafka_toppar_get(rkt, partition, 0/*no ua on miss*/);
2924 if (unlikely(!rktp))
2925 rktp = rd_kafka_toppar_desired_get(rkt, partition);
2926 rd_kafka_topic_rdunlock(rkt);
2927
2928 if (unlikely(!rktp)) {
2929 /* No such toppar known */
2930 rd_kafka_set_last_error(RD_KAFKA_RESP_ERR__UNKNOWN_PARTITION,
2931 ESRCH);
2932 return -1;
2933 }
2934
2935 /* Populate application's rkmessages array. */
2936 cnt = rd_kafka_q_serve_rkmessages(rktp->rktp_fetchq, timeout_ms,
2937 rkmessages, rkmessages_size);
2938
2939 rd_kafka_toppar_destroy(rktp); /* refcnt from .._get() */
2940
2941 rd_kafka_set_last_error(0, 0);
2942
2943 return cnt;
2944 }
2945
rd_kafka_consume_batch_queue(rd_kafka_queue_t * rkqu,int timeout_ms,rd_kafka_message_t ** rkmessages,size_t rkmessages_size)2946 ssize_t rd_kafka_consume_batch_queue (rd_kafka_queue_t *rkqu,
2947 int timeout_ms,
2948 rd_kafka_message_t **rkmessages,
2949 size_t rkmessages_size) {
2950 /* Populate application's rkmessages array. */
2951 return rd_kafka_consume_batch0(rkqu->rkqu_q, timeout_ms,
2952 rkmessages, rkmessages_size);
2953 }
2954
2955
2956 struct consume_ctx {
2957 void (*consume_cb) (rd_kafka_message_t *rkmessage, void *opaque);
2958 void *opaque;
2959 };
2960
2961
2962 /**
2963 * Trampoline for application's consume_cb()
2964 */
2965 static rd_kafka_op_res_t
rd_kafka_consume_cb(rd_kafka_t * rk,rd_kafka_q_t * rkq,rd_kafka_op_t * rko,rd_kafka_q_cb_type_t cb_type,void * opaque)2966 rd_kafka_consume_cb (rd_kafka_t *rk,
2967 rd_kafka_q_t *rkq,
2968 rd_kafka_op_t *rko,
2969 rd_kafka_q_cb_type_t cb_type, void *opaque) {
2970 struct consume_ctx *ctx = opaque;
2971 rd_kafka_message_t *rkmessage;
2972
2973 if (unlikely(rd_kafka_op_version_outdated(rko, 0)) ||
2974 rko->rko_type == RD_KAFKA_OP_BARRIER) {
2975 rd_kafka_op_destroy(rko);
2976 return RD_KAFKA_OP_RES_HANDLED;
2977 }
2978
2979 rkmessage = rd_kafka_message_get(rko);
2980
2981 rd_kafka_op_offset_store(rk, rko);
2982
2983 ctx->consume_cb(rkmessage, ctx->opaque);
2984
2985 rd_kafka_op_destroy(rko);
2986
2987 return RD_KAFKA_OP_RES_HANDLED;
2988 }
2989
2990
2991
2992 static rd_kafka_op_res_t
rd_kafka_consume_callback0(rd_kafka_q_t * rkq,int timeout_ms,int max_cnt,void (* consume_cb)(rd_kafka_message_t * rkmessage,void * opaque),void * opaque)2993 rd_kafka_consume_callback0 (rd_kafka_q_t *rkq, int timeout_ms, int max_cnt,
2994 void (*consume_cb) (rd_kafka_message_t
2995 *rkmessage,
2996 void *opaque),
2997 void *opaque) {
2998 struct consume_ctx ctx = { .consume_cb = consume_cb, .opaque = opaque };
2999 rd_kafka_op_res_t res;
3000
3001 if (timeout_ms)
3002 rd_kafka_app_poll_blocking(rkq->rkq_rk);
3003
3004 res = rd_kafka_q_serve(rkq, timeout_ms, max_cnt,
3005 RD_KAFKA_Q_CB_RETURN,
3006 rd_kafka_consume_cb, &ctx);
3007
3008 rd_kafka_app_polled(rkq->rkq_rk);
3009
3010 return res;
3011 }
3012
3013
rd_kafka_consume_callback(rd_kafka_topic_t * app_rkt,int32_t partition,int timeout_ms,void (* consume_cb)(rd_kafka_message_t * rkmessage,void * opaque),void * opaque)3014 int rd_kafka_consume_callback (rd_kafka_topic_t *app_rkt, int32_t partition,
3015 int timeout_ms,
3016 void (*consume_cb) (rd_kafka_message_t
3017 *rkmessage,
3018 void *opaque),
3019 void *opaque) {
3020 rd_kafka_topic_t *rkt = rd_kafka_topic_proper(app_rkt);
3021 rd_kafka_toppar_t *rktp;
3022 int r;
3023
3024 /* Get toppar */
3025 rd_kafka_topic_rdlock(rkt);
3026 rktp = rd_kafka_toppar_get(rkt, partition, 0/*no ua on miss*/);
3027 if (unlikely(!rktp))
3028 rktp = rd_kafka_toppar_desired_get(rkt, partition);
3029 rd_kafka_topic_rdunlock(rkt);
3030
3031 if (unlikely(!rktp)) {
3032 /* No such toppar known */
3033 rd_kafka_set_last_error(RD_KAFKA_RESP_ERR__UNKNOWN_PARTITION,
3034 ESRCH);
3035 return -1;
3036 }
3037
3038 r = rd_kafka_consume_callback0(rktp->rktp_fetchq, timeout_ms,
3039 rkt->rkt_conf.consume_callback_max_msgs,
3040 consume_cb, opaque);
3041
3042 rd_kafka_toppar_destroy(rktp);
3043
3044 rd_kafka_set_last_error(0, 0);
3045
3046 return r;
3047 }
3048
3049
3050
rd_kafka_consume_callback_queue(rd_kafka_queue_t * rkqu,int timeout_ms,void (* consume_cb)(rd_kafka_message_t * rkmessage,void * opaque),void * opaque)3051 int rd_kafka_consume_callback_queue (rd_kafka_queue_t *rkqu,
3052 int timeout_ms,
3053 void (*consume_cb) (rd_kafka_message_t
3054 *rkmessage,
3055 void *opaque),
3056 void *opaque) {
3057 return rd_kafka_consume_callback0(rkqu->rkqu_q, timeout_ms, 0,
3058 consume_cb, opaque);
3059 }
3060
3061
3062 /**
3063 * Serve queue 'rkq' and return one message.
3064 * By serving the queue it will also call any registered callbacks
3065 * registered for matching events, this includes consumer_cb()
3066 * in which case no message will be returned.
3067 */
rd_kafka_consume0(rd_kafka_t * rk,rd_kafka_q_t * rkq,int timeout_ms)3068 static rd_kafka_message_t *rd_kafka_consume0 (rd_kafka_t *rk,
3069 rd_kafka_q_t *rkq,
3070 int timeout_ms) {
3071 rd_kafka_op_t *rko;
3072 rd_kafka_message_t *rkmessage = NULL;
3073 rd_ts_t abs_timeout = rd_timeout_init(timeout_ms);
3074
3075 if (timeout_ms)
3076 rd_kafka_app_poll_blocking(rk);
3077
3078 rd_kafka_yield_thread = 0;
3079 while ((rko = rd_kafka_q_pop(rkq,
3080 rd_timeout_remains_us(abs_timeout), 0))) {
3081 rd_kafka_op_res_t res;
3082
3083 res = rd_kafka_poll_cb(rk, rkq, rko,
3084 RD_KAFKA_Q_CB_RETURN, NULL);
3085
3086 if (res == RD_KAFKA_OP_RES_PASS)
3087 break;
3088
3089 if (unlikely(res == RD_KAFKA_OP_RES_YIELD ||
3090 rd_kafka_yield_thread)) {
3091 /* Callback called rd_kafka_yield(), we must
3092 * stop dispatching the queue and return. */
3093 rd_kafka_set_last_error(RD_KAFKA_RESP_ERR__INTR,
3094 EINTR);
3095 rd_kafka_app_polled(rk);
3096 return NULL;
3097 }
3098
3099 /* Message was handled by callback. */
3100 continue;
3101 }
3102
3103 if (!rko) {
3104 /* Timeout reached with no op returned. */
3105 rd_kafka_set_last_error(RD_KAFKA_RESP_ERR__TIMED_OUT,
3106 ETIMEDOUT);
3107 rd_kafka_app_polled(rk);
3108 return NULL;
3109 }
3110
3111 rd_kafka_assert(rk,
3112 rko->rko_type == RD_KAFKA_OP_FETCH ||
3113 rko->rko_type == RD_KAFKA_OP_CONSUMER_ERR);
3114
3115 /* Get rkmessage from rko */
3116 rkmessage = rd_kafka_message_get(rko);
3117
3118 /* Store offset */
3119 rd_kafka_op_offset_store(rk, rko);
3120
3121 rd_kafka_set_last_error(0, 0);
3122
3123 rd_kafka_app_polled(rk);
3124
3125 return rkmessage;
3126 }
3127
rd_kafka_consume(rd_kafka_topic_t * app_rkt,int32_t partition,int timeout_ms)3128 rd_kafka_message_t *rd_kafka_consume (rd_kafka_topic_t *app_rkt,
3129 int32_t partition,
3130 int timeout_ms) {
3131 rd_kafka_topic_t *rkt = rd_kafka_topic_proper(app_rkt);
3132 rd_kafka_toppar_t *rktp;
3133 rd_kafka_message_t *rkmessage;
3134
3135 rd_kafka_topic_rdlock(rkt);
3136 rktp = rd_kafka_toppar_get(rkt, partition, 0/*no ua on miss*/);
3137 if (unlikely(!rktp))
3138 rktp = rd_kafka_toppar_desired_get(rkt, partition);
3139 rd_kafka_topic_rdunlock(rkt);
3140
3141 if (unlikely(!rktp)) {
3142 /* No such toppar known */
3143 rd_kafka_set_last_error(RD_KAFKA_RESP_ERR__UNKNOWN_PARTITION,
3144 ESRCH);
3145 return NULL;
3146 }
3147
3148 rkmessage = rd_kafka_consume0(rkt->rkt_rk,
3149 rktp->rktp_fetchq, timeout_ms);
3150
3151 rd_kafka_toppar_destroy(rktp); /* refcnt from .._get() */
3152
3153 return rkmessage;
3154 }
3155
3156
rd_kafka_consume_queue(rd_kafka_queue_t * rkqu,int timeout_ms)3157 rd_kafka_message_t *rd_kafka_consume_queue (rd_kafka_queue_t *rkqu,
3158 int timeout_ms) {
3159 return rd_kafka_consume0(rkqu->rkqu_rk, rkqu->rkqu_q, timeout_ms);
3160 }
3161
3162
3163
3164
rd_kafka_poll_set_consumer(rd_kafka_t * rk)3165 rd_kafka_resp_err_t rd_kafka_poll_set_consumer (rd_kafka_t *rk) {
3166 rd_kafka_cgrp_t *rkcg;
3167
3168 if (!(rkcg = rd_kafka_cgrp_get(rk)))
3169 return RD_KAFKA_RESP_ERR__UNKNOWN_GROUP;
3170
3171 rd_kafka_q_fwd_set(rk->rk_rep, rkcg->rkcg_q);
3172 return RD_KAFKA_RESP_ERR_NO_ERROR;
3173 }
3174
3175
3176
3177
rd_kafka_consumer_poll(rd_kafka_t * rk,int timeout_ms)3178 rd_kafka_message_t *rd_kafka_consumer_poll (rd_kafka_t *rk,
3179 int timeout_ms) {
3180 rd_kafka_cgrp_t *rkcg;
3181
3182 if (unlikely(!(rkcg = rd_kafka_cgrp_get(rk)))) {
3183 rd_kafka_message_t *rkmessage = rd_kafka_message_new();
3184 rkmessage->err = RD_KAFKA_RESP_ERR__UNKNOWN_GROUP;
3185 return rkmessage;
3186 }
3187
3188 return rd_kafka_consume0(rk, rkcg->rkcg_q, timeout_ms);
3189 }
3190
3191
rd_kafka_consumer_close(rd_kafka_t * rk)3192 rd_kafka_resp_err_t rd_kafka_consumer_close (rd_kafka_t *rk) {
3193 rd_kafka_cgrp_t *rkcg;
3194 rd_kafka_op_t *rko;
3195 rd_kafka_resp_err_t err = RD_KAFKA_RESP_ERR__TIMED_OUT;
3196 rd_kafka_q_t *rkq;
3197
3198 if (!(rkcg = rd_kafka_cgrp_get(rk)))
3199 return RD_KAFKA_RESP_ERR__UNKNOWN_GROUP;
3200
3201 /* If a fatal error has been raised and this is an
3202 * explicit consumer_close() from the application we return
3203 * a fatal error. Otherwise let the "silent" no_consumer_close
3204 * logic be performed to clean up properly. */
3205 if (rd_kafka_fatal_error_code(rk) &&
3206 !rd_kafka_destroy_flags_no_consumer_close(rk))
3207 return RD_KAFKA_RESP_ERR__FATAL;
3208
3209 rd_kafka_dbg(rk, CONSUMER, "CLOSE", "Closing consumer");
3210
3211 /* Redirect cgrp queue to our temporary queue to make sure
3212 * all posted ops (e.g., rebalance callbacks) are served by
3213 * this function. */
3214 rkq = rd_kafka_q_new(rk);
3215 rd_kafka_q_fwd_set(rkcg->rkcg_q, rkq);
3216
3217 rd_kafka_cgrp_terminate(rkcg, RD_KAFKA_REPLYQ(rkq, 0)); /* async */
3218
3219 /* Disable the queue if termination is immediate or the user
3220 * does not want the blocking consumer_close() behaviour, this will
3221 * cause any ops posted for this queue (such as rebalance) to
3222 * be destroyed.
3223 */
3224 if (rd_kafka_destroy_flags_no_consumer_close(rk)) {
3225 rd_kafka_dbg(rk, CONSUMER, "CLOSE",
3226 "Disabling and purging temporary queue to quench "
3227 "close events");
3228 rd_kafka_q_disable(rkq);
3229 /* Purge ops already enqueued */
3230 rd_kafka_q_purge(rkq);
3231 } else {
3232 rd_kafka_dbg(rk, CONSUMER, "CLOSE",
3233 "Waiting for close events");
3234 while ((rko = rd_kafka_q_pop(rkq, RD_POLL_INFINITE, 0))) {
3235 rd_kafka_op_res_t res;
3236 if ((rko->rko_type & ~RD_KAFKA_OP_FLAGMASK) ==
3237 RD_KAFKA_OP_TERMINATE) {
3238 err = rko->rko_err;
3239 rd_kafka_op_destroy(rko);
3240 break;
3241 }
3242 res = rd_kafka_poll_cb(rk, rkq, rko,
3243 RD_KAFKA_Q_CB_RETURN, NULL);
3244 if (res == RD_KAFKA_OP_RES_PASS)
3245 rd_kafka_op_destroy(rko);
3246 /* Ignore YIELD, we need to finish */
3247 }
3248 }
3249
3250 rd_kafka_q_fwd_set(rkcg->rkcg_q, NULL);
3251
3252 rd_kafka_q_destroy_owner(rkq);
3253
3254 rd_kafka_dbg(rk, CONSUMER, "CLOSE", "Consumer closed");
3255
3256 return err;
3257 }
3258
3259
3260
3261 rd_kafka_resp_err_t
rd_kafka_committed(rd_kafka_t * rk,rd_kafka_topic_partition_list_t * partitions,int timeout_ms)3262 rd_kafka_committed (rd_kafka_t *rk,
3263 rd_kafka_topic_partition_list_t *partitions,
3264 int timeout_ms) {
3265 rd_kafka_q_t *rkq;
3266 rd_kafka_resp_err_t err;
3267 rd_kafka_cgrp_t *rkcg;
3268 rd_ts_t abs_timeout = rd_timeout_init(timeout_ms);
3269
3270 if (!partitions)
3271 return RD_KAFKA_RESP_ERR__INVALID_ARG;
3272
3273 if (!(rkcg = rd_kafka_cgrp_get(rk)))
3274 return RD_KAFKA_RESP_ERR__UNKNOWN_GROUP;
3275
3276 /* Set default offsets. */
3277 rd_kafka_topic_partition_list_reset_offsets(partitions,
3278 RD_KAFKA_OFFSET_INVALID);
3279
3280 rkq = rd_kafka_q_new(rk);
3281
3282 do {
3283 rd_kafka_op_t *rko;
3284 int state_version = rd_kafka_brokers_get_state_version(rk);
3285
3286 rko = rd_kafka_op_new(RD_KAFKA_OP_OFFSET_FETCH);
3287 rd_kafka_op_set_replyq(rko, rkq, NULL);
3288
3289 /* Issue #827
3290 * Copy partition list to avoid use-after-free if we time out
3291 * here, the app frees the list, and then cgrp starts
3292 * processing the op. */
3293 rko->rko_u.offset_fetch.partitions =
3294 rd_kafka_topic_partition_list_copy(partitions);
3295 rko->rko_u.offset_fetch.require_stable =
3296 rk->rk_conf.isolation_level == RD_KAFKA_READ_COMMITTED;
3297 rko->rko_u.offset_fetch.do_free = 1;
3298
3299 if (!rd_kafka_q_enq(rkcg->rkcg_ops, rko)) {
3300 err = RD_KAFKA_RESP_ERR__DESTROY;
3301 break;
3302 }
3303
3304 rko = rd_kafka_q_pop(rkq,
3305 rd_timeout_remains_us(abs_timeout), 0);
3306 if (rko) {
3307 if (!(err = rko->rko_err))
3308 rd_kafka_topic_partition_list_update(
3309 partitions,
3310 rko->rko_u.offset_fetch.partitions);
3311 else if ((err == RD_KAFKA_RESP_ERR__WAIT_COORD ||
3312 err == RD_KAFKA_RESP_ERR__TRANSPORT) &&
3313 !rd_kafka_brokers_wait_state_change(
3314 rk, state_version,
3315 rd_timeout_remains(abs_timeout)))
3316 err = RD_KAFKA_RESP_ERR__TIMED_OUT;
3317
3318 rd_kafka_op_destroy(rko);
3319 } else
3320 err = RD_KAFKA_RESP_ERR__TIMED_OUT;
3321 } while (err == RD_KAFKA_RESP_ERR__TRANSPORT ||
3322 err == RD_KAFKA_RESP_ERR__WAIT_COORD);
3323
3324 rd_kafka_q_destroy_owner(rkq);
3325
3326 return err;
3327 }
3328
3329
3330
3331 rd_kafka_resp_err_t
rd_kafka_position(rd_kafka_t * rk,rd_kafka_topic_partition_list_t * partitions)3332 rd_kafka_position (rd_kafka_t *rk,
3333 rd_kafka_topic_partition_list_t *partitions) {
3334 int i;
3335
3336 for (i = 0 ; i < partitions->cnt ; i++) {
3337 rd_kafka_topic_partition_t *rktpar = &partitions->elems[i];
3338 rd_kafka_toppar_t *rktp;
3339
3340 if (!(rktp = rd_kafka_toppar_get2(rk, rktpar->topic,
3341 rktpar->partition, 0, 1))) {
3342 rktpar->err = RD_KAFKA_RESP_ERR__UNKNOWN_PARTITION;
3343 rktpar->offset = RD_KAFKA_OFFSET_INVALID;
3344 continue;
3345 }
3346
3347 rd_kafka_toppar_lock(rktp);
3348 rktpar->offset = rktp->rktp_app_offset;
3349 rktpar->err = RD_KAFKA_RESP_ERR_NO_ERROR;
3350 rd_kafka_toppar_unlock(rktp);
3351 rd_kafka_toppar_destroy(rktp);
3352 }
3353
3354 return RD_KAFKA_RESP_ERR_NO_ERROR;
3355 }
3356
3357
3358
3359 struct _query_wmark_offsets_state {
3360 rd_kafka_resp_err_t err;
3361 const char *topic;
3362 int32_t partition;
3363 int64_t offsets[2];
3364 int offidx; /* next offset to set from response */
3365 rd_ts_t ts_end;
3366 int state_version; /* Broker state version */
3367 };
3368
rd_kafka_query_wmark_offsets_resp_cb(rd_kafka_t * rk,rd_kafka_broker_t * rkb,rd_kafka_resp_err_t err,rd_kafka_buf_t * rkbuf,rd_kafka_buf_t * request,void * opaque)3369 static void rd_kafka_query_wmark_offsets_resp_cb (rd_kafka_t *rk,
3370 rd_kafka_broker_t *rkb,
3371 rd_kafka_resp_err_t err,
3372 rd_kafka_buf_t *rkbuf,
3373 rd_kafka_buf_t *request,
3374 void *opaque) {
3375 struct _query_wmark_offsets_state *state;
3376 rd_kafka_topic_partition_list_t *offsets;
3377 rd_kafka_topic_partition_t *rktpar;
3378
3379 if (err == RD_KAFKA_RESP_ERR__DESTROY) {
3380 /* 'state' has gone out of scope when query_watermark..()
3381 * timed out and returned to the caller. */
3382 return;
3383 }
3384
3385 state = opaque;
3386
3387 offsets = rd_kafka_topic_partition_list_new(1);
3388 err = rd_kafka_handle_ListOffsets(rk, rkb, err, rkbuf, request,
3389 offsets, NULL);
3390 if (err == RD_KAFKA_RESP_ERR__IN_PROGRESS) {
3391 rd_kafka_topic_partition_list_destroy(offsets);
3392 return; /* Retrying */
3393 }
3394
3395 /* Retry if no broker connection is available yet. */
3396 if (err == RD_KAFKA_RESP_ERR__TRANSPORT &&
3397 rkb &&
3398 rd_kafka_brokers_wait_state_change(
3399 rkb->rkb_rk, state->state_version,
3400 rd_timeout_remains(state->ts_end))) {
3401 /* Retry */
3402 state->state_version = rd_kafka_brokers_get_state_version(rk);
3403 request->rkbuf_retries = 0;
3404 if (rd_kafka_buf_retry(rkb, request)) {
3405 rd_kafka_topic_partition_list_destroy(offsets);
3406 return; /* Retry in progress */
3407 }
3408 /* FALLTHRU */
3409 }
3410
3411 /* Partition not seen in response. */
3412 if (!(rktpar = rd_kafka_topic_partition_list_find(offsets,
3413 state->topic,
3414 state->partition)))
3415 err = RD_KAFKA_RESP_ERR__BAD_MSG;
3416 else if (rktpar->err)
3417 err = rktpar->err;
3418 else
3419 state->offsets[state->offidx] = rktpar->offset;
3420
3421 state->offidx++;
3422
3423 if (err || state->offidx == 2) /* Error or Done */
3424 state->err = err;
3425
3426 rd_kafka_topic_partition_list_destroy(offsets);
3427 }
3428
3429
3430 rd_kafka_resp_err_t
rd_kafka_query_watermark_offsets(rd_kafka_t * rk,const char * topic,int32_t partition,int64_t * low,int64_t * high,int timeout_ms)3431 rd_kafka_query_watermark_offsets (rd_kafka_t *rk, const char *topic,
3432 int32_t partition,
3433 int64_t *low, int64_t *high, int timeout_ms) {
3434 rd_kafka_q_t *rkq;
3435 struct _query_wmark_offsets_state state;
3436 rd_ts_t ts_end = rd_timeout_init(timeout_ms);
3437 rd_kafka_topic_partition_list_t *partitions;
3438 rd_kafka_topic_partition_t *rktpar;
3439 struct rd_kafka_partition_leader *leader;
3440 rd_list_t leaders;
3441 rd_kafka_resp_err_t err;
3442
3443 partitions = rd_kafka_topic_partition_list_new(1);
3444 rktpar = rd_kafka_topic_partition_list_add(partitions,
3445 topic, partition);
3446
3447 rd_list_init(&leaders, partitions->cnt,
3448 (void *)rd_kafka_partition_leader_destroy);
3449
3450 err = rd_kafka_topic_partition_list_query_leaders(rk, partitions,
3451 &leaders, timeout_ms);
3452 if (err) {
3453 rd_list_destroy(&leaders);
3454 rd_kafka_topic_partition_list_destroy(partitions);
3455 return err;
3456 }
3457
3458 leader = rd_list_elem(&leaders, 0);
3459
3460 rkq = rd_kafka_q_new(rk);
3461
3462 /* Due to KAFKA-1588 we need to send a request for each wanted offset,
3463 * in this case one for the low watermark and one for the high. */
3464 state.topic = topic;
3465 state.partition = partition;
3466 state.offsets[0] = RD_KAFKA_OFFSET_BEGINNING;
3467 state.offsets[1] = RD_KAFKA_OFFSET_END;
3468 state.offidx = 0;
3469 state.err = RD_KAFKA_RESP_ERR__IN_PROGRESS;
3470 state.ts_end = ts_end;
3471 state.state_version = rd_kafka_brokers_get_state_version(rk);
3472
3473
3474 rktpar->offset = RD_KAFKA_OFFSET_BEGINNING;
3475 rd_kafka_ListOffsetsRequest(leader->rkb, partitions,
3476 RD_KAFKA_REPLYQ(rkq, 0),
3477 rd_kafka_query_wmark_offsets_resp_cb,
3478 &state);
3479
3480 rktpar->offset = RD_KAFKA_OFFSET_END;
3481 rd_kafka_ListOffsetsRequest(leader->rkb, partitions,
3482 RD_KAFKA_REPLYQ(rkq, 0),
3483 rd_kafka_query_wmark_offsets_resp_cb,
3484 &state);
3485
3486 rd_kafka_topic_partition_list_destroy(partitions);
3487 rd_list_destroy(&leaders);
3488
3489 /* Wait for reply (or timeout) */
3490 while (state.err == RD_KAFKA_RESP_ERR__IN_PROGRESS &&
3491 rd_kafka_q_serve(rkq, 100, 0, RD_KAFKA_Q_CB_CALLBACK,
3492 rd_kafka_poll_cb, NULL) !=
3493 RD_KAFKA_OP_RES_YIELD)
3494 ;
3495
3496 rd_kafka_q_destroy_owner(rkq);
3497
3498 if (state.err)
3499 return state.err;
3500 else if (state.offidx != 2)
3501 return RD_KAFKA_RESP_ERR__FAIL;
3502
3503 /* We are not certain about the returned order. */
3504 if (state.offsets[0] < state.offsets[1]) {
3505 *low = state.offsets[0];
3506 *high = state.offsets[1];
3507 } else {
3508 *low = state.offsets[1];
3509 *high = state.offsets[0];
3510 }
3511
3512 /* If partition is empty only one offset (the last) will be returned. */
3513 if (*low < 0 && *high >= 0)
3514 *low = *high;
3515
3516 return RD_KAFKA_RESP_ERR_NO_ERROR;
3517 }
3518
3519
3520 rd_kafka_resp_err_t
rd_kafka_get_watermark_offsets(rd_kafka_t * rk,const char * topic,int32_t partition,int64_t * low,int64_t * high)3521 rd_kafka_get_watermark_offsets (rd_kafka_t *rk, const char *topic,
3522 int32_t partition,
3523 int64_t *low, int64_t *high) {
3524 rd_kafka_toppar_t *rktp;
3525
3526 rktp = rd_kafka_toppar_get2(rk, topic, partition, 0, 1);
3527 if (!rktp)
3528 return RD_KAFKA_RESP_ERR__UNKNOWN_PARTITION;
3529
3530 rd_kafka_toppar_lock(rktp);
3531 *low = rktp->rktp_lo_offset;
3532 *high = rktp->rktp_hi_offset;
3533 rd_kafka_toppar_unlock(rktp);
3534
3535 rd_kafka_toppar_destroy(rktp);
3536
3537 return RD_KAFKA_RESP_ERR_NO_ERROR;
3538 }
3539
3540
3541 /**
3542 * @brief get_offsets_for_times() state
3543 */
3544 struct _get_offsets_for_times {
3545 rd_kafka_topic_partition_list_t *results;
3546 rd_kafka_resp_err_t err;
3547 int wait_reply;
3548 int state_version;
3549 rd_ts_t ts_end;
3550 };
3551
3552 /**
3553 * @brief Handle OffsetRequest responses
3554 */
rd_kafka_get_offsets_for_times_resp_cb(rd_kafka_t * rk,rd_kafka_broker_t * rkb,rd_kafka_resp_err_t err,rd_kafka_buf_t * rkbuf,rd_kafka_buf_t * request,void * opaque)3555 static void rd_kafka_get_offsets_for_times_resp_cb (rd_kafka_t *rk,
3556 rd_kafka_broker_t *rkb,
3557 rd_kafka_resp_err_t err,
3558 rd_kafka_buf_t *rkbuf,
3559 rd_kafka_buf_t *request,
3560 void *opaque) {
3561 struct _get_offsets_for_times *state;
3562
3563 if (err == RD_KAFKA_RESP_ERR__DESTROY) {
3564 /* 'state' has gone out of scope when offsets_for_times()
3565 * timed out and returned to the caller. */
3566 return;
3567 }
3568
3569 state = opaque;
3570
3571 err = rd_kafka_handle_ListOffsets(rk, rkb, err, rkbuf, request,
3572 state->results, NULL);
3573 if (err == RD_KAFKA_RESP_ERR__IN_PROGRESS)
3574 return; /* Retrying */
3575
3576 /* Retry if no broker connection is available yet. */
3577 if (err == RD_KAFKA_RESP_ERR__TRANSPORT &&
3578 rkb &&
3579 rd_kafka_brokers_wait_state_change(
3580 rkb->rkb_rk, state->state_version,
3581 rd_timeout_remains(state->ts_end))) {
3582 /* Retry */
3583 state->state_version = rd_kafka_brokers_get_state_version(rk);
3584 request->rkbuf_retries = 0;
3585 if (rd_kafka_buf_retry(rkb, request))
3586 return; /* Retry in progress */
3587 /* FALLTHRU */
3588 }
3589
3590 if (err && !state->err)
3591 state->err = err;
3592
3593 state->wait_reply--;
3594 }
3595
3596
3597 rd_kafka_resp_err_t
rd_kafka_offsets_for_times(rd_kafka_t * rk,rd_kafka_topic_partition_list_t * offsets,int timeout_ms)3598 rd_kafka_offsets_for_times (rd_kafka_t *rk,
3599 rd_kafka_topic_partition_list_t *offsets,
3600 int timeout_ms) {
3601 rd_kafka_q_t *rkq;
3602 struct _get_offsets_for_times state = RD_ZERO_INIT;
3603 rd_ts_t ts_end = rd_timeout_init(timeout_ms);
3604 rd_list_t leaders;
3605 int i;
3606 rd_kafka_resp_err_t err;
3607 struct rd_kafka_partition_leader *leader;
3608 int tmout;
3609
3610 if (offsets->cnt == 0)
3611 return RD_KAFKA_RESP_ERR__INVALID_ARG;
3612
3613 rd_list_init(&leaders, offsets->cnt,
3614 (void *)rd_kafka_partition_leader_destroy);
3615
3616 err = rd_kafka_topic_partition_list_query_leaders(rk, offsets, &leaders,
3617 timeout_ms);
3618 if (err) {
3619 rd_list_destroy(&leaders);
3620 return err;
3621 }
3622
3623
3624 rkq = rd_kafka_q_new(rk);
3625
3626 state.wait_reply = 0;
3627 state.results = rd_kafka_topic_partition_list_new(offsets->cnt);
3628
3629 /* For each leader send a request for its partitions */
3630 RD_LIST_FOREACH(leader, &leaders, i) {
3631 state.wait_reply++;
3632 rd_kafka_ListOffsetsRequest(
3633 leader->rkb, leader->partitions,
3634 RD_KAFKA_REPLYQ(rkq, 0),
3635 rd_kafka_get_offsets_for_times_resp_cb,
3636 &state);
3637 }
3638
3639 rd_list_destroy(&leaders);
3640
3641 /* Wait for reply (or timeout) */
3642 while (state.wait_reply > 0 &&
3643 !rd_timeout_expired((tmout = rd_timeout_remains(ts_end))))
3644 rd_kafka_q_serve(rkq, tmout, 0, RD_KAFKA_Q_CB_CALLBACK,
3645 rd_kafka_poll_cb, NULL);
3646
3647 rd_kafka_q_destroy_owner(rkq);
3648
3649 if (state.wait_reply > 0 && !state.err)
3650 state.err = RD_KAFKA_RESP_ERR__TIMED_OUT;
3651
3652 /* Then update the queried partitions. */
3653 if (!state.err)
3654 rd_kafka_topic_partition_list_update(offsets, state.results);
3655
3656 rd_kafka_topic_partition_list_destroy(state.results);
3657
3658 return state.err;
3659 }
3660
3661
3662 /**
3663 * @brief rd_kafka_poll() (and similar) op callback handler.
3664 * Will either call registered callback depending on cb_type and op type
3665 * or return op to application, if applicable (e.g., fetch message).
3666 *
3667 * @returns RD_KAFKA_OP_RES_HANDLED if op was handled, else one of the
3668 * other res types (such as OP_RES_PASS).
3669 *
3670 * @locality any thread that serves op queues
3671 */
3672 rd_kafka_op_res_t
rd_kafka_poll_cb(rd_kafka_t * rk,rd_kafka_q_t * rkq,rd_kafka_op_t * rko,rd_kafka_q_cb_type_t cb_type,void * opaque)3673 rd_kafka_poll_cb (rd_kafka_t *rk, rd_kafka_q_t *rkq, rd_kafka_op_t *rko,
3674 rd_kafka_q_cb_type_t cb_type, void *opaque) {
3675 rd_kafka_msg_t *rkm;
3676 rd_kafka_op_res_t res = RD_KAFKA_OP_RES_HANDLED;
3677
3678 /* Special handling for events based on cb_type */
3679 if (cb_type == RD_KAFKA_Q_CB_EVENT &&
3680 rd_kafka_event_setup(rk, rko)) {
3681 /* Return-as-event requested. */
3682 return RD_KAFKA_OP_RES_PASS; /* Return as event */
3683 }
3684
3685 switch ((int)rko->rko_type)
3686 {
3687 case RD_KAFKA_OP_FETCH:
3688 if (!rk->rk_conf.consume_cb ||
3689 cb_type == RD_KAFKA_Q_CB_RETURN ||
3690 cb_type == RD_KAFKA_Q_CB_FORCE_RETURN)
3691 return RD_KAFKA_OP_RES_PASS; /* Dont handle here */
3692 else {
3693 struct consume_ctx ctx = {
3694 .consume_cb = rk->rk_conf.consume_cb,
3695 .opaque = rk->rk_conf.opaque };
3696
3697 return rd_kafka_consume_cb(rk, rkq, rko, cb_type, &ctx);
3698 }
3699 break;
3700
3701 case RD_KAFKA_OP_REBALANCE:
3702 if (rk->rk_conf.rebalance_cb)
3703 rk->rk_conf.rebalance_cb(
3704 rk, rko->rko_err,
3705 rko->rko_u.rebalance.partitions,
3706 rk->rk_conf.opaque);
3707 else {
3708 /** If EVENT_REBALANCE is enabled but rebalance_cb
3709 * isn't, we need to perform a dummy assign for the
3710 * application. This might happen during termination
3711 * with consumer_close() */
3712 rd_kafka_dbg(rk, CGRP, "UNASSIGN",
3713 "Forcing unassign of %d partition(s)",
3714 rko->rko_u.rebalance.partitions ?
3715 rko->rko_u.rebalance.partitions->cnt : 0);
3716 rd_kafka_assign(rk, NULL);
3717 }
3718 break;
3719
3720 case RD_KAFKA_OP_OFFSET_COMMIT | RD_KAFKA_OP_REPLY:
3721 if (!rko->rko_u.offset_commit.cb)
3722 return RD_KAFKA_OP_RES_PASS; /* Dont handle here */
3723 rko->rko_u.offset_commit.cb(
3724 rk, rko->rko_err,
3725 rko->rko_u.offset_commit.partitions,
3726 rko->rko_u.offset_commit.opaque);
3727 break;
3728
3729 case RD_KAFKA_OP_FETCH_STOP|RD_KAFKA_OP_REPLY:
3730 /* Reply from toppar FETCH_STOP */
3731 rd_kafka_assignment_partition_stopped(rk, rko->rko_rktp);
3732 break;
3733
3734 case RD_KAFKA_OP_CONSUMER_ERR:
3735 /* rd_kafka_consumer_poll() (_Q_CB_CONSUMER):
3736 * Consumer errors are returned to the application
3737 * as rkmessages, not error callbacks.
3738 *
3739 * rd_kafka_poll() (_Q_CB_GLOBAL):
3740 * convert to ERR op (fallthru)
3741 */
3742 if (cb_type == RD_KAFKA_Q_CB_RETURN ||
3743 cb_type == RD_KAFKA_Q_CB_FORCE_RETURN) {
3744 /* return as message_t to application */
3745 return RD_KAFKA_OP_RES_PASS;
3746 }
3747 /* FALLTHRU */
3748
3749 case RD_KAFKA_OP_ERR:
3750 if (rk->rk_conf.error_cb)
3751 rk->rk_conf.error_cb(rk, rko->rko_err,
3752 rko->rko_u.err.errstr,
3753 rk->rk_conf.opaque);
3754 else
3755 rd_kafka_log(rk, LOG_ERR, "ERROR",
3756 "%s: %s",
3757 rk->rk_name,
3758 rko->rko_u.err.errstr);
3759 break;
3760
3761 case RD_KAFKA_OP_DR:
3762 /* Delivery report:
3763 * call application DR callback for each message. */
3764 while ((rkm = TAILQ_FIRST(&rko->rko_u.dr.msgq.rkmq_msgs))) {
3765 rd_kafka_message_t *rkmessage;
3766
3767 TAILQ_REMOVE(&rko->rko_u.dr.msgq.rkmq_msgs,
3768 rkm, rkm_link);
3769
3770 rkmessage = rd_kafka_message_get_from_rkm(rko, rkm);
3771
3772 if (likely(rk->rk_conf.dr_msg_cb != NULL)) {
3773 rk->rk_conf.dr_msg_cb(rk, rkmessage,
3774 rk->rk_conf.opaque);
3775
3776 } else if (rk->rk_conf.dr_cb) {
3777 rk->rk_conf.dr_cb(rk,
3778 rkmessage->payload,
3779 rkmessage->len,
3780 rkmessage->err,
3781 rk->rk_conf.opaque,
3782 rkmessage->_private);
3783 } else if (rk->rk_drmode == RD_KAFKA_DR_MODE_EVENT) {
3784 rd_kafka_log(rk, LOG_WARNING, "DRDROP",
3785 "Dropped delivery report for "
3786 "message to "
3787 "%s [%"PRId32"] (%s) with "
3788 "opaque %p: flush() or poll() "
3789 "should not be called when "
3790 "EVENT_DR is enabled",
3791 rd_kafka_topic_name(rkmessage->
3792 rkt),
3793 rkmessage->partition,
3794 rd_kafka_err2name(rkmessage->err),
3795 rkmessage->_private);
3796 } else {
3797 rd_assert(!*"BUG: neither a delivery report "
3798 "callback or EVENT_DR flag set");
3799 }
3800
3801 rd_kafka_msg_destroy(rk, rkm);
3802
3803 if (unlikely(rd_kafka_yield_thread)) {
3804 /* Callback called yield(),
3805 * re-enqueue the op (if there are any
3806 * remaining messages). */
3807 if (!TAILQ_EMPTY(&rko->rko_u.dr.msgq.
3808 rkmq_msgs))
3809 rd_kafka_q_reenq(rkq, rko);
3810 else
3811 rd_kafka_op_destroy(rko);
3812 return RD_KAFKA_OP_RES_YIELD;
3813 }
3814 }
3815
3816 rd_kafka_msgq_init(&rko->rko_u.dr.msgq);
3817
3818 break;
3819
3820 case RD_KAFKA_OP_THROTTLE:
3821 if (rk->rk_conf.throttle_cb)
3822 rk->rk_conf.throttle_cb(rk, rko->rko_u.throttle.nodename,
3823 rko->rko_u.throttle.nodeid,
3824 rko->rko_u.throttle.
3825 throttle_time,
3826 rk->rk_conf.opaque);
3827 break;
3828
3829 case RD_KAFKA_OP_STATS:
3830 /* Statistics */
3831 if (rk->rk_conf.stats_cb &&
3832 rk->rk_conf.stats_cb(rk, rko->rko_u.stats.json,
3833 rko->rko_u.stats.json_len,
3834 rk->rk_conf.opaque) == 1)
3835 rko->rko_u.stats.json = NULL; /* Application wanted json ptr */
3836 break;
3837
3838 case RD_KAFKA_OP_LOG:
3839 if (likely(rk->rk_conf.log_cb &&
3840 rk->rk_conf.log_level >= rko->rko_u.log.level))
3841 rk->rk_conf.log_cb(rk,
3842 rko->rko_u.log.level,
3843 rko->rko_u.log.fac,
3844 rko->rko_u.log.str);
3845 break;
3846
3847 case RD_KAFKA_OP_TERMINATE:
3848 /* nop: just a wake-up */
3849 break;
3850
3851 case RD_KAFKA_OP_CREATETOPICS:
3852 case RD_KAFKA_OP_DELETETOPICS:
3853 case RD_KAFKA_OP_CREATEPARTITIONS:
3854 case RD_KAFKA_OP_ALTERCONFIGS:
3855 case RD_KAFKA_OP_DESCRIBECONFIGS:
3856 case RD_KAFKA_OP_DELETERECORDS:
3857 case RD_KAFKA_OP_DELETEGROUPS:
3858 case RD_KAFKA_OP_ADMIN_FANOUT:
3859 /* Calls op_destroy() from worker callback,
3860 * when the time comes. */
3861 res = rd_kafka_op_call(rk, rkq, rko);
3862 break;
3863
3864 case RD_KAFKA_OP_ADMIN_RESULT:
3865 if (cb_type == RD_KAFKA_Q_CB_RETURN ||
3866 cb_type == RD_KAFKA_Q_CB_FORCE_RETURN)
3867 return RD_KAFKA_OP_RES_PASS; /* Don't handle here */
3868
3869 /* Op is silently destroyed below */
3870 break;
3871
3872 case RD_KAFKA_OP_TXN:
3873 /* Must only be handled by rdkafka main thread */
3874 rd_assert(thrd_is_current(rk->rk_thread));
3875 res = rd_kafka_op_call(rk, rkq, rko);
3876 break;
3877
3878 case RD_KAFKA_OP_BARRIER:
3879 break;
3880
3881 case RD_KAFKA_OP_PURGE:
3882 rd_kafka_purge(rk, rko->rko_u.purge.flags);
3883 break;
3884
3885 default:
3886 rd_kafka_assert(rk, !*"cant handle op type");
3887 break;
3888 }
3889
3890 if (res == RD_KAFKA_OP_RES_HANDLED)
3891 rd_kafka_op_destroy(rko);
3892
3893 return res;
3894 }
3895
rd_kafka_poll(rd_kafka_t * rk,int timeout_ms)3896 int rd_kafka_poll (rd_kafka_t *rk, int timeout_ms) {
3897 int r;
3898
3899 if (timeout_ms)
3900 rd_kafka_app_poll_blocking(rk);
3901
3902 r = rd_kafka_q_serve(rk->rk_rep, timeout_ms, 0,
3903 RD_KAFKA_Q_CB_CALLBACK, rd_kafka_poll_cb, NULL);
3904
3905 rd_kafka_app_polled(rk);
3906
3907 return r;
3908 }
3909
3910
rd_kafka_queue_poll(rd_kafka_queue_t * rkqu,int timeout_ms)3911 rd_kafka_event_t *rd_kafka_queue_poll (rd_kafka_queue_t *rkqu, int timeout_ms) {
3912 rd_kafka_op_t *rko;
3913
3914 if (timeout_ms)
3915 rd_kafka_app_poll_blocking(rkqu->rkqu_rk);
3916
3917 rko = rd_kafka_q_pop_serve(rkqu->rkqu_q, rd_timeout_us(timeout_ms), 0,
3918 RD_KAFKA_Q_CB_EVENT, rd_kafka_poll_cb, NULL);
3919
3920 rd_kafka_app_polled(rkqu->rkqu_rk);
3921
3922 if (!rko)
3923 return NULL;
3924
3925 return rko;
3926 }
3927
rd_kafka_queue_poll_callback(rd_kafka_queue_t * rkqu,int timeout_ms)3928 int rd_kafka_queue_poll_callback (rd_kafka_queue_t *rkqu, int timeout_ms) {
3929 int r;
3930
3931 if (timeout_ms)
3932 rd_kafka_app_poll_blocking(rkqu->rkqu_rk);
3933
3934 r = rd_kafka_q_serve(rkqu->rkqu_q, timeout_ms, 0,
3935 RD_KAFKA_Q_CB_CALLBACK, rd_kafka_poll_cb, NULL);
3936
3937 rd_kafka_app_polled(rkqu->rkqu_rk);
3938
3939 return r;
3940 }
3941
3942
3943
rd_kafka_toppar_dump(FILE * fp,const char * indent,rd_kafka_toppar_t * rktp)3944 static void rd_kafka_toppar_dump (FILE *fp, const char *indent,
3945 rd_kafka_toppar_t *rktp) {
3946
3947 fprintf(fp, "%s%.*s [%"PRId32"] broker %s, "
3948 "leader_id %s\n",
3949 indent,
3950 RD_KAFKAP_STR_PR(rktp->rktp_rkt->rkt_topic),
3951 rktp->rktp_partition,
3952 rktp->rktp_broker ?
3953 rktp->rktp_broker->rkb_name : "none",
3954 rktp->rktp_leader ?
3955 rktp->rktp_leader->rkb_name : "none");
3956 fprintf(fp,
3957 "%s refcnt %i\n"
3958 "%s msgq: %i messages\n"
3959 "%s xmit_msgq: %i messages\n"
3960 "%s total: %"PRIu64" messages, %"PRIu64" bytes\n",
3961 indent, rd_refcnt_get(&rktp->rktp_refcnt),
3962 indent, rktp->rktp_msgq.rkmq_msg_cnt,
3963 indent, rktp->rktp_xmit_msgq.rkmq_msg_cnt,
3964 indent, rd_atomic64_get(&rktp->rktp_c.tx_msgs),
3965 rd_atomic64_get(&rktp->rktp_c.tx_msg_bytes));
3966 }
3967
rd_kafka_broker_dump(FILE * fp,rd_kafka_broker_t * rkb,int locks)3968 static void rd_kafka_broker_dump (FILE *fp, rd_kafka_broker_t *rkb, int locks) {
3969 rd_kafka_toppar_t *rktp;
3970
3971 if (locks)
3972 rd_kafka_broker_lock(rkb);
3973 fprintf(fp, " rd_kafka_broker_t %p: %s NodeId %"PRId32
3974 " in state %s (for %.3fs)\n",
3975 rkb, rkb->rkb_name, rkb->rkb_nodeid,
3976 rd_kafka_broker_state_names[rkb->rkb_state],
3977 rkb->rkb_ts_state ?
3978 (float)(rd_clock() - rkb->rkb_ts_state) / 1000000.0f :
3979 0.0f);
3980 fprintf(fp, " refcnt %i\n", rd_refcnt_get(&rkb->rkb_refcnt));
3981 fprintf(fp, " outbuf_cnt: %i waitresp_cnt: %i\n",
3982 rd_atomic32_get(&rkb->rkb_outbufs.rkbq_cnt),
3983 rd_atomic32_get(&rkb->rkb_waitresps.rkbq_cnt));
3984 fprintf(fp,
3985 " %"PRIu64 " messages sent, %"PRIu64" bytes, "
3986 "%"PRIu64" errors, %"PRIu64" timeouts\n"
3987 " %"PRIu64 " messages received, %"PRIu64" bytes, "
3988 "%"PRIu64" errors\n"
3989 " %"PRIu64 " messageset transmissions were retried\n",
3990 rd_atomic64_get(&rkb->rkb_c.tx), rd_atomic64_get(&rkb->rkb_c.tx_bytes),
3991 rd_atomic64_get(&rkb->rkb_c.tx_err), rd_atomic64_get(&rkb->rkb_c.req_timeouts),
3992 rd_atomic64_get(&rkb->rkb_c.rx), rd_atomic64_get(&rkb->rkb_c.rx_bytes),
3993 rd_atomic64_get(&rkb->rkb_c.rx_err),
3994 rd_atomic64_get(&rkb->rkb_c.tx_retries));
3995
3996 fprintf(fp, " %i toppars:\n", rkb->rkb_toppar_cnt);
3997 TAILQ_FOREACH(rktp, &rkb->rkb_toppars, rktp_rkblink)
3998 rd_kafka_toppar_dump(fp, " ", rktp);
3999 if (locks) {
4000 rd_kafka_broker_unlock(rkb);
4001 }
4002 }
4003
4004
rd_kafka_dump0(FILE * fp,rd_kafka_t * rk,int locks)4005 static void rd_kafka_dump0 (FILE *fp, rd_kafka_t *rk, int locks) {
4006 rd_kafka_broker_t *rkb;
4007 rd_kafka_topic_t *rkt;
4008 rd_kafka_toppar_t *rktp;
4009 int i;
4010 unsigned int tot_cnt;
4011 size_t tot_size;
4012
4013 rd_kafka_curr_msgs_get(rk, &tot_cnt, &tot_size);
4014
4015 if (locks)
4016 rd_kafka_rdlock(rk);
4017 #if ENABLE_DEVEL
4018 fprintf(fp, "rd_kafka_op_cnt: %d\n", rd_atomic32_get(&rd_kafka_op_cnt));
4019 #endif
4020 fprintf(fp, "rd_kafka_t %p: %s\n", rk, rk->rk_name);
4021
4022 fprintf(fp, " producer.msg_cnt %u (%"PRIusz" bytes)\n",
4023 tot_cnt, tot_size);
4024 fprintf(fp, " rk_rep reply queue: %i ops\n",
4025 rd_kafka_q_len(rk->rk_rep));
4026
4027 fprintf(fp, " brokers:\n");
4028 if (locks)
4029 mtx_lock(&rk->rk_internal_rkb_lock);
4030 if (rk->rk_internal_rkb)
4031 rd_kafka_broker_dump(fp, rk->rk_internal_rkb, locks);
4032 if (locks)
4033 mtx_unlock(&rk->rk_internal_rkb_lock);
4034
4035 TAILQ_FOREACH(rkb, &rk->rk_brokers, rkb_link) {
4036 rd_kafka_broker_dump(fp, rkb, locks);
4037 }
4038
4039 fprintf(fp, " cgrp:\n");
4040 if (rk->rk_cgrp) {
4041 rd_kafka_cgrp_t *rkcg = rk->rk_cgrp;
4042 fprintf(fp, " %.*s in state %s, flags 0x%x\n",
4043 RD_KAFKAP_STR_PR(rkcg->rkcg_group_id),
4044 rd_kafka_cgrp_state_names[rkcg->rkcg_state],
4045 rkcg->rkcg_flags);
4046 fprintf(fp, " coord_id %"PRId32", broker %s\n",
4047 rkcg->rkcg_coord_id,
4048 rkcg->rkcg_curr_coord ?
4049 rd_kafka_broker_name(rkcg->rkcg_curr_coord):"(none)");
4050
4051 fprintf(fp, " toppars:\n");
4052 RD_LIST_FOREACH(rktp, &rkcg->rkcg_toppars, i) {
4053 fprintf(fp, " %.*s [%"PRId32"] in state %s\n",
4054 RD_KAFKAP_STR_PR(rktp->rktp_rkt->rkt_topic),
4055 rktp->rktp_partition,
4056 rd_kafka_fetch_states[rktp->rktp_fetch_state]);
4057 }
4058 }
4059
4060 fprintf(fp, " topics:\n");
4061 TAILQ_FOREACH(rkt, &rk->rk_topics, rkt_link) {
4062 fprintf(fp, " %.*s with %"PRId32" partitions, state %s, "
4063 "refcnt %i\n",
4064 RD_KAFKAP_STR_PR(rkt->rkt_topic),
4065 rkt->rkt_partition_cnt,
4066 rd_kafka_topic_state_names[rkt->rkt_state],
4067 rd_refcnt_get(&rkt->rkt_refcnt));
4068 if (rkt->rkt_ua)
4069 rd_kafka_toppar_dump(fp, " ", rkt->rkt_ua);
4070 if (rd_list_empty(&rkt->rkt_desp)) {
4071 fprintf(fp, " desired partitions:");
4072 RD_LIST_FOREACH(rktp, &rkt->rkt_desp, i)
4073 fprintf(fp, " %"PRId32, rktp->rktp_partition);
4074 fprintf(fp, "\n");
4075 }
4076 }
4077
4078 fprintf(fp, "\n");
4079 rd_kafka_metadata_cache_dump(fp, rk);
4080
4081 if (locks)
4082 rd_kafka_rdunlock(rk);
4083 }
4084
rd_kafka_dump(FILE * fp,rd_kafka_t * rk)4085 void rd_kafka_dump (FILE *fp, rd_kafka_t *rk) {
4086 if (rk)
4087 rd_kafka_dump0(fp, rk, 1/*locks*/);
4088 }
4089
4090
4091
rd_kafka_name(const rd_kafka_t * rk)4092 const char *rd_kafka_name (const rd_kafka_t *rk) {
4093 return rk->rk_name;
4094 }
4095
rd_kafka_type(const rd_kafka_t * rk)4096 rd_kafka_type_t rd_kafka_type(const rd_kafka_t *rk) {
4097 return rk->rk_type;
4098 }
4099
4100
rd_kafka_memberid(const rd_kafka_t * rk)4101 char *rd_kafka_memberid (const rd_kafka_t *rk) {
4102 rd_kafka_op_t *rko;
4103 rd_kafka_cgrp_t *rkcg;
4104 char *memberid;
4105
4106 if (!(rkcg = rd_kafka_cgrp_get(rk)))
4107 return NULL;
4108
4109 rko = rd_kafka_op_req2(rkcg->rkcg_ops, RD_KAFKA_OP_NAME);
4110 if (!rko)
4111 return NULL;
4112 memberid = rko->rko_u.name.str;
4113 rko->rko_u.name.str = NULL;
4114 rd_kafka_op_destroy(rko);
4115
4116 return memberid;
4117 }
4118
4119
rd_kafka_clusterid(rd_kafka_t * rk,int timeout_ms)4120 char *rd_kafka_clusterid (rd_kafka_t *rk, int timeout_ms) {
4121 rd_ts_t abs_timeout = rd_timeout_init(timeout_ms);
4122
4123 /* ClusterId is returned in Metadata >=V2 responses and
4124 * cached on the rk. If no cached value is available
4125 * it means no metadata has been received yet, or we're
4126 * using a lower protocol version
4127 * (e.g., lack of api.version.request=true). */
4128
4129 while (1) {
4130 int remains_ms;
4131
4132 rd_kafka_rdlock(rk);
4133
4134 if (rk->rk_clusterid) {
4135 /* Cached clusterid available. */
4136 char *ret = rd_strdup(rk->rk_clusterid);
4137 rd_kafka_rdunlock(rk);
4138 return ret;
4139 } else if (rk->rk_ts_metadata > 0) {
4140 /* Metadata received but no clusterid,
4141 * this probably means the broker is too old
4142 * or api.version.request=false. */
4143 rd_kafka_rdunlock(rk);
4144 return NULL;
4145 }
4146
4147 rd_kafka_rdunlock(rk);
4148
4149 /* Wait for up to timeout_ms for a metadata refresh,
4150 * if permitted by application. */
4151 remains_ms = rd_timeout_remains(abs_timeout);
4152 if (rd_timeout_expired(remains_ms))
4153 return NULL;
4154
4155 rd_kafka_metadata_cache_wait_change(rk, remains_ms);
4156 }
4157
4158 return NULL;
4159 }
4160
4161
rd_kafka_controllerid(rd_kafka_t * rk,int timeout_ms)4162 int32_t rd_kafka_controllerid (rd_kafka_t *rk, int timeout_ms) {
4163 rd_ts_t abs_timeout = rd_timeout_init(timeout_ms);
4164
4165 /* ControllerId is returned in Metadata >=V1 responses and
4166 * cached on the rk. If no cached value is available
4167 * it means no metadata has been received yet, or we're
4168 * using a lower protocol version
4169 * (e.g., lack of api.version.request=true). */
4170
4171 while (1) {
4172 int remains_ms;
4173 int version;
4174
4175 version = rd_kafka_brokers_get_state_version(rk);
4176
4177 rd_kafka_rdlock(rk);
4178
4179 if (rk->rk_controllerid != -1) {
4180 /* Cached controllerid available. */
4181 rd_kafka_rdunlock(rk);
4182 return rk->rk_controllerid;
4183 } else if (rk->rk_ts_metadata > 0) {
4184 /* Metadata received but no clusterid,
4185 * this probably means the broker is too old
4186 * or api.version.request=false. */
4187 rd_kafka_rdunlock(rk);
4188 return -1;
4189 }
4190
4191 rd_kafka_rdunlock(rk);
4192
4193 /* Wait for up to timeout_ms for a metadata refresh,
4194 * if permitted by application. */
4195 remains_ms = rd_timeout_remains(abs_timeout);
4196 if (rd_timeout_expired(remains_ms))
4197 return -1;
4198
4199 rd_kafka_brokers_wait_state_change(rk, version, remains_ms);
4200 }
4201
4202 return -1;
4203 }
4204
4205
rd_kafka_opaque(const rd_kafka_t * rk)4206 void *rd_kafka_opaque (const rd_kafka_t *rk) {
4207 return rk->rk_conf.opaque;
4208 }
4209
4210
rd_kafka_outq_len(rd_kafka_t * rk)4211 int rd_kafka_outq_len (rd_kafka_t *rk) {
4212 return rd_kafka_curr_msgs_cnt(rk) + rd_kafka_q_len(rk->rk_rep) +
4213 (rk->rk_background.q ? rd_kafka_q_len(rk->rk_background.q) : 0);
4214 }
4215
4216
rd_kafka_flush(rd_kafka_t * rk,int timeout_ms)4217 rd_kafka_resp_err_t rd_kafka_flush (rd_kafka_t *rk, int timeout_ms) {
4218 unsigned int msg_cnt = 0;
4219
4220 if (rk->rk_type != RD_KAFKA_PRODUCER)
4221 return RD_KAFKA_RESP_ERR__NOT_IMPLEMENTED;
4222
4223 rd_kafka_yield_thread = 0;
4224
4225 /* Set flushing flag on the producer for the duration of the
4226 * flush() call. This tells producer_serve() that the linger.ms
4227 * time should be considered immediate. */
4228 rd_atomic32_add(&rk->rk_flushing, 1);
4229
4230 /* Wake up all broker threads to trigger the produce_serve() call.
4231 * If this flush() call finishes before the broker wakes up
4232 * then no flushing will be performed by that broker thread. */
4233 rd_kafka_all_brokers_wakeup(rk, RD_KAFKA_BROKER_STATE_UP);
4234
4235 if (rk->rk_drmode == RD_KAFKA_DR_MODE_EVENT) {
4236 /* Application wants delivery reports as events rather
4237 * than callbacks, we must thus not serve this queue
4238 * with rd_kafka_poll() since that would trigger non-existent
4239 * delivery report callbacks, which would result
4240 * in the delivery reports being dropped.
4241 * Instead we rely on the application to serve the event
4242 * queue in another thread, so all we do here is wait
4243 * for the current message count to reach zero. */
4244 rd_kafka_curr_msgs_wait_zero(rk, timeout_ms, &msg_cnt);
4245
4246 } else {
4247 /* Standard poll interface.
4248 *
4249 * First poll call is non-blocking for the case
4250 * where timeout_ms==RD_POLL_NOWAIT to make sure poll is
4251 * called at least once. */
4252 rd_ts_t ts_end = rd_timeout_init(timeout_ms);
4253 int tmout = RD_POLL_NOWAIT;
4254 int qlen = 0;
4255
4256 do {
4257 rd_kafka_poll(rk, tmout);
4258 qlen = rd_kafka_q_len(rk->rk_rep);
4259 msg_cnt = rd_kafka_curr_msgs_cnt(rk);
4260 } while (qlen + msg_cnt > 0 &&
4261 !rd_kafka_yield_thread &&
4262 (tmout = rd_timeout_remains_limit(ts_end, 10)) !=
4263 RD_POLL_NOWAIT);
4264
4265 msg_cnt += qlen;
4266 }
4267
4268 rd_atomic32_sub(&rk->rk_flushing, 1);
4269
4270 return msg_cnt > 0 ? RD_KAFKA_RESP_ERR__TIMED_OUT :
4271 RD_KAFKA_RESP_ERR_NO_ERROR;
4272 }
4273
4274 /**
4275 * @brief Purge the partition message queue (according to \p purge_flags) for
4276 * all toppars.
4277 *
4278 * This is a necessity to avoid the race condition when a purge() is scheduled
4279 * shortly in-between an rktp has been created but before it has been
4280 * joined to a broker handler thread.
4281 *
4282 * The rktp_xmit_msgq is handled by the broker-thread purge.
4283 *
4284 * @returns the number of messages purged.
4285 *
4286 * @locks_required rd_kafka_*lock()
4287 * @locks_acquired rd_kafka_topic_rdlock()
4288 */
4289 static int
rd_kafka_purge_toppars(rd_kafka_t * rk,int purge_flags)4290 rd_kafka_purge_toppars (rd_kafka_t *rk, int purge_flags) {
4291 rd_kafka_topic_t *rkt;
4292 int cnt = 0;
4293
4294 TAILQ_FOREACH(rkt, &rk->rk_topics, rkt_link) {
4295 rd_kafka_toppar_t *rktp;
4296 int i;
4297
4298 rd_kafka_topic_rdlock(rkt);
4299 for (i = 0 ; i < rkt->rkt_partition_cnt ; i++)
4300 cnt += rd_kafka_toppar_purge_queues(
4301 rkt->rkt_p[i], purge_flags, rd_false/*!xmit*/);
4302
4303 RD_LIST_FOREACH(rktp, &rkt->rkt_desp, i)
4304 cnt += rd_kafka_toppar_purge_queues(
4305 rktp, purge_flags, rd_false/*!xmit*/);
4306
4307 if (rkt->rkt_ua)
4308 cnt += rd_kafka_toppar_purge_queues(
4309 rkt->rkt_ua, purge_flags, rd_false/*!xmit*/);
4310 rd_kafka_topic_rdunlock(rkt);
4311 }
4312
4313 return cnt;
4314 }
4315
4316
rd_kafka_purge(rd_kafka_t * rk,int purge_flags)4317 rd_kafka_resp_err_t rd_kafka_purge (rd_kafka_t *rk, int purge_flags) {
4318 rd_kafka_broker_t *rkb;
4319 rd_kafka_q_t *tmpq = NULL;
4320 int waitcnt = 0;
4321
4322 if (rk->rk_type != RD_KAFKA_PRODUCER)
4323 return RD_KAFKA_RESP_ERR__NOT_IMPLEMENTED;
4324
4325 /* Check that future flags are not passed */
4326 if ((purge_flags & ~RD_KAFKA_PURGE_F_MASK) != 0)
4327 return RD_KAFKA_RESP_ERR__INVALID_ARG;
4328
4329 /* Nothing to purge */
4330 if (!purge_flags)
4331 return RD_KAFKA_RESP_ERR_NO_ERROR;
4332
4333 /* Set up a reply queue to wait for broker thread signalling
4334 * completion, unless non-blocking. */
4335 if (!(purge_flags & RD_KAFKA_PURGE_F_NON_BLOCKING))
4336 tmpq = rd_kafka_q_new(rk);
4337
4338 rd_kafka_rdlock(rk);
4339
4340 /* Purge msgq for all toppars. */
4341 rd_kafka_purge_toppars(rk, purge_flags);
4342
4343 /* Send purge request to all broker threads */
4344 TAILQ_FOREACH(rkb, &rk->rk_brokers, rkb_link) {
4345 rd_kafka_broker_purge_queues(rkb, purge_flags,
4346 RD_KAFKA_REPLYQ(tmpq, 0));
4347 waitcnt++;
4348 }
4349
4350 rd_kafka_rdunlock(rk);
4351
4352
4353 if (tmpq) {
4354 /* Wait for responses */
4355 while (waitcnt-- > 0)
4356 rd_kafka_q_wait_result(tmpq, RD_POLL_INFINITE);
4357
4358 rd_kafka_q_destroy_owner(tmpq);
4359 }
4360
4361 /* Purge messages for the UA(-1) partitions (which are not
4362 * handled by a broker thread) */
4363 if (purge_flags & RD_KAFKA_PURGE_F_QUEUE)
4364 rd_kafka_purge_ua_toppar_queues(rk);
4365
4366 return RD_KAFKA_RESP_ERR_NO_ERROR;
4367 }
4368
4369
4370
4371
4372 /**
4373 * @returns a csv string of purge flags in thread-local storage
4374 */
rd_kafka_purge_flags2str(int flags)4375 const char *rd_kafka_purge_flags2str (int flags) {
4376 static const char *names[] = {
4377 "queue",
4378 "inflight",
4379 "non-blocking",
4380 NULL
4381 };
4382 static RD_TLS char ret[64];
4383
4384 return rd_flags2str(ret, sizeof(ret), names, flags);
4385 }
4386
4387
rd_kafka_version(void)4388 int rd_kafka_version (void) {
4389 return RD_KAFKA_VERSION;
4390 }
4391
rd_kafka_version_str(void)4392 const char *rd_kafka_version_str (void) {
4393 static RD_TLS char ret[128];
4394 size_t of = 0, r;
4395
4396 if (*ret)
4397 return ret;
4398
4399 #ifdef LIBRDKAFKA_GIT_VERSION
4400 if (*LIBRDKAFKA_GIT_VERSION) {
4401 of = rd_snprintf(ret, sizeof(ret), "%s",
4402 *LIBRDKAFKA_GIT_VERSION == 'v' ?
4403 LIBRDKAFKA_GIT_VERSION+1 :
4404 LIBRDKAFKA_GIT_VERSION);
4405 if (of > sizeof(ret))
4406 of = sizeof(ret);
4407 }
4408 #endif
4409
4410 #define _my_sprintf(...) do { \
4411 r = rd_snprintf(ret+of, sizeof(ret)-of, __VA_ARGS__); \
4412 if (r > sizeof(ret)-of) \
4413 r = sizeof(ret)-of; \
4414 of += r; \
4415 } while(0)
4416
4417 if (of == 0) {
4418 int ver = rd_kafka_version();
4419 int prel = (ver & 0xff);
4420 _my_sprintf("%i.%i.%i",
4421 (ver >> 24) & 0xff,
4422 (ver >> 16) & 0xff,
4423 (ver >> 8) & 0xff);
4424 if (prel != 0xff) {
4425 /* pre-builds below 200 are just running numbers,
4426 * above 200 are RC numbers. */
4427 if (prel <= 200)
4428 _my_sprintf("-pre%d", prel);
4429 else
4430 _my_sprintf("-RC%d", prel - 200);
4431 }
4432 }
4433
4434 #if ENABLE_DEVEL
4435 _my_sprintf("-devel");
4436 #endif
4437
4438 #if WITHOUT_OPTIMIZATION
4439 _my_sprintf("-O0");
4440 #endif
4441
4442 return ret;
4443 }
4444
4445
4446 /**
4447 * Assert trampoline to print some debugging information on crash.
4448 */
4449 void
4450 RD_NORETURN
rd_kafka_crash(const char * file,int line,const char * function,rd_kafka_t * rk,const char * reason)4451 rd_kafka_crash (const char *file, int line, const char *function,
4452 rd_kafka_t *rk, const char *reason) {
4453 fprintf(stderr, "*** %s:%i:%s: %s ***\n",
4454 file, line, function, reason);
4455 if (rk)
4456 rd_kafka_dump0(stderr, rk, 0/*no locks*/);
4457 abort();
4458 }
4459
4460
4461
4462
4463
4464 struct list_groups_state {
4465 rd_kafka_q_t *q;
4466 rd_kafka_resp_err_t err;
4467 int wait_cnt;
4468 const char *desired_group;
4469 struct rd_kafka_group_list *grplist;
4470 int grplist_size;
4471 };
4472
rd_kafka_DescribeGroups_resp_cb(rd_kafka_t * rk,rd_kafka_broker_t * rkb,rd_kafka_resp_err_t err,rd_kafka_buf_t * reply,rd_kafka_buf_t * request,void * opaque)4473 static void rd_kafka_DescribeGroups_resp_cb (rd_kafka_t *rk,
4474 rd_kafka_broker_t *rkb,
4475 rd_kafka_resp_err_t err,
4476 rd_kafka_buf_t *reply,
4477 rd_kafka_buf_t *request,
4478 void *opaque) {
4479 struct list_groups_state *state;
4480 const int log_decode_errors = LOG_ERR;
4481 int cnt;
4482
4483 if (err == RD_KAFKA_RESP_ERR__DESTROY) {
4484 /* 'state' has gone out of scope due to list_groups()
4485 * timing out and returning. */
4486 return;
4487 }
4488
4489 state = opaque;
4490 state->wait_cnt--;
4491
4492 if (err)
4493 goto err;
4494
4495 rd_kafka_buf_read_i32(reply, &cnt);
4496
4497 while (cnt-- > 0) {
4498 int16_t ErrorCode;
4499 rd_kafkap_str_t Group, GroupState, ProtoType, Proto;
4500 int MemberCnt;
4501 struct rd_kafka_group_info *gi;
4502
4503 if (state->grplist->group_cnt == state->grplist_size) {
4504 /* Grow group array */
4505 state->grplist_size *= 2;
4506 state->grplist->groups =
4507 rd_realloc(state->grplist->groups,
4508 state->grplist_size *
4509 sizeof(*state->grplist->groups));
4510 }
4511
4512 gi = &state->grplist->groups[state->grplist->group_cnt++];
4513 memset(gi, 0, sizeof(*gi));
4514
4515 rd_kafka_buf_read_i16(reply, &ErrorCode);
4516 rd_kafka_buf_read_str(reply, &Group);
4517 rd_kafka_buf_read_str(reply, &GroupState);
4518 rd_kafka_buf_read_str(reply, &ProtoType);
4519 rd_kafka_buf_read_str(reply, &Proto);
4520 rd_kafka_buf_read_i32(reply, &MemberCnt);
4521
4522 if (MemberCnt > 100000) {
4523 err = RD_KAFKA_RESP_ERR__BAD_MSG;
4524 goto err;
4525 }
4526
4527 rd_kafka_broker_lock(rkb);
4528 gi->broker.id = rkb->rkb_nodeid;
4529 gi->broker.host = rd_strdup(rkb->rkb_origname);
4530 gi->broker.port = rkb->rkb_port;
4531 rd_kafka_broker_unlock(rkb);
4532
4533 gi->err = ErrorCode;
4534 gi->group = RD_KAFKAP_STR_DUP(&Group);
4535 gi->state = RD_KAFKAP_STR_DUP(&GroupState);
4536 gi->protocol_type = RD_KAFKAP_STR_DUP(&ProtoType);
4537 gi->protocol = RD_KAFKAP_STR_DUP(&Proto);
4538
4539 if (MemberCnt > 0)
4540 gi->members =
4541 rd_malloc(MemberCnt * sizeof(*gi->members));
4542
4543 while (MemberCnt-- > 0) {
4544 rd_kafkap_str_t MemberId, ClientId, ClientHost;
4545 rd_kafkap_bytes_t Meta, Assignment;
4546 struct rd_kafka_group_member_info *mi;
4547
4548 mi = &gi->members[gi->member_cnt++];
4549 memset(mi, 0, sizeof(*mi));
4550
4551 rd_kafka_buf_read_str(reply, &MemberId);
4552 rd_kafka_buf_read_str(reply, &ClientId);
4553 rd_kafka_buf_read_str(reply, &ClientHost);
4554 rd_kafka_buf_read_bytes(reply, &Meta);
4555 rd_kafka_buf_read_bytes(reply, &Assignment);
4556
4557 mi->member_id = RD_KAFKAP_STR_DUP(&MemberId);
4558 mi->client_id = RD_KAFKAP_STR_DUP(&ClientId);
4559 mi->client_host = RD_KAFKAP_STR_DUP(&ClientHost);
4560
4561 if (RD_KAFKAP_BYTES_LEN(&Meta) == 0) {
4562 mi->member_metadata_size = 0;
4563 mi->member_metadata = NULL;
4564 } else {
4565 mi->member_metadata_size =
4566 RD_KAFKAP_BYTES_LEN(&Meta);
4567 mi->member_metadata =
4568 rd_memdup(Meta.data,
4569 mi->member_metadata_size);
4570 }
4571
4572 if (RD_KAFKAP_BYTES_LEN(&Assignment) == 0) {
4573 mi->member_assignment_size = 0;
4574 mi->member_assignment = NULL;
4575 } else {
4576 mi->member_assignment_size =
4577 RD_KAFKAP_BYTES_LEN(&Assignment);
4578 mi->member_assignment =
4579 rd_memdup(Assignment.data,
4580 mi->member_assignment_size);
4581 }
4582 }
4583 }
4584
4585 err:
4586 state->err = err;
4587 return;
4588
4589 err_parse:
4590 state->err = reply->rkbuf_err;
4591 }
4592
rd_kafka_ListGroups_resp_cb(rd_kafka_t * rk,rd_kafka_broker_t * rkb,rd_kafka_resp_err_t err,rd_kafka_buf_t * reply,rd_kafka_buf_t * request,void * opaque)4593 static void rd_kafka_ListGroups_resp_cb (rd_kafka_t *rk,
4594 rd_kafka_broker_t *rkb,
4595 rd_kafka_resp_err_t err,
4596 rd_kafka_buf_t *reply,
4597 rd_kafka_buf_t *request,
4598 void *opaque) {
4599 struct list_groups_state *state;
4600 const int log_decode_errors = LOG_ERR;
4601 int16_t ErrorCode;
4602 char **grps = NULL;
4603 int cnt, grpcnt, i = 0;
4604
4605 if (err == RD_KAFKA_RESP_ERR__DESTROY) {
4606 /* 'state' is no longer in scope because
4607 * list_groups() timed out and returned to the caller.
4608 * We must not touch anything here but simply return. */
4609 return;
4610 }
4611
4612 state = opaque;
4613
4614 state->wait_cnt--;
4615
4616 if (err)
4617 goto err;
4618
4619 rd_kafka_buf_read_i16(reply, &ErrorCode);
4620 if (ErrorCode) {
4621 err = ErrorCode;
4622 goto err;
4623 }
4624
4625 rd_kafka_buf_read_i32(reply, &cnt);
4626
4627 if (state->desired_group)
4628 grpcnt = 1;
4629 else
4630 grpcnt = cnt;
4631
4632 if (cnt == 0 || grpcnt == 0)
4633 return;
4634
4635 grps = rd_malloc(sizeof(*grps) * grpcnt);
4636
4637 while (cnt-- > 0) {
4638 rd_kafkap_str_t grp, proto;
4639
4640 rd_kafka_buf_read_str(reply, &grp);
4641 rd_kafka_buf_read_str(reply, &proto);
4642
4643 if (state->desired_group &&
4644 rd_kafkap_str_cmp_str(&grp, state->desired_group))
4645 continue;
4646
4647 grps[i++] = RD_KAFKAP_STR_DUP(&grp);
4648
4649 if (i == grpcnt)
4650 break;
4651 }
4652
4653 if (i > 0) {
4654 state->wait_cnt++;
4655 rd_kafka_DescribeGroupsRequest(rkb,
4656 (const char **)grps, i,
4657 RD_KAFKA_REPLYQ(state->q, 0),
4658 rd_kafka_DescribeGroups_resp_cb,
4659 state);
4660
4661 while (i-- > 0)
4662 rd_free(grps[i]);
4663 }
4664
4665
4666 rd_free(grps);
4667
4668 err:
4669 state->err = err;
4670 return;
4671
4672 err_parse:
4673 if (grps)
4674 rd_free(grps);
4675 state->err = reply->rkbuf_err;
4676 }
4677
4678 rd_kafka_resp_err_t
rd_kafka_list_groups(rd_kafka_t * rk,const char * group,const struct rd_kafka_group_list ** grplistp,int timeout_ms)4679 rd_kafka_list_groups (rd_kafka_t *rk, const char *group,
4680 const struct rd_kafka_group_list **grplistp,
4681 int timeout_ms) {
4682 rd_kafka_broker_t *rkb;
4683 int rkb_cnt = 0;
4684 struct list_groups_state state = RD_ZERO_INIT;
4685 rd_ts_t ts_end = rd_timeout_init(timeout_ms);
4686 int state_version = rd_kafka_brokers_get_state_version(rk);
4687
4688 /* Wait until metadata has been fetched from cluster so
4689 * that we have a full broker list.
4690 * This state only happens during initial client setup, after that
4691 * there'll always be a cached metadata copy. */
4692 rd_kafka_rdlock(rk);
4693 while (!rk->rk_ts_metadata) {
4694 rd_kafka_rdunlock(rk);
4695
4696 if (!rd_kafka_brokers_wait_state_change(
4697 rk, state_version, rd_timeout_remains(ts_end)))
4698 return RD_KAFKA_RESP_ERR__TIMED_OUT;
4699
4700 rd_kafka_rdlock(rk);
4701 }
4702
4703 state.q = rd_kafka_q_new(rk);
4704 state.desired_group = group;
4705 state.grplist = rd_calloc(1, sizeof(*state.grplist));
4706 state.grplist_size = group ? 1 : 32;
4707
4708 state.grplist->groups = rd_malloc(state.grplist_size *
4709 sizeof(*state.grplist->groups));
4710
4711 /* Query each broker for its list of groups */
4712 TAILQ_FOREACH(rkb, &rk->rk_brokers, rkb_link) {
4713 rd_kafka_broker_lock(rkb);
4714 if (rkb->rkb_nodeid == -1 || RD_KAFKA_BROKER_IS_LOGICAL(rkb)) {
4715 rd_kafka_broker_unlock(rkb);
4716 continue;
4717 }
4718 rd_kafka_broker_unlock(rkb);
4719
4720 state.wait_cnt++;
4721 rkb_cnt++;
4722 rd_kafka_ListGroupsRequest(rkb,
4723 RD_KAFKA_REPLYQ(state.q, 0),
4724 rd_kafka_ListGroups_resp_cb,
4725 &state);
4726 }
4727 rd_kafka_rdunlock(rk);
4728
4729 if (rkb_cnt == 0) {
4730 state.err = RD_KAFKA_RESP_ERR__TRANSPORT;
4731
4732 } else {
4733 int remains;
4734
4735 while (state.wait_cnt > 0 &&
4736 !rd_timeout_expired((remains =
4737 rd_timeout_remains(ts_end)))) {
4738 rd_kafka_q_serve(state.q, remains, 0,
4739 RD_KAFKA_Q_CB_CALLBACK,
4740 rd_kafka_poll_cb, NULL);
4741 /* Ignore yields */
4742 }
4743 }
4744
4745 rd_kafka_q_destroy_owner(state.q);
4746
4747 if (state.wait_cnt > 0 && !state.err) {
4748 if (state.grplist->group_cnt == 0)
4749 state.err = RD_KAFKA_RESP_ERR__TIMED_OUT;
4750 else {
4751 *grplistp = state.grplist;
4752 return RD_KAFKA_RESP_ERR__PARTIAL;
4753 }
4754 }
4755
4756 if (state.err)
4757 rd_kafka_group_list_destroy(state.grplist);
4758 else
4759 *grplistp = state.grplist;
4760
4761 return state.err;
4762 }
4763
4764
rd_kafka_group_list_destroy(const struct rd_kafka_group_list * grplist0)4765 void rd_kafka_group_list_destroy (const struct rd_kafka_group_list *grplist0) {
4766 struct rd_kafka_group_list *grplist =
4767 (struct rd_kafka_group_list *)grplist0;
4768
4769 while (grplist->group_cnt-- > 0) {
4770 struct rd_kafka_group_info *gi;
4771 gi = &grplist->groups[grplist->group_cnt];
4772
4773 if (gi->broker.host)
4774 rd_free(gi->broker.host);
4775 if (gi->group)
4776 rd_free(gi->group);
4777 if (gi->state)
4778 rd_free(gi->state);
4779 if (gi->protocol_type)
4780 rd_free(gi->protocol_type);
4781 if (gi->protocol)
4782 rd_free(gi->protocol);
4783
4784 while (gi->member_cnt-- > 0) {
4785 struct rd_kafka_group_member_info *mi;
4786 mi = &gi->members[gi->member_cnt];
4787
4788 if (mi->member_id)
4789 rd_free(mi->member_id);
4790 if (mi->client_id)
4791 rd_free(mi->client_id);
4792 if (mi->client_host)
4793 rd_free(mi->client_host);
4794 if (mi->member_metadata)
4795 rd_free(mi->member_metadata);
4796 if (mi->member_assignment)
4797 rd_free(mi->member_assignment);
4798 }
4799
4800 if (gi->members)
4801 rd_free(gi->members);
4802 }
4803
4804 if (grplist->groups)
4805 rd_free(grplist->groups);
4806
4807 rd_free(grplist);
4808 }
4809
4810
4811
rd_kafka_get_debug_contexts(void)4812 const char *rd_kafka_get_debug_contexts(void) {
4813 return RD_KAFKA_DEBUG_CONTEXTS;
4814 }
4815
4816
rd_kafka_path_is_dir(const char * path)4817 int rd_kafka_path_is_dir (const char *path) {
4818 #ifdef _WIN32
4819 struct _stat st;
4820 return (_stat(path, &st) == 0 && st.st_mode & S_IFDIR);
4821 #else
4822 struct stat st;
4823 return (stat(path, &st) == 0 && S_ISDIR(st.st_mode));
4824 #endif
4825 }
4826
4827
4828 /**
4829 * @returns true if directory is empty or can't be accessed, else false.
4830 */
rd_kafka_dir_is_empty(const char * path)4831 rd_bool_t rd_kafka_dir_is_empty (const char *path) {
4832 #if _WIN32
4833 /* FIXME: Unsupported */
4834 return rd_true;
4835 #else
4836 DIR *dir;
4837 struct dirent *d;
4838 #if defined(__sun)
4839 struct stat st;
4840 int ret = 0;
4841 #endif
4842
4843 dir = opendir(path);
4844 if (!dir)
4845 return rd_true;
4846
4847 while ((d = readdir(dir))) {
4848
4849 if (!strcmp(d->d_name, ".") ||
4850 !strcmp(d->d_name, ".."))
4851 continue;
4852
4853 #if defined(__sun)
4854 ret = stat(d->d_name, &st);
4855 if (ret != 0) {
4856 return rd_true; // Can't be accessed
4857 }
4858 if (S_ISREG(st.st_mode) || S_ISDIR(st.st_mode) ||
4859 S_ISLNK(st.st_mode)) {
4860 #else
4861 if (d->d_type == DT_REG || d->d_type == DT_LNK ||
4862 d->d_type == DT_DIR) {
4863 #endif
4864 closedir(dir);
4865 return rd_false;
4866 }
4867 }
4868
4869 closedir(dir);
4870 return rd_true;
4871 #endif
4872 }
4873
4874
4875 void *rd_kafka_mem_malloc (rd_kafka_t *rk, size_t size) {
4876 return rd_malloc(size);
4877 }
4878
4879 void *rd_kafka_mem_calloc(rd_kafka_t *rk, size_t num, size_t size) {
4880 return rd_calloc(num, size);
4881 }
4882
4883 void rd_kafka_mem_free (rd_kafka_t *rk, void *ptr) {
4884 rd_free(ptr);
4885 }
4886
4887
4888 int rd_kafka_errno (void) {
4889 return errno;
4890 }
4891
4892 int rd_kafka_unittest (void) {
4893 return rd_unittest();
4894 }
4895