1 /*
2  * librdkafka - Apache Kafka C library
3  *
4  * Copyright (c) 2019 Magnus Edenhill
5  * All rights reserved.
6  *
7  * Redistribution and use in source and binary forms, with or without
8  * modification, are permitted provided that the following conditions are met:
9  *
10  * 1. Redistributions of source code must retain the above copyright notice,
11  *    this list of conditions and the following disclaimer.
12  * 2. Redistributions in binary form must reproduce the above copyright notice,
13  *    this list of conditions and the following disclaimer in the documentation
14  *    and/or other materials provided with the distribution.
15  *
16  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
17  * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
18  * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
19  * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
20  * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
21  * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
22  * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
23  * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
24  * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
25  * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
26  * POSSIBILITY OF SUCH DAMAGE.
27  */
28 
29 #ifndef _RDKAFKA_COORD_H_
30 #define _RDKAFKA_COORD_H_
31 
32 
33 typedef TAILQ_HEAD(rd_kafka_coord_cache_head_s, rd_kafka_coord_cache_entry_s)
34         rd_kafka_coord_cache_head_t;
35 
36 /**
37  * @brief Coordinator cache entry
38  */
39 typedef struct rd_kafka_coord_cache_entry_s {
40         TAILQ_ENTRY(rd_kafka_coord_cache_entry_s) cce_link;
41         rd_kafka_coordtype_t cce_coordtype; /**< Coordinator type */
42         char                *cce_coordkey;  /**< Coordinator type key,
43                                              *   e.g the group id */
44         rd_ts_t              cce_ts_used;   /**< Last used timestamp */
45         rd_kafka_broker_t   *cce_rkb;       /**< The cached coordinator */
46 
47 } rd_kafka_coord_cache_entry_t;
48 
49 /**
50  * @brief Coordinator cache
51  */
52 typedef struct rd_kafka_coord_cache_s {
53         rd_kafka_coord_cache_head_t cc_entries;      /**< Cache entries */
54         int                         cc_cnt;          /**< Number of entries */
55         rd_ts_t                     cc_expire_thres; /**< Entries not used in
56                                                       *   this long will be
57                                                       *   expired */
58 } rd_kafka_coord_cache_t;
59 
60 
61 void rd_kafka_coord_cache_expire (rd_kafka_coord_cache_t *cc);
62 void rd_kafka_coord_cache_evict (rd_kafka_coord_cache_t *cc,
63                                  rd_kafka_broker_t *rkb);
64 void rd_kafka_coord_cache_destroy (rd_kafka_coord_cache_t *cc);
65 void rd_kafka_coord_cache_init (rd_kafka_coord_cache_t *cc,
66                                 int expire_thres_ms);
67 
68 
69 
70 
71 /**
72  * @name Coordinator requests
73  */
74 
75 /**
76  * @brief Request to be sent to coordinator.
77  *        Includes looking up, caching, and connecting to, the coordinator.
78  */
79 typedef struct rd_kafka_coord_req_s {
80         TAILQ_ENTRY(rd_kafka_coord_req_s) creq_link; /**< rk_coord_reqs */
81         rd_kafka_coordtype_t creq_coordtype; /**< Coordinator type */
82         char                *creq_coordkey;  /**< Coordinator key */
83 
84         rd_kafka_op_t *creq_rko;             /**< Requester's rko that is
85                                               *   provided to creq_send_req_cb
86                                               *   (optional). */
87         rd_ts_t        creq_ts_timeout;      /**< Absolute timeout.
88                                               *   Will fail with an error
89                                               *   code pertaining to the
90                                               *   current state */
91 
92         rd_kafka_send_req_cb_t *creq_send_req_cb; /**< Sender callback */
93 
94         rd_kafka_replyq_t   creq_replyq;     /**< Reply queue */
95         rd_kafka_resp_cb_t *creq_resp_cb;    /**< Reply queue response
96                                               *   parsing callback for the
97                                               *   request sent by
98                                               *   send_req_cb */
99         void               *creq_reply_opaque; /**< Opaque passed to
100                                                 *   creq_send_req_cb and
101                                                 *   creq_resp_cb. */
102 
103         int                 creq_refcnt;     /**< Internal reply queue for
104                                               *   FindCoordinator requests
105                                               *   which is forwarded to the
106                                               *   rk_ops queue, but allows
107                                               *   destroying the creq even
108                                               *   with outstanding
109                                               *   FindCoordinator requests. */
110         rd_bool_t           creq_done;      /**< True if request was sent */
111 
112 } rd_kafka_coord_req_t;
113 
114 
115 void rd_kafka_coord_req (rd_kafka_t *rk,
116                          rd_kafka_coordtype_t coordtype,
117                          const char *coordkey,
118                          rd_kafka_send_req_cb_t *send_req_cb,
119                          rd_kafka_op_t *rko,
120                          int timeout_ms,
121                          rd_kafka_replyq_t replyq,
122                          rd_kafka_resp_cb_t *resp_cb,
123                          void *reply_opaque);
124 
125 void rd_kafka_coord_rkb_monitor_cb (rd_kafka_broker_t *rkb);
126 
127 void rd_kafka_coord_reqs_term (rd_kafka_t *rk);
128 void rd_kafka_coord_reqs_init (rd_kafka_t *rk);
129 #endif /* _RDKAFKA_COORD_H_ */
130