1 /*
2 * librdkafka - Apache Kafka C library
3 *
4 * Copyright (c) 2012-2013, Magnus Edenhill
5 * All rights reserved.
6 *
7 * Redistribution and use in source and binary forms, with or without
8 * modification, are permitted provided that the following conditions are met:
9 *
10 * 1. Redistributions of source code must retain the above copyright notice,
11 * this list of conditions and the following disclaimer.
12 * 2. Redistributions in binary form must reproduce the above copyright notice,
13 * this list of conditions and the following disclaimer in the documentation
14 * and/or other materials provided with the distribution.
15 *
16 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
17 * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
18 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
19 * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
20 * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
21 * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
22 * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
23 * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
24 * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
25 * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
26 * POSSIBILITY OF SUCH DAMAGE.
27 */
28
29
30 #include "rd.h"
31 #include "rdkafka_int.h"
32 #include "rdkafka_topic.h"
33 #include "rdkafka_broker.h"
34 #include "rdkafka_request.h"
35 #include "rdkafka_metadata.h"
36
37 #include <string.h>
38 /**
39 * @{
40 *
41 * @brief Metadata cache
42 *
43 * The metadata cache consists of cached topic metadata as
44 * retrieved from the cluster using MetadataRequest.
45 *
46 * The topic cache entries are made up \c struct rd_kafka_metadata_cache_entry
47 * each containing the topic name, a copy of the topic's metadata
48 * and a cache expiry time.
49 *
50 * On update any previous entry for the topic are removed and replaced
51 * with a new entry.
52 *
53 * The cache is also populated when the topic metadata is being requested
54 * for specific topics, this will not interfere with existing cache entries
55 * for topics, but for any topics not currently in the cache a new
56 * entry will be added with a flag (RD_KAFKA_METADATA_CACHE_VALID(rkmce))
57 * indicating that the entry is waiting to be populated by the MetadataResponse.
58 * Two special error codes are used for this purpose:
59 * RD_KAFKA_RESP_ERR__NOENT - to indicate that a topic needs to be queried,
60 * RD_KAFKA_RESP_ERR__WAIT_CACHE - to indicate that a topic is being queried
61 * and there is no need to re-query it prior
62 * to the current query finishing.
63 *
64 * The cache is locked in its entirety with rd_kafka_wr/rdlock() by the caller
65 * and the returned cache entry must only be accessed during the duration
66 * of the lock.
67 *
68 */
69
70
71
72 /**
73 * @brief Remove and free cache entry.
74 *
75 * @remark The expiry timer is not updated, for simplicity.
76 * @locks rd_kafka_wrlock()
77 */
78 static RD_INLINE void
rd_kafka_metadata_cache_delete(rd_kafka_t * rk,struct rd_kafka_metadata_cache_entry * rkmce,int unlink_avl)79 rd_kafka_metadata_cache_delete (rd_kafka_t *rk,
80 struct rd_kafka_metadata_cache_entry *rkmce,
81 int unlink_avl) {
82 if (unlink_avl)
83 RD_AVL_REMOVE_ELM(&rk->rk_metadata_cache.rkmc_avl, rkmce);
84 TAILQ_REMOVE(&rk->rk_metadata_cache.rkmc_expiry, rkmce, rkmce_link);
85 rd_kafka_assert(NULL, rk->rk_metadata_cache.rkmc_cnt > 0);
86 rk->rk_metadata_cache.rkmc_cnt--;
87
88 rd_free(rkmce);
89 }
90
91 /**
92 * @brief Delete cache entry by topic name
93 * @locks rd_kafka_wrlock()
94 * @returns 1 if entry was found and removed, else 0.
95 */
rd_kafka_metadata_cache_delete_by_name(rd_kafka_t * rk,const char * topic)96 static int rd_kafka_metadata_cache_delete_by_name (rd_kafka_t *rk,
97 const char *topic) {
98 struct rd_kafka_metadata_cache_entry *rkmce;
99
100 rkmce = rd_kafka_metadata_cache_find(rk, topic, 1);
101 if (rkmce)
102 rd_kafka_metadata_cache_delete(rk, rkmce, 1);
103 return rkmce ? 1 : 0;
104 }
105
106 static int rd_kafka_metadata_cache_evict (rd_kafka_t *rk);
107
108 /**
109 * @brief Cache eviction timer callback.
110 * @locality rdkafka main thread
111 * @locks NOT rd_kafka_*lock()
112 */
rd_kafka_metadata_cache_evict_tmr_cb(rd_kafka_timers_t * rkts,void * arg)113 static void rd_kafka_metadata_cache_evict_tmr_cb (rd_kafka_timers_t *rkts,
114 void *arg) {
115 rd_kafka_t *rk = arg;
116
117 rd_kafka_wrlock(rk);
118 rd_kafka_metadata_cache_evict(rk);
119 rd_kafka_wrunlock(rk);
120 }
121
122
123 /**
124 * @brief Evict timed out entries from cache and rearm timer for
125 * next expiry.
126 *
127 * @returns the number of entries evicted.
128 *
129 * @locks rd_kafka_wrlock()
130 */
rd_kafka_metadata_cache_evict(rd_kafka_t * rk)131 static int rd_kafka_metadata_cache_evict (rd_kafka_t *rk) {
132 int cnt = 0;
133 rd_ts_t now = rd_clock();
134 struct rd_kafka_metadata_cache_entry *rkmce;
135
136 while ((rkmce = TAILQ_FIRST(&rk->rk_metadata_cache.rkmc_expiry)) &&
137 rkmce->rkmce_ts_expires <= now) {
138 rd_kafka_metadata_cache_delete(rk, rkmce, 1);
139 cnt++;
140 }
141
142 if (rkmce)
143 rd_kafka_timer_start(&rk->rk_timers,
144 &rk->rk_metadata_cache.rkmc_expiry_tmr,
145 rkmce->rkmce_ts_expires - now,
146 rd_kafka_metadata_cache_evict_tmr_cb,
147 rk);
148 else
149 rd_kafka_timer_stop(&rk->rk_timers,
150 &rk->rk_metadata_cache.rkmc_expiry_tmr, 1);
151
152 rd_kafka_dbg(rk, METADATA, "METADATA",
153 "Expired %d entries from metadata cache "
154 "(%d entries remain)",
155 cnt, rk->rk_metadata_cache.rkmc_cnt);
156
157 if (cnt)
158 rd_kafka_metadata_cache_propagate_changes(rk);
159
160 return cnt;
161 }
162
163
164 /**
165 * @brief Find cache entry by topic name
166 *
167 * @param valid: entry must be valid (not hint)
168 *
169 * @locks rd_kafka_*lock()
170 */
171 struct rd_kafka_metadata_cache_entry *
rd_kafka_metadata_cache_find(rd_kafka_t * rk,const char * topic,int valid)172 rd_kafka_metadata_cache_find (rd_kafka_t *rk, const char *topic, int valid) {
173 struct rd_kafka_metadata_cache_entry skel, *rkmce;
174 skel.rkmce_mtopic.topic = (char *)topic;
175 rkmce = RD_AVL_FIND(&rk->rk_metadata_cache.rkmc_avl, &skel);
176 if (rkmce && (!valid || RD_KAFKA_METADATA_CACHE_VALID(rkmce)))
177 return rkmce;
178 return NULL;
179 }
180
181
182 /**
183 * @brief Partition (id) comparator
184 */
rd_kafka_metadata_partition_id_cmp(const void * _a,const void * _b)185 int rd_kafka_metadata_partition_id_cmp (const void *_a,
186 const void *_b) {
187 const rd_kafka_metadata_partition_t *a = _a, *b = _b;
188 return RD_CMP(a->id, b->id);
189 }
190
191
192 /**
193 * @brief Add (and replace) cache entry for topic.
194 *
195 * This makes a copy of \p topic
196 *
197 * @locks_required rd_kafka_wrlock()
198 */
199 static struct rd_kafka_metadata_cache_entry *
rd_kafka_metadata_cache_insert(rd_kafka_t * rk,const rd_kafka_metadata_topic_t * mtopic,rd_ts_t now,rd_ts_t ts_expires)200 rd_kafka_metadata_cache_insert (rd_kafka_t *rk,
201 const rd_kafka_metadata_topic_t *mtopic,
202 rd_ts_t now, rd_ts_t ts_expires) {
203 struct rd_kafka_metadata_cache_entry *rkmce, *old;
204 size_t topic_len;
205 rd_tmpabuf_t tbuf;
206 int i;
207
208 /* Metadata is stored in one contigious buffer where structs and
209 * and pointed-to fields are layed out in a memory aligned fashion.
210 * rd_tmpabuf_t provides the infrastructure to do this.
211 * Because of this we copy all the structs verbatim but
212 * any pointer fields needs to be copied explicitly to update
213 * the pointer address. */
214 topic_len = strlen(mtopic->topic) + 1;
215 rd_tmpabuf_new(&tbuf,
216 RD_ROUNDUP(sizeof(*rkmce), 8) +
217 RD_ROUNDUP(topic_len, 8) +
218 (mtopic->partition_cnt *
219 RD_ROUNDUP(sizeof(*mtopic->partitions), 8)),
220 1/*assert on fail*/);
221
222 rkmce = rd_tmpabuf_alloc(&tbuf, sizeof(*rkmce));
223
224 rkmce->rkmce_mtopic = *mtopic;
225
226 /* Copy topic name and update pointer */
227 rkmce->rkmce_mtopic.topic = rd_tmpabuf_write_str(&tbuf, mtopic->topic);
228
229 /* Copy partition array and update pointer */
230 rkmce->rkmce_mtopic.partitions =
231 rd_tmpabuf_write(&tbuf, mtopic->partitions,
232 mtopic->partition_cnt *
233 sizeof(*mtopic->partitions));
234
235 /* Clear uncached fields. */
236 for (i = 0 ; i < mtopic->partition_cnt ; i++) {
237 rkmce->rkmce_mtopic.partitions[i].replicas = NULL;
238 rkmce->rkmce_mtopic.partitions[i].replica_cnt = 0;
239 rkmce->rkmce_mtopic.partitions[i].isrs = NULL;
240 rkmce->rkmce_mtopic.partitions[i].isr_cnt = 0;
241 }
242
243 /* Sort partitions for future bsearch() lookups. */
244 qsort(rkmce->rkmce_mtopic.partitions,
245 rkmce->rkmce_mtopic.partition_cnt,
246 sizeof(*rkmce->rkmce_mtopic.partitions),
247 rd_kafka_metadata_partition_id_cmp);
248
249 TAILQ_INSERT_TAIL(&rk->rk_metadata_cache.rkmc_expiry,
250 rkmce, rkmce_link);
251 rk->rk_metadata_cache.rkmc_cnt++;
252 rkmce->rkmce_ts_expires = ts_expires;
253 rkmce->rkmce_ts_insert = now;
254
255 /* Insert (and replace existing) entry. */
256 old = RD_AVL_INSERT(&rk->rk_metadata_cache.rkmc_avl, rkmce,
257 rkmce_avlnode);
258 if (old)
259 rd_kafka_metadata_cache_delete(rk, old, 0);
260
261 /* Explicitly not freeing the tmpabuf since rkmce points to its
262 * memory. */
263 return rkmce;
264 }
265
266
267 /**
268 * @brief Purge the metadata cache
269 *
270 * @locks_required rd_kafka_wrlock()
271 */
rd_kafka_metadata_cache_purge(rd_kafka_t * rk,rd_bool_t purge_observers)272 void rd_kafka_metadata_cache_purge (rd_kafka_t *rk, rd_bool_t purge_observers) {
273 struct rd_kafka_metadata_cache_entry *rkmce;
274 int was_empty = TAILQ_EMPTY(&rk->rk_metadata_cache.rkmc_expiry);
275
276 while ((rkmce = TAILQ_FIRST(&rk->rk_metadata_cache.rkmc_expiry)))
277 rd_kafka_metadata_cache_delete(rk, rkmce, 1);
278
279 rd_kafka_timer_stop(&rk->rk_timers,
280 &rk->rk_metadata_cache.rkmc_expiry_tmr, 1);
281
282 if (!was_empty)
283 rd_kafka_metadata_cache_propagate_changes(rk);
284
285 if (purge_observers)
286 rd_list_clear(&rk->rk_metadata_cache.rkmc_observers);
287 }
288
289
290 /**
291 * @brief Start or update the cache expiry timer.
292 * Typically done after a series of cache_topic_update()
293 *
294 * @locks rd_kafka_wrlock()
295 */
rd_kafka_metadata_cache_expiry_start(rd_kafka_t * rk)296 void rd_kafka_metadata_cache_expiry_start (rd_kafka_t *rk) {
297 struct rd_kafka_metadata_cache_entry *rkmce;
298
299 if ((rkmce = TAILQ_FIRST(&rk->rk_metadata_cache.rkmc_expiry)))
300 rd_kafka_timer_start(&rk->rk_timers,
301 &rk->rk_metadata_cache.rkmc_expiry_tmr,
302 rkmce->rkmce_ts_expires - rd_clock(),
303 rd_kafka_metadata_cache_evict_tmr_cb,
304 rk);
305 }
306
307 /**
308 * @brief Update the metadata cache for a single topic
309 * with the provided metadata.
310 *
311 * If the topic has a temporary error the existing entry is removed
312 * and no new entry is added, which avoids the topic to be
313 * suppressed in upcoming metadata requests because being in the cache.
314 * In other words: we want to re-query errored topics.
315 * If the broker reports ERR_UNKNOWN_TOPIC_OR_PART we add a negative cache
316 * entry with an low expiry time, this is so that client code (cgrp) knows
317 * the topic has been queried but did not exist, otherwise it would wait
318 * forever for the unknown topic to surface.
319 *
320 * For permanent errors (authorization failures), we keep
321 * the entry cached for metadata.max.age.ms.
322 *
323 * @remark The cache expiry timer will not be updated/started,
324 * call rd_kafka_metadata_cache_expiry_start() instead.
325 *
326 * @locks rd_kafka_wrlock()
327 */
328 void
rd_kafka_metadata_cache_topic_update(rd_kafka_t * rk,const rd_kafka_metadata_topic_t * mdt,rd_bool_t propagate)329 rd_kafka_metadata_cache_topic_update (rd_kafka_t *rk,
330 const rd_kafka_metadata_topic_t *mdt,
331 rd_bool_t propagate) {
332 rd_ts_t now = rd_clock();
333 rd_ts_t ts_expires = now + (rk->rk_conf.metadata_max_age_ms * 1000);
334 int changed = 1;
335
336 /* Cache unknown topics for a short while (100ms) to allow the cgrp
337 * logic to find negative cache hits. */
338 if (mdt->err == RD_KAFKA_RESP_ERR_UNKNOWN_TOPIC_OR_PART)
339 ts_expires = RD_MIN(ts_expires, now + (100 * 1000));
340
341 if (!mdt->err ||
342 mdt->err == RD_KAFKA_RESP_ERR_TOPIC_AUTHORIZATION_FAILED ||
343 mdt->err == RD_KAFKA_RESP_ERR_UNKNOWN_TOPIC_OR_PART)
344 rd_kafka_metadata_cache_insert(rk, mdt, now, ts_expires);
345 else
346 changed = rd_kafka_metadata_cache_delete_by_name(rk,
347 mdt->topic);
348
349 if (changed && propagate)
350 rd_kafka_metadata_cache_propagate_changes(rk);
351 }
352
353
354 /**
355 * @brief Update the metadata cache with the provided metadata.
356 *
357 * @param abs_update int: absolute update: purge cache before updating.
358 *
359 * @locks rd_kafka_wrlock()
360 */
rd_kafka_metadata_cache_update(rd_kafka_t * rk,const rd_kafka_metadata_t * md,int abs_update)361 void rd_kafka_metadata_cache_update (rd_kafka_t *rk,
362 const rd_kafka_metadata_t *md,
363 int abs_update) {
364 struct rd_kafka_metadata_cache_entry *rkmce;
365 rd_ts_t now = rd_clock();
366 rd_ts_t ts_expires = now + (rk->rk_conf.metadata_max_age_ms * 1000);
367 int i;
368
369 rd_kafka_dbg(rk, METADATA, "METADATA",
370 "%s of metadata cache with %d topic(s)",
371 abs_update ? "Absolute update" : "Update",
372 md->topic_cnt);
373
374 if (abs_update)
375 rd_kafka_metadata_cache_purge(rk, rd_false/*not observers*/);
376
377
378 for (i = 0 ; i < md->topic_cnt ; i++)
379 rd_kafka_metadata_cache_insert(rk, &md->topics[i], now,
380 ts_expires);
381
382 /* Update expiry timer */
383 if ((rkmce = TAILQ_FIRST(&rk->rk_metadata_cache.rkmc_expiry)))
384 rd_kafka_timer_start(&rk->rk_timers,
385 &rk->rk_metadata_cache.rkmc_expiry_tmr,
386 rkmce->rkmce_ts_expires - now,
387 rd_kafka_metadata_cache_evict_tmr_cb,
388 rk);
389
390 if (md->topic_cnt > 0 || abs_update)
391 rd_kafka_metadata_cache_propagate_changes(rk);
392 }
393
394
395 /**
396 * @brief Remove cache hints for topics in \p topics
397 * This is done when the Metadata response has been parsed and
398 * replaced hints with existing topic information, thus this will
399 * only remove unmatched topics from the cache.
400 *
401 * @locks rd_kafka_wrlock()
402 */
rd_kafka_metadata_cache_purge_hints(rd_kafka_t * rk,const rd_list_t * topics)403 void rd_kafka_metadata_cache_purge_hints (rd_kafka_t *rk,
404 const rd_list_t *topics) {
405 const char *topic;
406 int i;
407 int cnt = 0;
408
409 RD_LIST_FOREACH(topic, topics, i) {
410 struct rd_kafka_metadata_cache_entry *rkmce;
411
412 if (!(rkmce = rd_kafka_metadata_cache_find(rk, topic,
413 0/*any*/)) ||
414 RD_KAFKA_METADATA_CACHE_VALID(rkmce))
415 continue;
416
417 rd_kafka_metadata_cache_delete(rk, rkmce, 1/*unlink avl*/);
418 cnt++;
419 }
420
421 if (cnt > 0) {
422 rd_kafka_dbg(rk, METADATA, "METADATA",
423 "Purged %d/%d cached topic hint(s)",
424 cnt, rd_list_cnt(topics));
425 rd_kafka_metadata_cache_propagate_changes(rk);
426 }
427 }
428
429
430 /**
431 * @brief Inserts a non-valid entry for topics in \p topics indicating
432 * that a MetadataRequest is in progress.
433 * This avoids sending multiple MetadataRequests for the same topics
434 * if there are already outstanding requests, see
435 * \c rd_kafka_metadata_refresh_topics().
436 *
437 * @remark These non-valid cache entries' expire time is set to the
438 * MetadataRequest timeout.
439 *
440 * @param dst rd_list_t(char *topicname): if not NULL: populated with
441 * topics that were added as hints to cache, e.q., topics to query.
442 * @param dst rd_list_t(char *topicname)
443 * @param err is the error to set on hint cache entries,
444 * typically ERR__WAIT_CACHE.
445 * @param replace replace existing valid entries
446 *
447 * @returns the number of topic hints inserted.
448 *
449 * @locks_required rd_kafka_wrlock()
450 */
rd_kafka_metadata_cache_hint(rd_kafka_t * rk,const rd_list_t * topics,rd_list_t * dst,rd_kafka_resp_err_t err,rd_bool_t replace)451 int rd_kafka_metadata_cache_hint (rd_kafka_t *rk,
452 const rd_list_t *topics, rd_list_t *dst,
453 rd_kafka_resp_err_t err,
454 rd_bool_t replace) {
455 const char *topic;
456 rd_ts_t now = rd_clock();
457 rd_ts_t ts_expires = now + (rk->rk_conf.socket_timeout_ms * 1000);
458 int i;
459 int cnt = 0;
460
461 RD_LIST_FOREACH(topic, topics, i) {
462 rd_kafka_metadata_topic_t mtopic = {
463 .topic = (char *)topic,
464 .err = err
465 };
466 /*const*/ struct rd_kafka_metadata_cache_entry *rkmce;
467
468 /* !replace: Dont overwrite valid entries */
469 if (!replace &&
470 (rkmce =
471 rd_kafka_metadata_cache_find(rk, topic, 0/*any*/))) {
472 if (RD_KAFKA_METADATA_CACHE_VALID(rkmce) ||
473 (dst && rkmce->rkmce_mtopic.err !=
474 RD_KAFKA_RESP_ERR__NOENT))
475 continue;
476 rkmce->rkmce_mtopic.err = err;
477 /* FALLTHRU */
478 }
479
480 rd_kafka_metadata_cache_insert(rk, &mtopic, now, ts_expires);
481 cnt++;
482
483 if (dst)
484 rd_list_add(dst, rd_strdup(topic));
485
486 }
487
488 if (cnt > 0)
489 rd_kafka_dbg(rk, METADATA, "METADATA",
490 "Hinted cache of %d/%d topic(s) being queried",
491 cnt, rd_list_cnt(topics));
492
493 return cnt;
494 }
495
496
497 /**
498 * @brief Same as rd_kafka_metadata_cache_hint() but takes
499 * a topic+partition list as input instead.
500 *
501 * @locks_acquired rd_kafka_wrlock()
502 */
rd_kafka_metadata_cache_hint_rktparlist(rd_kafka_t * rk,const rd_kafka_topic_partition_list_t * rktparlist,rd_list_t * dst,int replace)503 int rd_kafka_metadata_cache_hint_rktparlist (
504 rd_kafka_t *rk,
505 const rd_kafka_topic_partition_list_t *rktparlist,
506 rd_list_t *dst,
507 int replace) {
508 rd_list_t topics;
509 int r;
510
511 rd_list_init(&topics, rktparlist->cnt, rd_free);
512 rd_kafka_topic_partition_list_get_topic_names(rktparlist, &topics,
513 0/*dont include regex*/);
514 rd_kafka_wrlock(rk);
515 r = rd_kafka_metadata_cache_hint(rk, &topics, dst,
516 RD_KAFKA_RESP_ERR__WAIT_CACHE,
517 replace);
518 rd_kafka_wrunlock(rk);
519
520 rd_list_destroy(&topics);
521 return r;
522 }
523
524
525 /**
526 * @brief Cache entry comparator (on topic name)
527 */
rd_kafka_metadata_cache_entry_cmp(const void * _a,const void * _b)528 static int rd_kafka_metadata_cache_entry_cmp (const void *_a, const void *_b) {
529 const struct rd_kafka_metadata_cache_entry *a = _a, *b = _b;
530 return strcmp(a->rkmce_mtopic.topic, b->rkmce_mtopic.topic);
531 }
532
533
534 /**
535 * @brief Initialize the metadata cache
536 *
537 * @locks rd_kafka_wrlock()
538 */
rd_kafka_metadata_cache_init(rd_kafka_t * rk)539 void rd_kafka_metadata_cache_init (rd_kafka_t *rk) {
540 rd_avl_init(&rk->rk_metadata_cache.rkmc_avl,
541 rd_kafka_metadata_cache_entry_cmp, 0);
542 TAILQ_INIT(&rk->rk_metadata_cache.rkmc_expiry);
543 mtx_init(&rk->rk_metadata_cache.rkmc_full_lock, mtx_plain);
544 mtx_init(&rk->rk_metadata_cache.rkmc_cnd_lock, mtx_plain);
545 cnd_init(&rk->rk_metadata_cache.rkmc_cnd);
546 rd_list_init(&rk->rk_metadata_cache.rkmc_observers, 8,
547 rd_kafka_enq_once_trigger_destroy);
548 }
549
550 /**
551 * @brief Purge and destroy metadata cache.
552 *
553 * @locks_required rd_kafka_wrlock()
554 */
rd_kafka_metadata_cache_destroy(rd_kafka_t * rk)555 void rd_kafka_metadata_cache_destroy (rd_kafka_t *rk) {
556 rd_list_destroy(&rk->rk_metadata_cache.rkmc_observers);
557 rd_kafka_timer_stop(&rk->rk_timers,
558 &rk->rk_metadata_cache.rkmc_query_tmr, 1/*lock*/);
559 rd_kafka_metadata_cache_purge(rk, rd_true/*observers too*/);
560 mtx_destroy(&rk->rk_metadata_cache.rkmc_full_lock);
561 mtx_destroy(&rk->rk_metadata_cache.rkmc_cnd_lock);
562 cnd_destroy(&rk->rk_metadata_cache.rkmc_cnd);
563 rd_avl_destroy(&rk->rk_metadata_cache.rkmc_avl);
564 }
565
566
567
568 /**
569 * @brief Add eonce to list of async cache observers.
570 *
571 * @locks_required rd_kafka_wrlock()
572 */
573 void
rd_kafka_metadata_cache_wait_state_change_async(rd_kafka_t * rk,rd_kafka_enq_once_t * eonce)574 rd_kafka_metadata_cache_wait_state_change_async (rd_kafka_t *rk,
575 rd_kafka_enq_once_t *eonce) {
576 rd_kafka_enq_once_add_source(eonce, "wait metadata cache change");
577 rd_list_add(&rk->rk_metadata_cache.rkmc_observers, eonce);
578 }
579
580
581 /**
582 * @brief Wait for cache update, or timeout.
583 *
584 * @returns 1 on cache update or 0 on timeout.
585 * @locks none
586 * @locality any
587 */
rd_kafka_metadata_cache_wait_change(rd_kafka_t * rk,int timeout_ms)588 int rd_kafka_metadata_cache_wait_change (rd_kafka_t *rk, int timeout_ms) {
589 int r;
590 #if ENABLE_DEVEL
591 rd_ts_t ts_start = rd_clock();
592 #endif
593 mtx_lock(&rk->rk_metadata_cache.rkmc_cnd_lock);
594 r = cnd_timedwait_ms(&rk->rk_metadata_cache.rkmc_cnd,
595 &rk->rk_metadata_cache.rkmc_cnd_lock,
596 timeout_ms);
597 mtx_unlock(&rk->rk_metadata_cache.rkmc_cnd_lock);
598
599 #if ENABLE_DEVEL
600 rd_kafka_dbg(rk, METADATA, "CACHEWAIT",
601 "%s wait took %dms: %s",
602 __FUNCTION__, (int)((rd_clock() - ts_start)/1000),
603 r == thrd_success ? "succeeded" : "timed out");
604 #endif
605 return r == thrd_success;
606 }
607
608
609 /**
610 * @brief eonce trigger callback for rd_list_apply() call in
611 * rd_kafka_metadata_cache_propagate_changes()
612 */
613 static int
rd_kafka_metadata_cache_propagate_changes_trigger_eonce(void * elem,void * opaque)614 rd_kafka_metadata_cache_propagate_changes_trigger_eonce (void *elem,
615 void *opaque) {
616 rd_kafka_enq_once_t *eonce = elem;
617 rd_kafka_enq_once_trigger(eonce, RD_KAFKA_RESP_ERR_NO_ERROR,
618 "wait metadata cache change");
619 return 0; /* remove eonce from list */
620 }
621
622
623 /**
624 * @brief Propagate that the cache changed (but not what changed) to
625 * any cnd listeners and eonce observers.
626 * @locks_required rd_kafka_wrlock(rk)
627 * @locks_acquired rkmc_cnd_lock
628 * @locality any
629 */
rd_kafka_metadata_cache_propagate_changes(rd_kafka_t * rk)630 void rd_kafka_metadata_cache_propagate_changes (rd_kafka_t *rk) {
631 mtx_lock(&rk->rk_metadata_cache.rkmc_cnd_lock);
632 cnd_broadcast(&rk->rk_metadata_cache.rkmc_cnd);
633 mtx_unlock(&rk->rk_metadata_cache.rkmc_cnd_lock);
634
635 /* Trigger observers */
636 rd_list_apply(&rk->rk_metadata_cache.rkmc_observers,
637 rd_kafka_metadata_cache_propagate_changes_trigger_eonce,
638 NULL);
639
640 }
641
642 /**
643 * @returns the shared metadata for a topic, or NULL if not found in
644 * cache.
645 *
646 * @locks rd_kafka_*lock()
647 */
648 const rd_kafka_metadata_topic_t *
rd_kafka_metadata_cache_topic_get(rd_kafka_t * rk,const char * topic,int valid)649 rd_kafka_metadata_cache_topic_get (rd_kafka_t *rk, const char *topic,
650 int valid) {
651 struct rd_kafka_metadata_cache_entry *rkmce;
652
653 if (!(rkmce = rd_kafka_metadata_cache_find(rk, topic, valid)))
654 return NULL;
655
656 return &rkmce->rkmce_mtopic;
657 }
658
659
660
661
662 /**
663 * @brief Looks up the shared metadata for a partition along with its topic.
664 *
665 * Cache entries with errors (such as auth errors) will not be returned unless
666 * \p valid is set to false.
667 *
668 * @param mtopicp: pointer to topic metadata
669 * @param mpartp: pointer to partition metadata
670 * @param valid: only return valid entries (no hints)
671 *
672 * @returns -1 if topic was not found in cache, 0 if topic was found
673 * but not the partition, 1 if both topic and partition was found.
674 *
675 * @locks rd_kafka_*lock()
676 */
rd_kafka_metadata_cache_topic_partition_get(rd_kafka_t * rk,const rd_kafka_metadata_topic_t ** mtopicp,const rd_kafka_metadata_partition_t ** mpartp,const char * topic,int32_t partition,int valid)677 int rd_kafka_metadata_cache_topic_partition_get (
678 rd_kafka_t *rk,
679 const rd_kafka_metadata_topic_t **mtopicp,
680 const rd_kafka_metadata_partition_t **mpartp,
681 const char *topic, int32_t partition, int valid) {
682
683 const rd_kafka_metadata_topic_t *mtopic;
684 const rd_kafka_metadata_partition_t *mpart;
685 rd_kafka_metadata_partition_t skel = { .id = partition };
686
687 *mtopicp = NULL;
688 *mpartp = NULL;
689
690 if (!(mtopic = rd_kafka_metadata_cache_topic_get(rk, topic, valid)))
691 return -1;
692
693 *mtopicp = mtopic;
694
695 if (mtopic->err)
696 return -1;
697
698 /* Partitions array may be sparse so use bsearch lookup. */
699 mpart = bsearch(&skel, mtopic->partitions,
700 mtopic->partition_cnt,
701 sizeof(*mtopic->partitions),
702 rd_kafka_metadata_partition_id_cmp);
703
704 if (!mpart)
705 return 0;
706
707 *mpartp = mpart;
708
709 return 1;
710 }
711
712
713 /**
714 * @returns the number of topics in \p topics that are in the cache.
715 *
716 * @param topics rd_list(const char *): topic names
717 * @param metadata_agep: age of oldest entry will be returned.
718 *
719 * @locks rd_kafka_*lock()
720 */
rd_kafka_metadata_cache_topics_count_exists(rd_kafka_t * rk,const rd_list_t * topics,int * metadata_agep)721 int rd_kafka_metadata_cache_topics_count_exists (rd_kafka_t *rk,
722 const rd_list_t *topics,
723 int *metadata_agep) {
724 const char *topic;
725 int i;
726 int cnt = 0;
727 int max_age = -1;
728
729 RD_LIST_FOREACH(topic, topics, i) {
730 const struct rd_kafka_metadata_cache_entry *rkmce;
731 int age;
732
733 if (!(rkmce = rd_kafka_metadata_cache_find(rk, topic,
734 1/*valid only*/)))
735 continue;
736
737 age = (int)((rd_clock() - rkmce->rkmce_ts_insert)/1000);
738 if (age > max_age)
739 max_age = age;
740 cnt++;
741 }
742
743 *metadata_agep = max_age;
744
745 return cnt;
746
747 }
748
749
750 /**
751 * @brief Add all topics in the metadata cache to \p topics, avoid duplicates.
752 *
753 * Element type is (char *topic_name).
754 *
755 * @returns the number of elements added to \p topics
756 *
757 * @locks_required rd_kafka_*lock()
758 */
rd_kafka_metadata_cache_topics_to_list(rd_kafka_t * rk,rd_list_t * topics)759 int rd_kafka_metadata_cache_topics_to_list (rd_kafka_t *rk,
760 rd_list_t *topics) {
761 const struct rd_kafka_metadata_cache_entry *rkmce;
762 int precnt = rd_list_cnt(topics);
763
764 TAILQ_FOREACH(rkmce, &rk->rk_metadata_cache.rkmc_expiry, rkmce_link) {
765 /* Ignore topics that have up to date metadata info */
766 if (RD_KAFKA_METADATA_CACHE_VALID(rkmce))
767 continue;
768
769 if (rd_list_find(topics, rkmce->rkmce_mtopic.topic,
770 rd_list_cmp_str))
771 continue;
772
773 rd_list_add(topics, rd_strdup(rkmce->rkmce_mtopic.topic));
774 }
775
776 return rd_list_cnt(topics) - precnt;
777 }
778
779
780 /**
781 * @brief Dump cache to \p fp
782 *
783 * @locks rd_kafka_*lock()
784 */
rd_kafka_metadata_cache_dump(FILE * fp,rd_kafka_t * rk)785 void rd_kafka_metadata_cache_dump (FILE *fp, rd_kafka_t *rk) {
786 const struct rd_kafka_metadata_cache *rkmc = &rk->rk_metadata_cache;
787 const struct rd_kafka_metadata_cache_entry *rkmce;
788 rd_ts_t now = rd_clock();
789
790 fprintf(fp,
791 "Metadata cache with %d entries:\n",
792 rkmc->rkmc_cnt);
793 TAILQ_FOREACH(rkmce, &rkmc->rkmc_expiry, rkmce_link) {
794 fprintf(fp,
795 " %s (inserted %dms ago, expires in %dms, "
796 "%d partition(s), %s)%s%s\n",
797 rkmce->rkmce_mtopic.topic,
798 (int)((now - rkmce->rkmce_ts_insert)/1000),
799 (int)((rkmce->rkmce_ts_expires - now)/1000),
800 rkmce->rkmce_mtopic.partition_cnt,
801 RD_KAFKA_METADATA_CACHE_VALID(rkmce) ? "valid":"hint",
802 rkmce->rkmce_mtopic.err ? " error: " : "",
803 rkmce->rkmce_mtopic.err ?
804 rd_kafka_err2str(rkmce->rkmce_mtopic.err) : "");
805 }
806 }
807
808 /**@}*/
809