1 /* 2 * librdkafka - Apache Kafka C library 3 * 4 * Copyright (c) 2019 Magnus Edenhill 5 * All rights reserved. 6 * 7 * Redistribution and use in source and binary forms, with or without 8 * modification, are permitted provided that the following conditions are met: 9 * 10 * 1. Redistributions of source code must retain the above copyright notice, 11 * this list of conditions and the following disclaimer. 12 * 2. Redistributions in binary form must reproduce the above copyright notice, 13 * this list of conditions and the following disclaimer in the documentation 14 * and/or other materials provided with the distribution. 15 * 16 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" 17 * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE 18 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE 19 * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE 20 * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR 21 * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF 22 * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS 23 * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN 24 * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) 25 * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE 26 * POSSIBILITY OF SUCH DAMAGE. 27 */ 28 29 #ifndef _RDKAFKA_COORD_H_ 30 #define _RDKAFKA_COORD_H_ 31 32 33 typedef TAILQ_HEAD(rd_kafka_coord_cache_head_s, rd_kafka_coord_cache_entry_s) 34 rd_kafka_coord_cache_head_t; 35 36 /** 37 * @brief Coordinator cache entry 38 */ 39 typedef struct rd_kafka_coord_cache_entry_s { 40 TAILQ_ENTRY(rd_kafka_coord_cache_entry_s) cce_link; 41 rd_kafka_coordtype_t cce_coordtype; /**< Coordinator type */ 42 char *cce_coordkey; /**< Coordinator type key, 43 * e.g the group id */ 44 rd_ts_t cce_ts_used; /**< Last used timestamp */ 45 rd_kafka_broker_t *cce_rkb; /**< The cached coordinator */ 46 47 } rd_kafka_coord_cache_entry_t; 48 49 /** 50 * @brief Coordinator cache 51 */ 52 typedef struct rd_kafka_coord_cache_s { 53 rd_kafka_coord_cache_head_t cc_entries; /**< Cache entries */ 54 int cc_cnt; /**< Number of entries */ 55 rd_ts_t cc_expire_thres; /**< Entries not used in 56 * this long will be 57 * expired */ 58 } rd_kafka_coord_cache_t; 59 60 61 void rd_kafka_coord_cache_expire (rd_kafka_coord_cache_t *cc); 62 void rd_kafka_coord_cache_evict (rd_kafka_coord_cache_t *cc, 63 rd_kafka_broker_t *rkb); 64 void rd_kafka_coord_cache_destroy (rd_kafka_coord_cache_t *cc); 65 void rd_kafka_coord_cache_init (rd_kafka_coord_cache_t *cc, 66 int expire_thres_ms); 67 68 69 70 71 /** 72 * @name Coordinator requests 73 */ 74 75 /** 76 * @brief Request to be sent to coordinator. 77 * Includes looking up, caching, and connecting to, the coordinator. 78 */ 79 typedef struct rd_kafka_coord_req_s { 80 TAILQ_ENTRY(rd_kafka_coord_req_s) creq_link; /**< rk_coord_reqs */ 81 rd_kafka_coordtype_t creq_coordtype; /**< Coordinator type */ 82 char *creq_coordkey; /**< Coordinator key */ 83 84 rd_kafka_op_t *creq_rko; /**< Requester's rko that is 85 * provided to creq_send_req_cb 86 * (optional). */ 87 rd_ts_t creq_ts_timeout; /**< Absolute timeout. 88 * Will fail with an error 89 * code pertaining to the 90 * current state */ 91 92 rd_kafka_send_req_cb_t *creq_send_req_cb; /**< Sender callback */ 93 94 rd_kafka_replyq_t creq_replyq; /**< Reply queue */ 95 rd_kafka_resp_cb_t *creq_resp_cb; /**< Reply queue response 96 * parsing callback for the 97 * request sent by 98 * send_req_cb */ 99 void *creq_reply_opaque; /**< Opaque passed to 100 * creq_send_req_cb and 101 * creq_resp_cb. */ 102 103 int creq_refcnt; /**< Internal reply queue for 104 * FindCoordinator requests 105 * which is forwarded to the 106 * rk_ops queue, but allows 107 * destroying the creq even 108 * with outstanding 109 * FindCoordinator requests. */ 110 rd_bool_t creq_done; /**< True if request was sent */ 111 112 } rd_kafka_coord_req_t; 113 114 115 void rd_kafka_coord_req (rd_kafka_t *rk, 116 rd_kafka_coordtype_t coordtype, 117 const char *coordkey, 118 rd_kafka_send_req_cb_t *send_req_cb, 119 rd_kafka_op_t *rko, 120 int timeout_ms, 121 rd_kafka_replyq_t replyq, 122 rd_kafka_resp_cb_t *resp_cb, 123 void *reply_opaque); 124 125 void rd_kafka_coord_rkb_monitor_cb (rd_kafka_broker_t *rkb); 126 127 void rd_kafka_coord_reqs_term (rd_kafka_t *rk); 128 void rd_kafka_coord_reqs_init (rd_kafka_t *rk); 129 #endif /* _RDKAFKA_COORD_H_ */ 130