1 /*
2  * librdkafka - Apache Kafka C library
3  *
4  * Copyright (c) 2012-2013, Magnus Edenhill
5  * All rights reserved.
6  *
7  * Redistribution and use in source and binary forms, with or without
8  * modification, are permitted provided that the following conditions are met:
9  *
10  * 1. Redistributions of source code must retain the above copyright notice,
11  *    this list of conditions and the following disclaimer.
12  * 2. Redistributions in binary form must reproduce the above copyright notice,
13  *    this list of conditions and the following disclaimer in the documentation
14  *    and/or other materials provided with the distribution.
15  *
16  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
17  * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
18  * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
19  * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
20  * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
21  * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
22  * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
23  * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
24  * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
25  * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
26  * POSSIBILITY OF SUCH DAMAGE.
27  */
28 
29 
30 #define _GNU_SOURCE
31 #include <errno.h>
32 #include <string.h>
33 #include <stdarg.h>
34 #include <signal.h>
35 #include <stdlib.h>
36 #include <sys/stat.h>
37 #if !_WIN32
38 #include <sys/types.h>
39 #include <dirent.h>
40 #endif
41 
42 #include "rdkafka_int.h"
43 #include "rdkafka_msg.h"
44 #include "rdkafka_broker.h"
45 #include "rdkafka_topic.h"
46 #include "rdkafka_partition.h"
47 #include "rdkafka_offset.h"
48 #include "rdkafka_transport.h"
49 #include "rdkafka_cgrp.h"
50 #include "rdkafka_assignor.h"
51 #include "rdkafka_request.h"
52 #include "rdkafka_event.h"
53 #include "rdkafka_sasl.h"
54 #include "rdkafka_interceptor.h"
55 #include "rdkafka_idempotence.h"
56 #include "rdkafka_sasl_oauthbearer.h"
57 #if WITH_SSL
58 #include "rdkafka_ssl.h"
59 #endif
60 
61 #include "rdtime.h"
62 #include "crc32c.h"
63 #include "rdunittest.h"
64 
65 #ifdef _WIN32
66 #include <sys/types.h>
67 #include <sys/timeb.h>
68 #endif
69 
70 
71 
72 static once_flag rd_kafka_global_init_once = ONCE_FLAG_INIT;
73 static once_flag rd_kafka_global_srand_once = ONCE_FLAG_INIT;
74 
75 /**
76  * @brief Global counter+lock for all active librdkafka instances
77  */
78 mtx_t rd_kafka_global_lock;
79 int rd_kafka_global_cnt;
80 
81 
82 /**
83  * Last API error code, per thread.
84  * Shared among all rd_kafka_t instances.
85  */
86 rd_kafka_resp_err_t RD_TLS rd_kafka_last_error_code;
87 
88 
89 /**
90  * Current number of threads created by rdkafka.
91  * This is used in regression tests.
92  */
93 rd_atomic32_t rd_kafka_thread_cnt_curr;
rd_kafka_thread_cnt(void)94 int rd_kafka_thread_cnt (void) {
95 	return rd_atomic32_get(&rd_kafka_thread_cnt_curr);
96 }
97 
98 /**
99  * Current thread's log name (TLS)
100  */
101 char RD_TLS rd_kafka_thread_name[64] = "app";
102 
rd_kafka_set_thread_name(const char * fmt,...)103 void rd_kafka_set_thread_name (const char *fmt, ...) {
104         va_list ap;
105 
106         va_start(ap, fmt);
107         rd_vsnprintf(rd_kafka_thread_name, sizeof(rd_kafka_thread_name),
108                      fmt, ap);
109         va_end(ap);
110 }
111 
112 /**
113  * @brief Current thread's system name (TLS)
114  *
115  * Note the name must be 15 characters or less, because it is passed to
116  * pthread_setname_np on Linux which imposes this limit.
117  */
118 static char RD_TLS rd_kafka_thread_sysname[16] = "app";
119 
rd_kafka_set_thread_sysname(const char * fmt,...)120 void rd_kafka_set_thread_sysname (const char *fmt, ...) {
121         va_list ap;
122 
123         va_start(ap, fmt);
124         rd_vsnprintf(rd_kafka_thread_sysname, sizeof(rd_kafka_thread_sysname),
125                      fmt, ap);
126         va_end(ap);
127 
128         thrd_setname(rd_kafka_thread_sysname);
129 }
130 
rd_kafka_global_init0(void)131 static void rd_kafka_global_init0 (void) {
132 	mtx_init(&rd_kafka_global_lock, mtx_plain);
133 #if ENABLE_DEVEL
134 	rd_atomic32_init(&rd_kafka_op_cnt, 0);
135 #endif
136         rd_crc32c_global_init();
137 #if WITH_SSL
138         /* The configuration interface might need to use
139          * OpenSSL to parse keys, prior to any rd_kafka_t
140          * object has been created. */
141         rd_kafka_ssl_init();
142 #endif
143 }
144 
145 /**
146  * @brief Initialize once per process
147  */
rd_kafka_global_init(void)148 void rd_kafka_global_init (void) {
149         call_once(&rd_kafka_global_init_once, rd_kafka_global_init0);
150 }
151 
152 
153 /**
154  * @brief Seed the PRNG with current_time.milliseconds
155  */
rd_kafka_global_srand(void)156 static void rd_kafka_global_srand (void) {
157 	struct timeval tv;
158 
159 	rd_gettimeofday(&tv, NULL);
160 
161         srand((unsigned int)(tv.tv_usec / 1000));
162 }
163 
164 
165 /**
166  * @returns the current number of active librdkafka instances
167  */
rd_kafka_global_cnt_get(void)168 static int rd_kafka_global_cnt_get (void) {
169 	int r;
170 	mtx_lock(&rd_kafka_global_lock);
171 	r = rd_kafka_global_cnt;
172 	mtx_unlock(&rd_kafka_global_lock);
173 	return r;
174 }
175 
176 
177 /**
178  * @brief Increase counter for active librdkafka instances.
179  * If this is the first instance the global constructors will be called, if any.
180  */
rd_kafka_global_cnt_incr(void)181 static void rd_kafka_global_cnt_incr (void) {
182 	mtx_lock(&rd_kafka_global_lock);
183 	rd_kafka_global_cnt++;
184 	if (rd_kafka_global_cnt == 1) {
185 		rd_kafka_transport_init();
186 #if WITH_SSL
187                 rd_kafka_ssl_init();
188 #endif
189                 rd_kafka_sasl_global_init();
190 	}
191 	mtx_unlock(&rd_kafka_global_lock);
192 }
193 
194 /**
195  * @brief Decrease counter for active librdkafka instances.
196  * If this counter reaches 0 the global destructors will be called, if any.
197  */
rd_kafka_global_cnt_decr(void)198 static void rd_kafka_global_cnt_decr (void) {
199 	mtx_lock(&rd_kafka_global_lock);
200 	rd_kafka_assert(NULL, rd_kafka_global_cnt > 0);
201 	rd_kafka_global_cnt--;
202 	if (rd_kafka_global_cnt == 0) {
203                 rd_kafka_sasl_global_term();
204 #if WITH_SSL
205                 rd_kafka_ssl_term();
206 #endif
207 	}
208 	mtx_unlock(&rd_kafka_global_lock);
209 }
210 
211 
212 /**
213  * Wait for all rd_kafka_t objects to be destroyed.
214  * Returns 0 if all kafka objects are now destroyed, or -1 if the
215  * timeout was reached.
216  */
rd_kafka_wait_destroyed(int timeout_ms)217 int rd_kafka_wait_destroyed (int timeout_ms) {
218 	rd_ts_t timeout = rd_clock() + (timeout_ms * 1000);
219 
220 	while (rd_kafka_thread_cnt() > 0 ||
221 	       rd_kafka_global_cnt_get() > 0) {
222 		if (rd_clock() >= timeout) {
223 			rd_kafka_set_last_error(RD_KAFKA_RESP_ERR__TIMED_OUT,
224 						ETIMEDOUT);
225 			return -1;
226 		}
227 		rd_usleep(25000, NULL); /* 25ms */
228 	}
229 
230 	return 0;
231 }
232 
rd_kafka_log_buf(const rd_kafka_conf_t * conf,const rd_kafka_t * rk,int level,int ctx,const char * fac,const char * buf)233 static void rd_kafka_log_buf (const rd_kafka_conf_t *conf,
234                               const rd_kafka_t *rk, int level, int ctx,
235                               const char *fac, const char *buf) {
236         if (level > conf->log_level)
237                 return;
238         else if (rk && conf->log_queue) {
239                 rd_kafka_op_t *rko;
240 
241                 if (!rk->rk_logq)
242                         return; /* Terminating */
243 
244                 rko = rd_kafka_op_new(RD_KAFKA_OP_LOG);
245                 rd_kafka_op_set_prio(rko, RD_KAFKA_PRIO_MEDIUM);
246                 rko->rko_u.log.level = level;
247                 rd_strlcpy(rko->rko_u.log.fac, fac, sizeof(rko->rko_u.log.fac));
248                 rko->rko_u.log.str = rd_strdup(buf);
249                 rko->rko_u.log.ctx = ctx;
250                 rd_kafka_q_enq(rk->rk_logq, rko);
251 
252         } else if (conf->log_cb) {
253                 conf->log_cb(rk, level, fac, buf);
254         }
255 }
256 
257 /**
258  * @brief Logger
259  *
260  * @remark conf must be set, but rk may be NULL
261  */
rd_kafka_log0(const rd_kafka_conf_t * conf,const rd_kafka_t * rk,const char * extra,int level,int ctx,const char * fac,const char * fmt,...)262 void rd_kafka_log0 (const rd_kafka_conf_t *conf,
263                     const rd_kafka_t *rk,
264                     const char *extra, int level, int ctx,
265                     const char *fac, const char *fmt, ...) {
266 	char buf[2048];
267 	va_list ap;
268 	unsigned int elen = 0;
269         unsigned int of = 0;
270 
271 	if (level > conf->log_level)
272 		return;
273 
274 	if (conf->log_thread_name) {
275 		elen = rd_snprintf(buf, sizeof(buf), "[thrd:%s]: ",
276 				   rd_kafka_thread_name);
277 		if (unlikely(elen >= sizeof(buf)))
278 			elen = sizeof(buf);
279 		of = elen;
280 	}
281 
282 	if (extra) {
283 		elen = rd_snprintf(buf+of, sizeof(buf)-of, "%s: ", extra);
284 		if (unlikely(elen >= sizeof(buf)-of))
285 			elen = sizeof(buf)-of;
286                 of += elen;
287 	}
288 
289 	va_start(ap, fmt);
290 	rd_vsnprintf(buf+of, sizeof(buf)-of, fmt, ap);
291 	va_end(ap);
292 
293         rd_kafka_log_buf(conf, rk, level, ctx, fac, buf);
294 }
295 
296 rd_kafka_resp_err_t
rd_kafka_oauthbearer_set_token(rd_kafka_t * rk,const char * token_value,int64_t md_lifetime_ms,const char * md_principal_name,const char ** extensions,size_t extension_size,char * errstr,size_t errstr_size)297 rd_kafka_oauthbearer_set_token (rd_kafka_t *rk,
298                                 const char *token_value,
299                                 int64_t md_lifetime_ms,
300                                 const char *md_principal_name,
301                                 const char **extensions, size_t extension_size,
302                                 char *errstr, size_t errstr_size) {
303 #if WITH_SASL_OAUTHBEARER
304         return rd_kafka_oauthbearer_set_token0(
305                 rk, token_value,
306                 md_lifetime_ms, md_principal_name, extensions, extension_size,
307                 errstr, errstr_size);
308 #else
309         rd_snprintf(errstr, errstr_size,
310                     "librdkafka not built with SASL OAUTHBEARER support");
311         return RD_KAFKA_RESP_ERR__NOT_IMPLEMENTED;
312 #endif
313 }
314 
315 rd_kafka_resp_err_t
rd_kafka_oauthbearer_set_token_failure(rd_kafka_t * rk,const char * errstr)316 rd_kafka_oauthbearer_set_token_failure (rd_kafka_t *rk, const char *errstr) {
317 #if WITH_SASL_OAUTHBEARER
318         return rd_kafka_oauthbearer_set_token_failure0(rk, errstr);
319 #else
320         return RD_KAFKA_RESP_ERR__NOT_IMPLEMENTED;
321 #endif
322 }
323 
rd_kafka_log_print(const rd_kafka_t * rk,int level,const char * fac,const char * buf)324 void rd_kafka_log_print(const rd_kafka_t *rk, int level,
325 	const char *fac, const char *buf) {
326 	int secs, msecs;
327 	struct timeval tv;
328 	rd_gettimeofday(&tv, NULL);
329 	secs = (int)tv.tv_sec;
330 	msecs = (int)(tv.tv_usec / 1000);
331 	fprintf(stderr, "%%%i|%u.%03u|%s|%s| %s\n",
332 		level, secs, msecs,
333 		fac, rk ? rk->rk_name : "", buf);
334 }
335 
rd_kafka_log_syslog(const rd_kafka_t * rk,int level,const char * fac,const char * buf)336 void rd_kafka_log_syslog (const rd_kafka_t *rk, int level,
337 			  const char *fac, const char *buf) {
338 #if WITH_SYSLOG
339 	static int initialized = 0;
340 
341 	if (!initialized)
342 		openlog("rdkafka", LOG_PID|LOG_CONS, LOG_USER);
343 
344 	syslog(level, "%s: %s: %s", fac, rk ? rk->rk_name : "", buf);
345 #else
346         rd_assert(!*"syslog support not enabled in this build");
347 #endif
348 }
349 
rd_kafka_set_logger(rd_kafka_t * rk,void (* func)(const rd_kafka_t * rk,int level,const char * fac,const char * buf))350 void rd_kafka_set_logger (rd_kafka_t *rk,
351 			  void (*func) (const rd_kafka_t *rk, int level,
352 					const char *fac, const char *buf)) {
353 #if !WITH_SYSLOG
354         if (func == rd_kafka_log_syslog)
355                 rd_assert(!*"syslog support not enabled in this build");
356 #endif
357 	rk->rk_conf.log_cb = func;
358 }
359 
rd_kafka_set_log_level(rd_kafka_t * rk,int level)360 void rd_kafka_set_log_level (rd_kafka_t *rk, int level) {
361 	rk->rk_conf.log_level = level;
362 }
363 
364 
365 
366 
367 
368 
rd_kafka_type2str(rd_kafka_type_t type)369 static const char *rd_kafka_type2str (rd_kafka_type_t type) {
370 	static const char *types[] = {
371 		[RD_KAFKA_PRODUCER] = "producer",
372 		[RD_KAFKA_CONSUMER] = "consumer",
373 	};
374 	return types[type];
375 }
376 
377 #define _ERR_DESC(ENUM,DESC) \
378 	[ENUM - RD_KAFKA_RESP_ERR__BEGIN] = { ENUM, &(# ENUM)[18]/*pfx*/, DESC }
379 
380 static const struct rd_kafka_err_desc rd_kafka_err_descs[] = {
381 	_ERR_DESC(RD_KAFKA_RESP_ERR__BEGIN, NULL),
382 	_ERR_DESC(RD_KAFKA_RESP_ERR__BAD_MSG,
383 		  "Local: Bad message format"),
384 	_ERR_DESC(RD_KAFKA_RESP_ERR__BAD_COMPRESSION,
385 		  "Local: Invalid compressed data"),
386 	_ERR_DESC(RD_KAFKA_RESP_ERR__DESTROY,
387 		  "Local: Broker handle destroyed"),
388 	_ERR_DESC(RD_KAFKA_RESP_ERR__FAIL,
389 		  "Local: Communication failure with broker"), //FIXME: too specific
390 	_ERR_DESC(RD_KAFKA_RESP_ERR__TRANSPORT,
391 		  "Local: Broker transport failure"),
392 	_ERR_DESC(RD_KAFKA_RESP_ERR__CRIT_SYS_RESOURCE,
393 		  "Local: Critical system resource failure"),
394 	_ERR_DESC(RD_KAFKA_RESP_ERR__RESOLVE,
395 		  "Local: Host resolution failure"),
396 	_ERR_DESC(RD_KAFKA_RESP_ERR__MSG_TIMED_OUT,
397 		  "Local: Message timed out"),
398 	_ERR_DESC(RD_KAFKA_RESP_ERR__PARTITION_EOF,
399 		  "Broker: No more messages"),
400 	_ERR_DESC(RD_KAFKA_RESP_ERR__UNKNOWN_PARTITION,
401 		  "Local: Unknown partition"),
402 	_ERR_DESC(RD_KAFKA_RESP_ERR__FS,
403 		  "Local: File or filesystem error"),
404 	_ERR_DESC(RD_KAFKA_RESP_ERR__UNKNOWN_TOPIC,
405 		  "Local: Unknown topic"),
406 	_ERR_DESC(RD_KAFKA_RESP_ERR__ALL_BROKERS_DOWN,
407 		  "Local: All broker connections are down"),
408 	_ERR_DESC(RD_KAFKA_RESP_ERR__INVALID_ARG,
409 		  "Local: Invalid argument or configuration"),
410 	_ERR_DESC(RD_KAFKA_RESP_ERR__TIMED_OUT,
411 		  "Local: Timed out"),
412 	_ERR_DESC(RD_KAFKA_RESP_ERR__QUEUE_FULL,
413 		  "Local: Queue full"),
414         _ERR_DESC(RD_KAFKA_RESP_ERR__ISR_INSUFF,
415 		  "Local: ISR count insufficient"),
416         _ERR_DESC(RD_KAFKA_RESP_ERR__NODE_UPDATE,
417 		  "Local: Broker node update"),
418 	_ERR_DESC(RD_KAFKA_RESP_ERR__SSL,
419 		  "Local: SSL error"),
420         _ERR_DESC(RD_KAFKA_RESP_ERR__WAIT_COORD,
421 		  "Local: Waiting for coordinator"),
422         _ERR_DESC(RD_KAFKA_RESP_ERR__UNKNOWN_GROUP,
423 		  "Local: Unknown group"),
424         _ERR_DESC(RD_KAFKA_RESP_ERR__IN_PROGRESS,
425 		  "Local: Operation in progress"),
426         _ERR_DESC(RD_KAFKA_RESP_ERR__PREV_IN_PROGRESS,
427 		  "Local: Previous operation in progress"),
428         _ERR_DESC(RD_KAFKA_RESP_ERR__EXISTING_SUBSCRIPTION,
429 		  "Local: Existing subscription"),
430         _ERR_DESC(RD_KAFKA_RESP_ERR__ASSIGN_PARTITIONS,
431 		  "Local: Assign partitions"),
432         _ERR_DESC(RD_KAFKA_RESP_ERR__REVOKE_PARTITIONS,
433 		  "Local: Revoke partitions"),
434         _ERR_DESC(RD_KAFKA_RESP_ERR__CONFLICT,
435 		  "Local: Conflicting use"),
436         _ERR_DESC(RD_KAFKA_RESP_ERR__STATE,
437 		  "Local: Erroneous state"),
438         _ERR_DESC(RD_KAFKA_RESP_ERR__UNKNOWN_PROTOCOL,
439 		  "Local: Unknown protocol"),
440         _ERR_DESC(RD_KAFKA_RESP_ERR__NOT_IMPLEMENTED,
441 		  "Local: Not implemented"),
442 	_ERR_DESC(RD_KAFKA_RESP_ERR__AUTHENTICATION,
443 		  "Local: Authentication failure"),
444 	_ERR_DESC(RD_KAFKA_RESP_ERR__NO_OFFSET,
445 		  "Local: No offset stored"),
446 	_ERR_DESC(RD_KAFKA_RESP_ERR__OUTDATED,
447 		  "Local: Outdated"),
448 	_ERR_DESC(RD_KAFKA_RESP_ERR__TIMED_OUT_QUEUE,
449 		  "Local: Timed out in queue"),
450         _ERR_DESC(RD_KAFKA_RESP_ERR__UNSUPPORTED_FEATURE,
451                   "Local: Required feature not supported by broker"),
452         _ERR_DESC(RD_KAFKA_RESP_ERR__WAIT_CACHE,
453                   "Local: Awaiting cache update"),
454         _ERR_DESC(RD_KAFKA_RESP_ERR__INTR,
455                   "Local: Operation interrupted"),
456         _ERR_DESC(RD_KAFKA_RESP_ERR__KEY_SERIALIZATION,
457                   "Local: Key serialization error"),
458         _ERR_DESC(RD_KAFKA_RESP_ERR__VALUE_SERIALIZATION,
459                   "Local: Value serialization error"),
460         _ERR_DESC(RD_KAFKA_RESP_ERR__KEY_DESERIALIZATION,
461                   "Local: Key deserialization error"),
462         _ERR_DESC(RD_KAFKA_RESP_ERR__VALUE_DESERIALIZATION,
463                   "Local: Value deserialization error"),
464         _ERR_DESC(RD_KAFKA_RESP_ERR__PARTIAL,
465                   "Local: Partial response"),
466         _ERR_DESC(RD_KAFKA_RESP_ERR__READ_ONLY,
467                   "Local: Read-only object"),
468         _ERR_DESC(RD_KAFKA_RESP_ERR__NOENT,
469                   "Local: No such entry"),
470         _ERR_DESC(RD_KAFKA_RESP_ERR__UNDERFLOW,
471                   "Local: Read underflow"),
472         _ERR_DESC(RD_KAFKA_RESP_ERR__INVALID_TYPE,
473                   "Local: Invalid type"),
474         _ERR_DESC(RD_KAFKA_RESP_ERR__RETRY,
475                   "Local: Retry operation"),
476         _ERR_DESC(RD_KAFKA_RESP_ERR__PURGE_QUEUE,
477                   "Local: Purged in queue"),
478         _ERR_DESC(RD_KAFKA_RESP_ERR__PURGE_INFLIGHT,
479                   "Local: Purged in flight"),
480         _ERR_DESC(RD_KAFKA_RESP_ERR__FATAL,
481                   "Local: Fatal error"),
482         _ERR_DESC(RD_KAFKA_RESP_ERR__INCONSISTENT,
483                   "Local: Inconsistent state"),
484         _ERR_DESC(RD_KAFKA_RESP_ERR__GAPLESS_GUARANTEE,
485                   "Local: Gap-less ordering would not be guaranteed "
486                   "if proceeding"),
487         _ERR_DESC(RD_KAFKA_RESP_ERR__MAX_POLL_EXCEEDED,
488                   "Local: Maximum application poll interval "
489                   "(max.poll.interval.ms) exceeded"),
490         _ERR_DESC(RD_KAFKA_RESP_ERR__UNKNOWN_BROKER,
491                   "Local: Unknown broker"),
492         _ERR_DESC(RD_KAFKA_RESP_ERR__NOT_CONFIGURED,
493                   "Local: Functionality not configured"),
494         _ERR_DESC(RD_KAFKA_RESP_ERR__FENCED,
495                   "Local: This instance has been fenced by a newer instance"),
496         _ERR_DESC(RD_KAFKA_RESP_ERR__APPLICATION,
497                   "Local: Application generated error"),
498         _ERR_DESC(RD_KAFKA_RESP_ERR__ASSIGNMENT_LOST,
499                   "Local: Group partition assignment lost"),
500         _ERR_DESC(RD_KAFKA_RESP_ERR__NOOP,
501                   "Local: No operation performed"),
502         _ERR_DESC(RD_KAFKA_RESP_ERR__AUTO_OFFSET_RESET,
503                   "Local: No offset to automatically reset to"),
504 
505 	_ERR_DESC(RD_KAFKA_RESP_ERR_UNKNOWN,
506 		  "Unknown broker error"),
507 	_ERR_DESC(RD_KAFKA_RESP_ERR_NO_ERROR,
508 		  "Success"),
509 	_ERR_DESC(RD_KAFKA_RESP_ERR_OFFSET_OUT_OF_RANGE,
510 		  "Broker: Offset out of range"),
511 	_ERR_DESC(RD_KAFKA_RESP_ERR_INVALID_MSG,
512 		  "Broker: Invalid message"),
513 	_ERR_DESC(RD_KAFKA_RESP_ERR_UNKNOWN_TOPIC_OR_PART,
514 		  "Broker: Unknown topic or partition"),
515 	_ERR_DESC(RD_KAFKA_RESP_ERR_INVALID_MSG_SIZE,
516 		  "Broker: Invalid message size"),
517 	_ERR_DESC(RD_KAFKA_RESP_ERR_LEADER_NOT_AVAILABLE,
518 		  "Broker: Leader not available"),
519 	_ERR_DESC(RD_KAFKA_RESP_ERR_NOT_LEADER_FOR_PARTITION,
520 		  "Broker: Not leader for partition"),
521 	_ERR_DESC(RD_KAFKA_RESP_ERR_REQUEST_TIMED_OUT,
522 		  "Broker: Request timed out"),
523 	_ERR_DESC(RD_KAFKA_RESP_ERR_BROKER_NOT_AVAILABLE,
524 		  "Broker: Broker not available"),
525 	_ERR_DESC(RD_KAFKA_RESP_ERR_REPLICA_NOT_AVAILABLE,
526 		  "Broker: Replica not available"),
527 	_ERR_DESC(RD_KAFKA_RESP_ERR_MSG_SIZE_TOO_LARGE,
528 		  "Broker: Message size too large"),
529 	_ERR_DESC(RD_KAFKA_RESP_ERR_STALE_CTRL_EPOCH,
530 		  "Broker: StaleControllerEpochCode"),
531 	_ERR_DESC(RD_KAFKA_RESP_ERR_OFFSET_METADATA_TOO_LARGE,
532 		  "Broker: Offset metadata string too large"),
533 	_ERR_DESC(RD_KAFKA_RESP_ERR_NETWORK_EXCEPTION,
534 		  "Broker: Broker disconnected before response received"),
535         _ERR_DESC(RD_KAFKA_RESP_ERR_COORDINATOR_LOAD_IN_PROGRESS,
536 		  "Broker: Coordinator load in progress"),
537         _ERR_DESC(RD_KAFKA_RESP_ERR_COORDINATOR_NOT_AVAILABLE,
538 		  "Broker: Coordinator not available"),
539         _ERR_DESC(RD_KAFKA_RESP_ERR_NOT_COORDINATOR,
540 		  "Broker: Not coordinator"),
541         _ERR_DESC(RD_KAFKA_RESP_ERR_TOPIC_EXCEPTION,
542 		  "Broker: Invalid topic"),
543         _ERR_DESC(RD_KAFKA_RESP_ERR_RECORD_LIST_TOO_LARGE,
544 		  "Broker: Message batch larger than configured server "
545 		  "segment size"),
546         _ERR_DESC(RD_KAFKA_RESP_ERR_NOT_ENOUGH_REPLICAS,
547 		  "Broker: Not enough in-sync replicas"),
548         _ERR_DESC(RD_KAFKA_RESP_ERR_NOT_ENOUGH_REPLICAS_AFTER_APPEND,
549 		  "Broker: Message(s) written to insufficient number of "
550 		  "in-sync replicas"),
551         _ERR_DESC(RD_KAFKA_RESP_ERR_INVALID_REQUIRED_ACKS,
552 		  "Broker: Invalid required acks value"),
553         _ERR_DESC(RD_KAFKA_RESP_ERR_ILLEGAL_GENERATION,
554 		  "Broker: Specified group generation id is not valid"),
555         _ERR_DESC(RD_KAFKA_RESP_ERR_INCONSISTENT_GROUP_PROTOCOL,
556 		  "Broker: Inconsistent group protocol"),
557 	_ERR_DESC(RD_KAFKA_RESP_ERR_INVALID_GROUP_ID,
558 		  "Broker: Invalid group.id"),
559         _ERR_DESC(RD_KAFKA_RESP_ERR_UNKNOWN_MEMBER_ID,
560 		  "Broker: Unknown member"),
561         _ERR_DESC(RD_KAFKA_RESP_ERR_INVALID_SESSION_TIMEOUT,
562 		  "Broker: Invalid session timeout"),
563 	_ERR_DESC(RD_KAFKA_RESP_ERR_REBALANCE_IN_PROGRESS,
564 		  "Broker: Group rebalance in progress"),
565         _ERR_DESC(RD_KAFKA_RESP_ERR_INVALID_COMMIT_OFFSET_SIZE,
566 		  "Broker: Commit offset data size is not valid"),
567         _ERR_DESC(RD_KAFKA_RESP_ERR_TOPIC_AUTHORIZATION_FAILED,
568 		  "Broker: Topic authorization failed"),
569 	_ERR_DESC(RD_KAFKA_RESP_ERR_GROUP_AUTHORIZATION_FAILED,
570 		  "Broker: Group authorization failed"),
571 	_ERR_DESC(RD_KAFKA_RESP_ERR_CLUSTER_AUTHORIZATION_FAILED,
572 		  "Broker: Cluster authorization failed"),
573 	_ERR_DESC(RD_KAFKA_RESP_ERR_INVALID_TIMESTAMP,
574 		  "Broker: Invalid timestamp"),
575 	_ERR_DESC(RD_KAFKA_RESP_ERR_UNSUPPORTED_SASL_MECHANISM,
576 		  "Broker: Unsupported SASL mechanism"),
577 	_ERR_DESC(RD_KAFKA_RESP_ERR_ILLEGAL_SASL_STATE,
578 		  "Broker: Request not valid in current SASL state"),
579 	_ERR_DESC(RD_KAFKA_RESP_ERR_UNSUPPORTED_VERSION,
580 		  "Broker: API version not supported"),
581 	_ERR_DESC(RD_KAFKA_RESP_ERR_TOPIC_ALREADY_EXISTS,
582 		  "Broker: Topic already exists"),
583 	_ERR_DESC(RD_KAFKA_RESP_ERR_INVALID_PARTITIONS,
584 		  "Broker: Invalid number of partitions"),
585 	_ERR_DESC(RD_KAFKA_RESP_ERR_INVALID_REPLICATION_FACTOR,
586 		  "Broker: Invalid replication factor"),
587 	_ERR_DESC(RD_KAFKA_RESP_ERR_INVALID_REPLICA_ASSIGNMENT,
588 		  "Broker: Invalid replica assignment"),
589 	_ERR_DESC(RD_KAFKA_RESP_ERR_INVALID_CONFIG,
590 		  "Broker: Configuration is invalid"),
591 	_ERR_DESC(RD_KAFKA_RESP_ERR_NOT_CONTROLLER,
592 		  "Broker: Not controller for cluster"),
593 	_ERR_DESC(RD_KAFKA_RESP_ERR_INVALID_REQUEST,
594 		  "Broker: Invalid request"),
595 	_ERR_DESC(RD_KAFKA_RESP_ERR_UNSUPPORTED_FOR_MESSAGE_FORMAT,
596 		  "Broker: Message format on broker does not support request"),
597         _ERR_DESC(RD_KAFKA_RESP_ERR_POLICY_VIOLATION,
598                   "Broker: Policy violation"),
599         _ERR_DESC(RD_KAFKA_RESP_ERR_OUT_OF_ORDER_SEQUENCE_NUMBER,
600                   "Broker: Broker received an out of order sequence number"),
601         _ERR_DESC(RD_KAFKA_RESP_ERR_DUPLICATE_SEQUENCE_NUMBER,
602                   "Broker: Broker received a duplicate sequence number"),
603         _ERR_DESC(RD_KAFKA_RESP_ERR_INVALID_PRODUCER_EPOCH,
604                   "Broker: Producer attempted an operation with an old epoch"),
605         _ERR_DESC(RD_KAFKA_RESP_ERR_INVALID_TXN_STATE,
606                   "Broker: Producer attempted a transactional operation in "
607                   "an invalid state"),
608         _ERR_DESC(RD_KAFKA_RESP_ERR_INVALID_PRODUCER_ID_MAPPING,
609                   "Broker: Producer attempted to use a producer id which is "
610                   "not currently assigned to its transactional id"),
611         _ERR_DESC(RD_KAFKA_RESP_ERR_INVALID_TRANSACTION_TIMEOUT,
612                   "Broker: Transaction timeout is larger than the maximum "
613                   "value allowed by the broker's max.transaction.timeout.ms"),
614         _ERR_DESC(RD_KAFKA_RESP_ERR_CONCURRENT_TRANSACTIONS,
615                   "Broker: Producer attempted to update a transaction while "
616                   "another concurrent operation on the same transaction was "
617                   "ongoing"),
618         _ERR_DESC(RD_KAFKA_RESP_ERR_TRANSACTION_COORDINATOR_FENCED,
619                   "Broker: Indicates that the transaction coordinator sending "
620                   "a WriteTxnMarker is no longer the current coordinator for "
621                   "a given producer"),
622         _ERR_DESC(RD_KAFKA_RESP_ERR_TRANSACTIONAL_ID_AUTHORIZATION_FAILED,
623                   "Broker: Transactional Id authorization failed"),
624         _ERR_DESC(RD_KAFKA_RESP_ERR_SECURITY_DISABLED,
625                   "Broker: Security features are disabled"),
626         _ERR_DESC(RD_KAFKA_RESP_ERR_OPERATION_NOT_ATTEMPTED,
627                   "Broker: Operation not attempted"),
628         _ERR_DESC(RD_KAFKA_RESP_ERR_KAFKA_STORAGE_ERROR,
629                   "Broker: Disk error when trying to access log file on disk"),
630         _ERR_DESC(RD_KAFKA_RESP_ERR_LOG_DIR_NOT_FOUND,
631                   "Broker: The user-specified log directory is not found "
632                   "in the broker config"),
633         _ERR_DESC(RD_KAFKA_RESP_ERR_SASL_AUTHENTICATION_FAILED,
634                   "Broker: SASL Authentication failed"),
635         _ERR_DESC(RD_KAFKA_RESP_ERR_UNKNOWN_PRODUCER_ID,
636                   "Broker: Unknown Producer Id"),
637         _ERR_DESC(RD_KAFKA_RESP_ERR_REASSIGNMENT_IN_PROGRESS,
638                   "Broker: Partition reassignment is in progress"),
639         _ERR_DESC(RD_KAFKA_RESP_ERR_DELEGATION_TOKEN_AUTH_DISABLED,
640                   "Broker: Delegation Token feature is not enabled"),
641         _ERR_DESC(RD_KAFKA_RESP_ERR_DELEGATION_TOKEN_NOT_FOUND,
642                   "Broker: Delegation Token is not found on server"),
643         _ERR_DESC(RD_KAFKA_RESP_ERR_DELEGATION_TOKEN_OWNER_MISMATCH,
644                   "Broker: Specified Principal is not valid Owner/Renewer"),
645         _ERR_DESC(RD_KAFKA_RESP_ERR_DELEGATION_TOKEN_REQUEST_NOT_ALLOWED,
646                   "Broker: Delegation Token requests are not allowed on "
647                   "this connection"),
648         _ERR_DESC(RD_KAFKA_RESP_ERR_DELEGATION_TOKEN_AUTHORIZATION_FAILED,
649                   "Broker: Delegation Token authorization failed"),
650         _ERR_DESC(RD_KAFKA_RESP_ERR_DELEGATION_TOKEN_EXPIRED,
651                   "Broker: Delegation Token is expired"),
652         _ERR_DESC(RD_KAFKA_RESP_ERR_INVALID_PRINCIPAL_TYPE,
653                   "Broker: Supplied principalType is not supported"),
654         _ERR_DESC(RD_KAFKA_RESP_ERR_NON_EMPTY_GROUP,
655                   "Broker: The group is not empty"),
656         _ERR_DESC(RD_KAFKA_RESP_ERR_GROUP_ID_NOT_FOUND,
657                   "Broker: The group id does not exist"),
658         _ERR_DESC(RD_KAFKA_RESP_ERR_FETCH_SESSION_ID_NOT_FOUND,
659                   "Broker: The fetch session ID was not found"),
660         _ERR_DESC(RD_KAFKA_RESP_ERR_INVALID_FETCH_SESSION_EPOCH,
661                   "Broker: The fetch session epoch is invalid"),
662         _ERR_DESC(RD_KAFKA_RESP_ERR_LISTENER_NOT_FOUND,
663                   "Broker: No matching listener"),
664         _ERR_DESC(RD_KAFKA_RESP_ERR_TOPIC_DELETION_DISABLED,
665                   "Broker: Topic deletion is disabled"),
666         _ERR_DESC(RD_KAFKA_RESP_ERR_FENCED_LEADER_EPOCH,
667                   "Broker: Leader epoch is older than broker epoch"),
668         _ERR_DESC(RD_KAFKA_RESP_ERR_UNKNOWN_LEADER_EPOCH,
669                   "Broker: Leader epoch is newer than broker epoch"),
670         _ERR_DESC(RD_KAFKA_RESP_ERR_UNSUPPORTED_COMPRESSION_TYPE,
671                   "Broker: Unsupported compression type"),
672         _ERR_DESC(RD_KAFKA_RESP_ERR_STALE_BROKER_EPOCH,
673                   "Broker: Broker epoch has changed"),
674         _ERR_DESC(RD_KAFKA_RESP_ERR_OFFSET_NOT_AVAILABLE,
675                   "Broker: Leader high watermark is not caught up"),
676         _ERR_DESC(RD_KAFKA_RESP_ERR_MEMBER_ID_REQUIRED,
677                   "Broker: Group member needs a valid member ID"),
678         _ERR_DESC(RD_KAFKA_RESP_ERR_PREFERRED_LEADER_NOT_AVAILABLE,
679                   "Broker: Preferred leader was not available"),
680         _ERR_DESC(RD_KAFKA_RESP_ERR_GROUP_MAX_SIZE_REACHED,
681                   "Broker: Consumer group has reached maximum size"),
682         _ERR_DESC(RD_KAFKA_RESP_ERR_FENCED_INSTANCE_ID,
683                   "Broker: Static consumer fenced by other consumer with same "
684                   "group.instance.id"),
685         _ERR_DESC(RD_KAFKA_RESP_ERR_ELIGIBLE_LEADERS_NOT_AVAILABLE,
686                   "Broker: Eligible partition leaders are not available"),
687         _ERR_DESC(RD_KAFKA_RESP_ERR_ELECTION_NOT_NEEDED,
688                   "Broker: Leader election not needed for topic partition"),
689         _ERR_DESC(RD_KAFKA_RESP_ERR_NO_REASSIGNMENT_IN_PROGRESS,
690                   "Broker: No partition reassignment is in progress"),
691         _ERR_DESC(RD_KAFKA_RESP_ERR_GROUP_SUBSCRIBED_TO_TOPIC,
692                   "Broker: Deleting offsets of a topic while the consumer "
693                   "group is subscribed to it"),
694         _ERR_DESC(RD_KAFKA_RESP_ERR_INVALID_RECORD,
695                   "Broker: Broker failed to validate record"),
696         _ERR_DESC(RD_KAFKA_RESP_ERR_UNSTABLE_OFFSET_COMMIT,
697                   "Broker: There are unstable offsets that need to be cleared"),
698         _ERR_DESC(RD_KAFKA_RESP_ERR_THROTTLING_QUOTA_EXCEEDED,
699                   "Broker: Throttling quota has been exceeded"),
700         _ERR_DESC(RD_KAFKA_RESP_ERR_PRODUCER_FENCED,
701                   "Broker: There is a newer producer with the same "
702                   "transactionalId which fences the current one"),
703         _ERR_DESC(RD_KAFKA_RESP_ERR_RESOURCE_NOT_FOUND,
704                   "Broker: Request illegally referred to resource that "
705                   "does not exist"),
706         _ERR_DESC(RD_KAFKA_RESP_ERR_DUPLICATE_RESOURCE,
707                   "Broker: Request illegally referred to the same resource "
708                   "twice"),
709         _ERR_DESC(RD_KAFKA_RESP_ERR_UNACCEPTABLE_CREDENTIAL,
710                   "Broker: Requested credential would not meet criteria for "
711                   "acceptability"),
712         _ERR_DESC(RD_KAFKA_RESP_ERR_INCONSISTENT_VOTER_SET,
713                   "Broker: Indicates that the either the sender or recipient "
714                   "of a voter-only request is not one of the expected voters"),
715         _ERR_DESC(RD_KAFKA_RESP_ERR_INVALID_UPDATE_VERSION,
716                   "Broker: Invalid update version"),
717         _ERR_DESC(RD_KAFKA_RESP_ERR_FEATURE_UPDATE_FAILED,
718                   "Broker: Unable to update finalized features due to "
719                   "server error"),
720         _ERR_DESC(RD_KAFKA_RESP_ERR_PRINCIPAL_DESERIALIZATION_FAILURE,
721                   "Broker: Request principal deserialization failed during "
722                   "forwarding"),
723 
724 	_ERR_DESC(RD_KAFKA_RESP_ERR__END, NULL)
725 };
726 
727 
rd_kafka_get_err_descs(const struct rd_kafka_err_desc ** errdescs,size_t * cntp)728 void rd_kafka_get_err_descs (const struct rd_kafka_err_desc **errdescs,
729 			     size_t *cntp) {
730 	*errdescs = rd_kafka_err_descs;
731 	*cntp = RD_ARRAYSIZE(rd_kafka_err_descs);
732 }
733 
734 
rd_kafka_err2str(rd_kafka_resp_err_t err)735 const char *rd_kafka_err2str (rd_kafka_resp_err_t err) {
736 	static RD_TLS char ret[32];
737 	int idx = err - RD_KAFKA_RESP_ERR__BEGIN;
738 
739 	if (unlikely(err <= RD_KAFKA_RESP_ERR__BEGIN ||
740 		     err >= RD_KAFKA_RESP_ERR_END_ALL ||
741 		     !rd_kafka_err_descs[idx].desc)) {
742 		rd_snprintf(ret, sizeof(ret), "Err-%i?", err);
743 		return ret;
744 	}
745 
746 	return rd_kafka_err_descs[idx].desc;
747 }
748 
749 
rd_kafka_err2name(rd_kafka_resp_err_t err)750 const char *rd_kafka_err2name (rd_kafka_resp_err_t err) {
751 	static RD_TLS char ret[32];
752 	int idx = err - RD_KAFKA_RESP_ERR__BEGIN;
753 
754 	if (unlikely(err <= RD_KAFKA_RESP_ERR__BEGIN ||
755 		     err >= RD_KAFKA_RESP_ERR_END_ALL ||
756 		     !rd_kafka_err_descs[idx].desc)) {
757 		rd_snprintf(ret, sizeof(ret), "ERR_%i?", err);
758 		return ret;
759 	}
760 
761 	return rd_kafka_err_descs[idx].name;
762 }
763 
764 
rd_kafka_last_error(void)765 rd_kafka_resp_err_t rd_kafka_last_error (void) {
766 	return rd_kafka_last_error_code;
767 }
768 
769 
rd_kafka_errno2err(int errnox)770 rd_kafka_resp_err_t rd_kafka_errno2err (int errnox) {
771 	switch (errnox)
772 	{
773 	case EINVAL:
774 		return RD_KAFKA_RESP_ERR__INVALID_ARG;
775 
776         case EBUSY:
777                 return RD_KAFKA_RESP_ERR__CONFLICT;
778 
779 	case ENOENT:
780 		return RD_KAFKA_RESP_ERR__UNKNOWN_TOPIC;
781 
782 	case ESRCH:
783 		return RD_KAFKA_RESP_ERR__UNKNOWN_PARTITION;
784 
785 	case ETIMEDOUT:
786 		return RD_KAFKA_RESP_ERR__TIMED_OUT;
787 
788 	case EMSGSIZE:
789 		return RD_KAFKA_RESP_ERR_MSG_SIZE_TOO_LARGE;
790 
791 	case ENOBUFS:
792 		return RD_KAFKA_RESP_ERR__QUEUE_FULL;
793 
794         case ECANCELED:
795                 return RD_KAFKA_RESP_ERR__FATAL;
796 
797 	default:
798 		return RD_KAFKA_RESP_ERR__FAIL;
799 	}
800 }
801 
802 
rd_kafka_fatal_error(rd_kafka_t * rk,char * errstr,size_t errstr_size)803 rd_kafka_resp_err_t rd_kafka_fatal_error (rd_kafka_t *rk,
804                                           char *errstr, size_t errstr_size) {
805         rd_kafka_resp_err_t err;
806 
807         if (unlikely((err = rd_atomic32_get(&rk->rk_fatal.err)))) {
808                 rd_kafka_rdlock(rk);
809                 rd_snprintf(errstr, errstr_size, "%s", rk->rk_fatal.errstr);
810                 rd_kafka_rdunlock(rk);
811         }
812 
813         return err;
814 }
815 
816 
817 /**
818  * @brief Set's the fatal error for this instance.
819  *
820  * @param do_lock RD_DO_LOCK: rd_kafka_wrlock() will be acquired and released,
821  *                RD_DONT_LOCK: caller must hold rd_kafka_wrlock().
822  *
823  * @returns 1 if the error was set, or 0 if a previous fatal error
824  *          has already been set on this instance.
825  *
826  * @locality any
827  * @locks none
828  */
rd_kafka_set_fatal_error0(rd_kafka_t * rk,rd_dolock_t do_lock,rd_kafka_resp_err_t err,const char * fmt,...)829 int rd_kafka_set_fatal_error0 (rd_kafka_t *rk, rd_dolock_t do_lock,
830                                rd_kafka_resp_err_t err,
831                                const char *fmt, ...) {
832         va_list ap;
833         char buf[512];
834 
835         if (do_lock)
836                 rd_kafka_wrlock(rk);
837         rk->rk_fatal.cnt++;
838         if (rd_atomic32_get(&rk->rk_fatal.err)) {
839                 if (do_lock)
840                         rd_kafka_wrunlock(rk);
841                 rd_kafka_dbg(rk, GENERIC, "FATAL",
842                              "Suppressing subsequent fatal error: %s",
843                              rd_kafka_err2name(err));
844                 return 0;
845         }
846 
847         rd_atomic32_set(&rk->rk_fatal.err, err);
848 
849         va_start(ap, fmt);
850         rd_vsnprintf(buf, sizeof(buf), fmt, ap);
851         va_end(ap);
852         rk->rk_fatal.errstr = rd_strdup(buf);
853 
854         if (do_lock)
855                 rd_kafka_wrunlock(rk);
856 
857         /* If there is an error callback or event handler we
858          * also log the fatal error as it happens.
859          * If there is no error callback the error event
860          * will be automatically logged, and this check here
861          * prevents us from duplicate logs. */
862         if (rk->rk_conf.enabled_events & RD_KAFKA_EVENT_ERROR)
863                 rd_kafka_log(rk, LOG_EMERG, "FATAL",
864                              "Fatal error: %s: %s",
865                              rd_kafka_err2str(err), rk->rk_fatal.errstr);
866         else
867                 rd_kafka_dbg(rk, ALL, "FATAL",
868                              "Fatal error: %s: %s",
869                              rd_kafka_err2str(err), rk->rk_fatal.errstr);
870 
871         /* Indicate to the application that a fatal error was raised,
872          * the app should use rd_kafka_fatal_error() to extract the
873          * fatal error code itself.
874          * For the high-level consumer we propagate the error as a
875          * consumer error so it is returned from consumer_poll(),
876          * while for all other client types (the producer) we propagate to
877          * the standard error handler (typically error_cb). */
878         if (rk->rk_type == RD_KAFKA_CONSUMER && rk->rk_cgrp)
879                 rd_kafka_consumer_err(rk->rk_cgrp->rkcg_q, RD_KAFKA_NODEID_UA,
880                                       RD_KAFKA_RESP_ERR__FATAL, 0, NULL, NULL,
881                                       RD_KAFKA_OFFSET_INVALID,
882                                       "Fatal error: %s: %s",
883                                       rd_kafka_err2str(err),
884                                       rk->rk_fatal.errstr);
885         else
886                 rd_kafka_op_err(rk, RD_KAFKA_RESP_ERR__FATAL,
887                                 "Fatal error: %s: %s",
888                                 rd_kafka_err2str(err), rk->rk_fatal.errstr);
889 
890 
891         /* Tell rdkafka main thread to purge producer queues, but not
892          * in-flight since we'll want proper delivery status for transmitted
893          * requests.
894          * Need NON_BLOCKING to avoid dead-lock if user is
895          * calling purge() at the same time, which could be
896          * waiting for this broker thread to handle its
897          * OP_PURGE request. */
898         if (rk->rk_type == RD_KAFKA_PRODUCER) {
899                 rd_kafka_op_t *rko = rd_kafka_op_new(RD_KAFKA_OP_PURGE);
900                 rko->rko_u.purge.flags = RD_KAFKA_PURGE_F_QUEUE|
901                         RD_KAFKA_PURGE_F_NON_BLOCKING;
902                 rd_kafka_q_enq(rk->rk_ops, rko);
903         }
904 
905         return 1;
906 }
907 
908 
909 rd_kafka_resp_err_t
rd_kafka_test_fatal_error(rd_kafka_t * rk,rd_kafka_resp_err_t err,const char * reason)910 rd_kafka_test_fatal_error (rd_kafka_t *rk, rd_kafka_resp_err_t err,
911                            const char *reason) {
912         if (!rd_kafka_set_fatal_error(rk, err, "test_fatal_error: %s", reason))
913                 return RD_KAFKA_RESP_ERR__PREV_IN_PROGRESS;
914         else
915                 return RD_KAFKA_RESP_ERR_NO_ERROR;
916 }
917 
918 
919 
920 /**
921  * @brief Final destructor for rd_kafka_t, must only be called with refcnt 0.
922  *
923  * @locality application thread
924  */
rd_kafka_destroy_final(rd_kafka_t * rk)925 void rd_kafka_destroy_final (rd_kafka_t *rk) {
926 
927         rd_kafka_assert(rk, rd_kafka_terminating(rk));
928 
929         /* Synchronize state */
930         rd_kafka_wrlock(rk);
931         rd_kafka_wrunlock(rk);
932 
933         /* Terminate SASL provider */
934         if (rk->rk_conf.sasl.provider)
935                 rd_kafka_sasl_term(rk);
936 
937         rd_kafka_timers_destroy(&rk->rk_timers);
938 
939         rd_kafka_dbg(rk, GENERIC, "TERMINATE", "Destroying op queues");
940 
941         /* Destroy cgrp */
942         if (rk->rk_cgrp) {
943                 rd_kafka_dbg(rk, GENERIC, "TERMINATE",
944                              "Destroying cgrp");
945                 /* Reset queue forwarding (rep -> cgrp) */
946                 rd_kafka_q_fwd_set(rk->rk_rep, NULL);
947                 rd_kafka_cgrp_destroy_final(rk->rk_cgrp);
948         }
949 
950         rd_kafka_assignors_term(rk);
951 
952         if (rk->rk_type == RD_KAFKA_CONSUMER) {
953                 rd_kafka_assignment_destroy(rk);
954                 if (rk->rk_consumer.q)
955                         rd_kafka_q_destroy(rk->rk_consumer.q);
956         }
957 
958 	/* Purge op-queues */
959 	rd_kafka_q_destroy_owner(rk->rk_rep);
960 	rd_kafka_q_destroy_owner(rk->rk_ops);
961 
962 #if WITH_SSL
963         if (rk->rk_conf.ssl.ctx) {
964                 rd_kafka_dbg(rk, GENERIC, "TERMINATE", "Destroying SSL CTX");
965                 rd_kafka_ssl_ctx_term(rk);
966         }
967 #endif
968 
969         /* It is not safe to log after this point. */
970         rd_kafka_dbg(rk, GENERIC, "TERMINATE",
971                      "Termination done: freeing resources");
972 
973         if (rk->rk_logq) {
974                 rd_kafka_q_destroy_owner(rk->rk_logq);
975                 rk->rk_logq = NULL;
976         }
977 
978         if (rk->rk_type == RD_KAFKA_PRODUCER) {
979 		cnd_destroy(&rk->rk_curr_msgs.cnd);
980 		mtx_destroy(&rk->rk_curr_msgs.lock);
981 	}
982 
983         if (rk->rk_fatal.errstr) {
984                 rd_free(rk->rk_fatal.errstr);
985                 rk->rk_fatal.errstr = NULL;
986         }
987 
988 	cnd_destroy(&rk->rk_broker_state_change_cnd);
989 	mtx_destroy(&rk->rk_broker_state_change_lock);
990 
991         mtx_destroy(&rk->rk_suppress.sparse_connect_lock);
992 
993         cnd_destroy(&rk->rk_init_cnd);
994         mtx_destroy(&rk->rk_init_lock);
995 
996 	if (rk->rk_full_metadata)
997 		rd_kafka_metadata_destroy(rk->rk_full_metadata);
998         rd_kafkap_str_destroy(rk->rk_client_id);
999         rd_kafkap_str_destroy(rk->rk_group_id);
1000         rd_kafkap_str_destroy(rk->rk_eos.transactional_id);
1001 	rd_kafka_anyconf_destroy(_RK_GLOBAL, &rk->rk_conf);
1002         rd_list_destroy(&rk->rk_broker_by_id);
1003 
1004 	rwlock_destroy(&rk->rk_lock);
1005 
1006 	rd_free(rk);
1007 	rd_kafka_global_cnt_decr();
1008 }
1009 
1010 
rd_kafka_destroy_app(rd_kafka_t * rk,int flags)1011 static void rd_kafka_destroy_app (rd_kafka_t *rk, int flags) {
1012         thrd_t thrd;
1013 #ifndef _WIN32
1014 	int term_sig = rk->rk_conf.term_sig;
1015 #endif
1016         int res;
1017         char flags_str[256];
1018         static const char *rd_kafka_destroy_flags_names[] = {
1019                 "Terminate",
1020                 "DestroyCalled",
1021                 "Immediate",
1022                 "NoConsumerClose",
1023                 NULL
1024         };
1025 
1026         /* Fatal errors and _F_IMMEDIATE also sets .._NO_CONSUMER_CLOSE */
1027         if (flags & RD_KAFKA_DESTROY_F_IMMEDIATE ||
1028             rd_kafka_fatal_error_code(rk))
1029                 flags |= RD_KAFKA_DESTROY_F_NO_CONSUMER_CLOSE;
1030 
1031         rd_flags2str(flags_str, sizeof(flags_str),
1032                      rd_kafka_destroy_flags_names, flags);
1033         rd_kafka_dbg(rk, ALL, "DESTROY", "Terminating instance "
1034                      "(destroy flags %s (0x%x))",
1035                      flags ? flags_str : "none", flags);
1036 
1037         /* If producer still has messages in queue the application
1038          * is terminating the producer without first calling flush() or purge()
1039          * which is a common new user mistake, so hint the user of proper
1040          * shutdown semantics. */
1041         if (rk->rk_type == RD_KAFKA_PRODUCER) {
1042                 unsigned int tot_cnt;
1043                 size_t tot_size;
1044 
1045                 rd_kafka_curr_msgs_get(rk, &tot_cnt, &tot_size);
1046 
1047                 if (tot_cnt > 0)
1048                         rd_kafka_log(rk, LOG_WARNING, "TERMINATE",
1049                                      "Producer terminating with %u message%s "
1050                                      "(%"PRIusz" byte%s) still in "
1051                                      "queue or transit: "
1052                                      "use flush() to wait for "
1053                                      "outstanding message delivery",
1054                                      tot_cnt, tot_cnt > 1 ? "s" : "",
1055                                      tot_size, tot_size > 1 ? "s" : "");
1056         }
1057 
1058         /* Make sure destroy is not called from a librdkafka thread
1059          * since this will most likely cause a deadlock.
1060          * FIXME: include broker threads (for log_cb) */
1061         if (thrd_is_current(rk->rk_thread) ||
1062             thrd_is_current(rk->rk_background.thread)) {
1063                 rd_kafka_log(rk, LOG_EMERG, "BGQUEUE",
1064                              "Application bug: "
1065                              "rd_kafka_destroy() called from "
1066                              "librdkafka owned thread");
1067                 rd_kafka_assert(NULL,
1068                                 !*"Application bug: "
1069                                 "calling rd_kafka_destroy() from "
1070                                 "librdkafka owned thread is prohibited");
1071         }
1072 
1073         /* Before signaling for general termination, set the destroy
1074          * flags to hint cgrp how to shut down. */
1075         rd_atomic32_set(&rk->rk_terminate,
1076                         flags|RD_KAFKA_DESTROY_F_DESTROY_CALLED);
1077 
1078         /* The legacy/simple consumer lacks an API to close down the consumer*/
1079         if (rk->rk_cgrp) {
1080                 rd_kafka_dbg(rk, GENERIC, "TERMINATE",
1081                              "Terminating consumer group handler");
1082                 rd_kafka_consumer_close(rk);
1083         }
1084 
1085         /* With the consumer closed, terminate the rest of librdkafka. */
1086         rd_atomic32_set(&rk->rk_terminate, flags|RD_KAFKA_DESTROY_F_TERMINATE);
1087 
1088         rd_kafka_dbg(rk, GENERIC, "TERMINATE", "Interrupting timers");
1089         rd_kafka_wrlock(rk);
1090         thrd = rk->rk_thread;
1091         rd_kafka_timers_interrupt(&rk->rk_timers);
1092         rd_kafka_wrunlock(rk);
1093 
1094         rd_kafka_dbg(rk, GENERIC, "TERMINATE",
1095                      "Sending TERMINATE to internal main thread");
1096         /* Send op to trigger queue/io wake-up.
1097          * The op itself is (likely) ignored by the receiver. */
1098         rd_kafka_q_enq(rk->rk_ops, rd_kafka_op_new(RD_KAFKA_OP_TERMINATE));
1099 
1100 #ifndef _WIN32
1101         /* Interrupt main kafka thread to speed up termination. */
1102 	if (term_sig) {
1103                 rd_kafka_dbg(rk, GENERIC, "TERMINATE",
1104                              "Sending thread kill signal %d", term_sig);
1105                 pthread_kill(thrd, term_sig);
1106         }
1107 #endif
1108 
1109         if (rd_kafka_destroy_flags_check(rk, RD_KAFKA_DESTROY_F_IMMEDIATE))
1110                 return; /* FIXME: thread resource leak */
1111 
1112         rd_kafka_dbg(rk, GENERIC, "TERMINATE",
1113                      "Joining internal main thread");
1114 
1115         if (thrd_join(thrd, &res) != thrd_success)
1116                 rd_kafka_log(rk, LOG_ERR, "DESTROY",
1117                              "Failed to join internal main thread: %s "
1118                              "(was process forked?)",
1119                              rd_strerror(errno));
1120 
1121         rd_kafka_destroy_final(rk);
1122 }
1123 
1124 
1125 /* NOTE: Must only be called by application.
1126  *       librdkafka itself must use rd_kafka_destroy0(). */
rd_kafka_destroy(rd_kafka_t * rk)1127 void rd_kafka_destroy (rd_kafka_t *rk) {
1128         rd_kafka_destroy_app(rk, 0);
1129 }
1130 
rd_kafka_destroy_flags(rd_kafka_t * rk,int flags)1131 void rd_kafka_destroy_flags (rd_kafka_t *rk, int flags) {
1132         rd_kafka_destroy_app(rk, flags);
1133 }
1134 
1135 
1136 /**
1137  * Main destructor for rd_kafka_t
1138  *
1139  * Locality: rdkafka main thread or application thread during rd_kafka_new()
1140  */
rd_kafka_destroy_internal(rd_kafka_t * rk)1141 static void rd_kafka_destroy_internal (rd_kafka_t *rk) {
1142 	rd_kafka_topic_t *rkt, *rkt_tmp;
1143 	rd_kafka_broker_t *rkb, *rkb_tmp;
1144         rd_list_t wait_thrds;
1145         thrd_t *thrd;
1146         int i;
1147 
1148         rd_kafka_dbg(rk, ALL, "DESTROY", "Destroy internal");
1149 
1150         /* Trigger any state-change waiters (which should check the
1151          * terminate flag whenever they wake up). */
1152         rd_kafka_brokers_broadcast_state_change(rk);
1153 
1154         if (rk->rk_background.thread) {
1155                 int res;
1156                 /* Send op to trigger queue/io wake-up.
1157                  * The op itself is (likely) ignored by the receiver. */
1158                 rd_kafka_q_enq(rk->rk_background.q,
1159                                rd_kafka_op_new(RD_KAFKA_OP_TERMINATE));
1160 
1161                 rd_kafka_dbg(rk, ALL, "DESTROY",
1162                              "Waiting for background queue thread "
1163                              "to terminate");
1164                 thrd_join(rk->rk_background.thread, &res);
1165                 rd_kafka_q_destroy_owner(rk->rk_background.q);
1166         }
1167 
1168         /* Call on_destroy() interceptors */
1169         rd_kafka_interceptors_on_destroy(rk);
1170 
1171 	/* Brokers pick up on rk_terminate automatically. */
1172 
1173         /* List of (broker) threads to join to synchronize termination */
1174         rd_list_init(&wait_thrds, rd_atomic32_get(&rk->rk_broker_cnt), NULL);
1175 
1176 	rd_kafka_wrlock(rk);
1177 
1178         rd_kafka_dbg(rk, ALL, "DESTROY", "Removing all topics");
1179 	/* Decommission all topics */
1180 	TAILQ_FOREACH_SAFE(rkt, &rk->rk_topics, rkt_link, rkt_tmp) {
1181 		rd_kafka_wrunlock(rk);
1182 		rd_kafka_topic_partitions_remove(rkt);
1183 		rd_kafka_wrlock(rk);
1184 	}
1185 
1186         /* Decommission brokers.
1187          * Broker thread holds a refcount and detects when broker refcounts
1188          * reaches 1 and then decommissions itself. */
1189         TAILQ_FOREACH_SAFE(rkb, &rk->rk_brokers, rkb_link, rkb_tmp) {
1190                 /* Add broker's thread to wait_thrds list for later joining */
1191                 thrd = rd_malloc(sizeof(*thrd));
1192                 *thrd = rkb->rkb_thread;
1193                 rd_list_add(&wait_thrds, thrd);
1194                 rd_kafka_wrunlock(rk);
1195 
1196                 rd_kafka_dbg(rk, BROKER, "DESTROY",
1197                              "Sending TERMINATE to %s",
1198                              rd_kafka_broker_name(rkb));
1199                 /* Send op to trigger queue/io wake-up.
1200                  * The op itself is (likely) ignored by the broker thread. */
1201                 rd_kafka_q_enq(rkb->rkb_ops,
1202                                rd_kafka_op_new(RD_KAFKA_OP_TERMINATE));
1203 
1204 #ifndef _WIN32
1205                 /* Interrupt IO threads to speed up termination. */
1206                 if (rk->rk_conf.term_sig)
1207 			pthread_kill(rkb->rkb_thread, rk->rk_conf.term_sig);
1208 #endif
1209 
1210                 rd_kafka_broker_destroy(rkb);
1211 
1212                 rd_kafka_wrlock(rk);
1213         }
1214 
1215         if (rk->rk_clusterid) {
1216                 rd_free(rk->rk_clusterid);
1217                 rk->rk_clusterid = NULL;
1218         }
1219 
1220         /* Destroy coord requests */
1221         rd_kafka_coord_reqs_term(rk);
1222 
1223         /* Destroy the coordinator cache */
1224         rd_kafka_coord_cache_destroy(&rk->rk_coord_cache);
1225 
1226         /* Purge metadata cache.
1227          * #3279:
1228          * We mustn't call cache_destroy() here since there might be outstanding
1229          * broker rkos that hold references to the metadata cache lock,
1230          * and these brokers are destroyed below. So to avoid a circular
1231          * dependency refcnt deadlock we first purge the cache here
1232          * and destroy it after the brokers are destroyed. */
1233         rd_kafka_metadata_cache_purge(rk, rd_true/*observers too*/);
1234 
1235         rd_kafka_wrunlock(rk);
1236 
1237         mtx_lock(&rk->rk_broker_state_change_lock);
1238         /* Purge broker state change waiters */
1239         rd_list_destroy(&rk->rk_broker_state_change_waiters);
1240         mtx_unlock(&rk->rk_broker_state_change_lock);
1241 
1242         if (rk->rk_type == RD_KAFKA_CONSUMER) {
1243                 if (rk->rk_consumer.q)
1244                         rd_kafka_q_disable(rk->rk_consumer.q);
1245         }
1246 
1247         rd_kafka_dbg(rk, GENERIC, "TERMINATE",
1248                      "Purging reply queue");
1249 
1250 	/* Purge op-queue */
1251         rd_kafka_q_disable(rk->rk_rep);
1252 	rd_kafka_q_purge(rk->rk_rep);
1253 
1254 	/* Loose our special reference to the internal broker. */
1255         mtx_lock(&rk->rk_internal_rkb_lock);
1256 	if ((rkb = rk->rk_internal_rkb)) {
1257                 rd_kafka_dbg(rk, GENERIC, "TERMINATE",
1258                              "Decommissioning internal broker");
1259 
1260                 /* Send op to trigger queue wake-up. */
1261                 rd_kafka_q_enq(rkb->rkb_ops,
1262                                rd_kafka_op_new(RD_KAFKA_OP_TERMINATE));
1263 
1264                 rk->rk_internal_rkb = NULL;
1265                 thrd = rd_malloc(sizeof(*thrd));
1266                 *thrd = rkb->rkb_thread;
1267                 rd_list_add(&wait_thrds, thrd);
1268         }
1269         mtx_unlock(&rk->rk_internal_rkb_lock);
1270 	if (rkb)
1271 		rd_kafka_broker_destroy(rkb);
1272 
1273 
1274         rd_kafka_dbg(rk, GENERIC, "TERMINATE",
1275                      "Join %d broker thread(s)", rd_list_cnt(&wait_thrds));
1276 
1277         /* Join broker threads */
1278         RD_LIST_FOREACH(thrd, &wait_thrds, i) {
1279                 int res;
1280                 if (thrd_join(*thrd, &res) != thrd_success)
1281                         ;
1282                 rd_free(thrd);
1283         }
1284 
1285         rd_list_destroy(&wait_thrds);
1286 
1287         /* Destroy mock cluster */
1288         if (rk->rk_mock.cluster)
1289                 rd_kafka_mock_cluster_destroy(rk->rk_mock.cluster);
1290 
1291         if (rd_atomic32_get(&rk->rk_mock.cluster_cnt) > 0) {
1292                 rd_kafka_log(rk, LOG_EMERG, "MOCK",
1293                              "%d mock cluster(s) still active: "
1294                              "must be explicitly destroyed with "
1295                              "rd_kafka_mock_cluster_destroy() prior to "
1296                              "terminating the rd_kafka_t instance",
1297                              (int)rd_atomic32_get(&rk->rk_mock.cluster_cnt));
1298                 rd_assert(!*"All mock clusters must be destroyed prior to "
1299                           "rd_kafka_t destroy");
1300         }
1301 
1302         /* Destroy metadata cache */
1303         rd_kafka_wrlock(rk);
1304         rd_kafka_metadata_cache_destroy(rk);
1305         rd_kafka_wrunlock(rk);
1306 }
1307 
1308 /**
1309  * @brief Buffer state for stats emitter
1310  */
1311 struct _stats_emit {
1312         char   *buf;      /* Pointer to allocated buffer */
1313         size_t  size;     /* Current allocated size of buf */
1314         size_t  of;       /* Current write-offset in buf */
1315 };
1316 
1317 
1318 /* Stats buffer printf. Requires a (struct _stats_emit *)st variable in the
1319  * current scope. */
1320 #define _st_printf(...) do {                                            \
1321                 ssize_t _r;                                             \
1322                 ssize_t _rem = st->size - st->of;                       \
1323                 _r = rd_snprintf(st->buf+st->of, _rem, __VA_ARGS__);    \
1324                 if (_r >= _rem) {                                       \
1325                         st->size *= 2;                                  \
1326                         _rem = st->size - st->of;                       \
1327                         st->buf = rd_realloc(st->buf, st->size);        \
1328                         _r = rd_snprintf(st->buf+st->of, _rem, __VA_ARGS__); \
1329                 }                                                       \
1330                 st->of += _r;                                           \
1331         } while (0)
1332 
1333 struct _stats_total {
1334         int64_t tx;          /**< broker.tx */
1335         int64_t tx_bytes;    /**< broker.tx_bytes */
1336         int64_t rx;          /**< broker.rx */
1337         int64_t rx_bytes;    /**< broker.rx_bytes */
1338         int64_t txmsgs;      /**< partition.txmsgs */
1339         int64_t txmsg_bytes; /**< partition.txbytes */
1340         int64_t rxmsgs;      /**< partition.rxmsgs */
1341         int64_t rxmsg_bytes; /**< partition.rxbytes */
1342 };
1343 
1344 
1345 
1346 /**
1347  * @brief Rollover and emit an average window.
1348  */
rd_kafka_stats_emit_avg(struct _stats_emit * st,const char * name,rd_avg_t * src_avg)1349 static RD_INLINE void rd_kafka_stats_emit_avg (struct _stats_emit *st,
1350                                                const char *name,
1351                                                rd_avg_t *src_avg) {
1352         rd_avg_t avg;
1353 
1354         rd_avg_rollover(&avg, src_avg);
1355         _st_printf(
1356                 "\"%s\": {"
1357                 " \"min\":%"PRId64","
1358                 " \"max\":%"PRId64","
1359                 " \"avg\":%"PRId64","
1360                 " \"sum\":%"PRId64","
1361                 " \"stddev\": %"PRId64","
1362                 " \"p50\": %"PRId64","
1363                 " \"p75\": %"PRId64","
1364                 " \"p90\": %"PRId64","
1365                 " \"p95\": %"PRId64","
1366                 " \"p99\": %"PRId64","
1367                 " \"p99_99\": %"PRId64","
1368                 " \"outofrange\": %"PRId64","
1369                 " \"hdrsize\": %"PRId32","
1370                 " \"cnt\":%i "
1371                 "}, ",
1372                 name,
1373                 avg.ra_v.minv,
1374                 avg.ra_v.maxv,
1375                 avg.ra_v.avg,
1376                 avg.ra_v.sum,
1377                 (int64_t)avg.ra_hist.stddev,
1378                 avg.ra_hist.p50,
1379                 avg.ra_hist.p75,
1380                 avg.ra_hist.p90,
1381                 avg.ra_hist.p95,
1382                 avg.ra_hist.p99,
1383                 avg.ra_hist.p99_99,
1384                 avg.ra_hist.oor,
1385                 avg.ra_hist.hdrsize,
1386                 avg.ra_v.cnt);
1387         rd_avg_destroy(&avg);
1388 }
1389 
1390 /**
1391  * Emit stats for toppar
1392  */
rd_kafka_stats_emit_toppar(struct _stats_emit * st,struct _stats_total * total,rd_kafka_toppar_t * rktp,int first)1393 static RD_INLINE void rd_kafka_stats_emit_toppar (struct _stats_emit *st,
1394                                                   struct _stats_total *total,
1395                                                   rd_kafka_toppar_t *rktp,
1396                                                   int first) {
1397         rd_kafka_t *rk = rktp->rktp_rkt->rkt_rk;
1398         int64_t end_offset;
1399         int64_t consumer_lag = -1;
1400         int64_t consumer_lag_stored = -1;
1401         struct offset_stats offs;
1402         int32_t broker_id = -1;
1403 
1404         rd_kafka_toppar_lock(rktp);
1405 
1406         if (rktp->rktp_broker) {
1407                 rd_kafka_broker_lock(rktp->rktp_broker);
1408                 broker_id = rktp->rktp_broker->rkb_nodeid;
1409                 rd_kafka_broker_unlock(rktp->rktp_broker);
1410         }
1411 
1412         /* Grab a copy of the latest finalized offset stats */
1413         offs = rktp->rktp_offsets_fin;
1414 
1415         end_offset = (rk->rk_conf.isolation_level == RD_KAFKA_READ_COMMITTED)
1416                 ? rktp->rktp_ls_offset
1417                 : rktp->rktp_hi_offset;
1418 
1419         /* Calculate consumer_lag by using the highest offset
1420          * of stored_offset (the last message passed to application + 1, or
1421          * if enable.auto.offset.store=false the last message manually stored),
1422          * or the committed_offset (the last message committed by this or
1423          * another consumer).
1424          * Using stored_offset allows consumer_lag to be up to date even if
1425          * offsets are not (yet) committed.
1426          */
1427         if (end_offset != RD_KAFKA_OFFSET_INVALID) {
1428                 if (rktp->rktp_stored_offset >= 0 &&
1429                     rktp->rktp_stored_offset <= end_offset)
1430                         consumer_lag_stored =
1431                                 end_offset - rktp->rktp_stored_offset;
1432                 if (rktp->rktp_committed_offset >= 0 &&
1433                     rktp->rktp_committed_offset <= end_offset)
1434                         consumer_lag = end_offset - rktp->rktp_committed_offset;
1435         }
1436 
1437 	_st_printf("%s\"%"PRId32"\": { "
1438 		   "\"partition\":%"PRId32", "
1439 		   "\"broker\":%"PRId32", "
1440 		   "\"leader\":%"PRId32", "
1441 		   "\"desired\":%s, "
1442 		   "\"unknown\":%s, "
1443 		   "\"msgq_cnt\":%i, "
1444 		   "\"msgq_bytes\":%"PRIusz", "
1445 		   "\"xmit_msgq_cnt\":%i, "
1446 		   "\"xmit_msgq_bytes\":%"PRIusz", "
1447 		   "\"fetchq_cnt\":%i, "
1448 		   "\"fetchq_size\":%"PRIu64", "
1449 		   "\"fetch_state\":\"%s\", "
1450 		   "\"query_offset\":%"PRId64", "
1451 		   "\"next_offset\":%"PRId64", "
1452 		   "\"app_offset\":%"PRId64", "
1453 		   "\"stored_offset\":%"PRId64", "
1454 		   "\"commited_offset\":%"PRId64", " /*FIXME: issue #80 */
1455 		   "\"committed_offset\":%"PRId64", "
1456 		   "\"eof_offset\":%"PRId64", "
1457 		   "\"lo_offset\":%"PRId64", "
1458 		   "\"hi_offset\":%"PRId64", "
1459                    "\"ls_offset\":%"PRId64", "
1460                    "\"consumer_lag\":%"PRId64", "
1461                    "\"consumer_lag_stored\":%"PRId64", "
1462 		   "\"txmsgs\":%"PRIu64", "
1463 		   "\"txbytes\":%"PRIu64", "
1464                    "\"rxmsgs\":%"PRIu64", "
1465                    "\"rxbytes\":%"PRIu64", "
1466                    "\"msgs\": %"PRIu64", "
1467                    "\"rx_ver_drops\": %"PRIu64", "
1468                    "\"msgs_inflight\": %"PRId32", "
1469                    "\"next_ack_seq\": %"PRId32", "
1470                    "\"next_err_seq\": %"PRId32", "
1471                    "\"acked_msgid\": %"PRIu64
1472                    "} ",
1473 		   first ? "" : ", ",
1474 		   rktp->rktp_partition,
1475 		   rktp->rktp_partition,
1476                    broker_id,
1477                    rktp->rktp_leader_id,
1478 		   (rktp->rktp_flags&RD_KAFKA_TOPPAR_F_DESIRED)?"true":"false",
1479 		   (rktp->rktp_flags&RD_KAFKA_TOPPAR_F_UNKNOWN)?"true":"false",
1480                    rd_kafka_msgq_len(&rktp->rktp_msgq),
1481 		   rd_kafka_msgq_size(&rktp->rktp_msgq),
1482                    /* FIXME: xmit_msgq is local to the broker thread. */
1483                    0,
1484                    (size_t)0,
1485 		   rd_kafka_q_len(rktp->rktp_fetchq),
1486 		   rd_kafka_q_size(rktp->rktp_fetchq),
1487 		   rd_kafka_fetch_states[rktp->rktp_fetch_state],
1488 		   rktp->rktp_query_offset,
1489                    offs.fetch_offset,
1490 		   rktp->rktp_app_offset,
1491 		   rktp->rktp_stored_offset,
1492 		   rktp->rktp_committed_offset, /* FIXME: issue #80 */
1493 		   rktp->rktp_committed_offset,
1494                    offs.eof_offset,
1495 		   rktp->rktp_lo_offset,
1496 		   rktp->rktp_hi_offset,
1497                    rktp->rktp_ls_offset,
1498                    consumer_lag,
1499                    consumer_lag_stored,
1500                    rd_atomic64_get(&rktp->rktp_c.tx_msgs),
1501                    rd_atomic64_get(&rktp->rktp_c.tx_msg_bytes),
1502                    rd_atomic64_get(&rktp->rktp_c.rx_msgs),
1503                    rd_atomic64_get(&rktp->rktp_c.rx_msg_bytes),
1504                    rk->rk_type == RD_KAFKA_PRODUCER ?
1505                    rd_atomic64_get(&rktp->rktp_c.producer_enq_msgs) :
1506                    rd_atomic64_get(&rktp->rktp_c.rx_msgs), /* legacy, same as rx_msgs */
1507                    rd_atomic64_get(&rktp->rktp_c.rx_ver_drops),
1508                    rd_atomic32_get(&rktp->rktp_msgs_inflight),
1509                    rktp->rktp_eos.next_ack_seq,
1510                    rktp->rktp_eos.next_err_seq,
1511                    rktp->rktp_eos.acked_msgid);
1512 
1513         if (total) {
1514                 total->txmsgs      += rd_atomic64_get(&rktp->rktp_c.tx_msgs);
1515                 total->txmsg_bytes += rd_atomic64_get(&rktp->rktp_c.tx_msg_bytes);
1516                 total->rxmsgs      += rd_atomic64_get(&rktp->rktp_c.rx_msgs);
1517                 total->rxmsg_bytes += rd_atomic64_get(&rktp->rktp_c.rx_msg_bytes);
1518         }
1519 
1520         rd_kafka_toppar_unlock(rktp);
1521 }
1522 
1523 /**
1524  * @brief Emit broker request type stats
1525  */
rd_kafka_stats_emit_broker_reqs(struct _stats_emit * st,rd_kafka_broker_t * rkb)1526 static void rd_kafka_stats_emit_broker_reqs (struct _stats_emit *st,
1527                                              rd_kafka_broker_t *rkb) {
1528         /* Filter out request types that will never be sent by the client. */
1529         static const rd_bool_t filter[4][RD_KAFKAP__NUM] = {
1530                 [RD_KAFKA_PRODUCER] = {
1531                         [RD_KAFKAP_Fetch] = rd_true,
1532                         [RD_KAFKAP_OffsetCommit] = rd_true,
1533                         [RD_KAFKAP_OffsetFetch] = rd_true,
1534                         [RD_KAFKAP_JoinGroup] = rd_true,
1535                         [RD_KAFKAP_Heartbeat] = rd_true,
1536                         [RD_KAFKAP_LeaveGroup] = rd_true,
1537                         [RD_KAFKAP_SyncGroup] = rd_true
1538                 },
1539                 [RD_KAFKA_CONSUMER] = {
1540                         [RD_KAFKAP_Produce] = rd_true,
1541                         [RD_KAFKAP_InitProducerId] = rd_true,
1542                         /* Transactional producer */
1543                         [RD_KAFKAP_AddPartitionsToTxn] = rd_true,
1544                         [RD_KAFKAP_AddOffsetsToTxn] = rd_true,
1545                         [RD_KAFKAP_EndTxn] = rd_true,
1546                         [RD_KAFKAP_TxnOffsetCommit] = rd_true,
1547                 },
1548                 [2/*any client type*/] = {
1549                         [RD_KAFKAP_UpdateMetadata] = rd_true,
1550                         [RD_KAFKAP_ControlledShutdown] = rd_true,
1551                         [RD_KAFKAP_LeaderAndIsr] = rd_true,
1552                         [RD_KAFKAP_StopReplica] = rd_true,
1553                         [RD_KAFKAP_OffsetForLeaderEpoch] = rd_true,
1554 
1555                         [RD_KAFKAP_WriteTxnMarkers] = rd_true,
1556 
1557                         [RD_KAFKAP_AlterReplicaLogDirs] = rd_true,
1558                         [RD_KAFKAP_DescribeLogDirs] = rd_true,
1559 
1560                         [RD_KAFKAP_SaslAuthenticate] = rd_false,
1561 
1562                         [RD_KAFKAP_CreateDelegationToken] = rd_true,
1563                         [RD_KAFKAP_RenewDelegationToken] = rd_true,
1564                         [RD_KAFKAP_ExpireDelegationToken] = rd_true,
1565                         [RD_KAFKAP_DescribeDelegationToken] = rd_true,
1566                         [RD_KAFKAP_IncrementalAlterConfigs] = rd_true,
1567                         [RD_KAFKAP_ElectLeaders] = rd_true,
1568                         [RD_KAFKAP_AlterPartitionReassignments] = rd_true,
1569                         [RD_KAFKAP_ListPartitionReassignments] = rd_true,
1570                         [RD_KAFKAP_AlterUserScramCredentials] = rd_true,
1571                         [RD_KAFKAP_Vote] = rd_true,
1572                         [RD_KAFKAP_BeginQuorumEpoch] = rd_true,
1573                         [RD_KAFKAP_EndQuorumEpoch] = rd_true,
1574                         [RD_KAFKAP_DescribeQuorum] = rd_true,
1575                         [RD_KAFKAP_AlterIsr] = rd_true,
1576                         [RD_KAFKAP_UpdateFeatures] = rd_true,
1577                         [RD_KAFKAP_Envelope] = rd_true,
1578                 },
1579                 [3/*hide-unless-non-zero*/] = {
1580                         /* Hide Admin requests unless they've been used */
1581                         [RD_KAFKAP_CreateTopics] =  rd_true,
1582                         [RD_KAFKAP_DeleteTopics] =  rd_true,
1583                         [RD_KAFKAP_DeleteRecords] =  rd_true,
1584                         [RD_KAFKAP_CreatePartitions] =  rd_true,
1585                         [RD_KAFKAP_DescribeAcls] = rd_true,
1586                         [RD_KAFKAP_CreateAcls] = rd_true,
1587                         [RD_KAFKAP_DeleteAcls] = rd_true,
1588                         [RD_KAFKAP_DescribeConfigs] = rd_true,
1589                         [RD_KAFKAP_AlterConfigs] = rd_true,
1590                         [RD_KAFKAP_DeleteGroups] = rd_true,
1591                         [RD_KAFKAP_ListGroups] = rd_true,
1592                         [RD_KAFKAP_DescribeGroups] = rd_true
1593                 }
1594         };
1595         int i;
1596         int cnt = 0;
1597 
1598         _st_printf("\"req\": { ");
1599         for (i = 0 ; i < RD_KAFKAP__NUM ; i++) {
1600                 int64_t v;
1601 
1602                 if (filter[rkb->rkb_rk->rk_type][i] || filter[2][i])
1603                         continue;
1604 
1605                 v = rd_atomic64_get(&rkb->rkb_c.reqtype[i]);
1606                 if (!v && filter[3][i])
1607                         continue; /* Filter out zero values */
1608 
1609                 _st_printf("%s\"%s\": %"PRId64,
1610                            cnt > 0 ? ", " : "",
1611                            rd_kafka_ApiKey2str(i), v);
1612 
1613                 cnt++;
1614         }
1615         _st_printf(" }, ");
1616 }
1617 
1618 
1619 /**
1620  * Emit all statistics
1621  */
rd_kafka_stats_emit_all(rd_kafka_t * rk)1622 static void rd_kafka_stats_emit_all (rd_kafka_t *rk) {
1623 	rd_kafka_broker_t *rkb;
1624 	rd_kafka_topic_t *rkt;
1625 	rd_ts_t now;
1626 	rd_kafka_op_t *rko;
1627 	unsigned int tot_cnt;
1628 	size_t tot_size;
1629         rd_kafka_resp_err_t err;
1630         struct _stats_emit stx = { .size = 1024*10 };
1631         struct _stats_emit *st = &stx;
1632         struct _stats_total total = {0};
1633 
1634         st->buf = rd_malloc(st->size);
1635 
1636 
1637 	rd_kafka_curr_msgs_get(rk, &tot_cnt, &tot_size);
1638 	rd_kafka_rdlock(rk);
1639 
1640 	now = rd_clock();
1641 	_st_printf("{ "
1642                    "\"name\": \"%s\", "
1643                    "\"client_id\": \"%s\", "
1644                    "\"type\": \"%s\", "
1645 		   "\"ts\":%"PRId64", "
1646 		   "\"time\":%lli, "
1647                    "\"age\":%"PRId64", "
1648 		   "\"replyq\":%i, "
1649                    "\"msg_cnt\":%u, "
1650 		   "\"msg_size\":%"PRIusz", "
1651                    "\"msg_max\":%u, "
1652 		   "\"msg_size_max\":%"PRIusz", "
1653                    "\"simple_cnt\":%i, "
1654                    "\"metadata_cache_cnt\":%i, "
1655 		   "\"brokers\":{ "/*open brokers*/,
1656                    rk->rk_name,
1657                    rk->rk_conf.client_id_str,
1658                    rd_kafka_type2str(rk->rk_type),
1659 		   now,
1660 		   (signed long long)time(NULL),
1661                    now - rk->rk_ts_created,
1662 		   rd_kafka_q_len(rk->rk_rep),
1663 		   tot_cnt, tot_size,
1664 		   rk->rk_curr_msgs.max_cnt, rk->rk_curr_msgs.max_size,
1665                    rd_atomic32_get(&rk->rk_simple_cnt),
1666                    rk->rk_metadata_cache.rkmc_cnt);
1667 
1668 
1669 	TAILQ_FOREACH(rkb, &rk->rk_brokers, rkb_link) {
1670 		rd_kafka_toppar_t *rktp;
1671                 rd_ts_t txidle = -1, rxidle = -1;
1672 
1673 		rd_kafka_broker_lock(rkb);
1674 
1675                 if (rkb->rkb_state >= RD_KAFKA_BROKER_STATE_UP) {
1676                         /* Calculate tx and rx idle time in usecs */
1677                         txidle = rd_atomic64_get(&rkb->rkb_c.ts_send);
1678                         rxidle = rd_atomic64_get(&rkb->rkb_c.ts_recv);
1679 
1680                         if (txidle)
1681                                 txidle = RD_MAX(now - txidle, 0);
1682                         else
1683                                 txidle = -1;
1684 
1685                         if (rxidle)
1686                                 rxidle = RD_MAX(now - rxidle, 0);
1687                         else
1688                                 rxidle = -1;
1689                 }
1690 
1691 		_st_printf("%s\"%s\": { "/*open broker*/
1692 			   "\"name\":\"%s\", "
1693 			   "\"nodeid\":%"PRId32", "
1694                            "\"nodename\":\"%s\", "
1695                            "\"source\":\"%s\", "
1696 			   "\"state\":\"%s\", "
1697                            "\"stateage\":%"PRId64", "
1698 			   "\"outbuf_cnt\":%i, "
1699 			   "\"outbuf_msg_cnt\":%i, "
1700 			   "\"waitresp_cnt\":%i, "
1701 			   "\"waitresp_msg_cnt\":%i, "
1702 			   "\"tx\":%"PRIu64", "
1703 			   "\"txbytes\":%"PRIu64", "
1704 			   "\"txerrs\":%"PRIu64", "
1705 			   "\"txretries\":%"PRIu64", "
1706                            "\"txidle\":%"PRId64", "
1707 			   "\"req_timeouts\":%"PRIu64", "
1708 			   "\"rx\":%"PRIu64", "
1709 			   "\"rxbytes\":%"PRIu64", "
1710 			   "\"rxerrs\":%"PRIu64", "
1711                            "\"rxcorriderrs\":%"PRIu64", "
1712                            "\"rxpartial\":%"PRIu64", "
1713                            "\"rxidle\":%"PRId64", "
1714                            "\"zbuf_grow\":%"PRIu64", "
1715                            "\"buf_grow\":%"PRIu64", "
1716                            "\"wakeups\":%"PRIu64", "
1717                            "\"connects\":%"PRId32", "
1718                            "\"disconnects\":%"PRId32", ",
1719 			   rkb == TAILQ_FIRST(&rk->rk_brokers) ? "" : ", ",
1720 			   rkb->rkb_name,
1721 			   rkb->rkb_name,
1722 			   rkb->rkb_nodeid,
1723                            rkb->rkb_nodename,
1724                            rd_kafka_confsource2str(rkb->rkb_source),
1725 			   rd_kafka_broker_state_names[rkb->rkb_state],
1726                            rkb->rkb_ts_state ? now - rkb->rkb_ts_state : 0,
1727 			   rd_atomic32_get(&rkb->rkb_outbufs.rkbq_cnt),
1728 			   rd_atomic32_get(&rkb->rkb_outbufs.rkbq_msg_cnt),
1729 			   rd_atomic32_get(&rkb->rkb_waitresps.rkbq_cnt),
1730 			   rd_atomic32_get(&rkb->rkb_waitresps.rkbq_msg_cnt),
1731 			   rd_atomic64_get(&rkb->rkb_c.tx),
1732 			   rd_atomic64_get(&rkb->rkb_c.tx_bytes),
1733 			   rd_atomic64_get(&rkb->rkb_c.tx_err),
1734 			   rd_atomic64_get(&rkb->rkb_c.tx_retries),
1735                            txidle,
1736 			   rd_atomic64_get(&rkb->rkb_c.req_timeouts),
1737 			   rd_atomic64_get(&rkb->rkb_c.rx),
1738 			   rd_atomic64_get(&rkb->rkb_c.rx_bytes),
1739 			   rd_atomic64_get(&rkb->rkb_c.rx_err),
1740 			   rd_atomic64_get(&rkb->rkb_c.rx_corrid_err),
1741 			   rd_atomic64_get(&rkb->rkb_c.rx_partial),
1742                            rxidle,
1743                            rd_atomic64_get(&rkb->rkb_c.zbuf_grow),
1744                            rd_atomic64_get(&rkb->rkb_c.buf_grow),
1745                            rd_atomic64_get(&rkb->rkb_c.wakeups),
1746                            rd_atomic32_get(&rkb->rkb_c.connects),
1747                            rd_atomic32_get(&rkb->rkb_c.disconnects));
1748 
1749                 total.tx       += rd_atomic64_get(&rkb->rkb_c.tx);
1750                 total.tx_bytes += rd_atomic64_get(&rkb->rkb_c.tx_bytes);
1751                 total.rx       += rd_atomic64_get(&rkb->rkb_c.rx);
1752                 total.rx_bytes += rd_atomic64_get(&rkb->rkb_c.rx_bytes);
1753 
1754                 rd_kafka_stats_emit_avg(st, "int_latency",
1755                                         &rkb->rkb_avg_int_latency);
1756                 rd_kafka_stats_emit_avg(st, "outbuf_latency",
1757                                         &rkb->rkb_avg_outbuf_latency);
1758                 rd_kafka_stats_emit_avg(st, "rtt", &rkb->rkb_avg_rtt);
1759                 rd_kafka_stats_emit_avg(st, "throttle", &rkb->rkb_avg_throttle);
1760 
1761                 rd_kafka_stats_emit_broker_reqs(st, rkb);
1762 
1763                 _st_printf("\"toppars\":{ "/*open toppars*/);
1764 
1765 		TAILQ_FOREACH(rktp, &rkb->rkb_toppars, rktp_rkblink) {
1766 			_st_printf("%s\"%.*s-%"PRId32"\": { "
1767 				   "\"topic\":\"%.*s\", "
1768 				   "\"partition\":%"PRId32"} ",
1769 				   rktp==TAILQ_FIRST(&rkb->rkb_toppars)?"":", ",
1770 				   RD_KAFKAP_STR_PR(rktp->rktp_rkt->rkt_topic),
1771                                    rktp->rktp_partition,
1772 				   RD_KAFKAP_STR_PR(rktp->rktp_rkt->rkt_topic),
1773 				   rktp->rktp_partition);
1774 		}
1775 
1776 		rd_kafka_broker_unlock(rkb);
1777 
1778 		_st_printf("} "/*close toppars*/
1779 			   "} "/*close broker*/);
1780 	}
1781 
1782 
1783 	_st_printf("}, " /* close "brokers" array */
1784 		   "\"topics\":{ ");
1785 
1786 	TAILQ_FOREACH(rkt, &rk->rk_topics, rkt_link) {
1787                 rd_kafka_toppar_t *rktp;
1788 		int i, j;
1789 
1790 		rd_kafka_topic_rdlock(rkt);
1791 		_st_printf("%s\"%.*s\": { "
1792 			   "\"topic\":\"%.*s\", "
1793                            "\"age\":%"PRId64", "
1794 			   "\"metadata_age\":%"PRId64", ",
1795 			   rkt==TAILQ_FIRST(&rk->rk_topics)?"":", ",
1796 			   RD_KAFKAP_STR_PR(rkt->rkt_topic),
1797 			   RD_KAFKAP_STR_PR(rkt->rkt_topic),
1798                            (now - rkt->rkt_ts_create)/1000,
1799 			   rkt->rkt_ts_metadata ?
1800 			   (now - rkt->rkt_ts_metadata)/1000 : 0);
1801 
1802                 rd_kafka_stats_emit_avg(st, "batchsize",
1803                                         &rkt->rkt_avg_batchsize);
1804                 rd_kafka_stats_emit_avg(st, "batchcnt",
1805                                         &rkt->rkt_avg_batchcnt);
1806 
1807                 _st_printf("\"partitions\":{ " /*open partitions*/);
1808 
1809                 for (i = 0 ; i < rkt->rkt_partition_cnt ; i++)
1810                         rd_kafka_stats_emit_toppar(st, &total, rkt->rkt_p[i],
1811                                                    i == 0);
1812 
1813                 RD_LIST_FOREACH(rktp, &rkt->rkt_desp, j)
1814                         rd_kafka_stats_emit_toppar(st, &total, rktp, i+j == 0);
1815 
1816                 i += j;
1817 
1818                 if (rkt->rkt_ua)
1819                         rd_kafka_stats_emit_toppar(st, NULL, rkt->rkt_ua,
1820                                                    i++ == 0);
1821 
1822 		rd_kafka_topic_rdunlock(rkt);
1823 
1824 		_st_printf("} "/*close partitions*/
1825 			   "} "/*close topic*/);
1826 
1827 	}
1828 	_st_printf("} "/*close topics*/);
1829 
1830         if (rk->rk_cgrp) {
1831                 rd_kafka_cgrp_t *rkcg = rk->rk_cgrp;
1832                 _st_printf(", \"cgrp\": { "
1833                            "\"state\": \"%s\", "
1834                            "\"stateage\": %"PRId64", "
1835                            "\"join_state\": \"%s\", "
1836                            "\"rebalance_age\": %"PRId64", "
1837                            "\"rebalance_cnt\": %d, "
1838                            "\"rebalance_reason\": \"%s\", "
1839                            "\"assignment_size\": %d }",
1840                            rd_kafka_cgrp_state_names[rkcg->rkcg_state],
1841                            rkcg->rkcg_ts_statechange ?
1842                            (now - rkcg->rkcg_ts_statechange) / 1000 : 0,
1843                            rd_kafka_cgrp_join_state_names[rkcg->rkcg_join_state],
1844                            rkcg->rkcg_c.ts_rebalance ?
1845                            (now - rkcg->rkcg_c.ts_rebalance)/1000 : 0,
1846                            rkcg->rkcg_c.rebalance_cnt,
1847                            rkcg->rkcg_c.rebalance_reason,
1848                            rkcg->rkcg_c.assignment_size);
1849         }
1850 
1851         if (rd_kafka_is_idempotent(rk)) {
1852                 _st_printf(", \"eos\": { "
1853                            "\"idemp_state\": \"%s\", "
1854                            "\"idemp_stateage\": %"PRId64", "
1855                            "\"txn_state\": \"%s\", "
1856                            "\"txn_stateage\": %"PRId64", "
1857                            "\"txn_may_enq\": %s, "
1858                            "\"producer_id\": %"PRId64", "
1859                            "\"producer_epoch\": %hd, "
1860                            "\"epoch_cnt\": %d "
1861                            "}",
1862                            rd_kafka_idemp_state2str(rk->rk_eos.idemp_state),
1863                            (now - rk->rk_eos.ts_idemp_state) / 1000,
1864                            rd_kafka_txn_state2str(rk->rk_eos.txn_state),
1865                            (now - rk->rk_eos.ts_txn_state) / 1000,
1866                            rd_atomic32_get(&rk->rk_eos.txn_may_enq) ?
1867                            "true":"false",
1868                            rk->rk_eos.pid.id,
1869                            rk->rk_eos.pid.epoch,
1870                            rk->rk_eos.epoch_cnt);
1871         }
1872 
1873         if ((err = rd_atomic32_get(&rk->rk_fatal.err)))
1874                 _st_printf(", \"fatal\": { "
1875                            "\"error\": \"%s\", "
1876                            "\"reason\": \"%s\", "
1877                            "\"cnt\": %d "
1878                            "}",
1879                            rd_kafka_err2str(err),
1880                            rk->rk_fatal.errstr,
1881                            rk->rk_fatal.cnt);
1882 
1883 	rd_kafka_rdunlock(rk);
1884 
1885         /* Total counters */
1886         _st_printf(", "
1887                    "\"tx\":%"PRId64", "
1888                    "\"tx_bytes\":%"PRId64", "
1889                    "\"rx\":%"PRId64", "
1890                    "\"rx_bytes\":%"PRId64", "
1891                    "\"txmsgs\":%"PRId64", "
1892                    "\"txmsg_bytes\":%"PRId64", "
1893                    "\"rxmsgs\":%"PRId64", "
1894                    "\"rxmsg_bytes\":%"PRId64,
1895                    total.tx,
1896                    total.tx_bytes,
1897                    total.rx,
1898                    total.rx_bytes,
1899                    total.txmsgs,
1900                    total.txmsg_bytes,
1901                    total.rxmsgs,
1902                    total.rxmsg_bytes);
1903 
1904         _st_printf("}"/*close object*/);
1905 
1906 
1907 	/* Enqueue op for application */
1908 	rko = rd_kafka_op_new(RD_KAFKA_OP_STATS);
1909         rd_kafka_op_set_prio(rko, RD_KAFKA_PRIO_HIGH);
1910 	rko->rko_u.stats.json = st->buf;
1911 	rko->rko_u.stats.json_len = st->of;
1912 	rd_kafka_q_enq(rk->rk_rep, rko);
1913 }
1914 
1915 
1916 /**
1917  * @brief 1 second generic timer.
1918  *
1919  * @locality rdkafka main thread
1920  * @locks none
1921  */
rd_kafka_1s_tmr_cb(rd_kafka_timers_t * rkts,void * arg)1922 static void rd_kafka_1s_tmr_cb (rd_kafka_timers_t *rkts, void *arg) {
1923         rd_kafka_t *rk = rkts->rkts_rk;
1924 
1925         /* Scan topic state, message timeouts, etc. */
1926         rd_kafka_topic_scan_all(rk, rd_clock());
1927 
1928         /* Sparse connections:
1929          * try to maintain at least one connection to the cluster. */
1930         if (rk->rk_conf.sparse_connections &&
1931             rd_atomic32_get(&rk->rk_broker_up_cnt) == 0)
1932                 rd_kafka_connect_any(rk, "no cluster connection");
1933 
1934         rd_kafka_coord_cache_expire(&rk->rk_coord_cache);
1935 }
1936 
rd_kafka_stats_emit_tmr_cb(rd_kafka_timers_t * rkts,void * arg)1937 static void rd_kafka_stats_emit_tmr_cb (rd_kafka_timers_t *rkts, void *arg) {
1938         rd_kafka_t *rk = rkts->rkts_rk;
1939 	rd_kafka_stats_emit_all(rk);
1940 }
1941 
1942 
1943 /**
1944  * @brief Periodic metadata refresh callback
1945  *
1946  * @locality rdkafka main thread
1947  */
rd_kafka_metadata_refresh_cb(rd_kafka_timers_t * rkts,void * arg)1948 static void rd_kafka_metadata_refresh_cb (rd_kafka_timers_t *rkts, void *arg) {
1949         rd_kafka_t *rk = rkts->rkts_rk;
1950         rd_kafka_resp_err_t err;
1951 
1952         /* High-level consumer:
1953          * We need to query both locally known topics and subscribed topics
1954          * so that we can detect locally known topics changing partition
1955          * count or disappearing, as well as detect previously non-existent
1956          * subscribed topics now being available in the cluster. */
1957         if (rk->rk_type == RD_KAFKA_CONSUMER && rk->rk_cgrp)
1958                 err = rd_kafka_metadata_refresh_consumer_topics(
1959                         rk, NULL,
1960                         "periodic topic and broker list refresh");
1961         else
1962                 err = rd_kafka_metadata_refresh_known_topics(
1963                         rk, NULL, rd_true/*force*/,
1964                         "periodic topic and broker list refresh");
1965 
1966 
1967         if (err == RD_KAFKA_RESP_ERR__UNKNOWN_TOPIC &&
1968             rd_interval(&rk->rk_suppress.broker_metadata_refresh,
1969                         10*1000*1000 /*10s*/, 0) > 0) {
1970                 /* If there are no (locally referenced) topics
1971                  * to query, refresh the broker list.
1972                  * This avoids getting idle-disconnected for clients
1973                  * that have not yet referenced a topic and makes
1974                  * sure such a client has an up to date broker list. */
1975                 rd_kafka_metadata_refresh_brokers(
1976                         rk, NULL, "periodic broker list refresh");
1977         }
1978 }
1979 
1980 
1981 
1982 /**
1983  * @brief Wait for background threads to initialize.
1984  *
1985  * @returns the number of background threads still not initialized.
1986  *
1987  * @locality app thread calling rd_kafka_new()
1988  * @locks none
1989  */
rd_kafka_init_wait(rd_kafka_t * rk,int timeout_ms)1990 static int rd_kafka_init_wait (rd_kafka_t *rk, int timeout_ms) {
1991         struct timespec tspec;
1992         int ret;
1993 
1994         rd_timeout_init_timespec(&tspec, timeout_ms);
1995 
1996         mtx_lock(&rk->rk_init_lock);
1997         while (rk->rk_init_wait_cnt > 0 &&
1998                cnd_timedwait_abs(&rk->rk_init_cnd, &rk->rk_init_lock,
1999                                  &tspec) == thrd_success)
2000                 ;
2001         ret = rk->rk_init_wait_cnt;
2002         mtx_unlock(&rk->rk_init_lock);
2003 
2004         return ret;
2005 }
2006 
2007 
2008 /**
2009  * Main loop for Kafka handler thread.
2010  */
rd_kafka_thread_main(void * arg)2011 static int rd_kafka_thread_main (void *arg) {
2012         rd_kafka_t *rk = arg;
2013 	rd_kafka_timer_t tmr_1s = RD_ZERO_INIT;
2014 	rd_kafka_timer_t tmr_stats_emit = RD_ZERO_INIT;
2015 	rd_kafka_timer_t tmr_metadata_refresh = RD_ZERO_INIT;
2016 
2017         rd_kafka_set_thread_name("main");
2018         rd_kafka_set_thread_sysname("rdk:main");
2019 
2020         rd_kafka_interceptors_on_thread_start(rk, RD_KAFKA_THREAD_MAIN);
2021 
2022 	(void)rd_atomic32_add(&rd_kafka_thread_cnt_curr, 1);
2023 
2024 	/* Acquire lock (which was held by thread creator during creation)
2025 	 * to synchronise state. */
2026 	rd_kafka_wrlock(rk);
2027 	rd_kafka_wrunlock(rk);
2028 
2029         /* 1 second timer for topic scan and connection checking. */
2030         rd_kafka_timer_start(&rk->rk_timers, &tmr_1s, 1000000,
2031                              rd_kafka_1s_tmr_cb, NULL);
2032         if (rk->rk_conf.stats_interval_ms)
2033                 rd_kafka_timer_start(&rk->rk_timers, &tmr_stats_emit,
2034                                      rk->rk_conf.stats_interval_ms * 1000ll,
2035                                      rd_kafka_stats_emit_tmr_cb, NULL);
2036         if (rk->rk_conf.metadata_refresh_interval_ms > 0)
2037                 rd_kafka_timer_start(&rk->rk_timers, &tmr_metadata_refresh,
2038                                      rk->rk_conf.metadata_refresh_interval_ms *
2039                                      1000ll,
2040                                      rd_kafka_metadata_refresh_cb, NULL);
2041 
2042         if (rk->rk_cgrp)
2043                 rd_kafka_q_fwd_set(rk->rk_cgrp->rkcg_ops, rk->rk_ops);
2044 
2045         if (rd_kafka_is_idempotent(rk))
2046                 rd_kafka_idemp_init(rk);
2047 
2048         mtx_lock(&rk->rk_init_lock);
2049         rk->rk_init_wait_cnt--;
2050         cnd_broadcast(&rk->rk_init_cnd);
2051         mtx_unlock(&rk->rk_init_lock);
2052 
2053 	while (likely(!rd_kafka_terminating(rk) ||
2054 		      rd_kafka_q_len(rk->rk_ops) ||
2055                       (rk->rk_cgrp && (rk->rk_cgrp->rkcg_state != RD_KAFKA_CGRP_STATE_TERM)))) {
2056                 rd_ts_t sleeptime = rd_kafka_timers_next(
2057                         &rk->rk_timers, 1000*1000/*1s*/, 1/*lock*/);
2058                 rd_kafka_q_serve(rk->rk_ops, (int)(sleeptime / 1000), 0,
2059                                  RD_KAFKA_Q_CB_CALLBACK, NULL, NULL);
2060 		if (rk->rk_cgrp) /* FIXME: move to timer-triggered */
2061 			rd_kafka_cgrp_serve(rk->rk_cgrp);
2062 		rd_kafka_timers_run(&rk->rk_timers, RD_POLL_NOWAIT);
2063 	}
2064 
2065         rd_kafka_dbg(rk, GENERIC, "TERMINATE",
2066                      "Internal main thread terminating");
2067 
2068         if (rd_kafka_is_idempotent(rk))
2069                 rd_kafka_idemp_term(rk);
2070 
2071 	rd_kafka_q_disable(rk->rk_ops);
2072 	rd_kafka_q_purge(rk->rk_ops);
2073 
2074         rd_kafka_timer_stop(&rk->rk_timers, &tmr_1s, 1);
2075         if (rk->rk_conf.stats_interval_ms)
2076                 rd_kafka_timer_stop(&rk->rk_timers, &tmr_stats_emit, 1);
2077         rd_kafka_timer_stop(&rk->rk_timers, &tmr_metadata_refresh, 1);
2078 
2079         /* Synchronise state */
2080         rd_kafka_wrlock(rk);
2081         rd_kafka_wrunlock(rk);
2082 
2083         rd_kafka_interceptors_on_thread_exit(rk, RD_KAFKA_THREAD_MAIN);
2084 
2085         rd_kafka_destroy_internal(rk);
2086 
2087         rd_kafka_dbg(rk, GENERIC, "TERMINATE",
2088                      "Internal main thread termination done");
2089 
2090 	rd_atomic32_sub(&rd_kafka_thread_cnt_curr, 1);
2091 
2092 	return 0;
2093 }
2094 
2095 
rd_kafka_term_sig_handler(int sig)2096 static void rd_kafka_term_sig_handler (int sig) {
2097 	/* nop */
2098 }
2099 
2100 
rd_kafka_new(rd_kafka_type_t type,rd_kafka_conf_t * app_conf,char * errstr,size_t errstr_size)2101 rd_kafka_t *rd_kafka_new (rd_kafka_type_t type, rd_kafka_conf_t *app_conf,
2102 			  char *errstr, size_t errstr_size) {
2103 	rd_kafka_t *rk;
2104 	static rd_atomic32_t rkid;
2105         rd_kafka_conf_t *conf;
2106         rd_kafka_resp_err_t ret_err = RD_KAFKA_RESP_ERR_NO_ERROR;
2107         int ret_errno = 0;
2108         const char *conf_err;
2109 #ifndef _WIN32
2110         sigset_t newset, oldset;
2111 #endif
2112         char builtin_features[128];
2113         size_t bflen;
2114 
2115         rd_kafka_global_init();
2116 
2117         /* rd_kafka_new() takes ownership of the provided \p app_conf
2118          * object if rd_kafka_new() succeeds.
2119          * Since \p app_conf is optional we allocate a default configuration
2120          * object here if \p app_conf is NULL.
2121          * The configuration object itself is struct-copied later
2122          * leaving the default *conf pointer to be ready for freeing.
2123          * In case new() fails and app_conf was specified we will clear out
2124          * rk_conf to avoid double-freeing from destroy_internal() and the
2125          * user's eventual call to rd_kafka_conf_destroy().
2126          * This is all a bit tricky but that's the nature of
2127          * legacy interfaces. */
2128         if (!app_conf)
2129                 conf = rd_kafka_conf_new();
2130         else
2131                 conf = app_conf;
2132 
2133         /* Verify and finalize configuration */
2134         if ((conf_err = rd_kafka_conf_finalize(type, conf))) {
2135                 /* Incompatible configuration settings */
2136                 rd_snprintf(errstr, errstr_size, "%s", conf_err);
2137                 if (!app_conf)
2138                         rd_kafka_conf_destroy(conf);
2139                 rd_kafka_set_last_error(RD_KAFKA_RESP_ERR__INVALID_ARG, EINVAL);
2140                 return NULL;
2141         }
2142 
2143 
2144 	rd_kafka_global_cnt_incr();
2145 
2146 	/*
2147 	 * Set up the handle.
2148 	 */
2149 	rk = rd_calloc(1, sizeof(*rk));
2150 
2151 	rk->rk_type = type;
2152         rk->rk_ts_created = rd_clock();
2153 
2154         /* Struct-copy the config object. */
2155 	rk->rk_conf = *conf;
2156         if (!app_conf)
2157                 rd_free(conf); /* Free the base config struct only,
2158                                 * not its fields since they were copied to
2159                                 * rk_conf just above. Those fields are
2160                                 * freed from rd_kafka_destroy_internal()
2161                                 * as the rk itself is destroyed. */
2162 
2163         /* Seed PRNG, don't bother about HAVE_RAND_R, since it is pretty cheap. */
2164         if (rk->rk_conf.enable_random_seed)
2165                 call_once(&rd_kafka_global_srand_once, rd_kafka_global_srand);
2166 
2167         /* Call on_new() interceptors */
2168         rd_kafka_interceptors_on_new(rk, &rk->rk_conf);
2169 
2170 	rwlock_init(&rk->rk_lock);
2171         mtx_init(&rk->rk_internal_rkb_lock, mtx_plain);
2172 
2173 	cnd_init(&rk->rk_broker_state_change_cnd);
2174 	mtx_init(&rk->rk_broker_state_change_lock, mtx_plain);
2175         rd_list_init(&rk->rk_broker_state_change_waiters, 8,
2176                      rd_kafka_enq_once_trigger_destroy);
2177 
2178         cnd_init(&rk->rk_init_cnd);
2179         mtx_init(&rk->rk_init_lock, mtx_plain);
2180 
2181         rd_interval_init(&rk->rk_suppress.no_idemp_brokers);
2182         rd_interval_init(&rk->rk_suppress.broker_metadata_refresh);
2183         rd_interval_init(&rk->rk_suppress.sparse_connect_random);
2184         mtx_init(&rk->rk_suppress.sparse_connect_lock, mtx_plain);
2185 
2186         rd_atomic64_init(&rk->rk_ts_last_poll, rk->rk_ts_created);
2187         rd_atomic32_init(&rk->rk_flushing, 0);
2188 
2189 	rk->rk_rep = rd_kafka_q_new(rk);
2190 	rk->rk_ops = rd_kafka_q_new(rk);
2191         rk->rk_ops->rkq_serve = rd_kafka_poll_cb;
2192         rk->rk_ops->rkq_opaque = rk;
2193 
2194         if (rk->rk_conf.log_queue) {
2195                 rk->rk_logq = rd_kafka_q_new(rk);
2196                 rk->rk_logq->rkq_serve = rd_kafka_poll_cb;
2197                 rk->rk_logq->rkq_opaque = rk;
2198         }
2199 
2200 	TAILQ_INIT(&rk->rk_brokers);
2201 	TAILQ_INIT(&rk->rk_topics);
2202         rd_kafka_timers_init(&rk->rk_timers, rk, rk->rk_ops);
2203         rd_kafka_metadata_cache_init(rk);
2204         rd_kafka_coord_cache_init(&rk->rk_coord_cache,
2205                                   rk->rk_conf.metadata_max_age_ms);
2206         rd_kafka_coord_reqs_init(rk);
2207 
2208 	if (rk->rk_conf.dr_cb || rk->rk_conf.dr_msg_cb)
2209                 rk->rk_drmode = RD_KAFKA_DR_MODE_CB;
2210         else if (rk->rk_conf.enabled_events & RD_KAFKA_EVENT_DR)
2211                 rk->rk_drmode = RD_KAFKA_DR_MODE_EVENT;
2212         else
2213                 rk->rk_drmode = RD_KAFKA_DR_MODE_NONE;
2214         if (rk->rk_drmode != RD_KAFKA_DR_MODE_NONE)
2215 		rk->rk_conf.enabled_events |= RD_KAFKA_EVENT_DR;
2216 
2217 	if (rk->rk_conf.rebalance_cb)
2218 		rk->rk_conf.enabled_events |= RD_KAFKA_EVENT_REBALANCE;
2219 	if (rk->rk_conf.offset_commit_cb)
2220 		rk->rk_conf.enabled_events |= RD_KAFKA_EVENT_OFFSET_COMMIT;
2221         if (rk->rk_conf.error_cb)
2222                 rk->rk_conf.enabled_events |= RD_KAFKA_EVENT_ERROR;
2223 #if WITH_SASL_OAUTHBEARER
2224         if (rk->rk_conf.sasl.enable_oauthbearer_unsecure_jwt &&
2225             !rk->rk_conf.sasl.oauthbearer_token_refresh_cb)
2226                 rd_kafka_conf_set_oauthbearer_token_refresh_cb(
2227                         &rk->rk_conf,
2228                         rd_kafka_oauthbearer_unsecured_token);
2229 
2230         if (rk->rk_conf.sasl.oauthbearer_token_refresh_cb)
2231                 rk->rk_conf.enabled_events |=
2232                         RD_KAFKA_EVENT_OAUTHBEARER_TOKEN_REFRESH;
2233 #endif
2234 
2235         rk->rk_controllerid = -1;
2236 
2237         /* Admin client defaults */
2238         rk->rk_conf.admin.request_timeout_ms = rk->rk_conf.socket_timeout_ms;
2239 
2240 	if (rk->rk_conf.debug)
2241                 rk->rk_conf.log_level = LOG_DEBUG;
2242 
2243 	rd_snprintf(rk->rk_name, sizeof(rk->rk_name), "%s#%s-%i",
2244                     rk->rk_conf.client_id_str, rd_kafka_type2str(rk->rk_type),
2245                     rd_atomic32_add(&rkid, 1));
2246 
2247 	/* Construct clientid kafka string */
2248 	rk->rk_client_id = rd_kafkap_str_new(rk->rk_conf.client_id_str,-1);
2249 
2250         /* Convert group.id to kafka string (may be NULL) */
2251         rk->rk_group_id = rd_kafkap_str_new(rk->rk_conf.group_id_str,-1);
2252 
2253         /* Config fixups */
2254         rk->rk_conf.queued_max_msg_bytes =
2255                 (int64_t)rk->rk_conf.queued_max_msg_kbytes * 1000ll;
2256 
2257 	/* Enable api.version.request=true if fallback.broker.version
2258 	 * indicates a supporting broker. */
2259 	if (rd_kafka_ApiVersion_is_queryable(rk->rk_conf.broker_version_fallback))
2260 		rk->rk_conf.api_version_request = 1;
2261 
2262         if (rk->rk_type == RD_KAFKA_PRODUCER) {
2263                 mtx_init(&rk->rk_curr_msgs.lock, mtx_plain);
2264                 cnd_init(&rk->rk_curr_msgs.cnd);
2265                 rk->rk_curr_msgs.max_cnt =
2266                         rk->rk_conf.queue_buffering_max_msgs;
2267                 if ((unsigned long long)rk->rk_conf.
2268                     queue_buffering_max_kbytes * 1024 >
2269                     (unsigned long long)SIZE_MAX) {
2270                         rk->rk_curr_msgs.max_size = SIZE_MAX;
2271                         rd_kafka_log(rk, LOG_WARNING, "QUEUESIZE",
2272                                      "queue.buffering.max.kbytes adjusted "
2273                                      "to system SIZE_MAX limit %"PRIusz" bytes",
2274                                      rk->rk_curr_msgs.max_size);
2275                 } else {
2276                         rk->rk_curr_msgs.max_size =
2277                                 (size_t)rk->rk_conf.
2278                                 queue_buffering_max_kbytes * 1024;
2279                 }
2280         }
2281 
2282         if (rd_kafka_assignors_init(rk, errstr, errstr_size) == -1) {
2283                 ret_err = RD_KAFKA_RESP_ERR__INVALID_ARG;
2284                 ret_errno = EINVAL;
2285                 goto fail;
2286         }
2287 
2288         /* Create Mock cluster */
2289         rd_atomic32_init(&rk->rk_mock.cluster_cnt, 0);
2290         if (rk->rk_conf.mock.broker_cnt > 0) {
2291                 rk->rk_mock.cluster = rd_kafka_mock_cluster_new(
2292                         rk, rk->rk_conf.mock.broker_cnt);
2293 
2294                 if (!rk->rk_mock.cluster) {
2295                         rd_snprintf(errstr, errstr_size,
2296                                     "Failed to create mock cluster, see logs");
2297                         ret_err = RD_KAFKA_RESP_ERR__FAIL;
2298                         ret_errno = EINVAL;
2299                         goto fail;
2300                 }
2301 
2302                 rd_kafka_log(rk, LOG_NOTICE, "MOCK", "Mock cluster enabled: "
2303                              "original bootstrap.servers and security.protocol "
2304                              "ignored and replaced");
2305 
2306                 /* Overwrite bootstrap.servers and connection settings */
2307                 if (rd_kafka_conf_set(&rk->rk_conf, "bootstrap.servers",
2308                                       rd_kafka_mock_cluster_bootstraps(
2309                                               rk->rk_mock.cluster),
2310                                       NULL, 0) != RD_KAFKA_CONF_OK)
2311                         rd_assert(!"failed to replace mock bootstrap.servers");
2312 
2313                 if (rd_kafka_conf_set(&rk->rk_conf, "security.protocol",
2314                                       "plaintext", NULL, 0) != RD_KAFKA_CONF_OK)
2315                         rd_assert(!"failed to reset mock security.protocol");
2316 
2317                 rk->rk_conf.security_protocol = RD_KAFKA_PROTO_PLAINTEXT;
2318         }
2319 
2320 
2321         if (rk->rk_conf.security_protocol == RD_KAFKA_PROTO_SASL_SSL ||
2322             rk->rk_conf.security_protocol == RD_KAFKA_PROTO_SASL_PLAINTEXT) {
2323                 /* Select SASL provider */
2324                 if (rd_kafka_sasl_select_provider(rk,
2325                                                   errstr, errstr_size) == -1) {
2326                         ret_err = RD_KAFKA_RESP_ERR__INVALID_ARG;
2327                         ret_errno = EINVAL;
2328                         goto fail;
2329                 }
2330 
2331                 /* Initialize SASL provider */
2332                 if (rd_kafka_sasl_init(rk, errstr, errstr_size) == -1) {
2333                         rk->rk_conf.sasl.provider = NULL;
2334                         ret_err = RD_KAFKA_RESP_ERR__INVALID_ARG;
2335                         ret_errno = EINVAL;
2336                         goto fail;
2337                 }
2338         }
2339 
2340 #if WITH_SSL
2341         if (rk->rk_conf.security_protocol == RD_KAFKA_PROTO_SSL ||
2342             rk->rk_conf.security_protocol == RD_KAFKA_PROTO_SASL_SSL) {
2343                 /* Create SSL context */
2344                 if (rd_kafka_ssl_ctx_init(rk, errstr, errstr_size) == -1) {
2345                         ret_err = RD_KAFKA_RESP_ERR__INVALID_ARG;
2346                         ret_errno = EINVAL;
2347                         goto fail;
2348                 }
2349         }
2350 #endif
2351 
2352         if (type == RD_KAFKA_CONSUMER) {
2353                 rd_kafka_assignment_init(rk);
2354 
2355                 if (RD_KAFKAP_STR_LEN(rk->rk_group_id) > 0) {
2356                         /* Create consumer group handle */
2357                         rk->rk_cgrp = rd_kafka_cgrp_new(rk,
2358                                                         rk->rk_group_id,
2359                                                         rk->rk_client_id);
2360                         rk->rk_consumer.q =
2361                                 rd_kafka_q_keep(rk->rk_cgrp->rkcg_q);
2362                 } else {
2363                         /* Legacy consumer */
2364                         rk->rk_consumer.q = rd_kafka_q_keep(rk->rk_rep);
2365                 }
2366 
2367         } else if (type == RD_KAFKA_PRODUCER) {
2368                 rk->rk_eos.transactional_id =
2369                         rd_kafkap_str_new(rk->rk_conf.eos.transactional_id, -1);
2370         }
2371 
2372 #ifndef _WIN32
2373         /* Block all signals in newly created threads.
2374          * To avoid race condition we block all signals in the calling
2375          * thread, which the new thread will inherit its sigmask from,
2376          * and then restore the original sigmask of the calling thread when
2377          * we're done creating the thread. */
2378         sigemptyset(&oldset);
2379         sigfillset(&newset);
2380 	if (rk->rk_conf.term_sig) {
2381 		struct sigaction sa_term = {
2382 			.sa_handler = rd_kafka_term_sig_handler
2383 		};
2384 		sigaction(rk->rk_conf.term_sig, &sa_term, NULL);
2385 	}
2386         pthread_sigmask(SIG_SETMASK, &newset, &oldset);
2387 #endif
2388 
2389         mtx_lock(&rk->rk_init_lock);
2390 
2391         /* Create background thread and queue if background_event_cb()
2392          * has been configured.
2393          * Do this before creating the main thread since after
2394          * the main thread is created it is no longer trivial to error
2395          * out from rd_kafka_new(). */
2396         if (rk->rk_conf.background_event_cb) {
2397                 /* Hold off background thread until thrd_create() is done. */
2398                 rd_kafka_wrlock(rk);
2399 
2400                 rk->rk_background.q = rd_kafka_q_new(rk);
2401 
2402                 rk->rk_init_wait_cnt++;
2403 
2404                 if ((thrd_create(&rk->rk_background.thread,
2405                                  rd_kafka_background_thread_main, rk)) !=
2406                     thrd_success) {
2407                         rk->rk_init_wait_cnt--;
2408                         ret_err = RD_KAFKA_RESP_ERR__CRIT_SYS_RESOURCE;
2409                         ret_errno = errno;
2410                         if (errstr)
2411                                 rd_snprintf(errstr, errstr_size,
2412                                             "Failed to create background "
2413                                             "thread: %s (%i)",
2414                                             rd_strerror(errno), errno);
2415                         rd_kafka_wrunlock(rk);
2416                         mtx_unlock(&rk->rk_init_lock);
2417 
2418 #ifndef _WIN32
2419                         /* Restore sigmask of caller */
2420                         pthread_sigmask(SIG_SETMASK, &oldset, NULL);
2421 #endif
2422                         goto fail;
2423                 }
2424 
2425                 rd_kafka_wrunlock(rk);
2426         }
2427 
2428 
2429 
2430 	/* Lock handle here to synchronise state, i.e., hold off
2431 	 * the thread until we've finalized the handle. */
2432 	rd_kafka_wrlock(rk);
2433 
2434 	/* Create handler thread */
2435         rk->rk_init_wait_cnt++;
2436 	if ((thrd_create(&rk->rk_thread,
2437 			 rd_kafka_thread_main, rk)) != thrd_success) {
2438                 rk->rk_init_wait_cnt--;
2439                 ret_err = RD_KAFKA_RESP_ERR__CRIT_SYS_RESOURCE;
2440                 ret_errno = errno;
2441 		if (errstr)
2442 			rd_snprintf(errstr, errstr_size,
2443 				    "Failed to create thread: %s (%i)",
2444 				    rd_strerror(errno), errno);
2445 		rd_kafka_wrunlock(rk);
2446                 mtx_unlock(&rk->rk_init_lock);
2447 #ifndef _WIN32
2448                 /* Restore sigmask of caller */
2449                 pthread_sigmask(SIG_SETMASK, &oldset, NULL);
2450 #endif
2451                 goto fail;
2452         }
2453 
2454         rd_kafka_wrunlock(rk);
2455         mtx_unlock(&rk->rk_init_lock);
2456 
2457         /*
2458          * @warning `goto fail` is prohibited past this point
2459          */
2460 
2461         mtx_lock(&rk->rk_internal_rkb_lock);
2462 	rk->rk_internal_rkb = rd_kafka_broker_add(rk, RD_KAFKA_INTERNAL,
2463 						  RD_KAFKA_PROTO_PLAINTEXT,
2464 						  "", 0, RD_KAFKA_NODEID_UA);
2465         mtx_unlock(&rk->rk_internal_rkb_lock);
2466 
2467 	/* Add initial list of brokers from configuration */
2468 	if (rk->rk_conf.brokerlist) {
2469 		if (rd_kafka_brokers_add0(rk, rk->rk_conf.brokerlist) == 0)
2470 			rd_kafka_op_err(rk, RD_KAFKA_RESP_ERR__ALL_BROKERS_DOWN,
2471 					"No brokers configured");
2472 	}
2473 
2474 #ifndef _WIN32
2475 	/* Restore sigmask of caller */
2476 	pthread_sigmask(SIG_SETMASK, &oldset, NULL);
2477 #endif
2478 
2479         /* Wait for background threads to fully initialize so that
2480          * the client instance is fully functional at the time it is
2481          * returned from the constructor. */
2482         if (rd_kafka_init_wait(rk, 60*1000) != 0) {
2483                 /* This should never happen unless there is a bug
2484                  * or the OS is not scheduling the background threads.
2485                  * Either case there is no point in handling this gracefully
2486                  * in the current state since the thread joins are likely
2487                  * to hang as well. */
2488                 mtx_lock(&rk->rk_init_lock);
2489                 rd_kafka_log(rk, LOG_CRIT, "INIT",
2490                              "Failed to initialize %s: "
2491                              "%d background thread(s) did not initialize "
2492                              "within 60 seconds",
2493                              rk->rk_name, rk->rk_init_wait_cnt);
2494                 if (errstr)
2495                         rd_snprintf(errstr, errstr_size,
2496                                     "Timed out waiting for "
2497                                     "%d background thread(s) to initialize",
2498                                     rk->rk_init_wait_cnt);
2499                 mtx_unlock(&rk->rk_init_lock);
2500 
2501                 rd_kafka_set_last_error(RD_KAFKA_RESP_ERR__CRIT_SYS_RESOURCE,
2502                                         EDEADLK);
2503                 return NULL;
2504         }
2505 
2506         rk->rk_initialized = 1;
2507 
2508         bflen = sizeof(builtin_features);
2509         if (rd_kafka_conf_get(&rk->rk_conf, "builtin.features",
2510                               builtin_features, &bflen) !=
2511             RD_KAFKA_CONF_OK)
2512                 rd_snprintf(builtin_features, sizeof(builtin_features), "?");
2513         rd_kafka_dbg(rk, ALL, "INIT",
2514                      "librdkafka v%s (0x%x) %s initialized "
2515                      "(builtin.features %s, %s, debug 0x%x)",
2516                      rd_kafka_version_str(), rd_kafka_version(),
2517                      rk->rk_name,
2518                      builtin_features, BUILT_WITH,
2519                      rk->rk_conf.debug);
2520 
2521         /* Log warnings for deprecated configuration */
2522         rd_kafka_conf_warn(rk);
2523 
2524         /* Debug dump configuration */
2525         if (rk->rk_conf.debug & RD_KAFKA_DBG_CONF) {
2526                 rd_kafka_anyconf_dump_dbg(rk, _RK_GLOBAL,
2527                                        &rk->rk_conf,
2528                                        "Client configuration");
2529                 if (rk->rk_conf.topic_conf)
2530                         rd_kafka_anyconf_dump_dbg(
2531                                 rk, _RK_TOPIC,
2532                                 rk->rk_conf.topic_conf,
2533                                 "Default topic configuration");
2534         }
2535 
2536         /* Free user supplied conf's base pointer on success,
2537          * but not the actual allocated fields since the struct
2538          * will have been copied in its entirety above. */
2539         if (app_conf)
2540                 rd_free(app_conf);
2541         rd_kafka_set_last_error(0, 0);
2542 
2543         return rk;
2544 
2545 fail:
2546         /*
2547          * Error out and clean up
2548          */
2549 
2550         /*
2551          * Tell background thread to terminate and wait for it to return.
2552          */
2553         rd_atomic32_set(&rk->rk_terminate, RD_KAFKA_DESTROY_F_TERMINATE);
2554 
2555         /* Terminate SASL provider */
2556         if (rk->rk_conf.sasl.provider)
2557                 rd_kafka_sasl_term(rk);
2558 
2559         if (rk->rk_background.thread) {
2560                 int res;
2561                 thrd_join(rk->rk_background.thread, &res);
2562                 rd_kafka_q_destroy_owner(rk->rk_background.q);
2563         }
2564 
2565         /* If on_new() interceptors have been called we also need
2566          * to allow interceptor clean-up by calling on_destroy() */
2567         rd_kafka_interceptors_on_destroy(rk);
2568 
2569         /* If rk_conf is a struct-copy of the application configuration
2570          * we need to avoid rk_conf fields from being freed from
2571          * rd_kafka_destroy_internal() since they belong to app_conf.
2572          * However, there are some internal fields, such as interceptors,
2573          * that belong to rk_conf and thus needs to be cleaned up.
2574          * Legacy APIs, sigh.. */
2575         if (app_conf) {
2576                 rd_kafka_assignors_term(rk);
2577                 rd_kafka_interceptors_destroy(&rk->rk_conf);
2578                 memset(&rk->rk_conf, 0, sizeof(rk->rk_conf));
2579         }
2580 
2581         rd_kafka_destroy_internal(rk);
2582         rd_kafka_destroy_final(rk);
2583 
2584         rd_kafka_set_last_error(ret_err, ret_errno);
2585 
2586         return NULL;
2587 }
2588 
2589 
2590 
2591 
2592 /**
2593  * Counts usage of the legacy/simple consumer (rd_kafka_consume_start() with
2594  * friends) since it does not have an API for stopping the cgrp we will need to
2595  * sort that out automatically in the background when all consumption
2596  * has stopped.
2597  *
2598  * Returns 0 if a  High level consumer is already instantiated
2599  * which means a Simple consumer cannot co-operate with it, else 1.
2600  *
2601  * A rd_kafka_t handle can never migrate from simple to high-level, or
2602  * vice versa, so we dont need a ..consumer_del().
2603  */
rd_kafka_simple_consumer_add(rd_kafka_t * rk)2604 int rd_kafka_simple_consumer_add (rd_kafka_t *rk) {
2605         if (rd_atomic32_get(&rk->rk_simple_cnt) < 0)
2606                 return 0;
2607 
2608         return (int)rd_atomic32_add(&rk->rk_simple_cnt, 1);
2609 }
2610 
2611 
2612 
2613 
2614 /**
2615  * rktp fetch is split up in these parts:
2616  *   * application side:
2617  *   * broker side (handled by current leader broker thread for rktp):
2618  *          - the fetch state, initial offset, etc.
2619  *          - fetching messages, updating fetched offset, etc.
2620  *          - offset commits
2621  *
2622  * Communication between the two are:
2623  *    app side -> rdkafka main side: rktp_ops
2624  *    broker thread -> app side: rktp_fetchq
2625  *
2626  * There is no shared state between these threads, instead
2627  * state is communicated through the two op queues, and state synchronization
2628  * is performed by version barriers.
2629  *
2630  */
2631 
2632 static RD_UNUSED
rd_kafka_consume_start0(rd_kafka_topic_t * rkt,int32_t partition,int64_t offset,rd_kafka_q_t * rkq)2633 int rd_kafka_consume_start0 (rd_kafka_topic_t *rkt, int32_t partition,
2634                              int64_t offset, rd_kafka_q_t *rkq) {
2635 	rd_kafka_toppar_t *rktp;
2636 
2637 	if (partition < 0) {
2638 		rd_kafka_set_last_error(RD_KAFKA_RESP_ERR__UNKNOWN_PARTITION,
2639 					ESRCH);
2640 		return -1;
2641 	}
2642 
2643         if (!rd_kafka_simple_consumer_add(rkt->rkt_rk)) {
2644 		rd_kafka_set_last_error(RD_KAFKA_RESP_ERR__INVALID_ARG, EINVAL);
2645                 return -1;
2646         }
2647 
2648 	rd_kafka_topic_wrlock(rkt);
2649 	rktp = rd_kafka_toppar_desired_add(rkt, partition);
2650 	rd_kafka_topic_wrunlock(rkt);
2651 
2652         /* Verify offset */
2653 	if (offset == RD_KAFKA_OFFSET_BEGINNING ||
2654 	    offset == RD_KAFKA_OFFSET_END ||
2655             offset <= RD_KAFKA_OFFSET_TAIL_BASE) {
2656                 /* logical offsets */
2657 
2658 	} else if (offset == RD_KAFKA_OFFSET_STORED) {
2659 		/* offset manager */
2660 
2661                 if (rkt->rkt_conf.offset_store_method ==
2662                     RD_KAFKA_OFFSET_METHOD_BROKER &&
2663                     RD_KAFKAP_STR_IS_NULL(rkt->rkt_rk->rk_group_id)) {
2664                         /* Broker based offsets require a group id. */
2665                         rd_kafka_toppar_destroy(rktp);
2666 			rd_kafka_set_last_error(RD_KAFKA_RESP_ERR__INVALID_ARG,
2667 						EINVAL);
2668                         return -1;
2669                 }
2670 
2671 	} else if (offset < 0) {
2672 		rd_kafka_toppar_destroy(rktp);
2673 		rd_kafka_set_last_error(RD_KAFKA_RESP_ERR__INVALID_ARG,
2674 					EINVAL);
2675 		return -1;
2676 
2677         }
2678 
2679         rd_kafka_toppar_op_fetch_start(rktp, offset, rkq, RD_KAFKA_NO_REPLYQ);
2680 
2681         rd_kafka_toppar_destroy(rktp);
2682 
2683 	rd_kafka_set_last_error(0, 0);
2684 	return 0;
2685 }
2686 
2687 
2688 
2689 
rd_kafka_consume_start(rd_kafka_topic_t * app_rkt,int32_t partition,int64_t offset)2690 int rd_kafka_consume_start (rd_kafka_topic_t *app_rkt, int32_t partition,
2691 			    int64_t offset) {
2692         rd_kafka_topic_t *rkt = rd_kafka_topic_proper(app_rkt);
2693         rd_kafka_dbg(rkt->rkt_rk, TOPIC, "START",
2694                      "Start consuming partition %"PRId32,partition);
2695  	return rd_kafka_consume_start0(rkt, partition, offset, NULL);
2696 }
2697 
rd_kafka_consume_start_queue(rd_kafka_topic_t * app_rkt,int32_t partition,int64_t offset,rd_kafka_queue_t * rkqu)2698 int rd_kafka_consume_start_queue (rd_kafka_topic_t *app_rkt, int32_t partition,
2699 				  int64_t offset, rd_kafka_queue_t *rkqu) {
2700         rd_kafka_topic_t *rkt = rd_kafka_topic_proper(app_rkt);
2701 
2702  	return rd_kafka_consume_start0(rkt, partition, offset, rkqu->rkqu_q);
2703 }
2704 
2705 
2706 
2707 
rd_kafka_consume_stop0(rd_kafka_toppar_t * rktp)2708 static RD_UNUSED int rd_kafka_consume_stop0 (rd_kafka_toppar_t *rktp) {
2709         rd_kafka_q_t *tmpq = NULL;
2710         rd_kafka_resp_err_t err;
2711 
2712         rd_kafka_topic_wrlock(rktp->rktp_rkt);
2713         rd_kafka_toppar_lock(rktp);
2714 	rd_kafka_toppar_desired_del(rktp);
2715         rd_kafka_toppar_unlock(rktp);
2716 	rd_kafka_topic_wrunlock(rktp->rktp_rkt);
2717 
2718         tmpq = rd_kafka_q_new(rktp->rktp_rkt->rkt_rk);
2719 
2720         rd_kafka_toppar_op_fetch_stop(rktp, RD_KAFKA_REPLYQ(tmpq, 0));
2721 
2722         /* Synchronisation: Wait for stop reply from broker thread */
2723         err = rd_kafka_q_wait_result(tmpq, RD_POLL_INFINITE);
2724         rd_kafka_q_destroy_owner(tmpq);
2725 
2726 	rd_kafka_set_last_error(err, err ? EINVAL : 0);
2727 
2728 	return err ? -1 : 0;
2729 }
2730 
2731 
rd_kafka_consume_stop(rd_kafka_topic_t * app_rkt,int32_t partition)2732 int rd_kafka_consume_stop (rd_kafka_topic_t *app_rkt, int32_t partition) {
2733         rd_kafka_topic_t *rkt = rd_kafka_topic_proper(app_rkt);
2734 	rd_kafka_toppar_t *rktp;
2735         int r;
2736 
2737 	if (partition == RD_KAFKA_PARTITION_UA) {
2738 		rd_kafka_set_last_error(RD_KAFKA_RESP_ERR__INVALID_ARG, EINVAL);
2739 		return -1;
2740 	}
2741 
2742 	rd_kafka_topic_wrlock(rkt);
2743 	if (!(rktp = rd_kafka_toppar_get(rkt, partition, 0)) &&
2744 	    !(rktp = rd_kafka_toppar_desired_get(rkt, partition))) {
2745 		rd_kafka_topic_wrunlock(rkt);
2746 		rd_kafka_set_last_error(RD_KAFKA_RESP_ERR__UNKNOWN_PARTITION,
2747 					ESRCH);
2748 		return -1;
2749 	}
2750         rd_kafka_topic_wrunlock(rkt);
2751 
2752         r = rd_kafka_consume_stop0(rktp);
2753 	/* set_last_error() called by stop0() */
2754 
2755         rd_kafka_toppar_destroy(rktp);
2756 
2757         return r;
2758 }
2759 
2760 
2761 
rd_kafka_seek(rd_kafka_topic_t * app_rkt,int32_t partition,int64_t offset,int timeout_ms)2762 rd_kafka_resp_err_t rd_kafka_seek (rd_kafka_topic_t *app_rkt,
2763                                    int32_t partition,
2764                                    int64_t offset,
2765                                    int timeout_ms) {
2766         rd_kafka_topic_t *rkt = rd_kafka_topic_proper(app_rkt);
2767 	rd_kafka_toppar_t *rktp;
2768         rd_kafka_q_t *tmpq = NULL;
2769         rd_kafka_resp_err_t err;
2770         rd_kafka_replyq_t replyq = RD_KAFKA_NO_REPLYQ;
2771 
2772         /* FIXME: simple consumer check */
2773 
2774 	if (partition == RD_KAFKA_PARTITION_UA)
2775                 return RD_KAFKA_RESP_ERR__INVALID_ARG;
2776 
2777 	rd_kafka_topic_rdlock(rkt);
2778 	if (!(rktp = rd_kafka_toppar_get(rkt, partition, 0)) &&
2779 	    !(rktp = rd_kafka_toppar_desired_get(rkt, partition))) {
2780 		rd_kafka_topic_rdunlock(rkt);
2781                 return RD_KAFKA_RESP_ERR__UNKNOWN_PARTITION;
2782 	}
2783 	rd_kafka_topic_rdunlock(rkt);
2784 
2785         if (timeout_ms) {
2786                 tmpq = rd_kafka_q_new(rkt->rkt_rk);
2787                 replyq = RD_KAFKA_REPLYQ(tmpq, 0);
2788         }
2789 
2790         if ((err = rd_kafka_toppar_op_seek(rktp, offset, replyq))) {
2791                 if (tmpq)
2792                         rd_kafka_q_destroy_owner(tmpq);
2793                 rd_kafka_toppar_destroy(rktp);
2794                 return err;
2795         }
2796 
2797 	rd_kafka_toppar_destroy(rktp);
2798 
2799         if (tmpq) {
2800                 err = rd_kafka_q_wait_result(tmpq, timeout_ms);
2801                 rd_kafka_q_destroy_owner(tmpq);
2802                 return err;
2803         }
2804 
2805         return RD_KAFKA_RESP_ERR_NO_ERROR;
2806 }
2807 
2808 
2809 rd_kafka_error_t *
rd_kafka_seek_partitions(rd_kafka_t * rk,rd_kafka_topic_partition_list_t * partitions,int timeout_ms)2810 rd_kafka_seek_partitions (rd_kafka_t *rk,
2811                           rd_kafka_topic_partition_list_t *partitions,
2812                           int timeout_ms) {
2813         rd_kafka_q_t *tmpq = NULL;
2814         rd_kafka_topic_partition_t *rktpar;
2815         rd_ts_t abs_timeout = rd_timeout_init(timeout_ms);
2816         int cnt = 0;
2817 
2818         if (rk->rk_type != RD_KAFKA_CONSUMER)
2819                 return rd_kafka_error_new(
2820                         RD_KAFKA_RESP_ERR__INVALID_ARG,
2821                         "Must only be used on consumer instance");
2822 
2823         if (!partitions || partitions->cnt == 0)
2824                 return rd_kafka_error_new(RD_KAFKA_RESP_ERR__INVALID_ARG,
2825                                           "partitions must be specified");
2826 
2827         if (timeout_ms)
2828                 tmpq = rd_kafka_q_new(rk);
2829 
2830         RD_KAFKA_TPLIST_FOREACH(rktpar, partitions) {
2831                 rd_kafka_toppar_t *rktp;
2832                 rd_kafka_resp_err_t err;
2833 
2834                 rktp = rd_kafka_toppar_get2(rk,
2835                                             rktpar->topic,
2836                                             rktpar->partition,
2837                                             rd_false/*no-ua-on-miss*/,
2838                                             rd_false/*no-create-on-miss*/);
2839                 if (!rktp) {
2840                         rktpar->err = RD_KAFKA_RESP_ERR__UNKNOWN_PARTITION;
2841                         continue;
2842                 }
2843 
2844                 err = rd_kafka_toppar_op_seek(rktp, rktpar->offset,
2845                                               RD_KAFKA_REPLYQ(tmpq, 0));
2846                 if (err) {
2847                         rktpar->err = err;
2848                 } else {
2849                         rktpar->err = RD_KAFKA_RESP_ERR__IN_PROGRESS;
2850                         cnt++;
2851                 }
2852 
2853                 rd_kafka_toppar_destroy(rktp); /* refcnt from toppar_get2() */
2854         }
2855 
2856         if (!timeout_ms)
2857                 return NULL;
2858 
2859 
2860         while (cnt > 0) {
2861                 rd_kafka_op_t *rko;
2862 
2863                 rko = rd_kafka_q_pop(tmpq, rd_timeout_remains(abs_timeout), 0);
2864                 if (!rko) {
2865                         rd_kafka_q_destroy_owner(tmpq);
2866 
2867                         return rd_kafka_error_new(
2868                                 RD_KAFKA_RESP_ERR__TIMED_OUT,
2869                                 "Timed out waiting for %d remaining partition "
2870                                 "seek(s) to finish", cnt);
2871                 }
2872 
2873                 if (rko->rko_err == RD_KAFKA_RESP_ERR__DESTROY) {
2874                         rd_kafka_q_destroy_owner(tmpq);
2875                         rd_kafka_op_destroy(rko);
2876 
2877                         return rd_kafka_error_new(RD_KAFKA_RESP_ERR__DESTROY,
2878                                                   "Instance is terminating");
2879                 }
2880 
2881                 rd_assert(rko->rko_rktp);
2882 
2883                 rktpar = rd_kafka_topic_partition_list_find(
2884                         partitions,
2885                         rko->rko_rktp->rktp_rkt->rkt_topic->str,
2886                         rko->rko_rktp->rktp_partition);
2887                 rd_assert(rktpar);
2888 
2889                 rktpar->err = rko->rko_err;
2890 
2891                 rd_kafka_op_destroy(rko);
2892 
2893                 cnt--;
2894         }
2895 
2896         rd_kafka_q_destroy_owner(tmpq);
2897 
2898         return NULL;
2899 }
2900 
2901 
2902 
rd_kafka_consume_batch0(rd_kafka_q_t * rkq,int timeout_ms,rd_kafka_message_t ** rkmessages,size_t rkmessages_size)2903 static ssize_t rd_kafka_consume_batch0 (rd_kafka_q_t *rkq,
2904 					int timeout_ms,
2905 					rd_kafka_message_t **rkmessages,
2906 					size_t rkmessages_size) {
2907 	/* Populate application's rkmessages array. */
2908 	return rd_kafka_q_serve_rkmessages(rkq, timeout_ms,
2909 					   rkmessages, rkmessages_size);
2910 }
2911 
2912 
rd_kafka_consume_batch(rd_kafka_topic_t * app_rkt,int32_t partition,int timeout_ms,rd_kafka_message_t ** rkmessages,size_t rkmessages_size)2913 ssize_t rd_kafka_consume_batch (rd_kafka_topic_t *app_rkt, int32_t partition,
2914 				int timeout_ms,
2915 				rd_kafka_message_t **rkmessages,
2916 				size_t rkmessages_size) {
2917         rd_kafka_topic_t *rkt = rd_kafka_topic_proper(app_rkt);
2918 	rd_kafka_toppar_t *rktp;
2919 	ssize_t cnt;
2920 
2921 	/* Get toppar */
2922 	rd_kafka_topic_rdlock(rkt);
2923 	rktp = rd_kafka_toppar_get(rkt, partition, 0/*no ua on miss*/);
2924 	if (unlikely(!rktp))
2925 		rktp = rd_kafka_toppar_desired_get(rkt, partition);
2926 	rd_kafka_topic_rdunlock(rkt);
2927 
2928 	if (unlikely(!rktp)) {
2929 		/* No such toppar known */
2930 		rd_kafka_set_last_error(RD_KAFKA_RESP_ERR__UNKNOWN_PARTITION,
2931 					ESRCH);
2932 		return -1;
2933 	}
2934 
2935 	/* Populate application's rkmessages array. */
2936 	cnt = rd_kafka_q_serve_rkmessages(rktp->rktp_fetchq, timeout_ms,
2937 					  rkmessages, rkmessages_size);
2938 
2939 	rd_kafka_toppar_destroy(rktp); /* refcnt from .._get() */
2940 
2941 	rd_kafka_set_last_error(0, 0);
2942 
2943 	return cnt;
2944 }
2945 
rd_kafka_consume_batch_queue(rd_kafka_queue_t * rkqu,int timeout_ms,rd_kafka_message_t ** rkmessages,size_t rkmessages_size)2946 ssize_t rd_kafka_consume_batch_queue (rd_kafka_queue_t *rkqu,
2947 				      int timeout_ms,
2948 				      rd_kafka_message_t **rkmessages,
2949 				      size_t rkmessages_size) {
2950 	/* Populate application's rkmessages array. */
2951 	return rd_kafka_consume_batch0(rkqu->rkqu_q, timeout_ms,
2952 				       rkmessages, rkmessages_size);
2953 }
2954 
2955 
2956 struct consume_ctx {
2957 	void (*consume_cb) (rd_kafka_message_t *rkmessage, void *opaque);
2958 	void *opaque;
2959 };
2960 
2961 
2962 /**
2963  * Trampoline for application's consume_cb()
2964  */
2965 static rd_kafka_op_res_t
rd_kafka_consume_cb(rd_kafka_t * rk,rd_kafka_q_t * rkq,rd_kafka_op_t * rko,rd_kafka_q_cb_type_t cb_type,void * opaque)2966 rd_kafka_consume_cb (rd_kafka_t *rk,
2967                      rd_kafka_q_t *rkq,
2968                      rd_kafka_op_t *rko,
2969                      rd_kafka_q_cb_type_t cb_type, void *opaque) {
2970 	struct consume_ctx *ctx = opaque;
2971 	rd_kafka_message_t *rkmessage;
2972 
2973         if (unlikely(rd_kafka_op_version_outdated(rko, 0)) ||
2974             rko->rko_type == RD_KAFKA_OP_BARRIER) {
2975                 rd_kafka_op_destroy(rko);
2976                 return RD_KAFKA_OP_RES_HANDLED;
2977         }
2978 
2979 	rkmessage = rd_kafka_message_get(rko);
2980 
2981 	rd_kafka_op_offset_store(rk, rko);
2982 
2983         ctx->consume_cb(rkmessage, ctx->opaque);
2984 
2985         rd_kafka_op_destroy(rko);
2986 
2987         return RD_KAFKA_OP_RES_HANDLED;
2988 }
2989 
2990 
2991 
2992 static rd_kafka_op_res_t
rd_kafka_consume_callback0(rd_kafka_q_t * rkq,int timeout_ms,int max_cnt,void (* consume_cb)(rd_kafka_message_t * rkmessage,void * opaque),void * opaque)2993 rd_kafka_consume_callback0 (rd_kafka_q_t *rkq, int timeout_ms, int max_cnt,
2994                             void (*consume_cb) (rd_kafka_message_t
2995                                                 *rkmessage,
2996                                                 void *opaque),
2997                             void *opaque) {
2998         struct consume_ctx ctx = { .consume_cb = consume_cb, .opaque = opaque };
2999         rd_kafka_op_res_t res;
3000 
3001         if (timeout_ms)
3002                 rd_kafka_app_poll_blocking(rkq->rkq_rk);
3003 
3004         res = rd_kafka_q_serve(rkq, timeout_ms, max_cnt,
3005                                RD_KAFKA_Q_CB_RETURN,
3006                                rd_kafka_consume_cb, &ctx);
3007 
3008         rd_kafka_app_polled(rkq->rkq_rk);
3009 
3010         return res;
3011 }
3012 
3013 
rd_kafka_consume_callback(rd_kafka_topic_t * app_rkt,int32_t partition,int timeout_ms,void (* consume_cb)(rd_kafka_message_t * rkmessage,void * opaque),void * opaque)3014 int rd_kafka_consume_callback (rd_kafka_topic_t *app_rkt, int32_t partition,
3015 			       int timeout_ms,
3016 			       void (*consume_cb) (rd_kafka_message_t
3017 						   *rkmessage,
3018 						   void *opaque),
3019 			       void *opaque) {
3020         rd_kafka_topic_t *rkt = rd_kafka_topic_proper(app_rkt);
3021 	rd_kafka_toppar_t *rktp;
3022 	int r;
3023 
3024 	/* Get toppar */
3025 	rd_kafka_topic_rdlock(rkt);
3026 	rktp = rd_kafka_toppar_get(rkt, partition, 0/*no ua on miss*/);
3027 	if (unlikely(!rktp))
3028 		rktp = rd_kafka_toppar_desired_get(rkt, partition);
3029 	rd_kafka_topic_rdunlock(rkt);
3030 
3031 	if (unlikely(!rktp)) {
3032 		/* No such toppar known */
3033 		rd_kafka_set_last_error(RD_KAFKA_RESP_ERR__UNKNOWN_PARTITION,
3034 					ESRCH);
3035 		return -1;
3036 	}
3037 
3038 	r = rd_kafka_consume_callback0(rktp->rktp_fetchq, timeout_ms,
3039                                        rkt->rkt_conf.consume_callback_max_msgs,
3040 				       consume_cb, opaque);
3041 
3042 	rd_kafka_toppar_destroy(rktp);
3043 
3044 	rd_kafka_set_last_error(0, 0);
3045 
3046 	return r;
3047 }
3048 
3049 
3050 
rd_kafka_consume_callback_queue(rd_kafka_queue_t * rkqu,int timeout_ms,void (* consume_cb)(rd_kafka_message_t * rkmessage,void * opaque),void * opaque)3051 int rd_kafka_consume_callback_queue (rd_kafka_queue_t *rkqu,
3052 				     int timeout_ms,
3053 				     void (*consume_cb) (rd_kafka_message_t
3054 							 *rkmessage,
3055 							 void *opaque),
3056 				     void *opaque) {
3057 	return rd_kafka_consume_callback0(rkqu->rkqu_q, timeout_ms, 0,
3058 					  consume_cb, opaque);
3059 }
3060 
3061 
3062 /**
3063  * Serve queue 'rkq' and return one message.
3064  * By serving the queue it will also call any registered callbacks
3065  * registered for matching events, this includes consumer_cb()
3066  * in which case no message will be returned.
3067  */
rd_kafka_consume0(rd_kafka_t * rk,rd_kafka_q_t * rkq,int timeout_ms)3068 static rd_kafka_message_t *rd_kafka_consume0 (rd_kafka_t *rk,
3069                                               rd_kafka_q_t *rkq,
3070 					      int timeout_ms) {
3071 	rd_kafka_op_t *rko;
3072 	rd_kafka_message_t *rkmessage = NULL;
3073 	rd_ts_t abs_timeout = rd_timeout_init(timeout_ms);
3074 
3075         if (timeout_ms)
3076                 rd_kafka_app_poll_blocking(rk);
3077 
3078 	rd_kafka_yield_thread = 0;
3079         while ((rko = rd_kafka_q_pop(rkq,
3080                                      rd_timeout_remains_us(abs_timeout), 0))) {
3081                 rd_kafka_op_res_t res;
3082 
3083                 res = rd_kafka_poll_cb(rk, rkq, rko,
3084                                        RD_KAFKA_Q_CB_RETURN, NULL);
3085 
3086                 if (res == RD_KAFKA_OP_RES_PASS)
3087                         break;
3088 
3089                 if (unlikely(res == RD_KAFKA_OP_RES_YIELD ||
3090                              rd_kafka_yield_thread)) {
3091                         /* Callback called rd_kafka_yield(), we must
3092                          * stop dispatching the queue and return. */
3093                         rd_kafka_set_last_error(RD_KAFKA_RESP_ERR__INTR,
3094                                                 EINTR);
3095                         rd_kafka_app_polled(rk);
3096                         return NULL;
3097                 }
3098 
3099                 /* Message was handled by callback. */
3100                 continue;
3101         }
3102 
3103 	if (!rko) {
3104 		/* Timeout reached with no op returned. */
3105 		rd_kafka_set_last_error(RD_KAFKA_RESP_ERR__TIMED_OUT,
3106 					ETIMEDOUT);
3107                 rd_kafka_app_polled(rk);
3108 		return NULL;
3109 	}
3110 
3111         rd_kafka_assert(rk,
3112                         rko->rko_type == RD_KAFKA_OP_FETCH ||
3113                         rko->rko_type == RD_KAFKA_OP_CONSUMER_ERR);
3114 
3115 	/* Get rkmessage from rko */
3116 	rkmessage = rd_kafka_message_get(rko);
3117 
3118 	/* Store offset */
3119 	rd_kafka_op_offset_store(rk, rko);
3120 
3121 	rd_kafka_set_last_error(0, 0);
3122 
3123         rd_kafka_app_polled(rk);
3124 
3125 	return rkmessage;
3126 }
3127 
rd_kafka_consume(rd_kafka_topic_t * app_rkt,int32_t partition,int timeout_ms)3128 rd_kafka_message_t *rd_kafka_consume (rd_kafka_topic_t *app_rkt,
3129                                       int32_t partition,
3130 				      int timeout_ms) {
3131         rd_kafka_topic_t *rkt = rd_kafka_topic_proper(app_rkt);
3132 	rd_kafka_toppar_t *rktp;
3133 	rd_kafka_message_t *rkmessage;
3134 
3135 	rd_kafka_topic_rdlock(rkt);
3136 	rktp = rd_kafka_toppar_get(rkt, partition, 0/*no ua on miss*/);
3137 	if (unlikely(!rktp))
3138 		rktp = rd_kafka_toppar_desired_get(rkt, partition);
3139 	rd_kafka_topic_rdunlock(rkt);
3140 
3141 	if (unlikely(!rktp)) {
3142 		/* No such toppar known */
3143 		rd_kafka_set_last_error(RD_KAFKA_RESP_ERR__UNKNOWN_PARTITION,
3144 					ESRCH);
3145 		return NULL;
3146 	}
3147 
3148 	rkmessage = rd_kafka_consume0(rkt->rkt_rk,
3149                                       rktp->rktp_fetchq, timeout_ms);
3150 
3151 	rd_kafka_toppar_destroy(rktp); /* refcnt from .._get() */
3152 
3153 	return rkmessage;
3154 }
3155 
3156 
rd_kafka_consume_queue(rd_kafka_queue_t * rkqu,int timeout_ms)3157 rd_kafka_message_t *rd_kafka_consume_queue (rd_kafka_queue_t *rkqu,
3158 					    int timeout_ms) {
3159 	return rd_kafka_consume0(rkqu->rkqu_rk, rkqu->rkqu_q, timeout_ms);
3160 }
3161 
3162 
3163 
3164 
rd_kafka_poll_set_consumer(rd_kafka_t * rk)3165 rd_kafka_resp_err_t rd_kafka_poll_set_consumer (rd_kafka_t *rk) {
3166         rd_kafka_cgrp_t *rkcg;
3167 
3168         if (!(rkcg = rd_kafka_cgrp_get(rk)))
3169                 return RD_KAFKA_RESP_ERR__UNKNOWN_GROUP;
3170 
3171         rd_kafka_q_fwd_set(rk->rk_rep, rkcg->rkcg_q);
3172         return RD_KAFKA_RESP_ERR_NO_ERROR;
3173 }
3174 
3175 
3176 
3177 
rd_kafka_consumer_poll(rd_kafka_t * rk,int timeout_ms)3178 rd_kafka_message_t *rd_kafka_consumer_poll (rd_kafka_t *rk,
3179                                             int timeout_ms) {
3180         rd_kafka_cgrp_t *rkcg;
3181 
3182         if (unlikely(!(rkcg = rd_kafka_cgrp_get(rk)))) {
3183                 rd_kafka_message_t *rkmessage = rd_kafka_message_new();
3184                 rkmessage->err = RD_KAFKA_RESP_ERR__UNKNOWN_GROUP;
3185                 return rkmessage;
3186         }
3187 
3188         return rd_kafka_consume0(rk, rkcg->rkcg_q, timeout_ms);
3189 }
3190 
3191 
rd_kafka_consumer_close(rd_kafka_t * rk)3192 rd_kafka_resp_err_t rd_kafka_consumer_close (rd_kafka_t *rk) {
3193         rd_kafka_cgrp_t *rkcg;
3194         rd_kafka_op_t *rko;
3195         rd_kafka_resp_err_t err = RD_KAFKA_RESP_ERR__TIMED_OUT;
3196 	rd_kafka_q_t *rkq;
3197 
3198         if (!(rkcg = rd_kafka_cgrp_get(rk)))
3199                 return RD_KAFKA_RESP_ERR__UNKNOWN_GROUP;
3200 
3201         /* If a fatal error has been raised and this is an
3202          * explicit consumer_close() from the application we return
3203          * a fatal error. Otherwise let the "silent" no_consumer_close
3204          * logic be performed to clean up properly. */
3205         if (rd_kafka_fatal_error_code(rk) &&
3206             !rd_kafka_destroy_flags_no_consumer_close(rk))
3207                 return RD_KAFKA_RESP_ERR__FATAL;
3208 
3209         rd_kafka_dbg(rk, CONSUMER, "CLOSE", "Closing consumer");
3210 
3211 	/* Redirect cgrp queue to our temporary queue to make sure
3212 	 * all posted ops (e.g., rebalance callbacks) are served by
3213 	 * this function. */
3214 	rkq = rd_kafka_q_new(rk);
3215 	rd_kafka_q_fwd_set(rkcg->rkcg_q, rkq);
3216 
3217         rd_kafka_cgrp_terminate(rkcg, RD_KAFKA_REPLYQ(rkq, 0)); /* async */
3218 
3219         /* Disable the queue if termination is immediate or the user
3220          * does not want the blocking consumer_close() behaviour, this will
3221          * cause any ops posted for this queue (such as rebalance) to
3222          * be destroyed.
3223          */
3224         if (rd_kafka_destroy_flags_no_consumer_close(rk)) {
3225                 rd_kafka_dbg(rk, CONSUMER, "CLOSE",
3226                              "Disabling and purging temporary queue to quench "
3227                              "close events");
3228                 rd_kafka_q_disable(rkq);
3229                 /* Purge ops already enqueued */
3230                 rd_kafka_q_purge(rkq);
3231         } else {
3232                 rd_kafka_dbg(rk, CONSUMER, "CLOSE",
3233                              "Waiting for close events");
3234                 while ((rko = rd_kafka_q_pop(rkq, RD_POLL_INFINITE, 0))) {
3235                         rd_kafka_op_res_t res;
3236                         if ((rko->rko_type & ~RD_KAFKA_OP_FLAGMASK) ==
3237                             RD_KAFKA_OP_TERMINATE) {
3238                                 err = rko->rko_err;
3239                                 rd_kafka_op_destroy(rko);
3240                                 break;
3241                         }
3242                         res = rd_kafka_poll_cb(rk, rkq, rko,
3243                                                RD_KAFKA_Q_CB_RETURN, NULL);
3244                         if (res == RD_KAFKA_OP_RES_PASS)
3245                                 rd_kafka_op_destroy(rko);
3246                         /* Ignore YIELD, we need to finish */
3247                 }
3248         }
3249 
3250         rd_kafka_q_fwd_set(rkcg->rkcg_q, NULL);
3251 
3252         rd_kafka_q_destroy_owner(rkq);
3253 
3254         rd_kafka_dbg(rk, CONSUMER, "CLOSE", "Consumer closed");
3255 
3256         return err;
3257 }
3258 
3259 
3260 
3261 rd_kafka_resp_err_t
rd_kafka_committed(rd_kafka_t * rk,rd_kafka_topic_partition_list_t * partitions,int timeout_ms)3262 rd_kafka_committed (rd_kafka_t *rk,
3263 		    rd_kafka_topic_partition_list_t *partitions,
3264 		    int timeout_ms) {
3265         rd_kafka_q_t *rkq;
3266         rd_kafka_resp_err_t err;
3267         rd_kafka_cgrp_t *rkcg;
3268 	rd_ts_t abs_timeout = rd_timeout_init(timeout_ms);
3269 
3270         if (!partitions)
3271                 return RD_KAFKA_RESP_ERR__INVALID_ARG;
3272 
3273         if (!(rkcg = rd_kafka_cgrp_get(rk)))
3274                 return RD_KAFKA_RESP_ERR__UNKNOWN_GROUP;
3275 
3276 	/* Set default offsets. */
3277 	rd_kafka_topic_partition_list_reset_offsets(partitions,
3278                                                     RD_KAFKA_OFFSET_INVALID);
3279 
3280 	rkq = rd_kafka_q_new(rk);
3281 
3282         do {
3283                 rd_kafka_op_t *rko;
3284 		int state_version = rd_kafka_brokers_get_state_version(rk);
3285 
3286                 rko = rd_kafka_op_new(RD_KAFKA_OP_OFFSET_FETCH);
3287 		rd_kafka_op_set_replyq(rko, rkq, NULL);
3288 
3289                 /* Issue #827
3290                  * Copy partition list to avoid use-after-free if we time out
3291                  * here, the app frees the list, and then cgrp starts
3292                  * processing the op. */
3293 		rko->rko_u.offset_fetch.partitions =
3294                         rd_kafka_topic_partition_list_copy(partitions);
3295                 rko->rko_u.offset_fetch.require_stable =
3296                         rk->rk_conf.isolation_level == RD_KAFKA_READ_COMMITTED;
3297 		rko->rko_u.offset_fetch.do_free = 1;
3298 
3299                 if (!rd_kafka_q_enq(rkcg->rkcg_ops, rko)) {
3300                         err = RD_KAFKA_RESP_ERR__DESTROY;
3301                         break;
3302                 }
3303 
3304                 rko = rd_kafka_q_pop(rkq,
3305                                      rd_timeout_remains_us(abs_timeout), 0);
3306                 if (rko) {
3307                         if (!(err = rko->rko_err))
3308                                 rd_kafka_topic_partition_list_update(
3309                                         partitions,
3310                                         rko->rko_u.offset_fetch.partitions);
3311                         else if ((err == RD_KAFKA_RESP_ERR__WAIT_COORD ||
3312 				    err == RD_KAFKA_RESP_ERR__TRANSPORT) &&
3313 				   !rd_kafka_brokers_wait_state_change(
3314 					   rk, state_version,
3315 					   rd_timeout_remains(abs_timeout)))
3316 				err = RD_KAFKA_RESP_ERR__TIMED_OUT;
3317 
3318                         rd_kafka_op_destroy(rko);
3319                 } else
3320                         err = RD_KAFKA_RESP_ERR__TIMED_OUT;
3321         } while (err == RD_KAFKA_RESP_ERR__TRANSPORT ||
3322 		 err == RD_KAFKA_RESP_ERR__WAIT_COORD);
3323 
3324         rd_kafka_q_destroy_owner(rkq);
3325 
3326         return err;
3327 }
3328 
3329 
3330 
3331 rd_kafka_resp_err_t
rd_kafka_position(rd_kafka_t * rk,rd_kafka_topic_partition_list_t * partitions)3332 rd_kafka_position (rd_kafka_t *rk,
3333 		   rd_kafka_topic_partition_list_t *partitions) {
3334  	int i;
3335 
3336 	for (i = 0 ; i < partitions->cnt ; i++) {
3337 		rd_kafka_topic_partition_t *rktpar = &partitions->elems[i];
3338 		rd_kafka_toppar_t *rktp;
3339 
3340 		if (!(rktp = rd_kafka_toppar_get2(rk, rktpar->topic,
3341 						    rktpar->partition, 0, 1))) {
3342 			rktpar->err = RD_KAFKA_RESP_ERR__UNKNOWN_PARTITION;
3343 			rktpar->offset = RD_KAFKA_OFFSET_INVALID;
3344 			continue;
3345 		}
3346 
3347 		rd_kafka_toppar_lock(rktp);
3348 		rktpar->offset = rktp->rktp_app_offset;
3349 		rktpar->err = RD_KAFKA_RESP_ERR_NO_ERROR;
3350 		rd_kafka_toppar_unlock(rktp);
3351 		rd_kafka_toppar_destroy(rktp);
3352 	}
3353 
3354         return RD_KAFKA_RESP_ERR_NO_ERROR;
3355 }
3356 
3357 
3358 
3359 struct _query_wmark_offsets_state {
3360 	rd_kafka_resp_err_t err;
3361 	const char *topic;
3362 	int32_t partition;
3363 	int64_t offsets[2];
3364 	int     offidx;  /* next offset to set from response */
3365 	rd_ts_t ts_end;
3366 	int     state_version;  /* Broker state version */
3367 };
3368 
rd_kafka_query_wmark_offsets_resp_cb(rd_kafka_t * rk,rd_kafka_broker_t * rkb,rd_kafka_resp_err_t err,rd_kafka_buf_t * rkbuf,rd_kafka_buf_t * request,void * opaque)3369 static void rd_kafka_query_wmark_offsets_resp_cb (rd_kafka_t *rk,
3370 						  rd_kafka_broker_t *rkb,
3371 						  rd_kafka_resp_err_t err,
3372 						  rd_kafka_buf_t *rkbuf,
3373 						  rd_kafka_buf_t *request,
3374 						  void *opaque) {
3375 	struct _query_wmark_offsets_state *state;
3376         rd_kafka_topic_partition_list_t *offsets;
3377         rd_kafka_topic_partition_t *rktpar;
3378 
3379         if (err == RD_KAFKA_RESP_ERR__DESTROY) {
3380                 /* 'state' has gone out of scope when query_watermark..()
3381                  * timed out and returned to the caller. */
3382                 return;
3383         }
3384 
3385         state = opaque;
3386 
3387         offsets = rd_kafka_topic_partition_list_new(1);
3388         err = rd_kafka_handle_ListOffsets(rk, rkb, err, rkbuf, request,
3389                                           offsets, NULL);
3390         if (err == RD_KAFKA_RESP_ERR__IN_PROGRESS) {
3391                 rd_kafka_topic_partition_list_destroy(offsets);
3392                 return; /* Retrying */
3393         }
3394 
3395 	/* Retry if no broker connection is available yet. */
3396 	if (err == RD_KAFKA_RESP_ERR__TRANSPORT &&
3397 	    rkb &&
3398 	    rd_kafka_brokers_wait_state_change(
3399 		    rkb->rkb_rk, state->state_version,
3400 		    rd_timeout_remains(state->ts_end))) {
3401 		/* Retry */
3402 		state->state_version = rd_kafka_brokers_get_state_version(rk);
3403 		request->rkbuf_retries = 0;
3404 		if (rd_kafka_buf_retry(rkb, request)) {
3405                         rd_kafka_topic_partition_list_destroy(offsets);
3406                         return; /* Retry in progress */
3407                 }
3408 		/* FALLTHRU */
3409 	}
3410 
3411         /* Partition not seen in response. */
3412         if (!(rktpar = rd_kafka_topic_partition_list_find(offsets,
3413                                                           state->topic,
3414                                                           state->partition)))
3415                 err = RD_KAFKA_RESP_ERR__BAD_MSG;
3416         else if (rktpar->err)
3417                 err = rktpar->err;
3418         else
3419                 state->offsets[state->offidx] = rktpar->offset;
3420 
3421         state->offidx++;
3422 
3423         if (err || state->offidx == 2) /* Error or Done */
3424                 state->err = err;
3425 
3426         rd_kafka_topic_partition_list_destroy(offsets);
3427 }
3428 
3429 
3430 rd_kafka_resp_err_t
rd_kafka_query_watermark_offsets(rd_kafka_t * rk,const char * topic,int32_t partition,int64_t * low,int64_t * high,int timeout_ms)3431 rd_kafka_query_watermark_offsets (rd_kafka_t *rk, const char *topic,
3432                                   int32_t partition,
3433                                   int64_t *low, int64_t *high, int timeout_ms) {
3434         rd_kafka_q_t *rkq;
3435         struct _query_wmark_offsets_state state;
3436         rd_ts_t ts_end = rd_timeout_init(timeout_ms);
3437         rd_kafka_topic_partition_list_t *partitions;
3438         rd_kafka_topic_partition_t *rktpar;
3439         struct rd_kafka_partition_leader *leader;
3440         rd_list_t leaders;
3441         rd_kafka_resp_err_t err;
3442 
3443         partitions = rd_kafka_topic_partition_list_new(1);
3444         rktpar = rd_kafka_topic_partition_list_add(partitions,
3445                                                    topic, partition);
3446 
3447         rd_list_init(&leaders, partitions->cnt,
3448                      (void *)rd_kafka_partition_leader_destroy);
3449 
3450         err = rd_kafka_topic_partition_list_query_leaders(rk, partitions,
3451                                                           &leaders, timeout_ms);
3452         if (err) {
3453                          rd_list_destroy(&leaders);
3454                          rd_kafka_topic_partition_list_destroy(partitions);
3455                          return err;
3456         }
3457 
3458         leader = rd_list_elem(&leaders, 0);
3459 
3460         rkq = rd_kafka_q_new(rk);
3461 
3462         /* Due to KAFKA-1588 we need to send a request for each wanted offset,
3463          * in this case one for the low watermark and one for the high. */
3464         state.topic = topic;
3465         state.partition = partition;
3466         state.offsets[0] = RD_KAFKA_OFFSET_BEGINNING;
3467         state.offsets[1] = RD_KAFKA_OFFSET_END;
3468         state.offidx = 0;
3469         state.err = RD_KAFKA_RESP_ERR__IN_PROGRESS;
3470         state.ts_end = ts_end;
3471         state.state_version = rd_kafka_brokers_get_state_version(rk);
3472 
3473 
3474         rktpar->offset =  RD_KAFKA_OFFSET_BEGINNING;
3475         rd_kafka_ListOffsetsRequest(leader->rkb, partitions,
3476                                     RD_KAFKA_REPLYQ(rkq, 0),
3477                                     rd_kafka_query_wmark_offsets_resp_cb,
3478                                     &state);
3479 
3480         rktpar->offset =  RD_KAFKA_OFFSET_END;
3481         rd_kafka_ListOffsetsRequest(leader->rkb, partitions,
3482                                     RD_KAFKA_REPLYQ(rkq, 0),
3483                                     rd_kafka_query_wmark_offsets_resp_cb,
3484                                     &state);
3485 
3486         rd_kafka_topic_partition_list_destroy(partitions);
3487         rd_list_destroy(&leaders);
3488 
3489         /* Wait for reply (or timeout) */
3490         while (state.err == RD_KAFKA_RESP_ERR__IN_PROGRESS &&
3491                rd_kafka_q_serve(rkq, 100, 0, RD_KAFKA_Q_CB_CALLBACK,
3492                                 rd_kafka_poll_cb, NULL) !=
3493                RD_KAFKA_OP_RES_YIELD)
3494                 ;
3495 
3496         rd_kafka_q_destroy_owner(rkq);
3497 
3498         if (state.err)
3499                 return state.err;
3500         else if (state.offidx != 2)
3501                 return RD_KAFKA_RESP_ERR__FAIL;
3502 
3503         /* We are not certain about the returned order. */
3504         if (state.offsets[0] < state.offsets[1]) {
3505                 *low = state.offsets[0];
3506                 *high  = state.offsets[1];
3507         } else {
3508                 *low = state.offsets[1];
3509                 *high = state.offsets[0];
3510         }
3511 
3512         /* If partition is empty only one offset (the last) will be returned. */
3513         if (*low < 0 && *high >= 0)
3514                 *low = *high;
3515 
3516         return RD_KAFKA_RESP_ERR_NO_ERROR;
3517 }
3518 
3519 
3520 rd_kafka_resp_err_t
rd_kafka_get_watermark_offsets(rd_kafka_t * rk,const char * topic,int32_t partition,int64_t * low,int64_t * high)3521 rd_kafka_get_watermark_offsets (rd_kafka_t *rk, const char *topic,
3522 				int32_t partition,
3523 				int64_t *low, int64_t *high) {
3524 	rd_kafka_toppar_t *rktp;
3525 
3526 	rktp = rd_kafka_toppar_get2(rk, topic, partition, 0, 1);
3527 	if (!rktp)
3528 		return RD_KAFKA_RESP_ERR__UNKNOWN_PARTITION;
3529 
3530 	rd_kafka_toppar_lock(rktp);
3531 	*low = rktp->rktp_lo_offset;
3532 	*high = rktp->rktp_hi_offset;
3533 	rd_kafka_toppar_unlock(rktp);
3534 
3535 	rd_kafka_toppar_destroy(rktp);
3536 
3537 	return RD_KAFKA_RESP_ERR_NO_ERROR;
3538 }
3539 
3540 
3541 /**
3542  * @brief get_offsets_for_times() state
3543  */
3544 struct _get_offsets_for_times {
3545         rd_kafka_topic_partition_list_t *results;
3546         rd_kafka_resp_err_t err;
3547         int wait_reply;
3548         int state_version;
3549         rd_ts_t ts_end;
3550 };
3551 
3552 /**
3553  * @brief Handle OffsetRequest responses
3554  */
rd_kafka_get_offsets_for_times_resp_cb(rd_kafka_t * rk,rd_kafka_broker_t * rkb,rd_kafka_resp_err_t err,rd_kafka_buf_t * rkbuf,rd_kafka_buf_t * request,void * opaque)3555 static void rd_kafka_get_offsets_for_times_resp_cb (rd_kafka_t *rk,
3556                                                   rd_kafka_broker_t *rkb,
3557                                                   rd_kafka_resp_err_t err,
3558                                                   rd_kafka_buf_t *rkbuf,
3559                                                   rd_kafka_buf_t *request,
3560                                                   void *opaque) {
3561         struct _get_offsets_for_times *state;
3562 
3563         if (err == RD_KAFKA_RESP_ERR__DESTROY) {
3564                 /* 'state' has gone out of scope when offsets_for_times()
3565                  * timed out and returned to the caller. */
3566                 return;
3567         }
3568 
3569         state = opaque;
3570 
3571         err = rd_kafka_handle_ListOffsets(rk, rkb, err, rkbuf, request,
3572                                           state->results, NULL);
3573         if (err == RD_KAFKA_RESP_ERR__IN_PROGRESS)
3574                 return; /* Retrying */
3575 
3576         /* Retry if no broker connection is available yet. */
3577         if (err == RD_KAFKA_RESP_ERR__TRANSPORT &&
3578             rkb &&
3579             rd_kafka_brokers_wait_state_change(
3580                     rkb->rkb_rk, state->state_version,
3581                     rd_timeout_remains(state->ts_end))) {
3582                 /* Retry */
3583                 state->state_version = rd_kafka_brokers_get_state_version(rk);
3584                 request->rkbuf_retries = 0;
3585                 if (rd_kafka_buf_retry(rkb, request))
3586                         return; /* Retry in progress */
3587                 /* FALLTHRU */
3588         }
3589 
3590         if (err && !state->err)
3591                 state->err = err;
3592 
3593         state->wait_reply--;
3594 }
3595 
3596 
3597 rd_kafka_resp_err_t
rd_kafka_offsets_for_times(rd_kafka_t * rk,rd_kafka_topic_partition_list_t * offsets,int timeout_ms)3598 rd_kafka_offsets_for_times (rd_kafka_t *rk,
3599                             rd_kafka_topic_partition_list_t *offsets,
3600                             int timeout_ms) {
3601         rd_kafka_q_t *rkq;
3602         struct _get_offsets_for_times state = RD_ZERO_INIT;
3603         rd_ts_t ts_end = rd_timeout_init(timeout_ms);
3604         rd_list_t leaders;
3605         int i;
3606         rd_kafka_resp_err_t err;
3607         struct rd_kafka_partition_leader *leader;
3608         int tmout;
3609 
3610         if (offsets->cnt == 0)
3611                 return RD_KAFKA_RESP_ERR__INVALID_ARG;
3612 
3613         rd_list_init(&leaders, offsets->cnt,
3614                      (void *)rd_kafka_partition_leader_destroy);
3615 
3616         err = rd_kafka_topic_partition_list_query_leaders(rk, offsets, &leaders,
3617                                                           timeout_ms);
3618         if (err) {
3619                 rd_list_destroy(&leaders);
3620                 return err;
3621         }
3622 
3623 
3624         rkq = rd_kafka_q_new(rk);
3625 
3626         state.wait_reply = 0;
3627         state.results = rd_kafka_topic_partition_list_new(offsets->cnt);
3628 
3629         /* For each leader send a request for its partitions */
3630         RD_LIST_FOREACH(leader, &leaders, i) {
3631                 state.wait_reply++;
3632                 rd_kafka_ListOffsetsRequest(
3633                         leader->rkb, leader->partitions,
3634                         RD_KAFKA_REPLYQ(rkq, 0),
3635                         rd_kafka_get_offsets_for_times_resp_cb,
3636                         &state);
3637         }
3638 
3639         rd_list_destroy(&leaders);
3640 
3641         /* Wait for reply (or timeout) */
3642         while (state.wait_reply > 0 &&
3643                !rd_timeout_expired((tmout = rd_timeout_remains(ts_end))))
3644                 rd_kafka_q_serve(rkq, tmout, 0, RD_KAFKA_Q_CB_CALLBACK,
3645                                  rd_kafka_poll_cb, NULL);
3646 
3647         rd_kafka_q_destroy_owner(rkq);
3648 
3649         if (state.wait_reply > 0 && !state.err)
3650                 state.err = RD_KAFKA_RESP_ERR__TIMED_OUT;
3651 
3652         /* Then update the queried partitions. */
3653         if (!state.err)
3654                 rd_kafka_topic_partition_list_update(offsets, state.results);
3655 
3656         rd_kafka_topic_partition_list_destroy(state.results);
3657 
3658         return state.err;
3659 }
3660 
3661 
3662 /**
3663  * @brief rd_kafka_poll() (and similar) op callback handler.
3664  *        Will either call registered callback depending on cb_type and op type
3665  *        or return op to application, if applicable (e.g., fetch message).
3666  *
3667  * @returns RD_KAFKA_OP_RES_HANDLED if op was handled, else one of the
3668  *          other res types (such as OP_RES_PASS).
3669  *
3670  * @locality any thread that serves op queues
3671  */
3672 rd_kafka_op_res_t
rd_kafka_poll_cb(rd_kafka_t * rk,rd_kafka_q_t * rkq,rd_kafka_op_t * rko,rd_kafka_q_cb_type_t cb_type,void * opaque)3673 rd_kafka_poll_cb (rd_kafka_t *rk, rd_kafka_q_t *rkq, rd_kafka_op_t *rko,
3674                   rd_kafka_q_cb_type_t cb_type, void *opaque) {
3675 	rd_kafka_msg_t *rkm;
3676         rd_kafka_op_res_t res = RD_KAFKA_OP_RES_HANDLED;
3677 
3678         /* Special handling for events based on cb_type */
3679         if (cb_type == RD_KAFKA_Q_CB_EVENT &&
3680             rd_kafka_event_setup(rk, rko)) {
3681                 /* Return-as-event requested. */
3682                 return RD_KAFKA_OP_RES_PASS; /* Return as event */
3683         }
3684 
3685         switch ((int)rko->rko_type)
3686         {
3687         case RD_KAFKA_OP_FETCH:
3688                 if (!rk->rk_conf.consume_cb ||
3689                     cb_type == RD_KAFKA_Q_CB_RETURN ||
3690                     cb_type == RD_KAFKA_Q_CB_FORCE_RETURN)
3691                         return RD_KAFKA_OP_RES_PASS; /* Dont handle here */
3692                 else {
3693                         struct consume_ctx ctx = {
3694                                 .consume_cb = rk->rk_conf.consume_cb,
3695                                 .opaque = rk->rk_conf.opaque };
3696 
3697                         return rd_kafka_consume_cb(rk, rkq, rko, cb_type, &ctx);
3698                 }
3699                 break;
3700 
3701         case RD_KAFKA_OP_REBALANCE:
3702                 if (rk->rk_conf.rebalance_cb)
3703                         rk->rk_conf.rebalance_cb(
3704                                 rk, rko->rko_err,
3705                                 rko->rko_u.rebalance.partitions,
3706                                 rk->rk_conf.opaque);
3707                 else {
3708                         /** If EVENT_REBALANCE is enabled but rebalance_cb
3709                          *  isn't, we need to perform a dummy assign for the
3710                          *  application. This might happen during termination
3711                          *  with consumer_close() */
3712                         rd_kafka_dbg(rk, CGRP, "UNASSIGN",
3713                                      "Forcing unassign of %d partition(s)",
3714                                      rko->rko_u.rebalance.partitions ?
3715                                      rko->rko_u.rebalance.partitions->cnt : 0);
3716                         rd_kafka_assign(rk, NULL);
3717                 }
3718                 break;
3719 
3720         case RD_KAFKA_OP_OFFSET_COMMIT | RD_KAFKA_OP_REPLY:
3721 		if (!rko->rko_u.offset_commit.cb)
3722 			return RD_KAFKA_OP_RES_PASS; /* Dont handle here */
3723 		rko->rko_u.offset_commit.cb(
3724                         rk, rko->rko_err,
3725 			rko->rko_u.offset_commit.partitions,
3726 			rko->rko_u.offset_commit.opaque);
3727                 break;
3728 
3729         case RD_KAFKA_OP_FETCH_STOP|RD_KAFKA_OP_REPLY:
3730                 /* Reply from toppar FETCH_STOP */
3731                 rd_kafka_assignment_partition_stopped(rk, rko->rko_rktp);
3732                 break;
3733 
3734         case RD_KAFKA_OP_CONSUMER_ERR:
3735                 /* rd_kafka_consumer_poll() (_Q_CB_CONSUMER):
3736                  *   Consumer errors are returned to the application
3737                  *   as rkmessages, not error callbacks.
3738                  *
3739                  * rd_kafka_poll() (_Q_CB_GLOBAL):
3740                  *   convert to ERR op (fallthru)
3741                  */
3742                 if (cb_type == RD_KAFKA_Q_CB_RETURN ||
3743                     cb_type == RD_KAFKA_Q_CB_FORCE_RETURN) {
3744                         /* return as message_t to application */
3745                         return RD_KAFKA_OP_RES_PASS;
3746                 }
3747 		/* FALLTHRU */
3748 
3749 	case RD_KAFKA_OP_ERR:
3750 		if (rk->rk_conf.error_cb)
3751 			rk->rk_conf.error_cb(rk, rko->rko_err,
3752 					     rko->rko_u.err.errstr,
3753                                              rk->rk_conf.opaque);
3754                 else
3755                         rd_kafka_log(rk, LOG_ERR, "ERROR",
3756                                      "%s: %s",
3757                                      rk->rk_name,
3758                                      rko->rko_u.err.errstr);
3759                 break;
3760 
3761 	case RD_KAFKA_OP_DR:
3762 		/* Delivery report:
3763 		 * call application DR callback for each message. */
3764 		while ((rkm = TAILQ_FIRST(&rko->rko_u.dr.msgq.rkmq_msgs))) {
3765                         rd_kafka_message_t *rkmessage;
3766 
3767 			TAILQ_REMOVE(&rko->rko_u.dr.msgq.rkmq_msgs,
3768 				     rkm, rkm_link);
3769 
3770                         rkmessage = rd_kafka_message_get_from_rkm(rko, rkm);
3771 
3772                         if (likely(rk->rk_conf.dr_msg_cb != NULL)) {
3773                                 rk->rk_conf.dr_msg_cb(rk, rkmessage,
3774                                                       rk->rk_conf.opaque);
3775 
3776                         } else if (rk->rk_conf.dr_cb) {
3777                                 rk->rk_conf.dr_cb(rk,
3778                                                   rkmessage->payload,
3779                                                   rkmessage->len,
3780                                                   rkmessage->err,
3781                                                   rk->rk_conf.opaque,
3782                                                   rkmessage->_private);
3783                         } else if (rk->rk_drmode == RD_KAFKA_DR_MODE_EVENT) {
3784                                 rd_kafka_log(rk, LOG_WARNING, "DRDROP",
3785                                              "Dropped delivery report for "
3786                                              "message to "
3787                                              "%s [%"PRId32"] (%s) with "
3788                                              "opaque %p: flush() or poll() "
3789                                              "should not be called when "
3790                                              "EVENT_DR is enabled",
3791                                              rd_kafka_topic_name(rkmessage->
3792                                                                  rkt),
3793                                              rkmessage->partition,
3794                                              rd_kafka_err2name(rkmessage->err),
3795                                              rkmessage->_private);
3796                         } else {
3797                                 rd_assert(!*"BUG: neither a delivery report "
3798                                           "callback or EVENT_DR flag set");
3799                         }
3800 
3801                         rd_kafka_msg_destroy(rk, rkm);
3802 
3803                         if (unlikely(rd_kafka_yield_thread)) {
3804                                 /* Callback called yield(),
3805                                  * re-enqueue the op (if there are any
3806                                  * remaining messages). */
3807                                 if (!TAILQ_EMPTY(&rko->rko_u.dr.msgq.
3808                                                  rkmq_msgs))
3809                                         rd_kafka_q_reenq(rkq, rko);
3810                                 else
3811                                         rd_kafka_op_destroy(rko);
3812                                 return RD_KAFKA_OP_RES_YIELD;
3813                         }
3814 		}
3815 
3816 		rd_kafka_msgq_init(&rko->rko_u.dr.msgq);
3817 
3818 		break;
3819 
3820 	case RD_KAFKA_OP_THROTTLE:
3821 		if (rk->rk_conf.throttle_cb)
3822 			rk->rk_conf.throttle_cb(rk, rko->rko_u.throttle.nodename,
3823 						rko->rko_u.throttle.nodeid,
3824 						rko->rko_u.throttle.
3825 						throttle_time,
3826 						rk->rk_conf.opaque);
3827 		break;
3828 
3829 	case RD_KAFKA_OP_STATS:
3830 		/* Statistics */
3831 		if (rk->rk_conf.stats_cb &&
3832 		    rk->rk_conf.stats_cb(rk, rko->rko_u.stats.json,
3833                                          rko->rko_u.stats.json_len,
3834 					 rk->rk_conf.opaque) == 1)
3835 			rko->rko_u.stats.json = NULL; /* Application wanted json ptr */
3836 		break;
3837 
3838         case RD_KAFKA_OP_LOG:
3839                 if (likely(rk->rk_conf.log_cb &&
3840                            rk->rk_conf.log_level >= rko->rko_u.log.level))
3841                         rk->rk_conf.log_cb(rk,
3842                                            rko->rko_u.log.level,
3843                                            rko->rko_u.log.fac,
3844                                            rko->rko_u.log.str);
3845                 break;
3846 
3847         case RD_KAFKA_OP_TERMINATE:
3848                 /* nop: just a wake-up */
3849                 break;
3850 
3851         case RD_KAFKA_OP_CREATETOPICS:
3852         case RD_KAFKA_OP_DELETETOPICS:
3853         case RD_KAFKA_OP_CREATEPARTITIONS:
3854         case RD_KAFKA_OP_ALTERCONFIGS:
3855         case RD_KAFKA_OP_DESCRIBECONFIGS:
3856         case RD_KAFKA_OP_DELETERECORDS:
3857         case RD_KAFKA_OP_DELETEGROUPS:
3858         case RD_KAFKA_OP_ADMIN_FANOUT:
3859                 /* Calls op_destroy() from worker callback,
3860                  * when the time comes. */
3861                 res = rd_kafka_op_call(rk, rkq, rko);
3862                 break;
3863 
3864         case RD_KAFKA_OP_ADMIN_RESULT:
3865                 if (cb_type == RD_KAFKA_Q_CB_RETURN ||
3866                     cb_type == RD_KAFKA_Q_CB_FORCE_RETURN)
3867                         return RD_KAFKA_OP_RES_PASS; /* Don't handle here */
3868 
3869                 /* Op is silently destroyed below */
3870                 break;
3871 
3872         case RD_KAFKA_OP_TXN:
3873                 /* Must only be handled by rdkafka main thread */
3874                 rd_assert(thrd_is_current(rk->rk_thread));
3875                 res = rd_kafka_op_call(rk, rkq, rko);
3876                 break;
3877 
3878         case RD_KAFKA_OP_BARRIER:
3879                 break;
3880 
3881         case RD_KAFKA_OP_PURGE:
3882                 rd_kafka_purge(rk, rko->rko_u.purge.flags);
3883                 break;
3884 
3885         default:
3886                 rd_kafka_assert(rk, !*"cant handle op type");
3887                 break;
3888         }
3889 
3890         if (res == RD_KAFKA_OP_RES_HANDLED)
3891                 rd_kafka_op_destroy(rko);
3892 
3893         return res;
3894 }
3895 
rd_kafka_poll(rd_kafka_t * rk,int timeout_ms)3896 int rd_kafka_poll (rd_kafka_t *rk, int timeout_ms) {
3897         int r;
3898 
3899         if (timeout_ms)
3900                 rd_kafka_app_poll_blocking(rk);
3901 
3902         r = rd_kafka_q_serve(rk->rk_rep, timeout_ms, 0,
3903                              RD_KAFKA_Q_CB_CALLBACK, rd_kafka_poll_cb, NULL);
3904 
3905         rd_kafka_app_polled(rk);
3906 
3907         return r;
3908 }
3909 
3910 
rd_kafka_queue_poll(rd_kafka_queue_t * rkqu,int timeout_ms)3911 rd_kafka_event_t *rd_kafka_queue_poll (rd_kafka_queue_t *rkqu, int timeout_ms) {
3912         rd_kafka_op_t *rko;
3913 
3914         if (timeout_ms)
3915                 rd_kafka_app_poll_blocking(rkqu->rkqu_rk);
3916 
3917         rko = rd_kafka_q_pop_serve(rkqu->rkqu_q, rd_timeout_us(timeout_ms), 0,
3918                                    RD_KAFKA_Q_CB_EVENT, rd_kafka_poll_cb, NULL);
3919 
3920         rd_kafka_app_polled(rkqu->rkqu_rk);
3921 
3922         if (!rko)
3923                 return NULL;
3924 
3925         return rko;
3926 }
3927 
rd_kafka_queue_poll_callback(rd_kafka_queue_t * rkqu,int timeout_ms)3928 int rd_kafka_queue_poll_callback (rd_kafka_queue_t *rkqu, int timeout_ms) {
3929         int r;
3930 
3931         if (timeout_ms)
3932                 rd_kafka_app_poll_blocking(rkqu->rkqu_rk);
3933 
3934         r = rd_kafka_q_serve(rkqu->rkqu_q, timeout_ms, 0,
3935                              RD_KAFKA_Q_CB_CALLBACK, rd_kafka_poll_cb, NULL);
3936 
3937         rd_kafka_app_polled(rkqu->rkqu_rk);
3938 
3939         return r;
3940 }
3941 
3942 
3943 
rd_kafka_toppar_dump(FILE * fp,const char * indent,rd_kafka_toppar_t * rktp)3944 static void rd_kafka_toppar_dump (FILE *fp, const char *indent,
3945 				  rd_kafka_toppar_t *rktp) {
3946 
3947 	fprintf(fp, "%s%.*s [%"PRId32"] broker %s, "
3948                 "leader_id %s\n",
3949 		indent,
3950 		RD_KAFKAP_STR_PR(rktp->rktp_rkt->rkt_topic),
3951 		rktp->rktp_partition,
3952 		rktp->rktp_broker ?
3953 		rktp->rktp_broker->rkb_name : "none",
3954                 rktp->rktp_leader ?
3955                 rktp->rktp_leader->rkb_name : "none");
3956 	fprintf(fp,
3957 		"%s refcnt %i\n"
3958 		"%s msgq:      %i messages\n"
3959 		"%s xmit_msgq: %i messages\n"
3960 		"%s total:     %"PRIu64" messages, %"PRIu64" bytes\n",
3961 		indent, rd_refcnt_get(&rktp->rktp_refcnt),
3962 		indent, rktp->rktp_msgq.rkmq_msg_cnt,
3963 		indent, rktp->rktp_xmit_msgq.rkmq_msg_cnt,
3964                 indent, rd_atomic64_get(&rktp->rktp_c.tx_msgs),
3965                 rd_atomic64_get(&rktp->rktp_c.tx_msg_bytes));
3966 }
3967 
rd_kafka_broker_dump(FILE * fp,rd_kafka_broker_t * rkb,int locks)3968 static void rd_kafka_broker_dump (FILE *fp, rd_kafka_broker_t *rkb, int locks) {
3969 	rd_kafka_toppar_t *rktp;
3970 
3971         if (locks)
3972                 rd_kafka_broker_lock(rkb);
3973         fprintf(fp, " rd_kafka_broker_t %p: %s NodeId %"PRId32
3974                 " in state %s (for %.3fs)\n",
3975                 rkb, rkb->rkb_name, rkb->rkb_nodeid,
3976                 rd_kafka_broker_state_names[rkb->rkb_state],
3977                 rkb->rkb_ts_state ?
3978                 (float)(rd_clock() - rkb->rkb_ts_state) / 1000000.0f :
3979                 0.0f);
3980         fprintf(fp, "  refcnt %i\n", rd_refcnt_get(&rkb->rkb_refcnt));
3981         fprintf(fp, "  outbuf_cnt: %i waitresp_cnt: %i\n",
3982                 rd_atomic32_get(&rkb->rkb_outbufs.rkbq_cnt),
3983                 rd_atomic32_get(&rkb->rkb_waitresps.rkbq_cnt));
3984         fprintf(fp,
3985                 "  %"PRIu64 " messages sent, %"PRIu64" bytes, "
3986                 "%"PRIu64" errors, %"PRIu64" timeouts\n"
3987                 "  %"PRIu64 " messages received, %"PRIu64" bytes, "
3988                 "%"PRIu64" errors\n"
3989                 "  %"PRIu64 " messageset transmissions were retried\n",
3990                 rd_atomic64_get(&rkb->rkb_c.tx), rd_atomic64_get(&rkb->rkb_c.tx_bytes),
3991                 rd_atomic64_get(&rkb->rkb_c.tx_err), rd_atomic64_get(&rkb->rkb_c.req_timeouts),
3992                 rd_atomic64_get(&rkb->rkb_c.rx), rd_atomic64_get(&rkb->rkb_c.rx_bytes),
3993                 rd_atomic64_get(&rkb->rkb_c.rx_err),
3994                 rd_atomic64_get(&rkb->rkb_c.tx_retries));
3995 
3996         fprintf(fp, "  %i toppars:\n", rkb->rkb_toppar_cnt);
3997         TAILQ_FOREACH(rktp, &rkb->rkb_toppars, rktp_rkblink)
3998                 rd_kafka_toppar_dump(fp, "   ", rktp);
3999         if (locks) {
4000                 rd_kafka_broker_unlock(rkb);
4001         }
4002 }
4003 
4004 
rd_kafka_dump0(FILE * fp,rd_kafka_t * rk,int locks)4005 static void rd_kafka_dump0 (FILE *fp, rd_kafka_t *rk, int locks) {
4006 	rd_kafka_broker_t *rkb;
4007 	rd_kafka_topic_t *rkt;
4008         rd_kafka_toppar_t *rktp;
4009         int i;
4010 	unsigned int tot_cnt;
4011 	size_t tot_size;
4012 
4013 	rd_kafka_curr_msgs_get(rk, &tot_cnt, &tot_size);
4014 
4015 	if (locks)
4016                 rd_kafka_rdlock(rk);
4017 #if ENABLE_DEVEL
4018         fprintf(fp, "rd_kafka_op_cnt: %d\n", rd_atomic32_get(&rd_kafka_op_cnt));
4019 #endif
4020 	fprintf(fp, "rd_kafka_t %p: %s\n", rk, rk->rk_name);
4021 
4022 	fprintf(fp, " producer.msg_cnt %u (%"PRIusz" bytes)\n",
4023 		tot_cnt, tot_size);
4024 	fprintf(fp, " rk_rep reply queue: %i ops\n",
4025 		rd_kafka_q_len(rk->rk_rep));
4026 
4027 	fprintf(fp, " brokers:\n");
4028         if (locks)
4029                 mtx_lock(&rk->rk_internal_rkb_lock);
4030         if (rk->rk_internal_rkb)
4031                 rd_kafka_broker_dump(fp, rk->rk_internal_rkb, locks);
4032         if (locks)
4033                 mtx_unlock(&rk->rk_internal_rkb_lock);
4034 
4035 	TAILQ_FOREACH(rkb, &rk->rk_brokers, rkb_link) {
4036                 rd_kafka_broker_dump(fp, rkb, locks);
4037 	}
4038 
4039         fprintf(fp, " cgrp:\n");
4040         if (rk->rk_cgrp) {
4041                 rd_kafka_cgrp_t *rkcg = rk->rk_cgrp;
4042                 fprintf(fp, "  %.*s in state %s, flags 0x%x\n",
4043                         RD_KAFKAP_STR_PR(rkcg->rkcg_group_id),
4044                         rd_kafka_cgrp_state_names[rkcg->rkcg_state],
4045                         rkcg->rkcg_flags);
4046                 fprintf(fp, "   coord_id %"PRId32", broker %s\n",
4047                         rkcg->rkcg_coord_id,
4048                         rkcg->rkcg_curr_coord ?
4049                         rd_kafka_broker_name(rkcg->rkcg_curr_coord):"(none)");
4050 
4051                 fprintf(fp, "  toppars:\n");
4052                 RD_LIST_FOREACH(rktp, &rkcg->rkcg_toppars, i) {
4053                         fprintf(fp, "   %.*s [%"PRId32"] in state %s\n",
4054                                 RD_KAFKAP_STR_PR(rktp->rktp_rkt->rkt_topic),
4055                                 rktp->rktp_partition,
4056                                 rd_kafka_fetch_states[rktp->rktp_fetch_state]);
4057                 }
4058         }
4059 
4060 	fprintf(fp, " topics:\n");
4061 	TAILQ_FOREACH(rkt, &rk->rk_topics, rkt_link) {
4062 		fprintf(fp, "  %.*s with %"PRId32" partitions, state %s, "
4063                         "refcnt %i\n",
4064 			RD_KAFKAP_STR_PR(rkt->rkt_topic),
4065 			rkt->rkt_partition_cnt,
4066                         rd_kafka_topic_state_names[rkt->rkt_state],
4067                         rd_refcnt_get(&rkt->rkt_refcnt));
4068 		if (rkt->rkt_ua)
4069 			rd_kafka_toppar_dump(fp, "   ", rkt->rkt_ua);
4070                 if (rd_list_empty(&rkt->rkt_desp)) {
4071                         fprintf(fp, "   desired partitions:");
4072                         RD_LIST_FOREACH(rktp, &rkt->rkt_desp,  i)
4073                                 fprintf(fp, " %"PRId32, rktp->rktp_partition);
4074                         fprintf(fp, "\n");
4075                 }
4076 	}
4077 
4078         fprintf(fp, "\n");
4079         rd_kafka_metadata_cache_dump(fp, rk);
4080 
4081         if (locks)
4082                 rd_kafka_rdunlock(rk);
4083 }
4084 
rd_kafka_dump(FILE * fp,rd_kafka_t * rk)4085 void rd_kafka_dump (FILE *fp, rd_kafka_t *rk) {
4086         if (rk)
4087                 rd_kafka_dump0(fp, rk, 1/*locks*/);
4088 }
4089 
4090 
4091 
rd_kafka_name(const rd_kafka_t * rk)4092 const char *rd_kafka_name (const rd_kafka_t *rk) {
4093 	return rk->rk_name;
4094 }
4095 
rd_kafka_type(const rd_kafka_t * rk)4096 rd_kafka_type_t rd_kafka_type(const rd_kafka_t *rk) {
4097         return rk->rk_type;
4098 }
4099 
4100 
rd_kafka_memberid(const rd_kafka_t * rk)4101 char *rd_kafka_memberid (const rd_kafka_t *rk) {
4102 	rd_kafka_op_t *rko;
4103 	rd_kafka_cgrp_t *rkcg;
4104 	char *memberid;
4105 
4106 	if (!(rkcg = rd_kafka_cgrp_get(rk)))
4107 		return NULL;
4108 
4109 	rko = rd_kafka_op_req2(rkcg->rkcg_ops, RD_KAFKA_OP_NAME);
4110 	if (!rko)
4111 		return NULL;
4112 	memberid = rko->rko_u.name.str;
4113 	rko->rko_u.name.str = NULL;
4114 	rd_kafka_op_destroy(rko);
4115 
4116 	return memberid;
4117 }
4118 
4119 
rd_kafka_clusterid(rd_kafka_t * rk,int timeout_ms)4120 char *rd_kafka_clusterid (rd_kafka_t *rk, int timeout_ms) {
4121         rd_ts_t abs_timeout = rd_timeout_init(timeout_ms);
4122 
4123         /* ClusterId is returned in Metadata >=V2 responses and
4124          * cached on the rk. If no cached value is available
4125          * it means no metadata has been received yet, or we're
4126          * using a lower protocol version
4127          * (e.g., lack of api.version.request=true). */
4128 
4129         while (1) {
4130                 int remains_ms;
4131 
4132                 rd_kafka_rdlock(rk);
4133 
4134                 if (rk->rk_clusterid) {
4135                         /* Cached clusterid available. */
4136                         char *ret = rd_strdup(rk->rk_clusterid);
4137                         rd_kafka_rdunlock(rk);
4138                         return ret;
4139                 } else if (rk->rk_ts_metadata > 0) {
4140                         /* Metadata received but no clusterid,
4141                          * this probably means the broker is too old
4142                          * or api.version.request=false. */
4143                         rd_kafka_rdunlock(rk);
4144                         return NULL;
4145                 }
4146 
4147                 rd_kafka_rdunlock(rk);
4148 
4149                 /* Wait for up to timeout_ms for a metadata refresh,
4150                  * if permitted by application. */
4151                 remains_ms = rd_timeout_remains(abs_timeout);
4152                 if (rd_timeout_expired(remains_ms))
4153                         return NULL;
4154 
4155                 rd_kafka_metadata_cache_wait_change(rk, remains_ms);
4156         }
4157 
4158         return NULL;
4159 }
4160 
4161 
rd_kafka_controllerid(rd_kafka_t * rk,int timeout_ms)4162 int32_t rd_kafka_controllerid (rd_kafka_t *rk, int timeout_ms) {
4163         rd_ts_t abs_timeout = rd_timeout_init(timeout_ms);
4164 
4165         /* ControllerId is returned in Metadata >=V1 responses and
4166          * cached on the rk. If no cached value is available
4167          * it means no metadata has been received yet, or we're
4168          * using a lower protocol version
4169          * (e.g., lack of api.version.request=true). */
4170 
4171         while (1) {
4172                 int remains_ms;
4173                 int version;
4174 
4175                 version = rd_kafka_brokers_get_state_version(rk);
4176 
4177                 rd_kafka_rdlock(rk);
4178 
4179                 if (rk->rk_controllerid != -1) {
4180                         /* Cached controllerid available. */
4181                         rd_kafka_rdunlock(rk);
4182                         return rk->rk_controllerid;
4183                 } else if (rk->rk_ts_metadata > 0) {
4184                         /* Metadata received but no clusterid,
4185                          * this probably means the broker is too old
4186                          * or api.version.request=false. */
4187                         rd_kafka_rdunlock(rk);
4188                         return -1;
4189                 }
4190 
4191                 rd_kafka_rdunlock(rk);
4192 
4193                 /* Wait for up to timeout_ms for a metadata refresh,
4194                  * if permitted by application. */
4195                 remains_ms = rd_timeout_remains(abs_timeout);
4196                 if (rd_timeout_expired(remains_ms))
4197                         return -1;
4198 
4199                 rd_kafka_brokers_wait_state_change(rk, version, remains_ms);
4200         }
4201 
4202         return -1;
4203 }
4204 
4205 
rd_kafka_opaque(const rd_kafka_t * rk)4206 void *rd_kafka_opaque (const rd_kafka_t *rk) {
4207         return rk->rk_conf.opaque;
4208 }
4209 
4210 
rd_kafka_outq_len(rd_kafka_t * rk)4211 int rd_kafka_outq_len (rd_kafka_t *rk) {
4212         return rd_kafka_curr_msgs_cnt(rk) + rd_kafka_q_len(rk->rk_rep) +
4213                 (rk->rk_background.q ? rd_kafka_q_len(rk->rk_background.q) : 0);
4214 }
4215 
4216 
rd_kafka_flush(rd_kafka_t * rk,int timeout_ms)4217 rd_kafka_resp_err_t rd_kafka_flush (rd_kafka_t *rk, int timeout_ms) {
4218         unsigned int msg_cnt = 0;
4219 
4220 	if (rk->rk_type != RD_KAFKA_PRODUCER)
4221 		return RD_KAFKA_RESP_ERR__NOT_IMPLEMENTED;
4222 
4223         rd_kafka_yield_thread = 0;
4224 
4225         /* Set flushing flag on the producer for the duration of the
4226          * flush() call. This tells producer_serve() that the linger.ms
4227          * time should be considered immediate. */
4228         rd_atomic32_add(&rk->rk_flushing, 1);
4229 
4230          /* Wake up all broker threads to trigger the produce_serve() call.
4231           * If this flush() call finishes before the broker wakes up
4232           * then no flushing will be performed by that broker thread. */
4233         rd_kafka_all_brokers_wakeup(rk, RD_KAFKA_BROKER_STATE_UP);
4234 
4235         if (rk->rk_drmode == RD_KAFKA_DR_MODE_EVENT) {
4236                 /* Application wants delivery reports as events rather
4237                  * than callbacks, we must thus not serve this queue
4238                  * with rd_kafka_poll() since that would trigger non-existent
4239                  * delivery report callbacks, which would result
4240                  * in the delivery reports being dropped.
4241                  * Instead we rely on the application to serve the event
4242                  * queue in another thread, so all we do here is wait
4243                  * for the current message count to reach zero. */
4244                 rd_kafka_curr_msgs_wait_zero(rk, timeout_ms, &msg_cnt);
4245 
4246         } else {
4247                 /* Standard poll interface.
4248                  *
4249                  * First poll call is non-blocking for the case
4250                  * where timeout_ms==RD_POLL_NOWAIT to make sure poll is
4251                  * called at least once. */
4252                 rd_ts_t ts_end = rd_timeout_init(timeout_ms);
4253                 int tmout = RD_POLL_NOWAIT;
4254                 int qlen = 0;
4255 
4256                 do {
4257                         rd_kafka_poll(rk, tmout);
4258                         qlen = rd_kafka_q_len(rk->rk_rep);
4259                         msg_cnt = rd_kafka_curr_msgs_cnt(rk);
4260                 } while (qlen + msg_cnt > 0 &&
4261                          !rd_kafka_yield_thread &&
4262                          (tmout = rd_timeout_remains_limit(ts_end, 10)) !=
4263                          RD_POLL_NOWAIT);
4264 
4265                 msg_cnt += qlen;
4266         }
4267 
4268         rd_atomic32_sub(&rk->rk_flushing, 1);
4269 
4270         return msg_cnt > 0 ? RD_KAFKA_RESP_ERR__TIMED_OUT :
4271                 RD_KAFKA_RESP_ERR_NO_ERROR;
4272 }
4273 
4274 /**
4275  * @brief Purge the partition message queue (according to \p purge_flags) for
4276  *        all toppars.
4277  *
4278  * This is a necessity to avoid the race condition when a purge() is scheduled
4279  * shortly in-between an rktp has been created but before it has been
4280  * joined to a broker handler thread.
4281  *
4282  * The rktp_xmit_msgq is handled by the broker-thread purge.
4283  *
4284  * @returns the number of messages purged.
4285  *
4286  * @locks_required rd_kafka_*lock()
4287  * @locks_acquired rd_kafka_topic_rdlock()
4288  */
4289 static int
rd_kafka_purge_toppars(rd_kafka_t * rk,int purge_flags)4290 rd_kafka_purge_toppars (rd_kafka_t *rk, int purge_flags) {
4291         rd_kafka_topic_t *rkt;
4292         int cnt = 0;
4293 
4294         TAILQ_FOREACH(rkt, &rk->rk_topics, rkt_link) {
4295                 rd_kafka_toppar_t *rktp;
4296                 int i;
4297 
4298                 rd_kafka_topic_rdlock(rkt);
4299                 for (i = 0 ; i < rkt->rkt_partition_cnt ; i++)
4300                         cnt += rd_kafka_toppar_purge_queues(
4301                                 rkt->rkt_p[i], purge_flags, rd_false/*!xmit*/);
4302 
4303                 RD_LIST_FOREACH(rktp, &rkt->rkt_desp, i)
4304                         cnt += rd_kafka_toppar_purge_queues(
4305                                 rktp, purge_flags, rd_false/*!xmit*/);
4306 
4307                 if (rkt->rkt_ua)
4308                         cnt += rd_kafka_toppar_purge_queues(
4309                                 rkt->rkt_ua, purge_flags, rd_false/*!xmit*/);
4310                 rd_kafka_topic_rdunlock(rkt);
4311         }
4312 
4313         return cnt;
4314 }
4315 
4316 
rd_kafka_purge(rd_kafka_t * rk,int purge_flags)4317 rd_kafka_resp_err_t rd_kafka_purge (rd_kafka_t *rk, int purge_flags) {
4318         rd_kafka_broker_t *rkb;
4319         rd_kafka_q_t *tmpq = NULL;
4320         int waitcnt = 0;
4321 
4322         if (rk->rk_type != RD_KAFKA_PRODUCER)
4323                 return RD_KAFKA_RESP_ERR__NOT_IMPLEMENTED;
4324 
4325         /* Check that future flags are not passed */
4326         if ((purge_flags & ~RD_KAFKA_PURGE_F_MASK) != 0)
4327                 return RD_KAFKA_RESP_ERR__INVALID_ARG;
4328 
4329         /* Nothing to purge */
4330         if (!purge_flags)
4331                 return RD_KAFKA_RESP_ERR_NO_ERROR;
4332 
4333         /* Set up a reply queue to wait for broker thread signalling
4334          * completion, unless non-blocking. */
4335         if (!(purge_flags & RD_KAFKA_PURGE_F_NON_BLOCKING))
4336                 tmpq = rd_kafka_q_new(rk);
4337 
4338         rd_kafka_rdlock(rk);
4339 
4340         /* Purge msgq for all toppars. */
4341         rd_kafka_purge_toppars(rk, purge_flags);
4342 
4343         /* Send purge request to all broker threads */
4344         TAILQ_FOREACH(rkb, &rk->rk_brokers, rkb_link) {
4345                 rd_kafka_broker_purge_queues(rkb, purge_flags,
4346                                              RD_KAFKA_REPLYQ(tmpq, 0));
4347                 waitcnt++;
4348         }
4349 
4350         rd_kafka_rdunlock(rk);
4351 
4352 
4353         if (tmpq) {
4354                 /* Wait for responses */
4355                 while (waitcnt-- > 0)
4356                         rd_kafka_q_wait_result(tmpq, RD_POLL_INFINITE);
4357 
4358                 rd_kafka_q_destroy_owner(tmpq);
4359         }
4360 
4361         /* Purge messages for the UA(-1) partitions (which are not
4362          * handled by a broker thread) */
4363         if (purge_flags & RD_KAFKA_PURGE_F_QUEUE)
4364                 rd_kafka_purge_ua_toppar_queues(rk);
4365 
4366         return RD_KAFKA_RESP_ERR_NO_ERROR;
4367 }
4368 
4369 
4370 
4371 
4372 /**
4373  * @returns a csv string of purge flags in thread-local storage
4374  */
rd_kafka_purge_flags2str(int flags)4375 const char *rd_kafka_purge_flags2str (int flags) {
4376         static const char *names[] = {
4377                 "queue",
4378                 "inflight",
4379                 "non-blocking",
4380                 NULL
4381         };
4382         static RD_TLS char ret[64];
4383 
4384         return rd_flags2str(ret, sizeof(ret), names, flags);
4385 }
4386 
4387 
rd_kafka_version(void)4388 int rd_kafka_version (void) {
4389 	return RD_KAFKA_VERSION;
4390 }
4391 
rd_kafka_version_str(void)4392 const char *rd_kafka_version_str (void) {
4393 	static RD_TLS char ret[128];
4394 	size_t of = 0, r;
4395 
4396 	if (*ret)
4397 		return ret;
4398 
4399 #ifdef LIBRDKAFKA_GIT_VERSION
4400 	if (*LIBRDKAFKA_GIT_VERSION) {
4401 		of = rd_snprintf(ret, sizeof(ret), "%s",
4402 				 *LIBRDKAFKA_GIT_VERSION == 'v' ?
4403                                  LIBRDKAFKA_GIT_VERSION+1 :
4404                                  LIBRDKAFKA_GIT_VERSION);
4405 		if (of > sizeof(ret))
4406 			of = sizeof(ret);
4407 	}
4408 #endif
4409 
4410 #define _my_sprintf(...) do {						\
4411 		r = rd_snprintf(ret+of, sizeof(ret)-of, __VA_ARGS__);	\
4412 		if (r > sizeof(ret)-of)					\
4413 			r = sizeof(ret)-of;				\
4414 		of += r;						\
4415 	} while(0)
4416 
4417 	if (of == 0) {
4418 		int ver = rd_kafka_version();
4419 		int prel = (ver & 0xff);
4420 		_my_sprintf("%i.%i.%i",
4421 			    (ver >> 24) & 0xff,
4422 			    (ver >> 16) & 0xff,
4423 			    (ver >> 8) & 0xff);
4424 		if (prel != 0xff) {
4425 			/* pre-builds below 200 are just running numbers,
4426 			 * above 200 are RC numbers. */
4427 			if (prel <= 200)
4428 				_my_sprintf("-pre%d", prel);
4429 			else
4430 				_my_sprintf("-RC%d", prel - 200);
4431 		}
4432 	}
4433 
4434 #if ENABLE_DEVEL
4435 	_my_sprintf("-devel");
4436 #endif
4437 
4438 #if WITHOUT_OPTIMIZATION
4439 	_my_sprintf("-O0");
4440 #endif
4441 
4442 	return ret;
4443 }
4444 
4445 
4446 /**
4447  * Assert trampoline to print some debugging information on crash.
4448  */
4449 void
4450 RD_NORETURN
rd_kafka_crash(const char * file,int line,const char * function,rd_kafka_t * rk,const char * reason)4451 rd_kafka_crash (const char *file, int line, const char *function,
4452                 rd_kafka_t *rk, const char *reason) {
4453         fprintf(stderr, "*** %s:%i:%s: %s ***\n",
4454                 file, line, function, reason);
4455         if (rk)
4456                 rd_kafka_dump0(stderr, rk, 0/*no locks*/);
4457         abort();
4458 }
4459 
4460 
4461 
4462 
4463 
4464 struct list_groups_state {
4465         rd_kafka_q_t *q;
4466         rd_kafka_resp_err_t err;
4467         int wait_cnt;
4468         const char *desired_group;
4469         struct rd_kafka_group_list *grplist;
4470         int grplist_size;
4471 };
4472 
rd_kafka_DescribeGroups_resp_cb(rd_kafka_t * rk,rd_kafka_broker_t * rkb,rd_kafka_resp_err_t err,rd_kafka_buf_t * reply,rd_kafka_buf_t * request,void * opaque)4473 static void rd_kafka_DescribeGroups_resp_cb (rd_kafka_t *rk,
4474 					     rd_kafka_broker_t *rkb,
4475                                              rd_kafka_resp_err_t err,
4476                                              rd_kafka_buf_t *reply,
4477                                              rd_kafka_buf_t *request,
4478                                              void *opaque) {
4479         struct list_groups_state *state;
4480         const int log_decode_errors = LOG_ERR;
4481         int cnt;
4482 
4483         if (err == RD_KAFKA_RESP_ERR__DESTROY) {
4484                 /* 'state' has gone out of scope due to list_groups()
4485                  * timing out and returning. */
4486                 return;
4487         }
4488 
4489         state = opaque;
4490         state->wait_cnt--;
4491 
4492         if (err)
4493                 goto err;
4494 
4495         rd_kafka_buf_read_i32(reply, &cnt);
4496 
4497         while (cnt-- > 0) {
4498                 int16_t ErrorCode;
4499                 rd_kafkap_str_t Group, GroupState, ProtoType, Proto;
4500                 int MemberCnt;
4501                 struct rd_kafka_group_info *gi;
4502 
4503                 if (state->grplist->group_cnt == state->grplist_size) {
4504                         /* Grow group array */
4505                         state->grplist_size *= 2;
4506                         state->grplist->groups =
4507                                 rd_realloc(state->grplist->groups,
4508                                            state->grplist_size *
4509                                            sizeof(*state->grplist->groups));
4510                 }
4511 
4512                 gi = &state->grplist->groups[state->grplist->group_cnt++];
4513                 memset(gi, 0, sizeof(*gi));
4514 
4515                 rd_kafka_buf_read_i16(reply, &ErrorCode);
4516                 rd_kafka_buf_read_str(reply, &Group);
4517                 rd_kafka_buf_read_str(reply, &GroupState);
4518                 rd_kafka_buf_read_str(reply, &ProtoType);
4519                 rd_kafka_buf_read_str(reply, &Proto);
4520                 rd_kafka_buf_read_i32(reply, &MemberCnt);
4521 
4522                 if (MemberCnt > 100000) {
4523                         err = RD_KAFKA_RESP_ERR__BAD_MSG;
4524                         goto err;
4525                 }
4526 
4527                 rd_kafka_broker_lock(rkb);
4528                 gi->broker.id = rkb->rkb_nodeid;
4529                 gi->broker.host = rd_strdup(rkb->rkb_origname);
4530                 gi->broker.port = rkb->rkb_port;
4531                 rd_kafka_broker_unlock(rkb);
4532 
4533                 gi->err = ErrorCode;
4534                 gi->group = RD_KAFKAP_STR_DUP(&Group);
4535                 gi->state = RD_KAFKAP_STR_DUP(&GroupState);
4536                 gi->protocol_type = RD_KAFKAP_STR_DUP(&ProtoType);
4537                 gi->protocol = RD_KAFKAP_STR_DUP(&Proto);
4538 
4539                 if (MemberCnt > 0)
4540                         gi->members =
4541                                 rd_malloc(MemberCnt * sizeof(*gi->members));
4542 
4543                 while (MemberCnt-- > 0) {
4544                         rd_kafkap_str_t MemberId, ClientId, ClientHost;
4545                         rd_kafkap_bytes_t Meta, Assignment;
4546                         struct rd_kafka_group_member_info *mi;
4547 
4548                         mi = &gi->members[gi->member_cnt++];
4549                         memset(mi, 0, sizeof(*mi));
4550 
4551                         rd_kafka_buf_read_str(reply, &MemberId);
4552                         rd_kafka_buf_read_str(reply, &ClientId);
4553                         rd_kafka_buf_read_str(reply, &ClientHost);
4554                         rd_kafka_buf_read_bytes(reply, &Meta);
4555                         rd_kafka_buf_read_bytes(reply, &Assignment);
4556 
4557                         mi->member_id = RD_KAFKAP_STR_DUP(&MemberId);
4558                         mi->client_id = RD_KAFKAP_STR_DUP(&ClientId);
4559                         mi->client_host = RD_KAFKAP_STR_DUP(&ClientHost);
4560 
4561                         if (RD_KAFKAP_BYTES_LEN(&Meta) == 0) {
4562                                 mi->member_metadata_size = 0;
4563                                 mi->member_metadata = NULL;
4564                         } else {
4565                                 mi->member_metadata_size =
4566                                         RD_KAFKAP_BYTES_LEN(&Meta);
4567                                 mi->member_metadata =
4568                                         rd_memdup(Meta.data,
4569                                                   mi->member_metadata_size);
4570                         }
4571 
4572                         if (RD_KAFKAP_BYTES_LEN(&Assignment) == 0) {
4573                                 mi->member_assignment_size = 0;
4574                                 mi->member_assignment = NULL;
4575                         } else {
4576                                 mi->member_assignment_size =
4577                                         RD_KAFKAP_BYTES_LEN(&Assignment);
4578                                 mi->member_assignment =
4579                                         rd_memdup(Assignment.data,
4580                                                   mi->member_assignment_size);
4581                         }
4582                 }
4583         }
4584 
4585 err:
4586         state->err = err;
4587         return;
4588 
4589  err_parse:
4590         state->err = reply->rkbuf_err;
4591 }
4592 
rd_kafka_ListGroups_resp_cb(rd_kafka_t * rk,rd_kafka_broker_t * rkb,rd_kafka_resp_err_t err,rd_kafka_buf_t * reply,rd_kafka_buf_t * request,void * opaque)4593 static void rd_kafka_ListGroups_resp_cb (rd_kafka_t *rk,
4594 					 rd_kafka_broker_t *rkb,
4595                                          rd_kafka_resp_err_t err,
4596                                          rd_kafka_buf_t *reply,
4597                                          rd_kafka_buf_t *request,
4598                                          void *opaque) {
4599         struct list_groups_state *state;
4600         const int log_decode_errors = LOG_ERR;
4601         int16_t ErrorCode;
4602         char **grps = NULL;
4603         int cnt, grpcnt, i = 0;
4604 
4605         if (err == RD_KAFKA_RESP_ERR__DESTROY) {
4606                 /* 'state' is no longer in scope because
4607                  * list_groups() timed out and returned to the caller.
4608                  * We must not touch anything here but simply return. */
4609                 return;
4610         }
4611 
4612         state = opaque;
4613 
4614         state->wait_cnt--;
4615 
4616         if (err)
4617                 goto err;
4618 
4619         rd_kafka_buf_read_i16(reply, &ErrorCode);
4620         if (ErrorCode) {
4621                 err = ErrorCode;
4622                 goto err;
4623         }
4624 
4625         rd_kafka_buf_read_i32(reply, &cnt);
4626 
4627         if (state->desired_group)
4628                 grpcnt = 1;
4629         else
4630                 grpcnt = cnt;
4631 
4632         if (cnt == 0 || grpcnt == 0)
4633                 return;
4634 
4635         grps = rd_malloc(sizeof(*grps) * grpcnt);
4636 
4637         while (cnt-- > 0) {
4638                 rd_kafkap_str_t grp, proto;
4639 
4640                 rd_kafka_buf_read_str(reply, &grp);
4641                 rd_kafka_buf_read_str(reply, &proto);
4642 
4643                 if (state->desired_group &&
4644                     rd_kafkap_str_cmp_str(&grp, state->desired_group))
4645                         continue;
4646 
4647                 grps[i++] = RD_KAFKAP_STR_DUP(&grp);
4648 
4649                 if (i == grpcnt)
4650                         break;
4651         }
4652 
4653         if (i > 0) {
4654                 state->wait_cnt++;
4655                 rd_kafka_DescribeGroupsRequest(rkb,
4656                                                (const char **)grps, i,
4657                                                RD_KAFKA_REPLYQ(state->q, 0),
4658                                                rd_kafka_DescribeGroups_resp_cb,
4659                                                state);
4660 
4661                 while (i-- > 0)
4662                         rd_free(grps[i]);
4663         }
4664 
4665 
4666         rd_free(grps);
4667 
4668 err:
4669         state->err = err;
4670         return;
4671 
4672  err_parse:
4673         if (grps)
4674                 rd_free(grps);
4675         state->err = reply->rkbuf_err;
4676 }
4677 
4678 rd_kafka_resp_err_t
rd_kafka_list_groups(rd_kafka_t * rk,const char * group,const struct rd_kafka_group_list ** grplistp,int timeout_ms)4679 rd_kafka_list_groups (rd_kafka_t *rk, const char *group,
4680                       const struct rd_kafka_group_list **grplistp,
4681                       int timeout_ms) {
4682         rd_kafka_broker_t *rkb;
4683         int rkb_cnt = 0;
4684         struct list_groups_state state = RD_ZERO_INIT;
4685         rd_ts_t ts_end = rd_timeout_init(timeout_ms);
4686 	int state_version = rd_kafka_brokers_get_state_version(rk);
4687 
4688         /* Wait until metadata has been fetched from cluster so
4689          * that we have a full broker list.
4690 	 * This state only happens during initial client setup, after that
4691 	 * there'll always be a cached metadata copy. */
4692         rd_kafka_rdlock(rk);
4693         while (!rk->rk_ts_metadata) {
4694                 rd_kafka_rdunlock(rk);
4695 
4696 		if (!rd_kafka_brokers_wait_state_change(
4697 			    rk, state_version, rd_timeout_remains(ts_end)))
4698                         return RD_KAFKA_RESP_ERR__TIMED_OUT;
4699 
4700                 rd_kafka_rdlock(rk);
4701         }
4702 
4703         state.q = rd_kafka_q_new(rk);
4704         state.desired_group = group;
4705         state.grplist = rd_calloc(1, sizeof(*state.grplist));
4706         state.grplist_size = group ? 1 : 32;
4707 
4708         state.grplist->groups = rd_malloc(state.grplist_size *
4709                                           sizeof(*state.grplist->groups));
4710 
4711         /* Query each broker for its list of groups */
4712         TAILQ_FOREACH(rkb, &rk->rk_brokers, rkb_link) {
4713                 rd_kafka_broker_lock(rkb);
4714                 if (rkb->rkb_nodeid == -1 || RD_KAFKA_BROKER_IS_LOGICAL(rkb)) {
4715                         rd_kafka_broker_unlock(rkb);
4716                         continue;
4717                 }
4718                 rd_kafka_broker_unlock(rkb);
4719 
4720                 state.wait_cnt++;
4721                 rkb_cnt++;
4722                 rd_kafka_ListGroupsRequest(rkb,
4723                                            RD_KAFKA_REPLYQ(state.q, 0),
4724                                            rd_kafka_ListGroups_resp_cb,
4725                                            &state);
4726         }
4727         rd_kafka_rdunlock(rk);
4728 
4729         if (rkb_cnt == 0) {
4730                 state.err = RD_KAFKA_RESP_ERR__TRANSPORT;
4731 
4732         } else {
4733                 int remains;
4734 
4735                 while (state.wait_cnt > 0 &&
4736                        !rd_timeout_expired((remains =
4737                                             rd_timeout_remains(ts_end)))) {
4738                         rd_kafka_q_serve(state.q, remains, 0,
4739                                          RD_KAFKA_Q_CB_CALLBACK,
4740                                          rd_kafka_poll_cb, NULL);
4741                         /* Ignore yields */
4742                 }
4743         }
4744 
4745         rd_kafka_q_destroy_owner(state.q);
4746 
4747         if (state.wait_cnt > 0 && !state.err) {
4748                 if (state.grplist->group_cnt == 0)
4749                         state.err = RD_KAFKA_RESP_ERR__TIMED_OUT;
4750                 else {
4751                         *grplistp = state.grplist;
4752                         return RD_KAFKA_RESP_ERR__PARTIAL;
4753                 }
4754         }
4755 
4756         if (state.err)
4757                 rd_kafka_group_list_destroy(state.grplist);
4758         else
4759                 *grplistp = state.grplist;
4760 
4761         return state.err;
4762 }
4763 
4764 
rd_kafka_group_list_destroy(const struct rd_kafka_group_list * grplist0)4765 void rd_kafka_group_list_destroy (const struct rd_kafka_group_list *grplist0) {
4766         struct rd_kafka_group_list *grplist =
4767                 (struct rd_kafka_group_list *)grplist0;
4768 
4769         while (grplist->group_cnt-- > 0) {
4770                 struct rd_kafka_group_info *gi;
4771                 gi = &grplist->groups[grplist->group_cnt];
4772 
4773                 if (gi->broker.host)
4774                         rd_free(gi->broker.host);
4775                 if (gi->group)
4776                         rd_free(gi->group);
4777                 if (gi->state)
4778                         rd_free(gi->state);
4779                 if (gi->protocol_type)
4780                         rd_free(gi->protocol_type);
4781                 if (gi->protocol)
4782                         rd_free(gi->protocol);
4783 
4784                 while (gi->member_cnt-- > 0) {
4785                         struct rd_kafka_group_member_info *mi;
4786                         mi = &gi->members[gi->member_cnt];
4787 
4788                         if (mi->member_id)
4789                                 rd_free(mi->member_id);
4790                         if (mi->client_id)
4791                                 rd_free(mi->client_id);
4792                         if (mi->client_host)
4793                                 rd_free(mi->client_host);
4794                         if (mi->member_metadata)
4795                                 rd_free(mi->member_metadata);
4796                         if (mi->member_assignment)
4797                                 rd_free(mi->member_assignment);
4798                 }
4799 
4800                 if (gi->members)
4801                         rd_free(gi->members);
4802         }
4803 
4804         if (grplist->groups)
4805                 rd_free(grplist->groups);
4806 
4807         rd_free(grplist);
4808 }
4809 
4810 
4811 
rd_kafka_get_debug_contexts(void)4812 const char *rd_kafka_get_debug_contexts(void) {
4813 	return RD_KAFKA_DEBUG_CONTEXTS;
4814 }
4815 
4816 
rd_kafka_path_is_dir(const char * path)4817 int rd_kafka_path_is_dir (const char *path) {
4818 #ifdef _WIN32
4819 	struct _stat st;
4820 	return (_stat(path, &st) == 0 && st.st_mode & S_IFDIR);
4821 #else
4822 	struct stat st;
4823 	return (stat(path, &st) == 0 && S_ISDIR(st.st_mode));
4824 #endif
4825 }
4826 
4827 
4828 /**
4829  * @returns true if directory is empty or can't be accessed, else false.
4830  */
rd_kafka_dir_is_empty(const char * path)4831 rd_bool_t rd_kafka_dir_is_empty (const char *path) {
4832 #if _WIN32
4833         /* FIXME: Unsupported */
4834         return rd_true;
4835 #else
4836         DIR *dir;
4837         struct dirent *d;
4838 #if defined(__sun)
4839         struct stat st;
4840         int ret = 0;
4841 #endif
4842 
4843         dir = opendir(path);
4844         if (!dir)
4845                 return rd_true;
4846 
4847         while ((d = readdir(dir))) {
4848 
4849                 if (!strcmp(d->d_name, ".") ||
4850                     !strcmp(d->d_name, ".."))
4851                         continue;
4852 
4853 #if defined(__sun)
4854                 ret = stat(d->d_name, &st);
4855                 if (ret != 0) {
4856                     return rd_true; // Can't be accessed
4857                 }
4858                 if (S_ISREG(st.st_mode) || S_ISDIR(st.st_mode) ||
4859                     S_ISLNK(st.st_mode)) {
4860 #else
4861                 if (d->d_type == DT_REG || d->d_type == DT_LNK ||
4862                     d->d_type == DT_DIR) {
4863 #endif
4864                         closedir(dir);
4865                         return rd_false;
4866                 }
4867         }
4868 
4869         closedir(dir);
4870         return rd_true;
4871 #endif
4872 }
4873 
4874 
4875 void *rd_kafka_mem_malloc (rd_kafka_t *rk, size_t size) {
4876         return rd_malloc(size);
4877 }
4878 
4879 void *rd_kafka_mem_calloc(rd_kafka_t *rk, size_t num, size_t size) {
4880         return rd_calloc(num, size);
4881 }
4882 
4883 void rd_kafka_mem_free (rd_kafka_t *rk, void *ptr) {
4884         rd_free(ptr);
4885 }
4886 
4887 
4888 int rd_kafka_errno (void) {
4889         return errno;
4890 }
4891 
4892 int rd_kafka_unittest (void) {
4893         return rd_unittest();
4894 }
4895