1 /*
2  * librdkafka - Apache Kafka C library
3  *
4  * Copyright (c) 2012-2013, Magnus Edenhill
5  * All rights reserved.
6  *
7  * Redistribution and use in source and binary forms, with or without
8  * modification, are permitted provided that the following conditions are met:
9  *
10  * 1. Redistributions of source code must retain the above copyright notice,
11  *    this list of conditions and the following disclaimer.
12  * 2. Redistributions in binary form must reproduce the above copyright notice,
13  *    this list of conditions and the following disclaimer in the documentation
14  *    and/or other materials provided with the distribution.
15  *
16  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
17  * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
18  * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
19  * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
20  * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
21  * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
22  * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
23  * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
24  * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
25  * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
26  * POSSIBILITY OF SUCH DAMAGE.
27  */
28 
29 #ifndef _RDKAFKA_INT_H_
30 #define _RDKAFKA_INT_H_
31 
32 #ifndef _MSC_VER
33 #define _GNU_SOURCE  /* for strndup() */
34 #else
35 typedef int mode_t;
36 #endif
37 #include <fcntl.h>
38 
39 
40 #include "rdsysqueue.h"
41 
42 #include "rdkafka.h"
43 #include "rd.h"
44 #include "rdlog.h"
45 #include "rdtime.h"
46 #include "rdaddr.h"
47 #include "rdinterval.h"
48 #include "rdavg.h"
49 #include "rdlist.h"
50 
51 #if WITH_SSL
52 #include <openssl/ssl.h>
53 #endif
54 
55 
56 
57 
58 typedef struct rd_kafka_itopic_s rd_kafka_itopic_t;
59 typedef struct rd_ikafka_s rd_ikafka_t;
60 
61 
62 #define rd_kafka_assert(rk, cond) do {                                  \
63                 if (unlikely(!(cond)))                                  \
64                         rd_kafka_crash(__FILE__,__LINE__, __FUNCTION__, \
65                                        (rk), "assert: " # cond);        \
66         } while (0)
67 
68 
69 void
70 RD_NORETURN
71 rd_kafka_crash (const char *file, int line, const char *function,
72                 rd_kafka_t *rk, const char *reason);
73 
74 
75 /* Forward declarations */
76 struct rd_kafka_s;
77 struct rd_kafka_itopic_s;
78 struct rd_kafka_msg_s;
79 struct rd_kafka_broker_s;
80 struct rd_kafka_toppar_s;
81 
82 typedef RD_SHARED_PTR_TYPE(, struct rd_kafka_toppar_s) shptr_rd_kafka_toppar_t;
83 typedef RD_SHARED_PTR_TYPE(, struct rd_kafka_itopic_s) shptr_rd_kafka_itopic_t;
84 
85 
86 
87 #include "rdkafka_op.h"
88 #include "rdkafka_queue.h"
89 #include "rdkafka_msg.h"
90 #include "rdkafka_proto.h"
91 #include "rdkafka_buf.h"
92 #include "rdkafka_pattern.h"
93 #include "rdkafka_conf.h"
94 #include "rdkafka_transport.h"
95 #include "rdkafka_timer.h"
96 #include "rdkafka_assignor.h"
97 #include "rdkafka_metadata.h"
98 #include "rdkafka_mock.h"
99 #include "rdkafka_partition.h"
100 #include "rdkafka_coord.h"
101 #include "rdkafka_mock.h"
102 
103 /**
104  * Protocol level sanity
105  */
106 #define RD_KAFKAP_BROKERS_MAX     10000
107 #define RD_KAFKAP_TOPICS_MAX      1000000
108 #define RD_KAFKAP_PARTITIONS_MAX  100000
109 
110 
111 #define RD_KAFKA_OFFSET_IS_LOGICAL(OFF)  ((OFF) < 0)
112 
113 
114 
115 
116 /**
117  * @enum Idempotent Producer state
118  */
119 typedef enum {
120         RD_KAFKA_IDEMP_STATE_INIT,      /**< Initial state */
121         RD_KAFKA_IDEMP_STATE_TERM,      /**< Instance is terminating */
122         RD_KAFKA_IDEMP_STATE_FATAL_ERROR, /**< A fatal error has been raised */
123         RD_KAFKA_IDEMP_STATE_REQ_PID,   /**< Request new PID */
124         RD_KAFKA_IDEMP_STATE_WAIT_TRANSPORT, /**< Waiting for coordinator to
125                                               *   become available. */
126         RD_KAFKA_IDEMP_STATE_WAIT_PID,  /**< PID requested, waiting for reply */
127         RD_KAFKA_IDEMP_STATE_ASSIGNED,  /**< New PID assigned */
128         RD_KAFKA_IDEMP_STATE_DRAIN_RESET, /**< Wait for outstanding
129                                            *   ProduceRequests to finish
130                                            *   before resetting and
131                                            *   re-requesting a new PID. */
132         RD_KAFKA_IDEMP_STATE_DRAIN_BUMP, /**< Wait for outstanding
133                                           *   ProduceRequests to finish
134                                           *   before bumping the current
135                                           *   epoch. */
136 } rd_kafka_idemp_state_t;
137 
138 /**
139  * @returns the idemp_state_t string representation
140  */
141 static RD_UNUSED const char *
rd_kafka_idemp_state2str(rd_kafka_idemp_state_t state)142 rd_kafka_idemp_state2str (rd_kafka_idemp_state_t state) {
143         static const char *names[] = {
144                 "Init",
145                 "Terminate",
146                 "FatalError",
147                 "RequestPID",
148                 "WaitTransport",
149                 "WaitPID",
150                 "Assigned",
151                 "DrainReset",
152                 "DrainBump"
153         };
154         return names[state];
155 }
156 
157 
158 
159 
160 /**
161  * @enum Transactional Producer state
162  */
163 typedef enum {
164         /**< Initial state */
165         RD_KAFKA_TXN_STATE_INIT,
166         /**< Awaiting PID to be acquired by rdkafka_idempotence.c */
167         RD_KAFKA_TXN_STATE_WAIT_PID,
168         /**< PID acquired, but application has not made a successful
169          *   init_transactions() call. */
170         RD_KAFKA_TXN_STATE_READY_NOT_ACKED,
171         /**< PID acquired, no active transaction. */
172         RD_KAFKA_TXN_STATE_READY,
173         /**< begin_transaction() has been called. */
174         RD_KAFKA_TXN_STATE_IN_TRANSACTION,
175         /**< commit_transaction() has been called. */
176         RD_KAFKA_TXN_STATE_BEGIN_COMMIT,
177         /**< commit_transaction() has been called and all outstanding
178          *   messages, partitions, and offsets have been sent. */
179         RD_KAFKA_TXN_STATE_COMMITTING_TRANSACTION,
180         /**< abort_transaction() has been called. */
181         RD_KAFKA_TXN_STATE_ABORTING_TRANSACTION,
182         /**< An abortable error has occurred. */
183         RD_KAFKA_TXN_STATE_ABORTABLE_ERROR,
184         /* A fatal error has occured. */
185         RD_KAFKA_TXN_STATE_FATAL_ERROR
186 } rd_kafka_txn_state_t;
187 
188 
189 /**
190  * @returns the txn_state_t string representation
191  */
192 static RD_UNUSED const char *
rd_kafka_txn_state2str(rd_kafka_txn_state_t state)193 rd_kafka_txn_state2str (rd_kafka_txn_state_t state) {
194         static const char *names[] = {
195                 "Init",
196                 "WaitPID",
197                 "ReadyNotAcked",
198                 "Ready",
199                 "InTransaction",
200                 "BeginCommit",
201                 "CommittingTransaction",
202                 "AbortingTransaction",
203                 "AbortableError",
204                 "FatalError"
205         };
206         return names[state];
207 }
208 
209 
210 
211 
212 
213 /**
214  * Kafka handle, internal representation of the application's rd_kafka_t.
215  */
216 
217 typedef RD_SHARED_PTR_TYPE(shptr_rd_ikafka_s, rd_ikafka_t) shptr_rd_ikafka_t;
218 
219 struct rd_kafka_s {
220 	rd_kafka_q_t *rk_rep;   /* kafka -> application reply queue */
221 	rd_kafka_q_t *rk_ops;   /* any -> rdkafka main thread ops */
222 
223 	TAILQ_HEAD(, rd_kafka_broker_s) rk_brokers;
224         rd_list_t                  rk_broker_by_id; /* Fast id lookups. */
225 	rd_atomic32_t              rk_broker_cnt;
226         /**< Number of brokers in state >= UP */
227         rd_atomic32_t              rk_broker_up_cnt;
228         /**< Number of logical brokers in state >= UP, this is a sub-set
229          *   of rk_broker_up_cnt. */
230         rd_atomic32_t              rk_logical_broker_up_cnt;
231         /**< Number of brokers that are down, only includes brokers
232          *   that have had at least one connection attempt. */
233 	rd_atomic32_t              rk_broker_down_cnt;
234         /**< Logical brokers currently without an address.
235          *   Used for calculating ERR__ALL_BROKERS_DOWN. */
236         rd_atomic32_t              rk_broker_addrless_cnt;
237 
238         mtx_t                      rk_internal_rkb_lock;
239 	rd_kafka_broker_t         *rk_internal_rkb;
240 
241 	/* Broadcasting of broker state changes to wake up
242 	 * functions waiting for a state change. */
243 	cnd_t                      rk_broker_state_change_cnd;
244 	mtx_t                      rk_broker_state_change_lock;
245 	int                        rk_broker_state_change_version;
246         /* List of (rd_kafka_enq_once_t*) objects waiting for broker
247          * state changes. Protected by rk_broker_state_change_lock. */
248         rd_list_t rk_broker_state_change_waiters; /**< (rd_kafka_enq_once_t*) */
249 
250 	TAILQ_HEAD(, rd_kafka_itopic_s)  rk_topics;
251 	int              rk_topic_cnt;
252 
253         struct rd_kafka_cgrp_s *rk_cgrp;
254 
255         rd_kafka_conf_t  rk_conf;
256         rd_kafka_q_t    *rk_logq;          /* Log queue if `log.queue` set */
257         char             rk_name[128];
258 	rd_kafkap_str_t *rk_client_id;
259         rd_kafkap_str_t *rk_group_id;    /* Consumer group id */
260 
261 	rd_atomic32_t    rk_terminate;   /**< Set to RD_KAFKA_DESTROY_F_..
262                                           *   flags instance
263                                           *   is being destroyed.
264                                           *   The value set is the
265                                           *   destroy flags from
266                                           *   rd_kafka_destroy*() and
267                                           *   the two internal flags shown
268                                           *   below.
269                                           *
270                                           * Order:
271                                           * 1. user_flags | .._F_DESTROY_CALLED
272                                           *    is set in rd_kafka_destroy*().
273                                           * 2. consumer_close() is called
274                                           *    for consumers.
275                                           * 3. .._F_TERMINATE is set to
276                                           *    signal all background threads
277                                           *    to terminate.
278                                           */
279 
280 #define RD_KAFKA_DESTROY_F_TERMINATE 0x1 /**< Internal flag to make sure
281                                           *   rk_terminate is set to non-zero
282                                           *   value even if user passed
283                                           *   no destroy flags. */
284 #define RD_KAFKA_DESTROY_F_DESTROY_CALLED 0x2 /**< Application has called
285                                                *  ..destroy*() and we've
286                                                * begun the termination
287                                                * process.
288                                                * This flag is needed to avoid
289                                                * rk_terminate from being
290                                                * 0 when destroy_flags()
291                                                * is called with flags=0
292                                                * and prior to _F_TERMINATE
293                                                * has been set. */
294 #define RD_KAFKA_DESTROY_F_IMMEDIATE 0x4     /**< Immediate non-blocking
295                                               *   destruction without waiting
296                                               *   for all resources
297                                               *   to be cleaned up.
298                                               *   WARNING: Memory and resource
299                                               *            leaks possible.
300                                               *   This flag automatically sets
301                                               *   .._NO_CONSUMER_CLOSE. */
302 
303 
304 	rwlock_t         rk_lock;
305 	rd_kafka_type_t  rk_type;
306 	struct timeval   rk_tv_state_change;
307 
308         rd_atomic64_t    rk_ts_last_poll;   /**< Timestamp of last application
309                                              *   consumer_poll() call
310                                              *   (or equivalent).
311                                              *   Used to enforce
312                                              *   max.poll.interval.ms.
313                                              *   Only relevant for consumer. */
314         /* First fatal error. */
315         struct {
316                 rd_atomic32_t err; /**< rd_kafka_resp_err_t */
317                 char *errstr;      /**< Protected by rk_lock */
318                 int cnt;           /**< Number of errors raised, only
319                                     *   the first one is stored. */
320         } rk_fatal;
321 
322 	rd_atomic32_t    rk_last_throttle;  /* Last throttle_time_ms value
323 					     * from broker. */
324 
325         /* Locks: rd_kafka_*lock() */
326         rd_ts_t          rk_ts_metadata;    /* Timestamp of most recent
327                                              * metadata. */
328 
329 	struct rd_kafka_metadata *rk_full_metadata; /* Last full metadata. */
330 	rd_ts_t          rk_ts_full_metadata;       /* Timesstamp of .. */
331         struct rd_kafka_metadata_cache rk_metadata_cache; /* Metadata cache */
332 
333         char            *rk_clusterid;      /* ClusterId from metadata */
334         int32_t          rk_controllerid;   /* ControllerId from metadata */
335 
336         /**< Producer: Delivery report mode */
337         enum {
338                 RD_KAFKA_DR_MODE_NONE,  /**< No delivery reports */
339                 RD_KAFKA_DR_MODE_CB,    /**< Delivery reports through callback */
340                 RD_KAFKA_DR_MODE_EVENT, /**< Delivery reports through event API*/
341         } rk_drmode;
342 
343         /* Simple consumer count:
344          *  >0: Running in legacy / Simple Consumer mode,
345          *   0: No consumers running
346          *  <0: Running in High level consumer mode */
347         rd_atomic32_t    rk_simple_cnt;
348 
349         /**
350          * Exactly Once Semantics and Idempotent Producer
351          *
352          * @locks rk_lock
353          */
354         struct {
355                 /*
356                  * Idempotence
357                  */
358                 rd_kafka_idemp_state_t idemp_state; /**< Idempotent Producer
359                                                      *   state */
360                 rd_ts_t ts_idemp_state;/**< Last state change */
361                 rd_kafka_pid_t pid;    /**< Current Producer ID and Epoch */
362                 int epoch_cnt;         /**< Number of times pid/epoch changed */
363                 rd_atomic32_t inflight_toppar_cnt; /**< Current number of
364                                                     *   toppars with inflight
365                                                     *   requests. */
366                 rd_kafka_timer_t pid_tmr; /**< PID FSM timer */
367 
368                 /*
369                  * Transactions
370                  *
371                  * All field access is from the rdkafka main thread,
372                  * unless a specific lock is mentioned in the doc string.
373                  *
374                  */
375                 rd_atomic32_t txn_may_enq;      /**< Transaction state allows
376                                                  *   application to enqueue
377                                                  *   (produce) messages. */
378 
379                 rd_kafkap_str_t *transactional_id; /**< transactional.id */
380                 rd_kafka_txn_state_t txn_state; /**< Transactional state.
381                                                  *   @locks rk_lock */
382                 rd_ts_t ts_txn_state;           /**< Last state change.
383                                                  *   @locks rk_lock */
384                 rd_kafka_broker_t *txn_coord;   /**< Transaction coordinator,
385                                                  *   this is a logical broker.*/
386                 rd_kafka_broker_t *txn_curr_coord; /**< Current actual coord
387                                                     *   broker.
388                                                     *   This is only used to
389                                                     *   check if the coord
390                                                     *   changes. */
391                 rd_kafka_broker_monitor_t txn_coord_mon; /**< Monitor for
392                                                           *   coordinator to
393                                                           *   take action when
394                                                           *   the broker state
395                                                           *   changes. */
396 
397                 /**< Blocking transactional API application call
398                  *   currently being handled, its state, reply queue and how
399                  *   to handle timeout.
400                  *   Only one transactional API call is allowed at any time.
401                  *   Protected by the rk_lock. */
402                 struct {
403                         char name[64];        /**< API name, e.g.,
404                                                *   SendOffsetsToTransaction */
405                         rd_kafka_timer_t tmr; /**< Timeout timer, the timeout
406                                                * is specified by the app. */
407 
408                         int flags;            /**< Flags */
409 #define RD_KAFKA_TXN_CURR_API_F_ABORT_ON_TIMEOUT 0x1 /**< Set state to abortable
410                                                       *   error on timeout,
411                                                       *   i.e., fail the txn,
412                                                       *   and set txn_requires_abort
413                                                       *   on the returned error.
414                                                       */
415 #define RD_KAFKA_TXN_CURR_API_F_RETRIABLE_ON_TIMEOUT 0x2 /**< Set retriable flag
416                                                           *   on the error
417                                                           *   on timeout. */
418 #define RD_KAFKA_TXN_CURR_API_F_FOR_REUSE 0x4        /**< Do not reset the
419                                                       *   current API when it
420                                                       *   completes successfully
421                                                       *   Instead keep it alive
422                                                       *   and allow reuse with
423                                                       *   .._F_REUSE, blocking
424                                                       *   any non-F_REUSE
425                                                       *   curr API calls. */
426 #define RD_KAFKA_TXN_CURR_API_F_REUSE     0x8        /**< Reuse/continue with
427                                                       *   current API state.
428                                                       *   This is used for
429                                                       *   multi-stage APIs,
430                                                       *   such as txn commit. */
431                 } txn_curr_api;
432 
433                 /**< Copy (and reference) of the original init_transactions(),
434                  *   but out-lives the timeout of the curr API.
435                  *   This is used as the reply queue for when the
436                  *   black box idempotent producer has acquired the
437                  *   initial PID (or fails to do so).
438                  *   Since that acquisition may take longer than the
439                  *   init_transactions() API timeout this extra reference
440                  *   needs to be kept around.
441                  *   If the originating init_transactions() call has timed
442                  *   out and returned this queue reference simply points
443                  *   to a disabled queue that will discard any ops enqueued.
444                  *
445                  *   @locks rk_lock
446                  */
447                 rd_kafka_q_t *txn_init_rkq;
448 
449                 int txn_req_cnt;                /**< Number of transaction
450                                                  *   requests sent.
451                                                  *   This is incremented when a
452                                                  *   AddPartitionsToTxn or
453                                                  *   AddOffsetsToTxn request
454                                                  *   has been sent for the
455                                                  *   current transaction,
456                                                  *   to keep track of
457                                                  *   whether the broker is
458                                                  *   aware of the current
459                                                  *   transaction and thus
460                                                  *   requires an EndTxn request
461                                                  *   on abort or not. */
462 
463                 /**< Timer to trigger registration of pending partitions */
464                 rd_kafka_timer_t         txn_register_parts_tmr;
465 
466                 /**< Lock for txn_pending_rktps and txn_waitresp_rktps */
467                 mtx_t                    txn_pending_lock;
468 
469                 /**< Partitions pending being added to transaction. */
470                 rd_kafka_toppar_tqhead_t txn_pending_rktps;
471 
472                 /**< Partitions in-flight added to transaction. */
473                 rd_kafka_toppar_tqhead_t txn_waitresp_rktps;
474 
475                 /**< Partitions added and registered to transaction. */
476                 rd_kafka_toppar_tqhead_t txn_rktps;
477 
478                 /**< Current transaction error. */
479                 rd_kafka_resp_err_t txn_err;
480 
481                 /**< Current transaction error string, if any. */
482                 char               *txn_errstr;
483 
484                 /**< Last InitProducerIdRequest error. */
485                 rd_kafka_resp_err_t txn_init_err;
486 
487                 /**< Waiting for transaction coordinator query response */
488                 rd_bool_t           txn_wait_coord;
489 
490                 /**< Transaction coordinator query timer */
491                 rd_kafka_timer_t    txn_coord_tmr;
492         } rk_eos;
493 
494         /**<
495          * Coordinator cache.
496          *
497          * @locks none
498          * @locality rdkafka main thread
499          */
500         rd_kafka_coord_cache_t   rk_coord_cache; /**< Coordinator cache */
501 
502         TAILQ_HEAD(, rd_kafka_coord_req_s) rk_coord_reqs; /**< Coordinator
503                                                            *   requests */
504 
505 	const rd_kafkap_bytes_t *rk_null_bytes;
506 
507 	struct {
508 		mtx_t lock;       /* Protects acces to this struct */
509 		cnd_t cnd;        /* For waking up blocking injectors */
510 		unsigned int cnt; /* Current message count */
511 		size_t size;      /* Current message size sum */
512 	        unsigned int max_cnt; /* Max limit */
513 		size_t max_size; /* Max limit */
514 	} rk_curr_msgs;
515 
516         rd_kafka_timers_t rk_timers;
517 	thrd_t rk_thread;
518 
519         int rk_initialized;       /**< Will be > 0 when the rd_kafka_t
520                                    *   instance has been fully initialized. */
521 
522         int   rk_init_wait_cnt;   /**< Number of background threads that
523                                    *   need to finish initialization. */
524         cnd_t rk_init_cnd;        /**< Cond-var used to wait for main thread
525                                    *   to finish its initialization before
526                                    *   before rd_kafka_new() returns. */
527         mtx_t rk_init_lock;       /**< Lock for rk_init_wait and _cmd */
528 
529         /**
530          * Background thread and queue,
531          * enabled by setting `background_event_cb()`.
532          */
533         struct {
534                 rd_kafka_q_t *q;  /**< Queue served by background thread. */
535                 thrd_t thread;    /**< Background thread. */
536                 int calling;      /**< Indicates whether the event callback
537                                    *   is being called, reset back to 0
538                                    *   when the callback returns.
539                                    *   This can be used for troubleshooting
540                                    *   purposes. */
541         } rk_background;
542 
543 
544         /*
545          * Logs, events or actions to rate limit / suppress
546          */
547         struct {
548                 /**< Log: No brokers support Idempotent Producer */
549                 rd_interval_t no_idemp_brokers;
550 
551                 /**< Sparse connections: randomly select broker
552                  *   to bring up. This interval should allow
553                  *   for a previous connection to be established,
554                  *   which varies between different environments:
555                  *   Use 10 < reconnect.backoff.jitter.ms / 2 < 1000.
556                  */
557                 rd_interval_t sparse_connect_random;
558                 /**< Lock for sparse_connect_random */
559                 mtx_t         sparse_connect_lock;
560 
561                 /**< Broker metadata refresh interval:
562                  *   this is rate-limiting the number of topic-less
563                  *   broker/cluster metadata refreshes when there are no
564                  *   topics to refresh.
565                  *   Will be refreshed every topic.metadata.refresh.interval.ms
566                  *   but no more often than every 10s.
567                  *   No locks: only accessed by rdkafka main thread. */
568                 rd_interval_t broker_metadata_refresh;
569         } rk_suppress;
570 
571         struct {
572                 void *handle; /**< Provider-specific handle struct pointer.
573                                *   Typically assigned in provider's .init() */
574         } rk_sasl;
575 
576         /* Test mocks */
577         struct {
578                 rd_kafka_mock_cluster_t *cluster; /**< Mock cluster, created
579                                                    *   by test.mock.num.brokers
580                                                    */
581                 rd_atomic32_t cluster_cnt;        /**< Total number of mock
582                                                    *   clusters, created either
583                                                    *   through
584                                                    *   test.mock.num.brokers
585                                                    *   or mock_cluster_new().
586                                                    */
587 
588         } rk_mock;
589 };
590 
591 #define rd_kafka_wrlock(rk)    rwlock_wrlock(&(rk)->rk_lock)
592 #define rd_kafka_rdlock(rk)    rwlock_rdlock(&(rk)->rk_lock)
593 #define rd_kafka_rdunlock(rk)    rwlock_rdunlock(&(rk)->rk_lock)
594 #define rd_kafka_wrunlock(rk)    rwlock_wrunlock(&(rk)->rk_lock)
595 
596 
597 /**
598  * @brief Add \p cnt messages and of total size \p size bytes to the
599  *        internal bookkeeping of current message counts.
600  *        If the total message count or size after add would exceed the
601  *        configured limits \c queue.buffering.max.messages and
602  *        \c queue.buffering.max.kbytes then depending on the value of
603  *        \p block the function either blocks until enough space is available
604  *        if \p block is 1, else immediately returns
605  *        RD_KAFKA_RESP_ERR__QUEUE_FULL.
606  *
607  * @param rdmtx If non-null and \p block is set and blocking is to ensue,
608  *              then unlock this mutex for the duration of the blocking
609  *              and then reacquire with a read-lock.
610  */
611 static RD_INLINE RD_UNUSED rd_kafka_resp_err_t
rd_kafka_curr_msgs_add(rd_kafka_t * rk,unsigned int cnt,size_t size,int block,rwlock_t * rdlock)612 rd_kafka_curr_msgs_add (rd_kafka_t *rk, unsigned int cnt, size_t size,
613 			int block, rwlock_t *rdlock) {
614 
615 	if (rk->rk_type != RD_KAFKA_PRODUCER)
616 		return RD_KAFKA_RESP_ERR_NO_ERROR;
617 
618 	mtx_lock(&rk->rk_curr_msgs.lock);
619 	while (unlikely(rk->rk_curr_msgs.cnt + cnt >
620 			rk->rk_curr_msgs.max_cnt ||
621 			(unsigned long long)(rk->rk_curr_msgs.size + size) >
622 			(unsigned long long)rk->rk_curr_msgs.max_size)) {
623 		if (!block) {
624 			mtx_unlock(&rk->rk_curr_msgs.lock);
625 			return RD_KAFKA_RESP_ERR__QUEUE_FULL;
626 		}
627 
628                 if (rdlock)
629                         rwlock_rdunlock(rdlock);
630 
631 		cnd_wait(&rk->rk_curr_msgs.cnd, &rk->rk_curr_msgs.lock);
632 
633                 if (rdlock)
634                         rwlock_rdlock(rdlock);
635 
636 	}
637 
638 	rk->rk_curr_msgs.cnt  += cnt;
639 	rk->rk_curr_msgs.size += size;
640 	mtx_unlock(&rk->rk_curr_msgs.lock);
641 
642 	return RD_KAFKA_RESP_ERR_NO_ERROR;
643 }
644 
645 
646 /**
647  * @brief Subtract \p cnt messages of total size \p size from the
648  *        current bookkeeping and broadcast a wakeup on the condvar
649  *        for any waiting & blocking threads.
650  */
651 static RD_INLINE RD_UNUSED void
rd_kafka_curr_msgs_sub(rd_kafka_t * rk,unsigned int cnt,size_t size)652 rd_kafka_curr_msgs_sub (rd_kafka_t *rk, unsigned int cnt, size_t size) {
653         int broadcast = 0;
654 
655 	if (rk->rk_type != RD_KAFKA_PRODUCER)
656 		return;
657 
658 	mtx_lock(&rk->rk_curr_msgs.lock);
659 	rd_kafka_assert(NULL,
660 			rk->rk_curr_msgs.cnt >= cnt &&
661 			rk->rk_curr_msgs.size >= size);
662 
663         /* If the subtraction would pass one of the thresholds
664          * broadcast a wake-up to any waiting listeners. */
665         if ((rk->rk_curr_msgs.cnt - cnt == 0) ||
666             (rk->rk_curr_msgs.cnt >= rk->rk_curr_msgs.max_cnt &&
667              rk->rk_curr_msgs.cnt - cnt < rk->rk_curr_msgs.max_cnt) ||
668             (rk->rk_curr_msgs.size >= rk->rk_curr_msgs.max_size &&
669              rk->rk_curr_msgs.size - size < rk->rk_curr_msgs.max_size))
670                 broadcast = 1;
671 
672 	rk->rk_curr_msgs.cnt  -= cnt;
673 	rk->rk_curr_msgs.size -= size;
674 
675         if (unlikely(broadcast))
676                 cnd_broadcast(&rk->rk_curr_msgs.cnd);
677 
678 	mtx_unlock(&rk->rk_curr_msgs.lock);
679 }
680 
681 static RD_INLINE RD_UNUSED void
rd_kafka_curr_msgs_get(rd_kafka_t * rk,unsigned int * cntp,size_t * sizep)682 rd_kafka_curr_msgs_get (rd_kafka_t *rk, unsigned int *cntp, size_t *sizep) {
683 	if (rk->rk_type != RD_KAFKA_PRODUCER) {
684 		*cntp = 0;
685 		*sizep = 0;
686 		return;
687 	}
688 
689 	mtx_lock(&rk->rk_curr_msgs.lock);
690 	*cntp = rk->rk_curr_msgs.cnt;
691 	*sizep = rk->rk_curr_msgs.size;
692 	mtx_unlock(&rk->rk_curr_msgs.lock);
693 }
694 
695 static RD_INLINE RD_UNUSED int
rd_kafka_curr_msgs_cnt(rd_kafka_t * rk)696 rd_kafka_curr_msgs_cnt (rd_kafka_t *rk) {
697 	int cnt;
698 	if (rk->rk_type != RD_KAFKA_PRODUCER)
699 		return 0;
700 
701 	mtx_lock(&rk->rk_curr_msgs.lock);
702 	cnt = rk->rk_curr_msgs.cnt;
703 	mtx_unlock(&rk->rk_curr_msgs.lock);
704 
705 	return cnt;
706 }
707 
708 /**
709  * @brief Wait until \p tspec for curr_msgs to reach 0.
710  *
711  * @returns remaining curr_msgs
712  */
713 static RD_INLINE RD_UNUSED int
rd_kafka_curr_msgs_wait_zero(rd_kafka_t * rk,const struct timespec * tspec)714 rd_kafka_curr_msgs_wait_zero (rd_kafka_t *rk, const struct timespec *tspec) {
715         int cnt;
716 
717         mtx_lock(&rk->rk_curr_msgs.lock);
718         while ((cnt = rk->rk_curr_msgs.cnt) > 0) {
719                 cnd_timedwait_abs(&rk->rk_curr_msgs.cnd,
720                                   &rk->rk_curr_msgs.lock,
721                                   tspec);
722         }
723         mtx_unlock(&rk->rk_curr_msgs.lock);
724 
725         return cnt;
726 }
727 
728 void rd_kafka_destroy_final (rd_kafka_t *rk);
729 
730 void rd_kafka_global_init (void);
731 
732 /**
733  * @returns true if \p rk handle is terminating.
734  *
735  * @remark If consumer_close() is called from destroy*() it will be
736  *         called prior to _F_TERMINATE being set and will thus not
737  *         be able to use rd_kafka_terminating() to know it is shutting down.
738  *         That code should instead just check that rk_terminate is non-zero
739  *         (the _F_DESTROY_CALLED flag will be set).
740  */
741 #define rd_kafka_terminating(rk) (rd_atomic32_get(&(rk)->rk_terminate) & \
742                                   RD_KAFKA_DESTROY_F_TERMINATE)
743 
744 /**
745  * @returns the destroy flags set matching \p flags, which might be
746  *          a subset of the flags.
747  */
748 #define rd_kafka_destroy_flags_check(rk,flags) \
749         (rd_atomic32_get(&(rk)->rk_terminate) & (flags))
750 
751 /**
752  * @returns true if no consumer callbacks, or standard consumer_close
753  *          behaviour, should be triggered. */
754 #define rd_kafka_destroy_flags_no_consumer_close(rk) \
755         rd_kafka_destroy_flags_check(rk, RD_KAFKA_DESTROY_F_NO_CONSUMER_CLOSE)
756 
757 #define rd_kafka_is_simple_consumer(rk) \
758         (rd_atomic32_get(&(rk)->rk_simple_cnt) > 0)
759 int rd_kafka_simple_consumer_add (rd_kafka_t *rk);
760 
761 
762 /**
763  * @returns true if idempotency is enabled (producer only).
764  */
765 #define rd_kafka_is_idempotent(rk) ((rk)->rk_conf.eos.idempotence)
766 
767 /**
768  * @returns true if the producer is transactional (producer only).
769  */
770 #define rd_kafka_is_transactional(rk)                   \
771         ((rk)->rk_conf.eos.transactional_id != NULL)
772 
773 
774 #define RD_KAFKA_PURGE_F_ABORT_TXN 0x100  /**< Internal flag used when
775                                            *   aborting transaction */
776 #define RD_KAFKA_PURGE_F_MASK 0x107
777 const char *rd_kafka_purge_flags2str (int flags);
778 
779 
780 #include "rdkafka_topic.h"
781 #include "rdkafka_partition.h"
782 
783 
784 
785 
786 
787 
788 
789 
790 
791 
792 
793 
794 
795 
796 /**
797  * Debug contexts
798  */
799 #define RD_KAFKA_DBG_GENERIC        0x1
800 #define RD_KAFKA_DBG_BROKER         0x2
801 #define RD_KAFKA_DBG_TOPIC          0x4
802 #define RD_KAFKA_DBG_METADATA       0x8
803 #define RD_KAFKA_DBG_FEATURE        0x10
804 #define RD_KAFKA_DBG_QUEUE          0x20
805 #define RD_KAFKA_DBG_MSG            0x40
806 #define RD_KAFKA_DBG_PROTOCOL       0x80
807 #define RD_KAFKA_DBG_CGRP           0x100
808 #define RD_KAFKA_DBG_SECURITY       0x200
809 #define RD_KAFKA_DBG_FETCH          0x400
810 #define RD_KAFKA_DBG_INTERCEPTOR    0x800
811 #define RD_KAFKA_DBG_PLUGIN         0x1000
812 #define RD_KAFKA_DBG_CONSUMER       0x2000
813 #define RD_KAFKA_DBG_ADMIN          0x4000
814 #define RD_KAFKA_DBG_EOS            0x8000
815 #define RD_KAFKA_DBG_MOCK           0x10000
816 #define RD_KAFKA_DBG_ALL            0xfffff
817 #define RD_KAFKA_DBG_NONE           0x0
818 
819 void rd_kafka_log0(const rd_kafka_conf_t *conf,
820                    const rd_kafka_t *rk, const char *extra, int level,
821                    const char *fac, const char *fmt, ...) RD_FORMAT(printf,
822                                                                     6, 7);
823 
824 #define rd_kafka_log(rk,level,fac,...) \
825         rd_kafka_log0(&rk->rk_conf, rk, NULL, level, fac, __VA_ARGS__)
826 #define rd_kafka_dbg(rk,ctx,fac,...) do {                               \
827                 if (unlikely((rk)->rk_conf.debug & (RD_KAFKA_DBG_ ## ctx))) \
828                         rd_kafka_log0(&rk->rk_conf,rk,NULL,             \
829                                       LOG_DEBUG,fac,__VA_ARGS__);       \
830         } while (0)
831 
832 /* dbg() not requiring an rk, just the conf object, for early logging */
833 #define rd_kafka_dbg0(conf,ctx,fac,...) do {                            \
834                 if (unlikely((conf)->debug & (RD_KAFKA_DBG_ ## ctx)))   \
835                         rd_kafka_log0(conf,NULL,NULL,                   \
836                                       LOG_DEBUG,fac,__VA_ARGS__);       \
837         } while (0)
838 
839 /* NOTE: The local copy of _logname is needed due rkb_logname_lock lock-ordering
840  *       when logging another broker's name in the message. */
841 #define rd_rkb_log(rkb,level,fac,...) do {				\
842 		char _logname[RD_KAFKA_NODENAME_SIZE];			\
843                 mtx_lock(&(rkb)->rkb_logname_lock);                     \
844                 rd_strlcpy(_logname, rkb->rkb_logname, sizeof(_logname)); \
845                 mtx_unlock(&(rkb)->rkb_logname_lock);                   \
846 		rd_kafka_log0(&(rkb)->rkb_rk->rk_conf, \
847                               (rkb)->rkb_rk, _logname,                  \
848                               level, fac, __VA_ARGS__);                 \
849         } while (0)
850 
851 #define rd_rkb_dbg(rkb,ctx,fac,...) do {				\
852 		if (unlikely((rkb)->rkb_rk->rk_conf.debug &		\
853 			     (RD_KAFKA_DBG_ ## ctx))) {			\
854 			rd_rkb_log(rkb, LOG_DEBUG, fac, __VA_ARGS__);	\
855                 }                                                       \
856 	} while (0)
857 
858 
859 
860 extern rd_kafka_resp_err_t RD_TLS rd_kafka_last_error_code;
861 
862 static RD_UNUSED RD_INLINE
rd_kafka_set_last_error(rd_kafka_resp_err_t err,int errnox)863 rd_kafka_resp_err_t rd_kafka_set_last_error (rd_kafka_resp_err_t err,
864 					     int errnox) {
865         if (errnox) {
866                 /* MSVC:
867                  * This is the correct way to set errno on Windows,
868                  * but it is still pointless due to different errnos in
869                  * in different runtimes:
870                  * https://social.msdn.microsoft.com/Forums/vstudio/en-US/b4500c0d-1b69-40c7-9ef5-08da1025b5bf/setting-errno-from-within-a-dll?forum=vclanguage/
871                  * errno is thus highly deprecated, and buggy, on Windows
872                  * when using librdkafka as a dynamically loaded DLL. */
873                 rd_set_errno(errnox);
874         }
875 	rd_kafka_last_error_code = err;
876 	return err;
877 }
878 
879 
880 int rd_kafka_set_fatal_error0 (rd_kafka_t *rk, rd_dolock_t do_lock,
881                                rd_kafka_resp_err_t err,
882                                const char *fmt, ...) RD_FORMAT(printf, 4, 5);
883 #define rd_kafka_set_fatal_error(rk,err,fmt,...)                        \
884         rd_kafka_set_fatal_error0(rk, RD_DO_LOCK, err, fmt, __VA_ARGS__)
885 
886 static RD_INLINE RD_UNUSED rd_kafka_resp_err_t
rd_kafka_fatal_error_code(rd_kafka_t * rk)887 rd_kafka_fatal_error_code (rd_kafka_t *rk) {
888         /* This is an optimization to avoid an atomic read which are costly
889          * on some platforms:
890          * Fatal errors are currently only raised by the idempotent producer
891          * and static consumers (group.instance.id). */
892         if ((rk->rk_type == RD_KAFKA_PRODUCER && rk->rk_conf.eos.idempotence) ||
893             (rk->rk_type == RD_KAFKA_CONSUMER && rk->rk_conf.group_instance_id))
894                 return rd_atomic32_get(&rk->rk_fatal.err);
895 
896         return RD_KAFKA_RESP_ERR_NO_ERROR;
897 }
898 
899 
900 extern rd_atomic32_t rd_kafka_thread_cnt_curr;
901 extern char RD_TLS rd_kafka_thread_name[64];
902 
903 void rd_kafka_set_thread_name (const char *fmt, ...);
904 void rd_kafka_set_thread_sysname (const char *fmt, ...);
905 
906 int rd_kafka_path_is_dir (const char *path);
907 rd_bool_t rd_kafka_dir_is_empty (const char *path);
908 
909 rd_kafka_op_res_t
910 rd_kafka_poll_cb (rd_kafka_t *rk, rd_kafka_q_t *rkq, rd_kafka_op_t *rko,
911                   rd_kafka_q_cb_type_t cb_type, void *opaque);
912 
913 rd_kafka_resp_err_t rd_kafka_subscribe_rkt (rd_kafka_itopic_t *rkt);
914 
915 
916 /**
917  * @returns the number of milliseconds the maximum poll interval
918  *          was exceeded, or 0 if not exceeded.
919  *
920  * @remark Only relevant for high-level consumer.
921  *
922  * @locality any
923  * @locks none
924  */
925 static RD_INLINE RD_UNUSED int
rd_kafka_max_poll_exceeded(rd_kafka_t * rk)926 rd_kafka_max_poll_exceeded (rd_kafka_t *rk) {
927         rd_ts_t last_poll;
928         int exceeded;
929 
930         if (rk->rk_type != RD_KAFKA_CONSUMER)
931                 return 0;
932 
933         last_poll = rd_atomic64_get(&rk->rk_ts_last_poll);
934 
935         /* Application is blocked in librdkafka function, see
936          * rd_kafka_app_poll_blocking(). */
937         if (last_poll == INT64_MAX)
938                 return 0;
939 
940         exceeded = (int)((rd_clock() - last_poll) / 1000ll) -
941                 rk->rk_conf.max_poll_interval_ms;
942 
943         if (unlikely(exceeded > 0))
944                 return exceeded;
945 
946         return 0;
947 }
948 
949 /**
950  * @brief Call on entry to blocking polling function to indicate
951  *        that the application is blocked waiting for librdkafka
952  *        and that max.poll.interval.ms should not be enforced.
953  *
954  *        Call app_polled() Upon return from the function calling
955  *        this function to register the application's last time of poll.
956  *
957  * @remark Only relevant for high-level consumer.
958  *
959  * @locality any
960  * @locks none
961  */
962 static RD_INLINE RD_UNUSED void
rd_kafka_app_poll_blocking(rd_kafka_t * rk)963 rd_kafka_app_poll_blocking (rd_kafka_t *rk) {
964         if (rk->rk_type == RD_KAFKA_CONSUMER)
965                 rd_atomic64_set(&rk->rk_ts_last_poll, INT64_MAX);
966 }
967 
968 /**
969  * @brief Set the last application poll time to now.
970  *
971  * @remark Only relevant for high-level consumer.
972  *
973  * @locality any
974  * @locks none
975  */
976 static RD_INLINE RD_UNUSED void
rd_kafka_app_polled(rd_kafka_t * rk)977 rd_kafka_app_polled (rd_kafka_t *rk) {
978         if (rk->rk_type == RD_KAFKA_CONSUMER)
979                 rd_atomic64_set(&rk->rk_ts_last_poll, rd_clock());
980 }
981 
982 
983 /**
984  * rdkafka_background.c
985  */
986 int rd_kafka_background_thread_main (void *arg);
987 
988 #endif /* _RDKAFKA_INT_H_ */
989