1 /*
2  * librdkafka - Apache Kafka C library
3  *
4  * Copyright (c) 2012-2013, Magnus Edenhill
5  * All rights reserved.
6  *
7  * Redistribution and use in source and binary forms, with or without
8  * modification, are permitted provided that the following conditions are met:
9  *
10  * 1. Redistributions of source code must retain the above copyright notice,
11  *    this list of conditions and the following disclaimer.
12  * 2. Redistributions in binary form must reproduce the above copyright notice,
13  *    this list of conditions and the following disclaimer in the documentation
14  *    and/or other materials provided with the distribution.
15  *
16  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
17  * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
18  * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
19  * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
20  * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
21  * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
22  * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
23  * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
24  * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
25  * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
26  * POSSIBILITY OF SUCH DAMAGE.
27  */
28 
29 
30 #include "rd.h"
31 #include "rdkafka_int.h"
32 #include "rdkafka_topic.h"
33 #include "rdkafka_broker.h"
34 #include "rdkafka_request.h"
35 #include "rdkafka_metadata.h"
36 
37 #include <string.h>
38 /**
39  * @{
40  *
41  * @brief Metadata cache
42  *
43  * The metadata cache consists of cached topic metadata as
44  * retrieved from the cluster using MetadataRequest.
45  *
46  * The topic cache entries are made up \c struct rd_kafka_metadata_cache_entry
47  * each containing the topic name, a copy of the topic's metadata
48  * and a cache expiry time.
49  *
50  * On update any previous entry for the topic are removed and replaced
51  * with a new entry.
52  *
53  * The cache is also populated when the topic metadata is being requested
54  * for specific topics, this will not interfere with existing cache entries
55  * for topics, but for any topics not currently in the cache a new
56  * entry will be added with a flag (RD_KAFKA_METADATA_CACHE_VALID(rkmce))
57  * indicating that the entry is waiting to be populated by the MetadataResponse.
58  * Two special error codes are used for this purpose:
59  *   RD_KAFKA_RESP_ERR__NOENT - to indicate that a topic needs to be queried,
60  *   RD_KAFKA_RESP_ERR__WAIT_CACHE - to indicate that a topic is being queried
61  *                                   and there is no need to re-query it prior
62  *                                   to the current query finishing.
63  *
64  * The cache is locked in its entirety with rd_kafka_wr/rdlock() by the caller
65  * and the returned cache entry must only be accessed during the duration
66  * of the lock.
67  *
68  */
69 
70 
71 
72 /**
73  * @brief Remove and free cache entry.
74  *
75  * @remark The expiry timer is not updated, for simplicity.
76  * @locks rd_kafka_wrlock()
77  */
78 static RD_INLINE void
rd_kafka_metadata_cache_delete(rd_kafka_t * rk,struct rd_kafka_metadata_cache_entry * rkmce,int unlink_avl)79 rd_kafka_metadata_cache_delete (rd_kafka_t *rk,
80                                 struct rd_kafka_metadata_cache_entry *rkmce,
81                                 int unlink_avl) {
82         if (unlink_avl)
83                 RD_AVL_REMOVE_ELM(&rk->rk_metadata_cache.rkmc_avl, rkmce);
84         TAILQ_REMOVE(&rk->rk_metadata_cache.rkmc_expiry, rkmce, rkmce_link);
85         rd_kafka_assert(NULL, rk->rk_metadata_cache.rkmc_cnt > 0);
86         rk->rk_metadata_cache.rkmc_cnt--;
87 
88         rd_free(rkmce);
89 }
90 
91 /**
92  * @brief Delete cache entry by topic name
93  * @locks rd_kafka_wrlock()
94  * @returns 1 if entry was found and removed, else 0.
95  */
rd_kafka_metadata_cache_delete_by_name(rd_kafka_t * rk,const char * topic)96 static int rd_kafka_metadata_cache_delete_by_name (rd_kafka_t *rk,
97                                                     const char *topic) {
98         struct rd_kafka_metadata_cache_entry *rkmce;
99 
100         rkmce = rd_kafka_metadata_cache_find(rk, topic, 1);
101         if (rkmce)
102                 rd_kafka_metadata_cache_delete(rk, rkmce, 1);
103         return rkmce ? 1 : 0;
104 }
105 
106 static int rd_kafka_metadata_cache_evict (rd_kafka_t *rk);
107 
108 /**
109  * @brief Cache eviction timer callback.
110  * @locality rdkafka main thread
111  * @locks NOT rd_kafka_*lock()
112  */
rd_kafka_metadata_cache_evict_tmr_cb(rd_kafka_timers_t * rkts,void * arg)113 static void rd_kafka_metadata_cache_evict_tmr_cb (rd_kafka_timers_t *rkts,
114                                                   void *arg) {
115         rd_kafka_t *rk = arg;
116 
117         rd_kafka_wrlock(rk);
118         rd_kafka_metadata_cache_evict(rk);
119         rd_kafka_wrunlock(rk);
120 }
121 
122 
123 /**
124  * @brief Evict timed out entries from cache and rearm timer for
125  *        next expiry.
126  *
127  * @returns the number of entries evicted.
128  *
129  * @locks rd_kafka_wrlock()
130  */
rd_kafka_metadata_cache_evict(rd_kafka_t * rk)131 static int rd_kafka_metadata_cache_evict (rd_kafka_t *rk) {
132         int cnt = 0;
133         rd_ts_t now = rd_clock();
134         struct rd_kafka_metadata_cache_entry *rkmce;
135 
136         while ((rkmce = TAILQ_FIRST(&rk->rk_metadata_cache.rkmc_expiry)) &&
137                rkmce->rkmce_ts_expires <= now) {
138                 rd_kafka_metadata_cache_delete(rk, rkmce, 1);
139                 cnt++;
140         }
141 
142         if (rkmce)
143                 rd_kafka_timer_start(&rk->rk_timers,
144                                      &rk->rk_metadata_cache.rkmc_expiry_tmr,
145                                      rkmce->rkmce_ts_expires - now,
146                                      rd_kafka_metadata_cache_evict_tmr_cb,
147                                      rk);
148         else
149                 rd_kafka_timer_stop(&rk->rk_timers,
150                                     &rk->rk_metadata_cache.rkmc_expiry_tmr, 1);
151 
152         rd_kafka_dbg(rk, METADATA, "METADATA",
153                      "Expired %d entries from metadata cache "
154                      "(%d entries remain)",
155                      cnt, rk->rk_metadata_cache.rkmc_cnt);
156 
157         if (cnt)
158                 rd_kafka_metadata_cache_propagate_changes(rk);
159 
160         return cnt;
161 }
162 
163 
164 /**
165  * @brief Find cache entry by topic name
166  *
167  * @param valid: entry must be valid (not hint)
168  *
169  * @locks rd_kafka_*lock()
170  */
171 struct rd_kafka_metadata_cache_entry *
rd_kafka_metadata_cache_find(rd_kafka_t * rk,const char * topic,int valid)172 rd_kafka_metadata_cache_find (rd_kafka_t *rk, const char *topic, int valid) {
173         struct rd_kafka_metadata_cache_entry skel, *rkmce;
174         skel.rkmce_mtopic.topic = (char *)topic;
175         rkmce = RD_AVL_FIND(&rk->rk_metadata_cache.rkmc_avl, &skel);
176         if (rkmce && (!valid || RD_KAFKA_METADATA_CACHE_VALID(rkmce)))
177                 return rkmce;
178         return NULL;
179 }
180 
181 
182 /**
183  * @brief Partition (id) comparator
184  */
rd_kafka_metadata_partition_id_cmp(const void * _a,const void * _b)185 int rd_kafka_metadata_partition_id_cmp (const void *_a,
186                                         const void *_b) {
187         const rd_kafka_metadata_partition_t *a = _a, *b = _b;
188         return RD_CMP(a->id, b->id);
189 }
190 
191 
192 /**
193  * @brief Add (and replace) cache entry for topic.
194  *
195  * This makes a copy of \p topic
196  *
197  * @locks_required rd_kafka_wrlock()
198  */
199 static struct rd_kafka_metadata_cache_entry *
rd_kafka_metadata_cache_insert(rd_kafka_t * rk,const rd_kafka_metadata_topic_t * mtopic,rd_ts_t now,rd_ts_t ts_expires)200 rd_kafka_metadata_cache_insert (rd_kafka_t *rk,
201                                 const rd_kafka_metadata_topic_t *mtopic,
202                                 rd_ts_t now, rd_ts_t ts_expires) {
203         struct rd_kafka_metadata_cache_entry *rkmce, *old;
204         size_t topic_len;
205         rd_tmpabuf_t tbuf;
206         int i;
207 
208         /* Metadata is stored in one contigious buffer where structs and
209          * and pointed-to fields are layed out in a memory aligned fashion.
210          * rd_tmpabuf_t provides the infrastructure to do this.
211          * Because of this we copy all the structs verbatim but
212          * any pointer fields needs to be copied explicitly to update
213          * the pointer address. */
214         topic_len = strlen(mtopic->topic) + 1;
215         rd_tmpabuf_new(&tbuf,
216                        RD_ROUNDUP(sizeof(*rkmce), 8) +
217                        RD_ROUNDUP(topic_len, 8) +
218                        (mtopic->partition_cnt *
219                         RD_ROUNDUP(sizeof(*mtopic->partitions), 8)),
220                        1/*assert on fail*/);
221 
222         rkmce = rd_tmpabuf_alloc(&tbuf, sizeof(*rkmce));
223 
224         rkmce->rkmce_mtopic = *mtopic;
225 
226         /* Copy topic name and update pointer */
227         rkmce->rkmce_mtopic.topic = rd_tmpabuf_write_str(&tbuf, mtopic->topic);
228 
229         /* Copy partition array and update pointer */
230         rkmce->rkmce_mtopic.partitions =
231                 rd_tmpabuf_write(&tbuf, mtopic->partitions,
232                                  mtopic->partition_cnt *
233                                  sizeof(*mtopic->partitions));
234 
235         /* Clear uncached fields. */
236         for (i = 0 ; i < mtopic->partition_cnt ; i++) {
237                 rkmce->rkmce_mtopic.partitions[i].replicas = NULL;
238                 rkmce->rkmce_mtopic.partitions[i].replica_cnt = 0;
239                 rkmce->rkmce_mtopic.partitions[i].isrs = NULL;
240                 rkmce->rkmce_mtopic.partitions[i].isr_cnt = 0;
241         }
242 
243         /* Sort partitions for future bsearch() lookups. */
244         qsort(rkmce->rkmce_mtopic.partitions,
245               rkmce->rkmce_mtopic.partition_cnt,
246               sizeof(*rkmce->rkmce_mtopic.partitions),
247               rd_kafka_metadata_partition_id_cmp);
248 
249         TAILQ_INSERT_TAIL(&rk->rk_metadata_cache.rkmc_expiry,
250                           rkmce, rkmce_link);
251         rk->rk_metadata_cache.rkmc_cnt++;
252         rkmce->rkmce_ts_expires = ts_expires;
253         rkmce->rkmce_ts_insert = now;
254 
255         /* Insert (and replace existing) entry. */
256         old = RD_AVL_INSERT(&rk->rk_metadata_cache.rkmc_avl, rkmce,
257                             rkmce_avlnode);
258         if (old)
259                 rd_kafka_metadata_cache_delete(rk, old, 0);
260 
261         /* Explicitly not freeing the tmpabuf since rkmce points to its
262          * memory. */
263         return rkmce;
264 }
265 
266 
267 /**
268  * @brief Purge the metadata cache
269  *
270  * @locks_required rd_kafka_wrlock()
271  */
rd_kafka_metadata_cache_purge(rd_kafka_t * rk,rd_bool_t purge_observers)272 void rd_kafka_metadata_cache_purge (rd_kafka_t *rk, rd_bool_t purge_observers) {
273         struct rd_kafka_metadata_cache_entry *rkmce;
274         int was_empty = TAILQ_EMPTY(&rk->rk_metadata_cache.rkmc_expiry);
275 
276         while ((rkmce = TAILQ_FIRST(&rk->rk_metadata_cache.rkmc_expiry)))
277                 rd_kafka_metadata_cache_delete(rk, rkmce, 1);
278 
279         rd_kafka_timer_stop(&rk->rk_timers,
280                             &rk->rk_metadata_cache.rkmc_expiry_tmr, 1);
281 
282         if (!was_empty)
283                 rd_kafka_metadata_cache_propagate_changes(rk);
284 
285         if (purge_observers)
286                 rd_list_clear(&rk->rk_metadata_cache.rkmc_observers);
287 }
288 
289 
290 /**
291  * @brief Start or update the cache expiry timer.
292  *        Typically done after a series of cache_topic_update()
293  *
294  * @locks rd_kafka_wrlock()
295  */
rd_kafka_metadata_cache_expiry_start(rd_kafka_t * rk)296 void rd_kafka_metadata_cache_expiry_start (rd_kafka_t *rk) {
297         struct rd_kafka_metadata_cache_entry *rkmce;
298 
299         if ((rkmce = TAILQ_FIRST(&rk->rk_metadata_cache.rkmc_expiry)))
300                 rd_kafka_timer_start(&rk->rk_timers,
301                                      &rk->rk_metadata_cache.rkmc_expiry_tmr,
302                                      rkmce->rkmce_ts_expires - rd_clock(),
303                                      rd_kafka_metadata_cache_evict_tmr_cb,
304                                      rk);
305 }
306 
307 /**
308  * @brief Update the metadata cache for a single topic
309  *        with the provided metadata.
310  *
311  * If the topic has a temporary error the existing entry is removed
312  * and no new entry is added, which avoids the topic to be
313  * suppressed in upcoming metadata requests because being in the cache.
314  * In other words: we want to re-query errored topics.
315  * If the broker reports ERR_UNKNOWN_TOPIC_OR_PART we add a negative cache
316  * entry with an low expiry time, this is so that client code (cgrp) knows
317  * the topic has been queried but did not exist, otherwise it would wait
318  * forever for the unknown topic to surface.
319  *
320  * For permanent errors (authorization failures), we keep
321  * the entry cached for metadata.max.age.ms.
322  *
323  * @remark The cache expiry timer will not be updated/started,
324  *         call rd_kafka_metadata_cache_expiry_start() instead.
325  *
326  * @locks rd_kafka_wrlock()
327  */
328 void
rd_kafka_metadata_cache_topic_update(rd_kafka_t * rk,const rd_kafka_metadata_topic_t * mdt,rd_bool_t propagate)329 rd_kafka_metadata_cache_topic_update (rd_kafka_t *rk,
330                                       const rd_kafka_metadata_topic_t *mdt,
331                                       rd_bool_t propagate) {
332         rd_ts_t now = rd_clock();
333         rd_ts_t ts_expires = now + (rk->rk_conf.metadata_max_age_ms * 1000);
334         int changed = 1;
335 
336         /* Cache unknown topics for a short while (100ms) to allow the cgrp
337          * logic to find negative cache hits. */
338         if (mdt->err == RD_KAFKA_RESP_ERR_UNKNOWN_TOPIC_OR_PART)
339                 ts_expires = RD_MIN(ts_expires, now + (100 * 1000));
340 
341         if (!mdt->err ||
342             mdt->err == RD_KAFKA_RESP_ERR_TOPIC_AUTHORIZATION_FAILED ||
343             mdt->err == RD_KAFKA_RESP_ERR_UNKNOWN_TOPIC_OR_PART)
344                 rd_kafka_metadata_cache_insert(rk, mdt, now, ts_expires);
345         else
346                 changed = rd_kafka_metadata_cache_delete_by_name(rk,
347                                                                  mdt->topic);
348 
349         if (changed && propagate)
350                 rd_kafka_metadata_cache_propagate_changes(rk);
351 }
352 
353 
354 /**
355  * @brief Update the metadata cache with the provided metadata.
356  *
357  * @param abs_update int: absolute update: purge cache before updating.
358  *
359  * @locks rd_kafka_wrlock()
360  */
rd_kafka_metadata_cache_update(rd_kafka_t * rk,const rd_kafka_metadata_t * md,int abs_update)361 void rd_kafka_metadata_cache_update (rd_kafka_t *rk,
362                                      const rd_kafka_metadata_t *md,
363                                      int abs_update) {
364         struct rd_kafka_metadata_cache_entry *rkmce;
365         rd_ts_t now = rd_clock();
366         rd_ts_t ts_expires = now + (rk->rk_conf.metadata_max_age_ms * 1000);
367         int i;
368 
369         rd_kafka_dbg(rk, METADATA, "METADATA",
370                      "%s of metadata cache with %d topic(s)",
371                      abs_update ? "Absolute update" : "Update",
372                      md->topic_cnt);
373 
374         if (abs_update)
375                 rd_kafka_metadata_cache_purge(rk, rd_false/*not observers*/);
376 
377 
378         for (i = 0 ; i < md->topic_cnt ; i++)
379                 rd_kafka_metadata_cache_insert(rk, &md->topics[i], now,
380                                                ts_expires);
381 
382         /* Update expiry timer */
383         if ((rkmce = TAILQ_FIRST(&rk->rk_metadata_cache.rkmc_expiry)))
384                 rd_kafka_timer_start(&rk->rk_timers,
385                                      &rk->rk_metadata_cache.rkmc_expiry_tmr,
386                                      rkmce->rkmce_ts_expires - now,
387                                      rd_kafka_metadata_cache_evict_tmr_cb,
388                                      rk);
389 
390         if (md->topic_cnt > 0 || abs_update)
391                 rd_kafka_metadata_cache_propagate_changes(rk);
392 }
393 
394 
395 /**
396  * @brief Remove cache hints for topics in \p topics
397  *        This is done when the Metadata response has been parsed and
398  *        replaced hints with existing topic information, thus this will
399  *        only remove unmatched topics from the cache.
400  *
401  * @locks rd_kafka_wrlock()
402  */
rd_kafka_metadata_cache_purge_hints(rd_kafka_t * rk,const rd_list_t * topics)403 void rd_kafka_metadata_cache_purge_hints (rd_kafka_t *rk,
404                                           const rd_list_t *topics) {
405         const char *topic;
406         int i;
407         int cnt = 0;
408 
409         RD_LIST_FOREACH(topic, topics, i) {
410                 struct rd_kafka_metadata_cache_entry *rkmce;
411 
412                 if (!(rkmce = rd_kafka_metadata_cache_find(rk, topic,
413                                                            0/*any*/)) ||
414                     RD_KAFKA_METADATA_CACHE_VALID(rkmce))
415                         continue;
416 
417                 rd_kafka_metadata_cache_delete(rk, rkmce, 1/*unlink avl*/);
418                 cnt++;
419         }
420 
421         if (cnt > 0) {
422                 rd_kafka_dbg(rk, METADATA, "METADATA",
423                              "Purged %d/%d cached topic hint(s)",
424                              cnt, rd_list_cnt(topics));
425                 rd_kafka_metadata_cache_propagate_changes(rk);
426         }
427 }
428 
429 
430 /**
431  * @brief Inserts a non-valid entry for topics in \p topics indicating
432  *        that a MetadataRequest is in progress.
433  *        This avoids sending multiple MetadataRequests for the same topics
434  *        if there are already outstanding requests, see
435  *        \c rd_kafka_metadata_refresh_topics().
436  *
437  * @remark These non-valid cache entries' expire time is set to the
438  *         MetadataRequest timeout.
439  *
440  * @param dst rd_list_t(char *topicname): if not NULL: populated with
441  *        topics that were added as hints to cache, e.q., topics to query.
442  * @param dst rd_list_t(char *topicname)
443  * @param err is the error to set on hint cache entries,
444  *            typically ERR__WAIT_CACHE.
445  * @param replace replace existing valid entries
446  *
447  * @returns the number of topic hints inserted.
448  *
449  * @locks_required rd_kafka_wrlock()
450  */
rd_kafka_metadata_cache_hint(rd_kafka_t * rk,const rd_list_t * topics,rd_list_t * dst,rd_kafka_resp_err_t err,rd_bool_t replace)451 int rd_kafka_metadata_cache_hint (rd_kafka_t *rk,
452                                   const rd_list_t *topics, rd_list_t *dst,
453                                   rd_kafka_resp_err_t err,
454                                   rd_bool_t replace) {
455         const char *topic;
456         rd_ts_t now = rd_clock();
457         rd_ts_t ts_expires = now + (rk->rk_conf.socket_timeout_ms * 1000);
458         int i;
459         int cnt = 0;
460 
461         RD_LIST_FOREACH(topic, topics, i) {
462                 rd_kafka_metadata_topic_t mtopic = {
463                         .topic = (char *)topic,
464                         .err = err
465                 };
466                 /*const*/ struct rd_kafka_metadata_cache_entry *rkmce;
467 
468                 /* !replace: Dont overwrite valid entries */
469                 if (!replace &&
470                     (rkmce =
471                      rd_kafka_metadata_cache_find(rk, topic, 0/*any*/))) {
472                         if (RD_KAFKA_METADATA_CACHE_VALID(rkmce) ||
473                             (dst && rkmce->rkmce_mtopic.err !=
474                              RD_KAFKA_RESP_ERR__NOENT))
475                                 continue;
476                         rkmce->rkmce_mtopic.err = err;
477                         /* FALLTHRU */
478                 }
479 
480                 rd_kafka_metadata_cache_insert(rk, &mtopic, now, ts_expires);
481                 cnt++;
482 
483                 if (dst)
484                         rd_list_add(dst, rd_strdup(topic));
485 
486         }
487 
488         if (cnt > 0)
489                 rd_kafka_dbg(rk, METADATA, "METADATA",
490                              "Hinted cache of %d/%d topic(s) being queried",
491                              cnt, rd_list_cnt(topics));
492 
493         return cnt;
494 }
495 
496 
497 /**
498  * @brief Same as rd_kafka_metadata_cache_hint() but takes
499  *        a topic+partition list as input instead.
500  *
501  * @locks_acquired rd_kafka_wrlock()
502  */
rd_kafka_metadata_cache_hint_rktparlist(rd_kafka_t * rk,const rd_kafka_topic_partition_list_t * rktparlist,rd_list_t * dst,int replace)503 int rd_kafka_metadata_cache_hint_rktparlist (
504         rd_kafka_t *rk,
505         const rd_kafka_topic_partition_list_t *rktparlist,
506         rd_list_t *dst,
507         int replace) {
508         rd_list_t topics;
509         int r;
510 
511         rd_list_init(&topics, rktparlist->cnt, rd_free);
512         rd_kafka_topic_partition_list_get_topic_names(rktparlist, &topics,
513                                                       0/*dont include regex*/);
514         rd_kafka_wrlock(rk);
515         r = rd_kafka_metadata_cache_hint(rk, &topics, dst,
516                                          RD_KAFKA_RESP_ERR__WAIT_CACHE,
517                                          replace);
518         rd_kafka_wrunlock(rk);
519 
520         rd_list_destroy(&topics);
521         return r;
522 }
523 
524 
525 /**
526  * @brief Cache entry comparator (on topic name)
527  */
rd_kafka_metadata_cache_entry_cmp(const void * _a,const void * _b)528 static int rd_kafka_metadata_cache_entry_cmp (const void *_a, const void *_b) {
529         const struct rd_kafka_metadata_cache_entry *a = _a, *b = _b;
530         return strcmp(a->rkmce_mtopic.topic, b->rkmce_mtopic.topic);
531 }
532 
533 
534 /**
535  * @brief Initialize the metadata cache
536  *
537  * @locks rd_kafka_wrlock()
538  */
rd_kafka_metadata_cache_init(rd_kafka_t * rk)539 void rd_kafka_metadata_cache_init (rd_kafka_t *rk) {
540         rd_avl_init(&rk->rk_metadata_cache.rkmc_avl,
541                     rd_kafka_metadata_cache_entry_cmp, 0);
542         TAILQ_INIT(&rk->rk_metadata_cache.rkmc_expiry);
543         mtx_init(&rk->rk_metadata_cache.rkmc_full_lock, mtx_plain);
544         mtx_init(&rk->rk_metadata_cache.rkmc_cnd_lock, mtx_plain);
545         cnd_init(&rk->rk_metadata_cache.rkmc_cnd);
546         rd_list_init(&rk->rk_metadata_cache.rkmc_observers, 8,
547                      rd_kafka_enq_once_trigger_destroy);
548 }
549 
550 /**
551  * @brief Purge and destroy metadata cache.
552  *
553  * @locks_required rd_kafka_wrlock()
554  */
rd_kafka_metadata_cache_destroy(rd_kafka_t * rk)555 void rd_kafka_metadata_cache_destroy (rd_kafka_t *rk) {
556         rd_list_destroy(&rk->rk_metadata_cache.rkmc_observers);
557         rd_kafka_timer_stop(&rk->rk_timers,
558                             &rk->rk_metadata_cache.rkmc_query_tmr, 1/*lock*/);
559         rd_kafka_metadata_cache_purge(rk, rd_true/*observers too*/);
560         mtx_destroy(&rk->rk_metadata_cache.rkmc_full_lock);
561         mtx_destroy(&rk->rk_metadata_cache.rkmc_cnd_lock);
562         cnd_destroy(&rk->rk_metadata_cache.rkmc_cnd);
563         rd_avl_destroy(&rk->rk_metadata_cache.rkmc_avl);
564 }
565 
566 
567 
568 /**
569  * @brief Add eonce to list of async cache observers.
570  *
571  * @locks_required rd_kafka_wrlock()
572  */
573 void
rd_kafka_metadata_cache_wait_state_change_async(rd_kafka_t * rk,rd_kafka_enq_once_t * eonce)574 rd_kafka_metadata_cache_wait_state_change_async (rd_kafka_t *rk,
575                                                  rd_kafka_enq_once_t *eonce) {
576         rd_kafka_enq_once_add_source(eonce, "wait metadata cache change");
577         rd_list_add(&rk->rk_metadata_cache.rkmc_observers, eonce);
578 }
579 
580 
581 /**
582  * @brief Wait for cache update, or timeout.
583  *
584  * @returns 1 on cache update or 0 on timeout.
585  * @locks none
586  * @locality any
587  */
rd_kafka_metadata_cache_wait_change(rd_kafka_t * rk,int timeout_ms)588 int rd_kafka_metadata_cache_wait_change (rd_kafka_t *rk, int timeout_ms) {
589         int r;
590 #if ENABLE_DEVEL
591         rd_ts_t ts_start = rd_clock();
592 #endif
593         mtx_lock(&rk->rk_metadata_cache.rkmc_cnd_lock);
594         r = cnd_timedwait_ms(&rk->rk_metadata_cache.rkmc_cnd,
595                              &rk->rk_metadata_cache.rkmc_cnd_lock,
596                              timeout_ms);
597         mtx_unlock(&rk->rk_metadata_cache.rkmc_cnd_lock);
598 
599 #if ENABLE_DEVEL
600         rd_kafka_dbg(rk, METADATA, "CACHEWAIT",
601                      "%s wait took %dms: %s",
602                      __FUNCTION__, (int)((rd_clock() - ts_start)/1000),
603                      r == thrd_success ? "succeeded" : "timed out");
604 #endif
605         return r == thrd_success;
606 }
607 
608 
609 /**
610  * @brief eonce trigger callback for rd_list_apply() call in
611  *        rd_kafka_metadata_cache_propagate_changes()
612  */
613 static int
rd_kafka_metadata_cache_propagate_changes_trigger_eonce(void * elem,void * opaque)614 rd_kafka_metadata_cache_propagate_changes_trigger_eonce (void *elem,
615                                                          void *opaque) {
616         rd_kafka_enq_once_t *eonce = elem;
617         rd_kafka_enq_once_trigger(eonce, RD_KAFKA_RESP_ERR_NO_ERROR,
618                                   "wait metadata cache change");
619         return 0; /* remove eonce from list */
620 }
621 
622 
623 /**
624  * @brief Propagate that the cache changed (but not what changed) to
625  *        any cnd listeners and eonce observers.
626  * @locks_required rd_kafka_wrlock(rk)
627  * @locks_acquired rkmc_cnd_lock
628  * @locality any
629  */
rd_kafka_metadata_cache_propagate_changes(rd_kafka_t * rk)630 void rd_kafka_metadata_cache_propagate_changes (rd_kafka_t *rk) {
631         mtx_lock(&rk->rk_metadata_cache.rkmc_cnd_lock);
632         cnd_broadcast(&rk->rk_metadata_cache.rkmc_cnd);
633         mtx_unlock(&rk->rk_metadata_cache.rkmc_cnd_lock);
634 
635         /* Trigger observers */
636         rd_list_apply(&rk->rk_metadata_cache.rkmc_observers,
637                       rd_kafka_metadata_cache_propagate_changes_trigger_eonce,
638                       NULL);
639 
640 }
641 
642 /**
643  * @returns the shared metadata for a topic, or NULL if not found in
644  *          cache.
645  *
646  * @locks rd_kafka_*lock()
647  */
648 const rd_kafka_metadata_topic_t *
rd_kafka_metadata_cache_topic_get(rd_kafka_t * rk,const char * topic,int valid)649 rd_kafka_metadata_cache_topic_get (rd_kafka_t *rk, const char *topic,
650                                    int valid) {
651         struct rd_kafka_metadata_cache_entry *rkmce;
652 
653         if (!(rkmce = rd_kafka_metadata_cache_find(rk, topic, valid)))
654                 return NULL;
655 
656         return &rkmce->rkmce_mtopic;
657 }
658 
659 
660 
661 
662 /**
663  * @brief Looks up the shared metadata for a partition along with its topic.
664  *
665  * Cache entries with errors (such as auth errors) will not be returned unless
666  * \p valid is set to false.
667  *
668  * @param mtopicp: pointer to topic metadata
669  * @param mpartp: pointer to partition metadata
670  * @param valid: only return valid entries (no hints)
671  *
672  * @returns -1 if topic was not found in cache, 0 if topic was found
673  *          but not the partition, 1 if both topic and partition was found.
674  *
675  * @locks rd_kafka_*lock()
676  */
rd_kafka_metadata_cache_topic_partition_get(rd_kafka_t * rk,const rd_kafka_metadata_topic_t ** mtopicp,const rd_kafka_metadata_partition_t ** mpartp,const char * topic,int32_t partition,int valid)677 int rd_kafka_metadata_cache_topic_partition_get (
678         rd_kafka_t *rk,
679         const rd_kafka_metadata_topic_t **mtopicp,
680         const rd_kafka_metadata_partition_t **mpartp,
681         const char *topic, int32_t partition, int valid) {
682 
683         const rd_kafka_metadata_topic_t *mtopic;
684         const rd_kafka_metadata_partition_t *mpart;
685         rd_kafka_metadata_partition_t skel = { .id = partition };
686 
687         *mtopicp = NULL;
688         *mpartp = NULL;
689 
690         if (!(mtopic = rd_kafka_metadata_cache_topic_get(rk, topic, valid)))
691                 return -1;
692 
693         *mtopicp = mtopic;
694 
695         if (mtopic->err)
696                 return -1;
697 
698         /* Partitions array may be sparse so use bsearch lookup. */
699         mpart = bsearch(&skel, mtopic->partitions,
700                         mtopic->partition_cnt,
701                         sizeof(*mtopic->partitions),
702                         rd_kafka_metadata_partition_id_cmp);
703 
704         if (!mpart)
705                 return 0;
706 
707         *mpartp = mpart;
708 
709         return 1;
710 }
711 
712 
713 /**
714  * @returns the number of topics in \p topics that are in the cache.
715  *
716  * @param topics rd_list(const char *): topic names
717  * @param metadata_agep: age of oldest entry will be returned.
718  *
719  * @locks rd_kafka_*lock()
720  */
rd_kafka_metadata_cache_topics_count_exists(rd_kafka_t * rk,const rd_list_t * topics,int * metadata_agep)721 int rd_kafka_metadata_cache_topics_count_exists (rd_kafka_t *rk,
722                                                  const rd_list_t *topics,
723                                                  int *metadata_agep) {
724         const char *topic;
725         int i;
726         int cnt = 0;
727         int max_age = -1;
728 
729         RD_LIST_FOREACH(topic, topics, i) {
730                 const struct rd_kafka_metadata_cache_entry *rkmce;
731                 int age;
732 
733                 if (!(rkmce = rd_kafka_metadata_cache_find(rk, topic,
734                                                            1/*valid only*/)))
735                         continue;
736 
737                 age = (int)((rd_clock() - rkmce->rkmce_ts_insert)/1000);
738                 if (age > max_age)
739                         max_age = age;
740                 cnt++;
741         }
742 
743         *metadata_agep = max_age;
744 
745         return cnt;
746 
747 }
748 
749 
750 /**
751  * @brief Add all topics in the metadata cache to \p topics, avoid duplicates.
752  *
753  * Element type is (char *topic_name).
754  *
755  * @returns the number of elements added to \p topics
756  *
757  * @locks_required rd_kafka_*lock()
758  */
rd_kafka_metadata_cache_topics_to_list(rd_kafka_t * rk,rd_list_t * topics)759 int rd_kafka_metadata_cache_topics_to_list (rd_kafka_t *rk,
760                                              rd_list_t *topics) {
761         const struct rd_kafka_metadata_cache_entry *rkmce;
762         int precnt = rd_list_cnt(topics);
763 
764         TAILQ_FOREACH(rkmce, &rk->rk_metadata_cache.rkmc_expiry, rkmce_link) {
765                 /* Ignore topics that have up to date metadata info */
766                 if (RD_KAFKA_METADATA_CACHE_VALID(rkmce))
767                         continue;
768 
769                 if (rd_list_find(topics, rkmce->rkmce_mtopic.topic,
770                                  rd_list_cmp_str))
771                         continue;
772 
773                 rd_list_add(topics, rd_strdup(rkmce->rkmce_mtopic.topic));
774         }
775 
776         return rd_list_cnt(topics) - precnt;
777 }
778 
779 
780 /**
781  * @brief Dump cache to \p fp
782  *
783  * @locks rd_kafka_*lock()
784  */
rd_kafka_metadata_cache_dump(FILE * fp,rd_kafka_t * rk)785 void rd_kafka_metadata_cache_dump (FILE *fp, rd_kafka_t *rk) {
786         const struct rd_kafka_metadata_cache *rkmc = &rk->rk_metadata_cache;
787         const struct rd_kafka_metadata_cache_entry *rkmce;
788         rd_ts_t now = rd_clock();
789 
790         fprintf(fp,
791                 "Metadata cache with %d entries:\n",
792                 rkmc->rkmc_cnt);
793         TAILQ_FOREACH(rkmce, &rkmc->rkmc_expiry, rkmce_link) {
794                 fprintf(fp,
795                         "  %s (inserted %dms ago, expires in %dms, "
796                         "%d partition(s), %s)%s%s\n",
797                         rkmce->rkmce_mtopic.topic,
798                         (int)((now - rkmce->rkmce_ts_insert)/1000),
799                         (int)((rkmce->rkmce_ts_expires - now)/1000),
800                         rkmce->rkmce_mtopic.partition_cnt,
801                         RD_KAFKA_METADATA_CACHE_VALID(rkmce) ? "valid":"hint",
802                         rkmce->rkmce_mtopic.err ? " error: " : "",
803                         rkmce->rkmce_mtopic.err ?
804                         rd_kafka_err2str(rkmce->rkmce_mtopic.err) : "");
805         }
806 }
807 
808 /**@}*/
809