1 /* 2 * librdkafka - Apache Kafka C library 3 * 4 * Copyright (c) 2012-2015, Magnus Edenhill 5 * All rights reserved. 6 * 7 * Redistribution and use in source and binary forms, with or without 8 * modification, are permitted provided that the following conditions are met: 9 * 10 * 1. Redistributions of source code must retain the above copyright notice, 11 * this list of conditions and the following disclaimer. 12 * 2. Redistributions in binary form must reproduce the above copyright notice, 13 * this list of conditions and the following disclaimer in the documentation 14 * and/or other materials provided with the distribution. 15 * 16 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" 17 * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE 18 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE 19 * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE 20 * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR 21 * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF 22 * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS 23 * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN 24 * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) 25 * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE 26 * POSSIBILITY OF SUCH DAMAGE. 27 */ 28 29 #ifndef _RDKAFKA_METADATA_H_ 30 #define _RDKAFKA_METADATA_H_ 31 32 #include "rdavl.h" 33 34 rd_kafka_resp_err_t 35 rd_kafka_parse_Metadata (rd_kafka_broker_t *rkb, 36 rd_kafka_buf_t *request, rd_kafka_buf_t *rkbuf, 37 struct rd_kafka_metadata **mdp); 38 39 struct rd_kafka_metadata * 40 rd_kafka_metadata_copy (const struct rd_kafka_metadata *md, size_t size); 41 42 size_t 43 rd_kafka_metadata_topic_match (rd_kafka_t *rk, rd_list_t *tinfos, 44 const rd_kafka_topic_partition_list_t *match, 45 rd_kafka_topic_partition_list_t *errored); 46 size_t 47 rd_kafka_metadata_topic_filter (rd_kafka_t *rk, rd_list_t *tinfos, 48 const rd_kafka_topic_partition_list_t *match, 49 rd_kafka_topic_partition_list_t *errored); 50 51 void rd_kafka_metadata_log (rd_kafka_t *rk, const char *fac, 52 const struct rd_kafka_metadata *md); 53 54 55 56 rd_kafka_resp_err_t 57 rd_kafka_metadata_refresh_topics (rd_kafka_t *rk, rd_kafka_broker_t *rkb, 58 const rd_list_t *topics, rd_bool_t force, 59 rd_bool_t allow_auto_create, 60 rd_bool_t cgrp_update, 61 const char *reason); 62 rd_kafka_resp_err_t 63 rd_kafka_metadata_refresh_known_topics (rd_kafka_t *rk, rd_kafka_broker_t *rkb, 64 rd_bool_t force, const char *reason); 65 rd_kafka_resp_err_t 66 rd_kafka_metadata_refresh_consumer_topics (rd_kafka_t *rk, 67 rd_kafka_broker_t *rkb, 68 const char *reason); 69 rd_kafka_resp_err_t 70 rd_kafka_metadata_refresh_brokers (rd_kafka_t *rk, rd_kafka_broker_t *rkb, 71 const char *reason); 72 rd_kafka_resp_err_t 73 rd_kafka_metadata_refresh_all (rd_kafka_t *rk, rd_kafka_broker_t *rkb, 74 const char *reason); 75 76 rd_kafka_resp_err_t 77 rd_kafka_metadata_request (rd_kafka_t *rk, rd_kafka_broker_t *rkb, 78 const rd_list_t *topics, 79 rd_bool_t allow_auto_create_topics, 80 rd_bool_t cgrp_update, 81 const char *reason, rd_kafka_op_t *rko); 82 83 84 85 int rd_kafka_metadata_partition_id_cmp (const void *_a, 86 const void *_b); 87 88 rd_kafka_metadata_t * 89 rd_kafka_metadata_new_topic_mock (const rd_kafka_metadata_topic_t *topics, 90 size_t topic_cnt); 91 rd_kafka_metadata_t *rd_kafka_metadata_new_topic_mockv (size_t topic_cnt, ...); 92 93 94 /** 95 * @{ 96 * 97 * @brief Metadata cache 98 */ 99 100 struct rd_kafka_metadata_cache_entry { 101 rd_avl_node_t rkmce_avlnode; /* rkmc_avl */ 102 TAILQ_ENTRY(rd_kafka_metadata_cache_entry) rkmce_link; /* rkmc_expiry */ 103 rd_ts_t rkmce_ts_expires; /* Expire time */ 104 rd_ts_t rkmce_ts_insert; /* Insert time */ 105 rd_kafka_metadata_topic_t rkmce_mtopic; /* Cached topic metadata */ 106 /* rkmce_partitions memory points here. */ 107 }; 108 109 110 #define RD_KAFKA_METADATA_CACHE_ERR_IS_TEMPORARY(ERR) \ 111 ((ERR) == RD_KAFKA_RESP_ERR__WAIT_CACHE || \ 112 (ERR) == RD_KAFKA_RESP_ERR__NOENT) 113 114 #define RD_KAFKA_METADATA_CACHE_VALID(rkmce) \ 115 !RD_KAFKA_METADATA_CACHE_ERR_IS_TEMPORARY((rkmce)->rkmce_mtopic.err) 116 117 118 119 struct rd_kafka_metadata_cache { 120 rd_avl_t rkmc_avl; 121 TAILQ_HEAD(, rd_kafka_metadata_cache_entry) rkmc_expiry; 122 rd_kafka_timer_t rkmc_expiry_tmr; 123 int rkmc_cnt; 124 125 /* Protected by rk_lock */ 126 rd_list_t rkmc_observers; /**< (rd_kafka_enq_once_t*) */ 127 128 /* Protected by full_lock: */ 129 mtx_t rkmc_full_lock; 130 int rkmc_full_topics_sent; /* Full MetadataRequest for 131 * all topics has been sent, 132 * awaiting response. */ 133 int rkmc_full_brokers_sent; /* Full MetadataRequest for 134 * all brokers (but not topics) 135 * has been sent, 136 * awaiting response. */ 137 138 rd_kafka_timer_t rkmc_query_tmr; /* Query timer for topic's without 139 * leaders. */ 140 cnd_t rkmc_cnd; /* cache_wait_change() cond. */ 141 mtx_t rkmc_cnd_lock; /* lock for rkmc_cnd */ 142 }; 143 144 145 146 void rd_kafka_metadata_cache_expiry_start (rd_kafka_t *rk); 147 void 148 rd_kafka_metadata_cache_topic_update (rd_kafka_t *rk, 149 const rd_kafka_metadata_topic_t *mdt, 150 rd_bool_t propagate); 151 void rd_kafka_metadata_cache_update (rd_kafka_t *rk, 152 const rd_kafka_metadata_t *md, 153 int abs_update); 154 void rd_kafka_metadata_cache_propagate_changes (rd_kafka_t *rk); 155 struct rd_kafka_metadata_cache_entry * 156 rd_kafka_metadata_cache_find (rd_kafka_t *rk, const char *topic, int valid); 157 void rd_kafka_metadata_cache_purge_hints (rd_kafka_t *rk, 158 const rd_list_t *topics); 159 int rd_kafka_metadata_cache_hint (rd_kafka_t *rk, 160 const rd_list_t *topics, rd_list_t *dst, 161 rd_kafka_resp_err_t err, 162 rd_bool_t replace); 163 164 int rd_kafka_metadata_cache_hint_rktparlist ( 165 rd_kafka_t *rk, 166 const rd_kafka_topic_partition_list_t *rktparlist, 167 rd_list_t *dst, 168 int replace); 169 170 const rd_kafka_metadata_topic_t * 171 rd_kafka_metadata_cache_topic_get (rd_kafka_t *rk, const char *topic, 172 int valid); 173 int rd_kafka_metadata_cache_topic_partition_get ( 174 rd_kafka_t *rk, 175 const rd_kafka_metadata_topic_t **mtopicp, 176 const rd_kafka_metadata_partition_t **mpartp, 177 const char *topic, int32_t partition, int valid); 178 179 int rd_kafka_metadata_cache_topics_count_exists (rd_kafka_t *rk, 180 const rd_list_t *topics, 181 int *metadata_agep); 182 183 void rd_kafka_metadata_fast_leader_query (rd_kafka_t *rk); 184 185 void rd_kafka_metadata_cache_init (rd_kafka_t *rk); 186 void rd_kafka_metadata_cache_destroy (rd_kafka_t *rk); 187 void rd_kafka_metadata_cache_purge (rd_kafka_t *rk, rd_bool_t purge_observers); 188 int rd_kafka_metadata_cache_wait_change (rd_kafka_t *rk, int timeout_ms); 189 void rd_kafka_metadata_cache_dump (FILE *fp, rd_kafka_t *rk); 190 191 int rd_kafka_metadata_cache_topics_to_list (rd_kafka_t *rk, 192 rd_list_t *topics); 193 194 void 195 rd_kafka_metadata_cache_wait_state_change_async (rd_kafka_t *rk, 196 rd_kafka_enq_once_t *eonce); 197 198 /**@}*/ 199 #endif /* _RDKAFKA_METADATA_H_ */ 200