1 /*
2  * librdkafka - Apache Kafka C library
3  *
4  * Copyright (c) 2012-2015, Magnus Edenhill
5  * All rights reserved.
6  *
7  * Redistribution and use in source and binary forms, with or without
8  * modification, are permitted provided that the following conditions are met:
9  *
10  * 1. Redistributions of source code must retain the above copyright notice,
11  *    this list of conditions and the following disclaimer.
12  * 2. Redistributions in binary form must reproduce the above copyright notice,
13  *    this list of conditions and the following disclaimer in the documentation
14  *    and/or other materials provided with the distribution.
15  *
16  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
17  * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
18  * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
19  * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
20  * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
21  * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
22  * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
23  * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
24  * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
25  * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
26  * POSSIBILITY OF SUCH DAMAGE.
27  */
28 
29 #ifndef _RDKAFKA_METADATA_H_
30 #define _RDKAFKA_METADATA_H_
31 
32 #include "rdavl.h"
33 
34 rd_kafka_resp_err_t
35 rd_kafka_parse_Metadata (rd_kafka_broker_t *rkb,
36                          rd_kafka_buf_t *request, rd_kafka_buf_t *rkbuf,
37                          struct rd_kafka_metadata **mdp);
38 
39 struct rd_kafka_metadata *
40 rd_kafka_metadata_copy (const struct rd_kafka_metadata *md, size_t size);
41 
42 size_t
43 rd_kafka_metadata_topic_match (rd_kafka_t *rk, rd_list_t *tinfos,
44                                const rd_kafka_topic_partition_list_t *match,
45                                rd_kafka_topic_partition_list_t *errored);
46 size_t
47 rd_kafka_metadata_topic_filter (rd_kafka_t *rk, rd_list_t *tinfos,
48                                 const rd_kafka_topic_partition_list_t *match,
49                                 rd_kafka_topic_partition_list_t *errored);
50 
51 void rd_kafka_metadata_log (rd_kafka_t *rk, const char *fac,
52                             const struct rd_kafka_metadata *md);
53 
54 
55 
56 rd_kafka_resp_err_t
57 rd_kafka_metadata_refresh_topics (rd_kafka_t *rk, rd_kafka_broker_t *rkb,
58                                   const rd_list_t *topics, rd_bool_t force,
59                                   rd_bool_t allow_auto_create,
60                                   rd_bool_t cgrp_update,
61                                   const char *reason);
62 rd_kafka_resp_err_t
63 rd_kafka_metadata_refresh_known_topics (rd_kafka_t *rk, rd_kafka_broker_t *rkb,
64                                         rd_bool_t force, const char *reason);
65 rd_kafka_resp_err_t
66 rd_kafka_metadata_refresh_consumer_topics (rd_kafka_t *rk,
67                                            rd_kafka_broker_t *rkb,
68                                            const char *reason);
69 rd_kafka_resp_err_t
70 rd_kafka_metadata_refresh_brokers (rd_kafka_t *rk, rd_kafka_broker_t *rkb,
71                                    const char *reason);
72 rd_kafka_resp_err_t
73 rd_kafka_metadata_refresh_all (rd_kafka_t *rk, rd_kafka_broker_t *rkb,
74                                const char *reason);
75 
76 rd_kafka_resp_err_t
77 rd_kafka_metadata_request (rd_kafka_t *rk, rd_kafka_broker_t *rkb,
78                            const rd_list_t *topics,
79                            rd_bool_t allow_auto_create_topics,
80                            rd_bool_t cgrp_update,
81                            const char *reason, rd_kafka_op_t *rko);
82 
83 
84 
85 int rd_kafka_metadata_partition_id_cmp (const void *_a,
86                                         const void *_b);
87 
88 rd_kafka_metadata_t *
89 rd_kafka_metadata_new_topic_mock (const rd_kafka_metadata_topic_t *topics,
90                                   size_t topic_cnt);
91 rd_kafka_metadata_t *rd_kafka_metadata_new_topic_mockv (size_t topic_cnt, ...);
92 
93 
94 /**
95  * @{
96  *
97  * @brief Metadata cache
98  */
99 
100 struct rd_kafka_metadata_cache_entry {
101         rd_avl_node_t rkmce_avlnode;             /* rkmc_avl */
102         TAILQ_ENTRY(rd_kafka_metadata_cache_entry) rkmce_link; /* rkmc_expiry */
103         rd_ts_t rkmce_ts_expires;                /* Expire time */
104         rd_ts_t rkmce_ts_insert;                 /* Insert time */
105         rd_kafka_metadata_topic_t rkmce_mtopic;  /* Cached topic metadata */
106         /* rkmce_partitions memory points here. */
107 };
108 
109 
110 #define RD_KAFKA_METADATA_CACHE_ERR_IS_TEMPORARY(ERR)                   \
111         ((ERR) == RD_KAFKA_RESP_ERR__WAIT_CACHE ||                      \
112          (ERR) == RD_KAFKA_RESP_ERR__NOENT)
113 
114 #define RD_KAFKA_METADATA_CACHE_VALID(rkmce)                            \
115         !RD_KAFKA_METADATA_CACHE_ERR_IS_TEMPORARY((rkmce)->rkmce_mtopic.err)
116 
117 
118 
119 struct rd_kafka_metadata_cache {
120         rd_avl_t         rkmc_avl;
121         TAILQ_HEAD(, rd_kafka_metadata_cache_entry) rkmc_expiry;
122         rd_kafka_timer_t rkmc_expiry_tmr;
123         int              rkmc_cnt;
124 
125         /* Protected by rk_lock */
126         rd_list_t        rkmc_observers; /**< (rd_kafka_enq_once_t*) */
127 
128         /* Protected by full_lock: */
129         mtx_t            rkmc_full_lock;
130         int              rkmc_full_topics_sent; /* Full MetadataRequest for
131                                                  * all topics has been sent,
132                                                  * awaiting response. */
133         int              rkmc_full_brokers_sent; /* Full MetadataRequest for
134                                                   * all brokers (but not topics)
135                                                   * has been sent,
136                                                   * awaiting response. */
137 
138         rd_kafka_timer_t rkmc_query_tmr; /* Query timer for topic's without
139                                           * leaders. */
140         cnd_t            rkmc_cnd;       /* cache_wait_change() cond. */
141         mtx_t            rkmc_cnd_lock;  /* lock for rkmc_cnd */
142 };
143 
144 
145 
146 void rd_kafka_metadata_cache_expiry_start (rd_kafka_t *rk);
147 void
148 rd_kafka_metadata_cache_topic_update (rd_kafka_t *rk,
149                                       const rd_kafka_metadata_topic_t *mdt,
150                                       rd_bool_t propagate);
151 void rd_kafka_metadata_cache_update (rd_kafka_t *rk,
152                                      const rd_kafka_metadata_t *md,
153                                      int abs_update);
154 void rd_kafka_metadata_cache_propagate_changes (rd_kafka_t *rk);
155 struct rd_kafka_metadata_cache_entry *
156 rd_kafka_metadata_cache_find (rd_kafka_t *rk, const char *topic, int valid);
157 void rd_kafka_metadata_cache_purge_hints (rd_kafka_t *rk,
158                                           const rd_list_t *topics);
159 int rd_kafka_metadata_cache_hint (rd_kafka_t *rk,
160                                   const rd_list_t *topics, rd_list_t *dst,
161                                   rd_kafka_resp_err_t err,
162                                   rd_bool_t replace);
163 
164 int rd_kafka_metadata_cache_hint_rktparlist (
165         rd_kafka_t *rk,
166         const rd_kafka_topic_partition_list_t *rktparlist,
167         rd_list_t *dst,
168         int replace);
169 
170 const rd_kafka_metadata_topic_t *
171 rd_kafka_metadata_cache_topic_get (rd_kafka_t *rk, const char *topic,
172                                    int valid);
173 int rd_kafka_metadata_cache_topic_partition_get (
174         rd_kafka_t *rk,
175         const rd_kafka_metadata_topic_t **mtopicp,
176         const rd_kafka_metadata_partition_t **mpartp,
177         const char *topic, int32_t partition, int valid);
178 
179 int rd_kafka_metadata_cache_topics_count_exists (rd_kafka_t *rk,
180                                                  const rd_list_t *topics,
181                                                  int *metadata_agep);
182 
183 void rd_kafka_metadata_fast_leader_query (rd_kafka_t *rk);
184 
185 void rd_kafka_metadata_cache_init (rd_kafka_t *rk);
186 void rd_kafka_metadata_cache_destroy (rd_kafka_t *rk);
187 void rd_kafka_metadata_cache_purge (rd_kafka_t *rk, rd_bool_t purge_observers);
188 int  rd_kafka_metadata_cache_wait_change (rd_kafka_t *rk, int timeout_ms);
189 void rd_kafka_metadata_cache_dump (FILE *fp, rd_kafka_t *rk);
190 
191 int rd_kafka_metadata_cache_topics_to_list (rd_kafka_t *rk,
192                                              rd_list_t *topics);
193 
194 void
195 rd_kafka_metadata_cache_wait_state_change_async (rd_kafka_t *rk,
196                                                  rd_kafka_enq_once_t *eonce);
197 
198 /**@}*/
199 #endif /* _RDKAFKA_METADATA_H_ */
200