1 /*
2 * librdkafka - Apache Kafka C library
3 *
4 * Copyright (c) 2012-2013, Magnus Edenhill
5 * All rights reserved.
6 *
7 * Redistribution and use in source and binary forms, with or without
8 * modification, are permitted provided that the following conditions are met:
9 *
10 * 1. Redistributions of source code must retain the above copyright notice,
11 * this list of conditions and the following disclaimer.
12 * 2. Redistributions in binary form must reproduce the above copyright notice,
13 * this list of conditions and the following disclaimer in the documentation
14 * and/or other materials provided with the distribution.
15 *
16 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
17 * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
18 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
19 * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
20 * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
21 * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
22 * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
23 * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
24 * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
25 * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
26 * POSSIBILITY OF SUCH DAMAGE.
27 */
28
29 #ifndef _RDKAFKA_INT_H_
30 #define _RDKAFKA_INT_H_
31
32 #ifndef _MSC_VER
33 #define _GNU_SOURCE /* for strndup() */
34 #else
35 typedef int mode_t;
36 #endif
37 #include <fcntl.h>
38
39
40 #include "rdsysqueue.h"
41
42 #include "rdkafka.h"
43 #include "rd.h"
44 #include "rdlog.h"
45 #include "rdtime.h"
46 #include "rdaddr.h"
47 #include "rdinterval.h"
48 #include "rdavg.h"
49 #include "rdlist.h"
50
51 #if WITH_SSL
52 #include <openssl/ssl.h>
53 #endif
54
55
56
57
58 typedef struct rd_kafka_itopic_s rd_kafka_itopic_t;
59 typedef struct rd_ikafka_s rd_ikafka_t;
60
61
62 #define rd_kafka_assert(rk, cond) do { \
63 if (unlikely(!(cond))) \
64 rd_kafka_crash(__FILE__,__LINE__, __FUNCTION__, \
65 (rk), "assert: " # cond); \
66 } while (0)
67
68
69 void
70 RD_NORETURN
71 rd_kafka_crash (const char *file, int line, const char *function,
72 rd_kafka_t *rk, const char *reason);
73
74
75 /* Forward declarations */
76 struct rd_kafka_s;
77 struct rd_kafka_itopic_s;
78 struct rd_kafka_msg_s;
79 struct rd_kafka_broker_s;
80 struct rd_kafka_toppar_s;
81
82 typedef RD_SHARED_PTR_TYPE(, struct rd_kafka_toppar_s) shptr_rd_kafka_toppar_t;
83 typedef RD_SHARED_PTR_TYPE(, struct rd_kafka_itopic_s) shptr_rd_kafka_itopic_t;
84
85
86
87 #include "rdkafka_op.h"
88 #include "rdkafka_queue.h"
89 #include "rdkafka_msg.h"
90 #include "rdkafka_proto.h"
91 #include "rdkafka_buf.h"
92 #include "rdkafka_pattern.h"
93 #include "rdkafka_conf.h"
94 #include "rdkafka_transport.h"
95 #include "rdkafka_timer.h"
96 #include "rdkafka_assignor.h"
97 #include "rdkafka_metadata.h"
98 #include "rdkafka_mock.h"
99 #include "rdkafka_partition.h"
100 #include "rdkafka_coord.h"
101 #include "rdkafka_mock.h"
102
103 /**
104 * Protocol level sanity
105 */
106 #define RD_KAFKAP_BROKERS_MAX 10000
107 #define RD_KAFKAP_TOPICS_MAX 1000000
108 #define RD_KAFKAP_PARTITIONS_MAX 100000
109
110
111 #define RD_KAFKA_OFFSET_IS_LOGICAL(OFF) ((OFF) < 0)
112
113
114
115
116 /**
117 * @enum Idempotent Producer state
118 */
119 typedef enum {
120 RD_KAFKA_IDEMP_STATE_INIT, /**< Initial state */
121 RD_KAFKA_IDEMP_STATE_TERM, /**< Instance is terminating */
122 RD_KAFKA_IDEMP_STATE_FATAL_ERROR, /**< A fatal error has been raised */
123 RD_KAFKA_IDEMP_STATE_REQ_PID, /**< Request new PID */
124 RD_KAFKA_IDEMP_STATE_WAIT_TRANSPORT, /**< Waiting for coordinator to
125 * become available. */
126 RD_KAFKA_IDEMP_STATE_WAIT_PID, /**< PID requested, waiting for reply */
127 RD_KAFKA_IDEMP_STATE_ASSIGNED, /**< New PID assigned */
128 RD_KAFKA_IDEMP_STATE_DRAIN_RESET, /**< Wait for outstanding
129 * ProduceRequests to finish
130 * before resetting and
131 * re-requesting a new PID. */
132 RD_KAFKA_IDEMP_STATE_DRAIN_BUMP, /**< Wait for outstanding
133 * ProduceRequests to finish
134 * before bumping the current
135 * epoch. */
136 } rd_kafka_idemp_state_t;
137
138 /**
139 * @returns the idemp_state_t string representation
140 */
141 static RD_UNUSED const char *
rd_kafka_idemp_state2str(rd_kafka_idemp_state_t state)142 rd_kafka_idemp_state2str (rd_kafka_idemp_state_t state) {
143 static const char *names[] = {
144 "Init",
145 "Terminate",
146 "FatalError",
147 "RequestPID",
148 "WaitTransport",
149 "WaitPID",
150 "Assigned",
151 "DrainReset",
152 "DrainBump"
153 };
154 return names[state];
155 }
156
157
158
159
160 /**
161 * @enum Transactional Producer state
162 */
163 typedef enum {
164 /**< Initial state */
165 RD_KAFKA_TXN_STATE_INIT,
166 /**< Awaiting PID to be acquired by rdkafka_idempotence.c */
167 RD_KAFKA_TXN_STATE_WAIT_PID,
168 /**< PID acquired, but application has not made a successful
169 * init_transactions() call. */
170 RD_KAFKA_TXN_STATE_READY_NOT_ACKED,
171 /**< PID acquired, no active transaction. */
172 RD_KAFKA_TXN_STATE_READY,
173 /**< begin_transaction() has been called. */
174 RD_KAFKA_TXN_STATE_IN_TRANSACTION,
175 /**< commit_transaction() has been called. */
176 RD_KAFKA_TXN_STATE_BEGIN_COMMIT,
177 /**< commit_transaction() has been called and all outstanding
178 * messages, partitions, and offsets have been sent. */
179 RD_KAFKA_TXN_STATE_COMMITTING_TRANSACTION,
180 /**< abort_transaction() has been called. */
181 RD_KAFKA_TXN_STATE_ABORTING_TRANSACTION,
182 /**< An abortable error has occurred. */
183 RD_KAFKA_TXN_STATE_ABORTABLE_ERROR,
184 /* A fatal error has occured. */
185 RD_KAFKA_TXN_STATE_FATAL_ERROR
186 } rd_kafka_txn_state_t;
187
188
189 /**
190 * @returns the txn_state_t string representation
191 */
192 static RD_UNUSED const char *
rd_kafka_txn_state2str(rd_kafka_txn_state_t state)193 rd_kafka_txn_state2str (rd_kafka_txn_state_t state) {
194 static const char *names[] = {
195 "Init",
196 "WaitPID",
197 "ReadyNotAcked",
198 "Ready",
199 "InTransaction",
200 "BeginCommit",
201 "CommittingTransaction",
202 "AbortingTransaction",
203 "AbortableError",
204 "FatalError"
205 };
206 return names[state];
207 }
208
209
210
211
212
213 /**
214 * Kafka handle, internal representation of the application's rd_kafka_t.
215 */
216
217 typedef RD_SHARED_PTR_TYPE(shptr_rd_ikafka_s, rd_ikafka_t) shptr_rd_ikafka_t;
218
219 struct rd_kafka_s {
220 rd_kafka_q_t *rk_rep; /* kafka -> application reply queue */
221 rd_kafka_q_t *rk_ops; /* any -> rdkafka main thread ops */
222
223 TAILQ_HEAD(, rd_kafka_broker_s) rk_brokers;
224 rd_list_t rk_broker_by_id; /* Fast id lookups. */
225 rd_atomic32_t rk_broker_cnt;
226 /**< Number of brokers in state >= UP */
227 rd_atomic32_t rk_broker_up_cnt;
228 /**< Number of logical brokers in state >= UP, this is a sub-set
229 * of rk_broker_up_cnt. */
230 rd_atomic32_t rk_logical_broker_up_cnt;
231 /**< Number of brokers that are down, only includes brokers
232 * that have had at least one connection attempt. */
233 rd_atomic32_t rk_broker_down_cnt;
234 /**< Logical brokers currently without an address.
235 * Used for calculating ERR__ALL_BROKERS_DOWN. */
236 rd_atomic32_t rk_broker_addrless_cnt;
237
238 mtx_t rk_internal_rkb_lock;
239 rd_kafka_broker_t *rk_internal_rkb;
240
241 /* Broadcasting of broker state changes to wake up
242 * functions waiting for a state change. */
243 cnd_t rk_broker_state_change_cnd;
244 mtx_t rk_broker_state_change_lock;
245 int rk_broker_state_change_version;
246 /* List of (rd_kafka_enq_once_t*) objects waiting for broker
247 * state changes. Protected by rk_broker_state_change_lock. */
248 rd_list_t rk_broker_state_change_waiters; /**< (rd_kafka_enq_once_t*) */
249
250 TAILQ_HEAD(, rd_kafka_itopic_s) rk_topics;
251 int rk_topic_cnt;
252
253 struct rd_kafka_cgrp_s *rk_cgrp;
254
255 rd_kafka_conf_t rk_conf;
256 rd_kafka_q_t *rk_logq; /* Log queue if `log.queue` set */
257 char rk_name[128];
258 rd_kafkap_str_t *rk_client_id;
259 rd_kafkap_str_t *rk_group_id; /* Consumer group id */
260
261 rd_atomic32_t rk_terminate; /**< Set to RD_KAFKA_DESTROY_F_..
262 * flags instance
263 * is being destroyed.
264 * The value set is the
265 * destroy flags from
266 * rd_kafka_destroy*() and
267 * the two internal flags shown
268 * below.
269 *
270 * Order:
271 * 1. user_flags | .._F_DESTROY_CALLED
272 * is set in rd_kafka_destroy*().
273 * 2. consumer_close() is called
274 * for consumers.
275 * 3. .._F_TERMINATE is set to
276 * signal all background threads
277 * to terminate.
278 */
279
280 #define RD_KAFKA_DESTROY_F_TERMINATE 0x1 /**< Internal flag to make sure
281 * rk_terminate is set to non-zero
282 * value even if user passed
283 * no destroy flags. */
284 #define RD_KAFKA_DESTROY_F_DESTROY_CALLED 0x2 /**< Application has called
285 * ..destroy*() and we've
286 * begun the termination
287 * process.
288 * This flag is needed to avoid
289 * rk_terminate from being
290 * 0 when destroy_flags()
291 * is called with flags=0
292 * and prior to _F_TERMINATE
293 * has been set. */
294 #define RD_KAFKA_DESTROY_F_IMMEDIATE 0x4 /**< Immediate non-blocking
295 * destruction without waiting
296 * for all resources
297 * to be cleaned up.
298 * WARNING: Memory and resource
299 * leaks possible.
300 * This flag automatically sets
301 * .._NO_CONSUMER_CLOSE. */
302
303
304 rwlock_t rk_lock;
305 rd_kafka_type_t rk_type;
306 struct timeval rk_tv_state_change;
307
308 rd_atomic64_t rk_ts_last_poll; /**< Timestamp of last application
309 * consumer_poll() call
310 * (or equivalent).
311 * Used to enforce
312 * max.poll.interval.ms.
313 * Only relevant for consumer. */
314 /* First fatal error. */
315 struct {
316 rd_atomic32_t err; /**< rd_kafka_resp_err_t */
317 char *errstr; /**< Protected by rk_lock */
318 int cnt; /**< Number of errors raised, only
319 * the first one is stored. */
320 } rk_fatal;
321
322 rd_atomic32_t rk_last_throttle; /* Last throttle_time_ms value
323 * from broker. */
324
325 /* Locks: rd_kafka_*lock() */
326 rd_ts_t rk_ts_metadata; /* Timestamp of most recent
327 * metadata. */
328
329 struct rd_kafka_metadata *rk_full_metadata; /* Last full metadata. */
330 rd_ts_t rk_ts_full_metadata; /* Timesstamp of .. */
331 struct rd_kafka_metadata_cache rk_metadata_cache; /* Metadata cache */
332
333 char *rk_clusterid; /* ClusterId from metadata */
334 int32_t rk_controllerid; /* ControllerId from metadata */
335
336 /**< Producer: Delivery report mode */
337 enum {
338 RD_KAFKA_DR_MODE_NONE, /**< No delivery reports */
339 RD_KAFKA_DR_MODE_CB, /**< Delivery reports through callback */
340 RD_KAFKA_DR_MODE_EVENT, /**< Delivery reports through event API*/
341 } rk_drmode;
342
343 /* Simple consumer count:
344 * >0: Running in legacy / Simple Consumer mode,
345 * 0: No consumers running
346 * <0: Running in High level consumer mode */
347 rd_atomic32_t rk_simple_cnt;
348
349 /**
350 * Exactly Once Semantics and Idempotent Producer
351 *
352 * @locks rk_lock
353 */
354 struct {
355 /*
356 * Idempotence
357 */
358 rd_kafka_idemp_state_t idemp_state; /**< Idempotent Producer
359 * state */
360 rd_ts_t ts_idemp_state;/**< Last state change */
361 rd_kafka_pid_t pid; /**< Current Producer ID and Epoch */
362 int epoch_cnt; /**< Number of times pid/epoch changed */
363 rd_atomic32_t inflight_toppar_cnt; /**< Current number of
364 * toppars with inflight
365 * requests. */
366 rd_kafka_timer_t pid_tmr; /**< PID FSM timer */
367
368 /*
369 * Transactions
370 *
371 * All field access is from the rdkafka main thread,
372 * unless a specific lock is mentioned in the doc string.
373 *
374 */
375 rd_atomic32_t txn_may_enq; /**< Transaction state allows
376 * application to enqueue
377 * (produce) messages. */
378
379 rd_kafkap_str_t *transactional_id; /**< transactional.id */
380 rd_kafka_txn_state_t txn_state; /**< Transactional state.
381 * @locks rk_lock */
382 rd_ts_t ts_txn_state; /**< Last state change.
383 * @locks rk_lock */
384 rd_kafka_broker_t *txn_coord; /**< Transaction coordinator,
385 * this is a logical broker.*/
386 rd_kafka_broker_t *txn_curr_coord; /**< Current actual coord
387 * broker.
388 * This is only used to
389 * check if the coord
390 * changes. */
391 rd_kafka_broker_monitor_t txn_coord_mon; /**< Monitor for
392 * coordinator to
393 * take action when
394 * the broker state
395 * changes. */
396
397 /**< Blocking transactional API application call
398 * currently being handled, its state, reply queue and how
399 * to handle timeout.
400 * Only one transactional API call is allowed at any time.
401 * Protected by the rk_lock. */
402 struct {
403 char name[64]; /**< API name, e.g.,
404 * SendOffsetsToTransaction */
405 rd_kafka_timer_t tmr; /**< Timeout timer, the timeout
406 * is specified by the app. */
407
408 int flags; /**< Flags */
409 #define RD_KAFKA_TXN_CURR_API_F_ABORT_ON_TIMEOUT 0x1 /**< Set state to abortable
410 * error on timeout,
411 * i.e., fail the txn,
412 * and set txn_requires_abort
413 * on the returned error.
414 */
415 #define RD_KAFKA_TXN_CURR_API_F_RETRIABLE_ON_TIMEOUT 0x2 /**< Set retriable flag
416 * on the error
417 * on timeout. */
418 #define RD_KAFKA_TXN_CURR_API_F_FOR_REUSE 0x4 /**< Do not reset the
419 * current API when it
420 * completes successfully
421 * Instead keep it alive
422 * and allow reuse with
423 * .._F_REUSE, blocking
424 * any non-F_REUSE
425 * curr API calls. */
426 #define RD_KAFKA_TXN_CURR_API_F_REUSE 0x8 /**< Reuse/continue with
427 * current API state.
428 * This is used for
429 * multi-stage APIs,
430 * such as txn commit. */
431 } txn_curr_api;
432
433 /**< Copy (and reference) of the original init_transactions(),
434 * but out-lives the timeout of the curr API.
435 * This is used as the reply queue for when the
436 * black box idempotent producer has acquired the
437 * initial PID (or fails to do so).
438 * Since that acquisition may take longer than the
439 * init_transactions() API timeout this extra reference
440 * needs to be kept around.
441 * If the originating init_transactions() call has timed
442 * out and returned this queue reference simply points
443 * to a disabled queue that will discard any ops enqueued.
444 *
445 * @locks rk_lock
446 */
447 rd_kafka_q_t *txn_init_rkq;
448
449 int txn_req_cnt; /**< Number of transaction
450 * requests sent.
451 * This is incremented when a
452 * AddPartitionsToTxn or
453 * AddOffsetsToTxn request
454 * has been sent for the
455 * current transaction,
456 * to keep track of
457 * whether the broker is
458 * aware of the current
459 * transaction and thus
460 * requires an EndTxn request
461 * on abort or not. */
462
463 /**< Timer to trigger registration of pending partitions */
464 rd_kafka_timer_t txn_register_parts_tmr;
465
466 /**< Lock for txn_pending_rktps and txn_waitresp_rktps */
467 mtx_t txn_pending_lock;
468
469 /**< Partitions pending being added to transaction. */
470 rd_kafka_toppar_tqhead_t txn_pending_rktps;
471
472 /**< Partitions in-flight added to transaction. */
473 rd_kafka_toppar_tqhead_t txn_waitresp_rktps;
474
475 /**< Partitions added and registered to transaction. */
476 rd_kafka_toppar_tqhead_t txn_rktps;
477
478 /**< Current transaction error. */
479 rd_kafka_resp_err_t txn_err;
480
481 /**< Current transaction error string, if any. */
482 char *txn_errstr;
483
484 /**< Last InitProducerIdRequest error. */
485 rd_kafka_resp_err_t txn_init_err;
486
487 /**< Waiting for transaction coordinator query response */
488 rd_bool_t txn_wait_coord;
489
490 /**< Transaction coordinator query timer */
491 rd_kafka_timer_t txn_coord_tmr;
492 } rk_eos;
493
494 /**<
495 * Coordinator cache.
496 *
497 * @locks none
498 * @locality rdkafka main thread
499 */
500 rd_kafka_coord_cache_t rk_coord_cache; /**< Coordinator cache */
501
502 TAILQ_HEAD(, rd_kafka_coord_req_s) rk_coord_reqs; /**< Coordinator
503 * requests */
504
505 const rd_kafkap_bytes_t *rk_null_bytes;
506
507 struct {
508 mtx_t lock; /* Protects acces to this struct */
509 cnd_t cnd; /* For waking up blocking injectors */
510 unsigned int cnt; /* Current message count */
511 size_t size; /* Current message size sum */
512 unsigned int max_cnt; /* Max limit */
513 size_t max_size; /* Max limit */
514 } rk_curr_msgs;
515
516 rd_kafka_timers_t rk_timers;
517 thrd_t rk_thread;
518
519 int rk_initialized; /**< Will be > 0 when the rd_kafka_t
520 * instance has been fully initialized. */
521
522 int rk_init_wait_cnt; /**< Number of background threads that
523 * need to finish initialization. */
524 cnd_t rk_init_cnd; /**< Cond-var used to wait for main thread
525 * to finish its initialization before
526 * before rd_kafka_new() returns. */
527 mtx_t rk_init_lock; /**< Lock for rk_init_wait and _cmd */
528
529 /**
530 * Background thread and queue,
531 * enabled by setting `background_event_cb()`.
532 */
533 struct {
534 rd_kafka_q_t *q; /**< Queue served by background thread. */
535 thrd_t thread; /**< Background thread. */
536 int calling; /**< Indicates whether the event callback
537 * is being called, reset back to 0
538 * when the callback returns.
539 * This can be used for troubleshooting
540 * purposes. */
541 } rk_background;
542
543
544 /*
545 * Logs, events or actions to rate limit / suppress
546 */
547 struct {
548 /**< Log: No brokers support Idempotent Producer */
549 rd_interval_t no_idemp_brokers;
550
551 /**< Sparse connections: randomly select broker
552 * to bring up. This interval should allow
553 * for a previous connection to be established,
554 * which varies between different environments:
555 * Use 10 < reconnect.backoff.jitter.ms / 2 < 1000.
556 */
557 rd_interval_t sparse_connect_random;
558 /**< Lock for sparse_connect_random */
559 mtx_t sparse_connect_lock;
560
561 /**< Broker metadata refresh interval:
562 * this is rate-limiting the number of topic-less
563 * broker/cluster metadata refreshes when there are no
564 * topics to refresh.
565 * Will be refreshed every topic.metadata.refresh.interval.ms
566 * but no more often than every 10s.
567 * No locks: only accessed by rdkafka main thread. */
568 rd_interval_t broker_metadata_refresh;
569 } rk_suppress;
570
571 struct {
572 void *handle; /**< Provider-specific handle struct pointer.
573 * Typically assigned in provider's .init() */
574 } rk_sasl;
575
576 /* Test mocks */
577 struct {
578 rd_kafka_mock_cluster_t *cluster; /**< Mock cluster, created
579 * by test.mock.num.brokers
580 */
581 rd_atomic32_t cluster_cnt; /**< Total number of mock
582 * clusters, created either
583 * through
584 * test.mock.num.brokers
585 * or mock_cluster_new().
586 */
587
588 } rk_mock;
589 };
590
591 #define rd_kafka_wrlock(rk) rwlock_wrlock(&(rk)->rk_lock)
592 #define rd_kafka_rdlock(rk) rwlock_rdlock(&(rk)->rk_lock)
593 #define rd_kafka_rdunlock(rk) rwlock_rdunlock(&(rk)->rk_lock)
594 #define rd_kafka_wrunlock(rk) rwlock_wrunlock(&(rk)->rk_lock)
595
596
597 /**
598 * @brief Add \p cnt messages and of total size \p size bytes to the
599 * internal bookkeeping of current message counts.
600 * If the total message count or size after add would exceed the
601 * configured limits \c queue.buffering.max.messages and
602 * \c queue.buffering.max.kbytes then depending on the value of
603 * \p block the function either blocks until enough space is available
604 * if \p block is 1, else immediately returns
605 * RD_KAFKA_RESP_ERR__QUEUE_FULL.
606 *
607 * @param rdmtx If non-null and \p block is set and blocking is to ensue,
608 * then unlock this mutex for the duration of the blocking
609 * and then reacquire with a read-lock.
610 */
611 static RD_INLINE RD_UNUSED rd_kafka_resp_err_t
rd_kafka_curr_msgs_add(rd_kafka_t * rk,unsigned int cnt,size_t size,int block,rwlock_t * rdlock)612 rd_kafka_curr_msgs_add (rd_kafka_t *rk, unsigned int cnt, size_t size,
613 int block, rwlock_t *rdlock) {
614
615 if (rk->rk_type != RD_KAFKA_PRODUCER)
616 return RD_KAFKA_RESP_ERR_NO_ERROR;
617
618 mtx_lock(&rk->rk_curr_msgs.lock);
619 while (unlikely(rk->rk_curr_msgs.cnt + cnt >
620 rk->rk_curr_msgs.max_cnt ||
621 (unsigned long long)(rk->rk_curr_msgs.size + size) >
622 (unsigned long long)rk->rk_curr_msgs.max_size)) {
623 if (!block) {
624 mtx_unlock(&rk->rk_curr_msgs.lock);
625 return RD_KAFKA_RESP_ERR__QUEUE_FULL;
626 }
627
628 if (rdlock)
629 rwlock_rdunlock(rdlock);
630
631 cnd_wait(&rk->rk_curr_msgs.cnd, &rk->rk_curr_msgs.lock);
632
633 if (rdlock)
634 rwlock_rdlock(rdlock);
635
636 }
637
638 rk->rk_curr_msgs.cnt += cnt;
639 rk->rk_curr_msgs.size += size;
640 mtx_unlock(&rk->rk_curr_msgs.lock);
641
642 return RD_KAFKA_RESP_ERR_NO_ERROR;
643 }
644
645
646 /**
647 * @brief Subtract \p cnt messages of total size \p size from the
648 * current bookkeeping and broadcast a wakeup on the condvar
649 * for any waiting & blocking threads.
650 */
651 static RD_INLINE RD_UNUSED void
rd_kafka_curr_msgs_sub(rd_kafka_t * rk,unsigned int cnt,size_t size)652 rd_kafka_curr_msgs_sub (rd_kafka_t *rk, unsigned int cnt, size_t size) {
653 int broadcast = 0;
654
655 if (rk->rk_type != RD_KAFKA_PRODUCER)
656 return;
657
658 mtx_lock(&rk->rk_curr_msgs.lock);
659 rd_kafka_assert(NULL,
660 rk->rk_curr_msgs.cnt >= cnt &&
661 rk->rk_curr_msgs.size >= size);
662
663 /* If the subtraction would pass one of the thresholds
664 * broadcast a wake-up to any waiting listeners. */
665 if ((rk->rk_curr_msgs.cnt - cnt == 0) ||
666 (rk->rk_curr_msgs.cnt >= rk->rk_curr_msgs.max_cnt &&
667 rk->rk_curr_msgs.cnt - cnt < rk->rk_curr_msgs.max_cnt) ||
668 (rk->rk_curr_msgs.size >= rk->rk_curr_msgs.max_size &&
669 rk->rk_curr_msgs.size - size < rk->rk_curr_msgs.max_size))
670 broadcast = 1;
671
672 rk->rk_curr_msgs.cnt -= cnt;
673 rk->rk_curr_msgs.size -= size;
674
675 if (unlikely(broadcast))
676 cnd_broadcast(&rk->rk_curr_msgs.cnd);
677
678 mtx_unlock(&rk->rk_curr_msgs.lock);
679 }
680
681 static RD_INLINE RD_UNUSED void
rd_kafka_curr_msgs_get(rd_kafka_t * rk,unsigned int * cntp,size_t * sizep)682 rd_kafka_curr_msgs_get (rd_kafka_t *rk, unsigned int *cntp, size_t *sizep) {
683 if (rk->rk_type != RD_KAFKA_PRODUCER) {
684 *cntp = 0;
685 *sizep = 0;
686 return;
687 }
688
689 mtx_lock(&rk->rk_curr_msgs.lock);
690 *cntp = rk->rk_curr_msgs.cnt;
691 *sizep = rk->rk_curr_msgs.size;
692 mtx_unlock(&rk->rk_curr_msgs.lock);
693 }
694
695 static RD_INLINE RD_UNUSED int
rd_kafka_curr_msgs_cnt(rd_kafka_t * rk)696 rd_kafka_curr_msgs_cnt (rd_kafka_t *rk) {
697 int cnt;
698 if (rk->rk_type != RD_KAFKA_PRODUCER)
699 return 0;
700
701 mtx_lock(&rk->rk_curr_msgs.lock);
702 cnt = rk->rk_curr_msgs.cnt;
703 mtx_unlock(&rk->rk_curr_msgs.lock);
704
705 return cnt;
706 }
707
708 /**
709 * @brief Wait until \p tspec for curr_msgs to reach 0.
710 *
711 * @returns remaining curr_msgs
712 */
713 static RD_INLINE RD_UNUSED int
rd_kafka_curr_msgs_wait_zero(rd_kafka_t * rk,const struct timespec * tspec)714 rd_kafka_curr_msgs_wait_zero (rd_kafka_t *rk, const struct timespec *tspec) {
715 int cnt;
716
717 mtx_lock(&rk->rk_curr_msgs.lock);
718 while ((cnt = rk->rk_curr_msgs.cnt) > 0) {
719 cnd_timedwait_abs(&rk->rk_curr_msgs.cnd,
720 &rk->rk_curr_msgs.lock,
721 tspec);
722 }
723 mtx_unlock(&rk->rk_curr_msgs.lock);
724
725 return cnt;
726 }
727
728 void rd_kafka_destroy_final (rd_kafka_t *rk);
729
730 void rd_kafka_global_init (void);
731
732 /**
733 * @returns true if \p rk handle is terminating.
734 *
735 * @remark If consumer_close() is called from destroy*() it will be
736 * called prior to _F_TERMINATE being set and will thus not
737 * be able to use rd_kafka_terminating() to know it is shutting down.
738 * That code should instead just check that rk_terminate is non-zero
739 * (the _F_DESTROY_CALLED flag will be set).
740 */
741 #define rd_kafka_terminating(rk) (rd_atomic32_get(&(rk)->rk_terminate) & \
742 RD_KAFKA_DESTROY_F_TERMINATE)
743
744 /**
745 * @returns the destroy flags set matching \p flags, which might be
746 * a subset of the flags.
747 */
748 #define rd_kafka_destroy_flags_check(rk,flags) \
749 (rd_atomic32_get(&(rk)->rk_terminate) & (flags))
750
751 /**
752 * @returns true if no consumer callbacks, or standard consumer_close
753 * behaviour, should be triggered. */
754 #define rd_kafka_destroy_flags_no_consumer_close(rk) \
755 rd_kafka_destroy_flags_check(rk, RD_KAFKA_DESTROY_F_NO_CONSUMER_CLOSE)
756
757 #define rd_kafka_is_simple_consumer(rk) \
758 (rd_atomic32_get(&(rk)->rk_simple_cnt) > 0)
759 int rd_kafka_simple_consumer_add (rd_kafka_t *rk);
760
761
762 /**
763 * @returns true if idempotency is enabled (producer only).
764 */
765 #define rd_kafka_is_idempotent(rk) ((rk)->rk_conf.eos.idempotence)
766
767 /**
768 * @returns true if the producer is transactional (producer only).
769 */
770 #define rd_kafka_is_transactional(rk) \
771 ((rk)->rk_conf.eos.transactional_id != NULL)
772
773
774 #define RD_KAFKA_PURGE_F_ABORT_TXN 0x100 /**< Internal flag used when
775 * aborting transaction */
776 #define RD_KAFKA_PURGE_F_MASK 0x107
777 const char *rd_kafka_purge_flags2str (int flags);
778
779
780 #include "rdkafka_topic.h"
781 #include "rdkafka_partition.h"
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796 /**
797 * Debug contexts
798 */
799 #define RD_KAFKA_DBG_GENERIC 0x1
800 #define RD_KAFKA_DBG_BROKER 0x2
801 #define RD_KAFKA_DBG_TOPIC 0x4
802 #define RD_KAFKA_DBG_METADATA 0x8
803 #define RD_KAFKA_DBG_FEATURE 0x10
804 #define RD_KAFKA_DBG_QUEUE 0x20
805 #define RD_KAFKA_DBG_MSG 0x40
806 #define RD_KAFKA_DBG_PROTOCOL 0x80
807 #define RD_KAFKA_DBG_CGRP 0x100
808 #define RD_KAFKA_DBG_SECURITY 0x200
809 #define RD_KAFKA_DBG_FETCH 0x400
810 #define RD_KAFKA_DBG_INTERCEPTOR 0x800
811 #define RD_KAFKA_DBG_PLUGIN 0x1000
812 #define RD_KAFKA_DBG_CONSUMER 0x2000
813 #define RD_KAFKA_DBG_ADMIN 0x4000
814 #define RD_KAFKA_DBG_EOS 0x8000
815 #define RD_KAFKA_DBG_MOCK 0x10000
816 #define RD_KAFKA_DBG_ALL 0xfffff
817 #define RD_KAFKA_DBG_NONE 0x0
818
819 void rd_kafka_log0(const rd_kafka_conf_t *conf,
820 const rd_kafka_t *rk, const char *extra, int level,
821 const char *fac, const char *fmt, ...) RD_FORMAT(printf,
822 6, 7);
823
824 #define rd_kafka_log(rk,level,fac,...) \
825 rd_kafka_log0(&rk->rk_conf, rk, NULL, level, fac, __VA_ARGS__)
826 #define rd_kafka_dbg(rk,ctx,fac,...) do { \
827 if (unlikely((rk)->rk_conf.debug & (RD_KAFKA_DBG_ ## ctx))) \
828 rd_kafka_log0(&rk->rk_conf,rk,NULL, \
829 LOG_DEBUG,fac,__VA_ARGS__); \
830 } while (0)
831
832 /* dbg() not requiring an rk, just the conf object, for early logging */
833 #define rd_kafka_dbg0(conf,ctx,fac,...) do { \
834 if (unlikely((conf)->debug & (RD_KAFKA_DBG_ ## ctx))) \
835 rd_kafka_log0(conf,NULL,NULL, \
836 LOG_DEBUG,fac,__VA_ARGS__); \
837 } while (0)
838
839 /* NOTE: The local copy of _logname is needed due rkb_logname_lock lock-ordering
840 * when logging another broker's name in the message. */
841 #define rd_rkb_log(rkb,level,fac,...) do { \
842 char _logname[RD_KAFKA_NODENAME_SIZE]; \
843 mtx_lock(&(rkb)->rkb_logname_lock); \
844 rd_strlcpy(_logname, rkb->rkb_logname, sizeof(_logname)); \
845 mtx_unlock(&(rkb)->rkb_logname_lock); \
846 rd_kafka_log0(&(rkb)->rkb_rk->rk_conf, \
847 (rkb)->rkb_rk, _logname, \
848 level, fac, __VA_ARGS__); \
849 } while (0)
850
851 #define rd_rkb_dbg(rkb,ctx,fac,...) do { \
852 if (unlikely((rkb)->rkb_rk->rk_conf.debug & \
853 (RD_KAFKA_DBG_ ## ctx))) { \
854 rd_rkb_log(rkb, LOG_DEBUG, fac, __VA_ARGS__); \
855 } \
856 } while (0)
857
858
859
860 extern rd_kafka_resp_err_t RD_TLS rd_kafka_last_error_code;
861
862 static RD_UNUSED RD_INLINE
rd_kafka_set_last_error(rd_kafka_resp_err_t err,int errnox)863 rd_kafka_resp_err_t rd_kafka_set_last_error (rd_kafka_resp_err_t err,
864 int errnox) {
865 if (errnox) {
866 /* MSVC:
867 * This is the correct way to set errno on Windows,
868 * but it is still pointless due to different errnos in
869 * in different runtimes:
870 * https://social.msdn.microsoft.com/Forums/vstudio/en-US/b4500c0d-1b69-40c7-9ef5-08da1025b5bf/setting-errno-from-within-a-dll?forum=vclanguage/
871 * errno is thus highly deprecated, and buggy, on Windows
872 * when using librdkafka as a dynamically loaded DLL. */
873 rd_set_errno(errnox);
874 }
875 rd_kafka_last_error_code = err;
876 return err;
877 }
878
879
880 int rd_kafka_set_fatal_error0 (rd_kafka_t *rk, rd_dolock_t do_lock,
881 rd_kafka_resp_err_t err,
882 const char *fmt, ...) RD_FORMAT(printf, 4, 5);
883 #define rd_kafka_set_fatal_error(rk,err,fmt,...) \
884 rd_kafka_set_fatal_error0(rk, RD_DO_LOCK, err, fmt, __VA_ARGS__)
885
886 static RD_INLINE RD_UNUSED rd_kafka_resp_err_t
rd_kafka_fatal_error_code(rd_kafka_t * rk)887 rd_kafka_fatal_error_code (rd_kafka_t *rk) {
888 /* This is an optimization to avoid an atomic read which are costly
889 * on some platforms:
890 * Fatal errors are currently only raised by the idempotent producer
891 * and static consumers (group.instance.id). */
892 if ((rk->rk_type == RD_KAFKA_PRODUCER && rk->rk_conf.eos.idempotence) ||
893 (rk->rk_type == RD_KAFKA_CONSUMER && rk->rk_conf.group_instance_id))
894 return rd_atomic32_get(&rk->rk_fatal.err);
895
896 return RD_KAFKA_RESP_ERR_NO_ERROR;
897 }
898
899
900 extern rd_atomic32_t rd_kafka_thread_cnt_curr;
901 extern char RD_TLS rd_kafka_thread_name[64];
902
903 void rd_kafka_set_thread_name (const char *fmt, ...);
904 void rd_kafka_set_thread_sysname (const char *fmt, ...);
905
906 int rd_kafka_path_is_dir (const char *path);
907 rd_bool_t rd_kafka_dir_is_empty (const char *path);
908
909 rd_kafka_op_res_t
910 rd_kafka_poll_cb (rd_kafka_t *rk, rd_kafka_q_t *rkq, rd_kafka_op_t *rko,
911 rd_kafka_q_cb_type_t cb_type, void *opaque);
912
913 rd_kafka_resp_err_t rd_kafka_subscribe_rkt (rd_kafka_itopic_t *rkt);
914
915
916 /**
917 * @returns the number of milliseconds the maximum poll interval
918 * was exceeded, or 0 if not exceeded.
919 *
920 * @remark Only relevant for high-level consumer.
921 *
922 * @locality any
923 * @locks none
924 */
925 static RD_INLINE RD_UNUSED int
rd_kafka_max_poll_exceeded(rd_kafka_t * rk)926 rd_kafka_max_poll_exceeded (rd_kafka_t *rk) {
927 rd_ts_t last_poll;
928 int exceeded;
929
930 if (rk->rk_type != RD_KAFKA_CONSUMER)
931 return 0;
932
933 last_poll = rd_atomic64_get(&rk->rk_ts_last_poll);
934
935 /* Application is blocked in librdkafka function, see
936 * rd_kafka_app_poll_blocking(). */
937 if (last_poll == INT64_MAX)
938 return 0;
939
940 exceeded = (int)((rd_clock() - last_poll) / 1000ll) -
941 rk->rk_conf.max_poll_interval_ms;
942
943 if (unlikely(exceeded > 0))
944 return exceeded;
945
946 return 0;
947 }
948
949 /**
950 * @brief Call on entry to blocking polling function to indicate
951 * that the application is blocked waiting for librdkafka
952 * and that max.poll.interval.ms should not be enforced.
953 *
954 * Call app_polled() Upon return from the function calling
955 * this function to register the application's last time of poll.
956 *
957 * @remark Only relevant for high-level consumer.
958 *
959 * @locality any
960 * @locks none
961 */
962 static RD_INLINE RD_UNUSED void
rd_kafka_app_poll_blocking(rd_kafka_t * rk)963 rd_kafka_app_poll_blocking (rd_kafka_t *rk) {
964 if (rk->rk_type == RD_KAFKA_CONSUMER)
965 rd_atomic64_set(&rk->rk_ts_last_poll, INT64_MAX);
966 }
967
968 /**
969 * @brief Set the last application poll time to now.
970 *
971 * @remark Only relevant for high-level consumer.
972 *
973 * @locality any
974 * @locks none
975 */
976 static RD_INLINE RD_UNUSED void
rd_kafka_app_polled(rd_kafka_t * rk)977 rd_kafka_app_polled (rd_kafka_t *rk) {
978 if (rk->rk_type == RD_KAFKA_CONSUMER)
979 rd_atomic64_set(&rk->rk_ts_last_poll, rd_clock());
980 }
981
982
983 /**
984 * rdkafka_background.c
985 */
986 int rd_kafka_background_thread_main (void *arg);
987
988 #endif /* _RDKAFKA_INT_H_ */
989