1 /*
2 * librdkafka - The Apache Kafka C/C++ library
3 *
4 * Copyright (c) 2015 Magnus Edenhill
5 * All rights reserved.
6 *
7 * Redistribution and use in source and binary forms, with or without
8 * modification, are permitted provided that the following conditions are met:
9 *
10 * 1. Redistributions of source code must retain the above copyright notice,
11 * this list of conditions and the following disclaimer.
12 * 2. Redistributions in binary form must reproduce the above copyright notice,
13 * this list of conditions and the following disclaimer in the documentation
14 * and/or other materials provided with the distribution.
15 *
16 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
17 * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
18 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
19 * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
20 * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
21 * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
22 * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
23 * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
24 * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
25 * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
26 * POSSIBILITY OF SUCH DAMAGE.
27 */
28 #include "rdkafka_int.h"
29 #include "rdkafka_topic.h"
30 #include "rdkafka_broker.h"
31 #include "rdkafka_request.h"
32 #include "rdkafka_offset.h"
33 #include "rdkafka_partition.h"
34 #include "rdregex.h"
35 #include "rdports.h" /* rd_qsort_r() */
36
37 #include "rdunittest.h"
38
39 const char *rd_kafka_fetch_states[] = {
40 "none",
41 "stopping",
42 "stopped",
43 "offset-query",
44 "offset-wait",
45 "active"
46 };
47
48
49 static rd_kafka_op_res_t
50 rd_kafka_toppar_op_serve (rd_kafka_t *rk,
51 rd_kafka_q_t *rkq, rd_kafka_op_t *rko,
52 rd_kafka_q_cb_type_t cb_type, void *opaque);
53
54 static void rd_kafka_toppar_offset_retry (rd_kafka_toppar_t *rktp,
55 int backoff_ms,
56 const char *reason);
57
58
59 static RD_INLINE int32_t
rd_kafka_toppar_version_new_barrier0(rd_kafka_toppar_t * rktp,const char * func,int line)60 rd_kafka_toppar_version_new_barrier0 (rd_kafka_toppar_t *rktp,
61 const char *func, int line) {
62 int32_t version = rd_atomic32_add(&rktp->rktp_version, 1);
63 rd_kafka_dbg(rktp->rktp_rkt->rkt_rk, TOPIC, "BARRIER",
64 "%s [%"PRId32"]: %s:%d: new version barrier v%"PRId32,
65 rktp->rktp_rkt->rkt_topic->str, rktp->rktp_partition,
66 func, line, version);
67 return version;
68 }
69
70 #define rd_kafka_toppar_version_new_barrier(rktp) \
71 rd_kafka_toppar_version_new_barrier0(rktp, __FUNCTION__, __LINE__)
72
73
74 /**
75 * Toppar based OffsetResponse handling.
76 * This is used for updating the low water mark for consumer lag.
77 */
rd_kafka_toppar_lag_handle_Offset(rd_kafka_t * rk,rd_kafka_broker_t * rkb,rd_kafka_resp_err_t err,rd_kafka_buf_t * rkbuf,rd_kafka_buf_t * request,void * opaque)78 static void rd_kafka_toppar_lag_handle_Offset (rd_kafka_t *rk,
79 rd_kafka_broker_t *rkb,
80 rd_kafka_resp_err_t err,
81 rd_kafka_buf_t *rkbuf,
82 rd_kafka_buf_t *request,
83 void *opaque) {
84 rd_kafka_toppar_t *rktp = opaque;
85 rd_kafka_topic_partition_list_t *offsets;
86 rd_kafka_topic_partition_t *rktpar;
87
88 offsets = rd_kafka_topic_partition_list_new(1);
89
90 /* Parse and return Offset */
91 err = rd_kafka_handle_Offset(rkb->rkb_rk, rkb, err,
92 rkbuf, request, offsets);
93
94 if (err == RD_KAFKA_RESP_ERR__IN_PROGRESS) {
95 rd_kafka_topic_partition_list_destroy(offsets);
96 return; /* Retrying */
97 }
98
99 if (!err && !(rktpar = rd_kafka_topic_partition_list_find(
100 offsets,
101 rktp->rktp_rkt->rkt_topic->str,
102 rktp->rktp_partition)))
103 err = RD_KAFKA_RESP_ERR__UNKNOWN_PARTITION;
104
105 if (!err && !rktpar->err) {
106 rd_kafka_toppar_lock(rktp);
107 rktp->rktp_lo_offset = rktpar->offset;
108 rd_kafka_toppar_unlock(rktp);
109 }
110
111 rd_kafka_topic_partition_list_destroy(offsets);
112
113 rktp->rktp_wait_consumer_lag_resp = 0;
114
115 rd_kafka_toppar_destroy(rktp); /* from request.opaque */
116 }
117
118
119
120 /**
121 * Request information from broker to keep track of consumer lag.
122 *
123 * @locality toppar handle thread
124 * @locks none
125 */
rd_kafka_toppar_consumer_lag_req(rd_kafka_toppar_t * rktp)126 static void rd_kafka_toppar_consumer_lag_req (rd_kafka_toppar_t *rktp) {
127 rd_kafka_topic_partition_list_t *partitions;
128
129 if (rktp->rktp_wait_consumer_lag_resp)
130 return; /* Previous request not finished yet */
131
132 rd_kafka_toppar_lock(rktp);
133
134 /* Offset requests can only be sent to the leader replica.
135 *
136 * Note: If rktp is delegated to a preferred replica, it is
137 * certain that FETCH >= v5 and so rktp_lo_offset will be
138 * updated via LogStartOffset in the FETCH response.
139 */
140 if (!rktp->rktp_leader || (rktp->rktp_leader != rktp->rktp_broker)) {
141 rd_kafka_toppar_unlock(rktp);
142 return;
143 }
144
145 /* Also don't send a timed log start offset request if leader
146 * broker supports FETCH >= v5, since this will be set when
147 * doing fetch requests.
148 */
149 if (rd_kafka_broker_ApiVersion_supported(rktp->rktp_broker,
150 RD_KAFKAP_Fetch, 0,
151 5, NULL) == 5) {
152 rd_kafka_toppar_unlock(rktp);
153 return;
154 }
155
156 rktp->rktp_wait_consumer_lag_resp = 1;
157
158 partitions = rd_kafka_topic_partition_list_new(1);
159 rd_kafka_topic_partition_list_add(partitions,
160 rktp->rktp_rkt->rkt_topic->str,
161 rktp->rktp_partition)->offset =
162 RD_KAFKA_OFFSET_BEGINNING;
163
164 /* Ask for oldest offset. The newest offset is automatically
165 * propagated in FetchResponse.HighwaterMark. */
166 rd_kafka_OffsetRequest(rktp->rktp_broker, partitions, 0,
167 RD_KAFKA_REPLYQ(rktp->rktp_ops, 0),
168 rd_kafka_toppar_lag_handle_Offset,
169 rd_kafka_toppar_keep(rktp));
170
171 rd_kafka_toppar_unlock(rktp);
172
173 rd_kafka_topic_partition_list_destroy(partitions);
174 }
175
176
177
178 /**
179 * Request earliest offset for a partition
180 *
181 * Locality: toppar handler thread
182 */
rd_kafka_toppar_consumer_lag_tmr_cb(rd_kafka_timers_t * rkts,void * arg)183 static void rd_kafka_toppar_consumer_lag_tmr_cb (rd_kafka_timers_t *rkts,
184 void *arg) {
185 rd_kafka_toppar_t *rktp = arg;
186 rd_kafka_toppar_consumer_lag_req(rktp);
187 }
188
189 /**
190 * @brief Update rktp_op_version.
191 * Enqueue an RD_KAFKA_OP_BARRIER type of operation
192 * when the op_version is updated.
193 *
194 * @locks_required rd_kafka_toppar_lock() must be held.
195 * @locality Toppar handler thread
196 */
rd_kafka_toppar_op_version_bump(rd_kafka_toppar_t * rktp,int32_t version)197 void rd_kafka_toppar_op_version_bump (rd_kafka_toppar_t *rktp,
198 int32_t version) {
199 rd_kafka_op_t *rko;
200
201 rktp->rktp_op_version = version;
202 rko = rd_kafka_op_new(RD_KAFKA_OP_BARRIER);
203 rko->rko_version = version;
204 rd_kafka_q_enq(rktp->rktp_fetchq, rko);
205 }
206
207
208 /**
209 * Add new partition to topic.
210 *
211 * Locks: rd_kafka_topic_wrlock() must be held.
212 * Locks: rd_kafka_wrlock() must be held.
213 */
rd_kafka_toppar_new0(rd_kafka_topic_t * rkt,int32_t partition,const char * func,int line)214 rd_kafka_toppar_t *rd_kafka_toppar_new0 (rd_kafka_topic_t *rkt,
215 int32_t partition,
216 const char *func, int line) {
217 rd_kafka_toppar_t *rktp;
218
219 rktp = rd_calloc(1, sizeof(*rktp));
220
221 rktp->rktp_partition = partition;
222 rktp->rktp_rkt = rkt;
223 rktp->rktp_leader_id = -1;
224 rktp->rktp_broker_id = -1;
225 rd_interval_init(&rktp->rktp_lease_intvl);
226 rd_interval_init(&rktp->rktp_new_lease_intvl);
227 rd_interval_init(&rktp->rktp_new_lease_log_intvl);
228 rd_interval_init(&rktp->rktp_metadata_intvl);
229 /* Mark partition as unknown (does not exist) until we see the
230 * partition in topic metadata. */
231 if (partition != RD_KAFKA_PARTITION_UA)
232 rktp->rktp_flags |= RD_KAFKA_TOPPAR_F_UNKNOWN;
233 rktp->rktp_fetch_state = RD_KAFKA_TOPPAR_FETCH_NONE;
234 rktp->rktp_fetch_msg_max_bytes
235 = rkt->rkt_rk->rk_conf.fetch_msg_max_bytes;
236 rktp->rktp_offset_fp = NULL;
237 rd_kafka_offset_stats_reset(&rktp->rktp_offsets);
238 rd_kafka_offset_stats_reset(&rktp->rktp_offsets_fin);
239 rktp->rktp_ls_offset = RD_KAFKA_OFFSET_INVALID;
240 rktp->rktp_hi_offset = RD_KAFKA_OFFSET_INVALID;
241 rktp->rktp_lo_offset = RD_KAFKA_OFFSET_INVALID;
242 rktp->rktp_query_offset = RD_KAFKA_OFFSET_INVALID;
243 rktp->rktp_next_offset = RD_KAFKA_OFFSET_INVALID;
244 rktp->rktp_last_next_offset = RD_KAFKA_OFFSET_INVALID;
245 rktp->rktp_app_offset = RD_KAFKA_OFFSET_INVALID;
246 rktp->rktp_stored_offset = RD_KAFKA_OFFSET_INVALID;
247 rktp->rktp_committing_offset = RD_KAFKA_OFFSET_INVALID;
248 rktp->rktp_committed_offset = RD_KAFKA_OFFSET_INVALID;
249 rd_kafka_msgq_init(&rktp->rktp_msgq);
250 rd_kafka_msgq_init(&rktp->rktp_xmit_msgq);
251 mtx_init(&rktp->rktp_lock, mtx_plain);
252
253 rd_refcnt_init(&rktp->rktp_refcnt, 0);
254 rktp->rktp_fetchq = rd_kafka_q_new(rkt->rkt_rk);
255 rktp->rktp_ops = rd_kafka_q_new(rkt->rkt_rk);
256 rktp->rktp_ops->rkq_serve = rd_kafka_toppar_op_serve;
257 rktp->rktp_ops->rkq_opaque = rktp;
258 rd_atomic32_init(&rktp->rktp_version, 1);
259 rktp->rktp_op_version = rd_atomic32_get(&rktp->rktp_version);
260
261 rd_atomic32_init(&rktp->rktp_msgs_inflight, 0);
262 rd_kafka_pid_reset(&rktp->rktp_eos.pid);
263
264 /* Consumer: If statistics is available we query the log start offset
265 * of each partition.
266 * Since the oldest offset only moves on log retention, we cap this
267 * value on the low end to a reasonable value to avoid flooding
268 * the brokers with OffsetRequests when our statistics interval is low.
269 * FIXME: Use a global timer to collect offsets for all partitions
270 * FIXME: This timer is superfulous for FETCH >= v5 because the log
271 * start offset is included in fetch responses.
272 * */
273 if (rktp->rktp_rkt->rkt_rk->rk_conf.stats_interval_ms > 0 &&
274 rkt->rkt_rk->rk_type == RD_KAFKA_CONSUMER &&
275 rktp->rktp_partition != RD_KAFKA_PARTITION_UA) {
276 int intvl = rkt->rkt_rk->rk_conf.stats_interval_ms;
277 if (intvl < 10 * 1000 /* 10s */)
278 intvl = 10 * 1000;
279 rd_kafka_timer_start(&rkt->rkt_rk->rk_timers,
280 &rktp->rktp_consumer_lag_tmr,
281 intvl * 1000ll,
282 rd_kafka_toppar_consumer_lag_tmr_cb,
283 rktp);
284 }
285
286 rktp->rktp_rkt = rd_kafka_topic_keep(rkt);
287
288 rd_kafka_q_fwd_set(rktp->rktp_ops, rkt->rkt_rk->rk_ops);
289 rd_kafka_dbg(rkt->rkt_rk, TOPIC, "TOPPARNEW",
290 "NEW %s [%"PRId32"] %p refcnt %p (at %s:%d)",
291 rkt->rkt_topic->str, rktp->rktp_partition, rktp,
292 &rktp->rktp_refcnt,
293 func, line);
294
295 return rd_kafka_toppar_keep(rktp);
296 }
297
298
299
300 /**
301 * Removes a toppar from its duties, global lists, etc.
302 *
303 * Locks: rd_kafka_toppar_lock() MUST be held
304 */
rd_kafka_toppar_remove(rd_kafka_toppar_t * rktp)305 static void rd_kafka_toppar_remove (rd_kafka_toppar_t *rktp) {
306 rd_kafka_dbg(rktp->rktp_rkt->rkt_rk, TOPIC, "TOPPARREMOVE",
307 "Removing toppar %s [%"PRId32"] %p",
308 rktp->rktp_rkt->rkt_topic->str, rktp->rktp_partition,
309 rktp);
310
311 rd_kafka_timer_stop(&rktp->rktp_rkt->rkt_rk->rk_timers,
312 &rktp->rktp_offset_query_tmr, 1/*lock*/);
313 rd_kafka_timer_stop(&rktp->rktp_rkt->rkt_rk->rk_timers,
314 &rktp->rktp_consumer_lag_tmr, 1/*lock*/);
315
316 rd_kafka_q_fwd_set(rktp->rktp_ops, NULL);
317 }
318
319
320 /**
321 * Final destructor for partition.
322 */
rd_kafka_toppar_destroy_final(rd_kafka_toppar_t * rktp)323 void rd_kafka_toppar_destroy_final (rd_kafka_toppar_t *rktp) {
324
325 rd_kafka_toppar_remove(rktp);
326
327 rd_kafka_dbg(rktp->rktp_rkt->rkt_rk, TOPIC, "DESTROY",
328 "%s [%"PRId32"]: %p DESTROY_FINAL",
329 rktp->rktp_rkt->rkt_topic->str,
330 rktp->rktp_partition, rktp);
331
332 /* Clear queues */
333 rd_kafka_assert(rktp->rktp_rkt->rkt_rk,
334 rd_kafka_msgq_len(&rktp->rktp_xmit_msgq) == 0);
335 rd_kafka_dr_msgq(rktp->rktp_rkt, &rktp->rktp_msgq,
336 RD_KAFKA_RESP_ERR__DESTROY);
337 rd_kafka_q_destroy_owner(rktp->rktp_fetchq);
338 rd_kafka_q_destroy_owner(rktp->rktp_ops);
339
340 rd_kafka_replyq_destroy(&rktp->rktp_replyq);
341
342 rd_kafka_topic_destroy0(rktp->rktp_rkt);
343
344 mtx_destroy(&rktp->rktp_lock);
345
346 if (rktp->rktp_leader)
347 rd_kafka_broker_destroy(rktp->rktp_leader);
348
349 rd_refcnt_destroy(&rktp->rktp_refcnt);
350
351 rd_free(rktp);
352 }
353
354
355 /**
356 * Set toppar fetching state.
357 *
358 * Locality: broker thread
359 * Locks: rd_kafka_toppar_lock() MUST be held.
360 */
rd_kafka_toppar_set_fetch_state(rd_kafka_toppar_t * rktp,int fetch_state)361 void rd_kafka_toppar_set_fetch_state (rd_kafka_toppar_t *rktp,
362 int fetch_state) {
363 rd_kafka_assert(NULL,
364 thrd_is_current(rktp->rktp_rkt->rkt_rk->rk_thread));
365
366 if ((int)rktp->rktp_fetch_state == fetch_state)
367 return;
368
369 rd_kafka_dbg(rktp->rktp_rkt->rkt_rk, TOPIC, "PARTSTATE",
370 "Partition %.*s [%"PRId32"] changed fetch state %s -> %s",
371 RD_KAFKAP_STR_PR(rktp->rktp_rkt->rkt_topic),
372 rktp->rktp_partition,
373 rd_kafka_fetch_states[rktp->rktp_fetch_state],
374 rd_kafka_fetch_states[fetch_state]);
375
376 rktp->rktp_fetch_state = fetch_state;
377
378 if (fetch_state == RD_KAFKA_TOPPAR_FETCH_ACTIVE)
379 rd_kafka_dbg(rktp->rktp_rkt->rkt_rk,
380 CONSUMER|RD_KAFKA_DBG_TOPIC,
381 "FETCH",
382 "Partition %.*s [%"PRId32"] start fetching "
383 "at offset %s",
384 RD_KAFKAP_STR_PR(rktp->rktp_rkt->rkt_topic),
385 rktp->rktp_partition,
386 rd_kafka_offset2str(rktp->rktp_next_offset));
387 }
388
389
390 /**
391 * Returns the appropriate toppar for a given rkt and partition.
392 * The returned toppar has increased refcnt and must be unreffed by calling
393 * rd_kafka_toppar_destroy().
394 * May return NULL.
395 *
396 * If 'ua_on_miss' is true the UA (unassigned) toppar is returned if
397 * 'partition' was not known locally, else NULL is returned.
398 *
399 * Locks: Caller must hold rd_kafka_topic_*lock()
400 */
rd_kafka_toppar_get0(const char * func,int line,const rd_kafka_topic_t * rkt,int32_t partition,int ua_on_miss)401 rd_kafka_toppar_t *rd_kafka_toppar_get0 (const char *func, int line,
402 const rd_kafka_topic_t *rkt,
403 int32_t partition,
404 int ua_on_miss) {
405 rd_kafka_toppar_t *rktp;
406
407 if (partition >= 0 && partition < rkt->rkt_partition_cnt)
408 rktp = rkt->rkt_p[partition];
409 else if (partition == RD_KAFKA_PARTITION_UA || ua_on_miss)
410 rktp = rkt->rkt_ua;
411 else
412 return NULL;
413
414 if (rktp)
415 return rd_kafka_toppar_keep_fl(func, line, rktp);
416
417 return NULL;
418 }
419
420
421 /**
422 * Same as rd_kafka_toppar_get() but no need for locking and
423 * looks up the topic first.
424 *
425 * Locality: any
426 * Locks: none
427 */
rd_kafka_toppar_get2(rd_kafka_t * rk,const char * topic,int32_t partition,int ua_on_miss,int create_on_miss)428 rd_kafka_toppar_t *rd_kafka_toppar_get2 (rd_kafka_t *rk,
429 const char *topic,
430 int32_t partition,
431 int ua_on_miss,
432 int create_on_miss) {
433 rd_kafka_topic_t *rkt;
434 rd_kafka_toppar_t *rktp;
435
436 rd_kafka_wrlock(rk);
437
438 /* Find or create topic */
439 if (unlikely(!(rkt = rd_kafka_topic_find(rk, topic, 0/*no-lock*/)))) {
440 if (!create_on_miss) {
441 rd_kafka_wrunlock(rk);
442 return NULL;
443 }
444 rkt = rd_kafka_topic_new0(rk, topic, NULL,
445 NULL, 0/*no-lock*/);
446 if (!rkt) {
447 rd_kafka_wrunlock(rk);
448 rd_kafka_log(rk, LOG_ERR, "TOPIC",
449 "Failed to create local topic \"%s\": %s",
450 topic, rd_strerror(errno));
451 return NULL;
452 }
453 }
454
455 rd_kafka_wrunlock(rk);
456
457 rd_kafka_topic_wrlock(rkt);
458 rktp = rd_kafka_toppar_desired_add(rkt, partition);
459 rd_kafka_topic_wrunlock(rkt);
460
461 rd_kafka_topic_destroy0(rkt);
462
463 return rktp;
464 }
465
466
467 /**
468 * Returns a toppar if it is available in the cluster.
469 * '*errp' is set to the error-code if lookup fails.
470 *
471 * Locks: topic_*lock() MUST be held
472 */
473 rd_kafka_toppar_t *
rd_kafka_toppar_get_avail(const rd_kafka_topic_t * rkt,int32_t partition,int ua_on_miss,rd_kafka_resp_err_t * errp)474 rd_kafka_toppar_get_avail (const rd_kafka_topic_t *rkt,
475 int32_t partition, int ua_on_miss,
476 rd_kafka_resp_err_t *errp) {
477 rd_kafka_toppar_t *rktp;
478
479 switch (rkt->rkt_state)
480 {
481 case RD_KAFKA_TOPIC_S_UNKNOWN:
482 /* No metadata received from cluster yet.
483 * Put message in UA partition and re-run partitioner when
484 * cluster comes up. */
485 partition = RD_KAFKA_PARTITION_UA;
486 break;
487
488 case RD_KAFKA_TOPIC_S_NOTEXISTS:
489 /* Topic not found in cluster.
490 * Fail message immediately. */
491 *errp = RD_KAFKA_RESP_ERR__UNKNOWN_TOPIC;
492 return NULL;
493
494 case RD_KAFKA_TOPIC_S_ERROR:
495 /* Permanent topic error. */
496 *errp = rkt->rkt_err;
497 return NULL;
498
499 case RD_KAFKA_TOPIC_S_EXISTS:
500 /* Topic exists in cluster. */
501
502 /* Topic exists but has no partitions.
503 * This is usually an transient state following the
504 * auto-creation of a topic. */
505 if (unlikely(rkt->rkt_partition_cnt == 0)) {
506 partition = RD_KAFKA_PARTITION_UA;
507 break;
508 }
509
510 /* Check that partition exists. */
511 if (partition >= rkt->rkt_partition_cnt) {
512 *errp = RD_KAFKA_RESP_ERR__UNKNOWN_PARTITION;
513 return NULL;
514 }
515 break;
516
517 default:
518 rd_kafka_assert(rkt->rkt_rk, !*"NOTREACHED");
519 break;
520 }
521
522 /* Get new partition */
523 rktp = rd_kafka_toppar_get(rkt, partition, 0);
524
525 if (unlikely(!rktp)) {
526 /* Unknown topic or partition */
527 if (rkt->rkt_state == RD_KAFKA_TOPIC_S_NOTEXISTS)
528 *errp = RD_KAFKA_RESP_ERR__UNKNOWN_TOPIC;
529 else
530 *errp = RD_KAFKA_RESP_ERR__UNKNOWN_PARTITION;
531
532 return NULL;
533 }
534
535 return rktp;
536 }
537
538
539 /**
540 * Looks for partition 'i' in topic 'rkt's desired list.
541 *
542 * The desired partition list is the list of partitions that are desired
543 * (e.g., by the consumer) but not yet seen on a broker.
544 * As soon as the partition is seen on a broker the toppar is moved from
545 * the desired list and onto the normal rkt_p array.
546 * When the partition on the broker goes away a desired partition is put
547 * back on the desired list.
548 *
549 * Locks: rd_kafka_topic_*lock() must be held.
550 * Note: 'rktp' refcount is increased.
551 */
552
rd_kafka_toppar_desired_get(rd_kafka_topic_t * rkt,int32_t partition)553 rd_kafka_toppar_t *rd_kafka_toppar_desired_get (rd_kafka_topic_t *rkt,
554 int32_t partition) {
555 rd_kafka_toppar_t *rktp;
556 int i;
557
558 RD_LIST_FOREACH(rktp, &rkt->rkt_desp, i) {
559 if (rktp->rktp_partition == partition)
560 return rd_kafka_toppar_keep(rktp);
561 }
562
563 return NULL;
564 }
565
566
567 /**
568 * Link toppar on desired list.
569 *
570 * Locks: rd_kafka_topic_wrlock() and toppar_lock() must be held.
571 */
rd_kafka_toppar_desired_link(rd_kafka_toppar_t * rktp)572 void rd_kafka_toppar_desired_link (rd_kafka_toppar_t *rktp) {
573
574 if (rktp->rktp_flags & RD_KAFKA_TOPPAR_F_ON_DESP)
575 return; /* Already linked */
576
577 rd_kafka_toppar_keep(rktp);
578 rd_list_add(&rktp->rktp_rkt->rkt_desp, rktp);
579 rd_interval_reset(&rktp->rktp_rkt->rkt_desp_refresh_intvl);
580 rktp->rktp_flags |= RD_KAFKA_TOPPAR_F_ON_DESP;
581 }
582
583 /**
584 * Unlink toppar from desired list.
585 *
586 * Locks: rd_kafka_topic_wrlock() and toppar_lock() must be held.
587 */
rd_kafka_toppar_desired_unlink(rd_kafka_toppar_t * rktp)588 void rd_kafka_toppar_desired_unlink (rd_kafka_toppar_t *rktp) {
589 if (!(rktp->rktp_flags & RD_KAFKA_TOPPAR_F_ON_DESP))
590 return; /* Not linked */
591
592 rktp->rktp_flags &= ~RD_KAFKA_TOPPAR_F_ON_DESP;
593 rd_list_remove(&rktp->rktp_rkt->rkt_desp, rktp);
594 rd_interval_reset(&rktp->rktp_rkt->rkt_desp_refresh_intvl);
595 rd_kafka_toppar_destroy(rktp);
596 }
597
598
599 /**
600 * @brief If rktp is not already desired:
601 * - mark as DESIRED|~REMOVE
602 * - add to desired list if unknown
603 *
604 * @remark toppar_lock() MUST be held
605 */
rd_kafka_toppar_desired_add0(rd_kafka_toppar_t * rktp)606 void rd_kafka_toppar_desired_add0 (rd_kafka_toppar_t *rktp) {
607 if ((rktp->rktp_flags & RD_KAFKA_TOPPAR_F_DESIRED))
608 return;
609
610 rd_kafka_dbg(rktp->rktp_rkt->rkt_rk, TOPIC, "DESIRED",
611 "%s [%"PRId32"]: marking as DESIRED",
612 rktp->rktp_rkt->rkt_topic->str, rktp->rktp_partition);
613
614 /* If toppar was marked for removal this is no longer
615 * the case since the partition is now desired. */
616 rktp->rktp_flags &= ~RD_KAFKA_TOPPAR_F_REMOVE;
617
618 rktp->rktp_flags |= RD_KAFKA_TOPPAR_F_DESIRED;
619
620 if (rktp->rktp_flags & RD_KAFKA_TOPPAR_F_UNKNOWN) {
621 rd_kafka_dbg(rktp->rktp_rkt->rkt_rk, TOPIC, "DESIRED",
622 "%s [%"PRId32"]: adding to DESIRED list",
623 rktp->rktp_rkt->rkt_topic->str, rktp->rktp_partition);
624 rd_kafka_toppar_desired_link(rktp);
625 }
626 }
627
628
629 /**
630 * Adds 'partition' as a desired partition to topic 'rkt', or updates
631 * an existing partition to be desired.
632 *
633 * Locks: rd_kafka_topic_wrlock() must be held.
634 */
rd_kafka_toppar_desired_add(rd_kafka_topic_t * rkt,int32_t partition)635 rd_kafka_toppar_t *rd_kafka_toppar_desired_add (rd_kafka_topic_t *rkt,
636 int32_t partition) {
637 rd_kafka_toppar_t *rktp;
638
639 rktp = rd_kafka_toppar_get(rkt, partition, 0/*no_ua_on_miss*/);
640
641 if (!rktp)
642 rktp = rd_kafka_toppar_desired_get(rkt, partition);
643
644 if (!rktp)
645 rktp = rd_kafka_toppar_new(rkt, partition);
646
647 rd_kafka_toppar_lock(rktp);
648 rd_kafka_toppar_desired_add0(rktp);
649 rd_kafka_toppar_unlock(rktp);
650
651 return rktp; /* Callers refcount */
652 }
653
654
655
656
657 /**
658 * Unmarks an 'rktp' as desired.
659 *
660 * Locks: rd_kafka_topic_wrlock() and rd_kafka_toppar_lock() MUST be held.
661 */
rd_kafka_toppar_desired_del(rd_kafka_toppar_t * rktp)662 void rd_kafka_toppar_desired_del (rd_kafka_toppar_t *rktp) {
663
664 if (!(rktp->rktp_flags & RD_KAFKA_TOPPAR_F_DESIRED))
665 return;
666
667 rktp->rktp_flags &= ~RD_KAFKA_TOPPAR_F_DESIRED;
668 rd_kafka_toppar_desired_unlink(rktp);
669
670 rd_kafka_dbg(rktp->rktp_rkt->rkt_rk, TOPIC, "DESP",
671 "Removing (un)desired topic %s [%"PRId32"]",
672 rktp->rktp_rkt->rkt_topic->str, rktp->rktp_partition);
673
674 if (rktp->rktp_flags & RD_KAFKA_TOPPAR_F_UNKNOWN) {
675 /* If this partition does not exist in the cluster
676 * and is no longer desired, remove it. */
677 rd_kafka_toppar_broker_leave_for_remove(rktp);
678 }
679 }
680
681
682
683 /**
684 * Append message at tail of 'rktp' message queue.
685 */
rd_kafka_toppar_enq_msg(rd_kafka_toppar_t * rktp,rd_kafka_msg_t * rkm)686 void rd_kafka_toppar_enq_msg (rd_kafka_toppar_t *rktp, rd_kafka_msg_t *rkm) {
687 int queue_len;
688 rd_kafka_q_t *wakeup_q = NULL;
689
690 rd_kafka_toppar_lock(rktp);
691
692 if (!rkm->rkm_u.producer.msgid &&
693 rktp->rktp_partition != RD_KAFKA_PARTITION_UA)
694 rkm->rkm_u.producer.msgid = ++rktp->rktp_msgid;
695
696 if (rktp->rktp_partition == RD_KAFKA_PARTITION_UA ||
697 rktp->rktp_rkt->rkt_conf.queuing_strategy == RD_KAFKA_QUEUE_FIFO) {
698 /* No need for enq_sorted(), this is the oldest message. */
699 queue_len = rd_kafka_msgq_enq(&rktp->rktp_msgq, rkm);
700 } else {
701 queue_len = rd_kafka_msgq_enq_sorted(rktp->rktp_rkt,
702 &rktp->rktp_msgq, rkm);
703 }
704
705 if (unlikely(queue_len == 1 &&
706 (wakeup_q = rktp->rktp_msgq_wakeup_q)))
707 rd_kafka_q_keep(wakeup_q);
708
709 rd_kafka_toppar_unlock(rktp);
710
711 if (wakeup_q) {
712 rd_kafka_q_yield(wakeup_q);
713 rd_kafka_q_destroy(wakeup_q);
714 }
715 }
716
717
718 /**
719 * @brief Insert \p srcq before \p insert_before in \p destq.
720 *
721 * If \p srcq and \p destq overlaps only part of the \p srcq will be inserted.
722 *
723 * Upon return \p srcq will contain any remaining messages that require
724 * another insert position in \p destq.
725 */
726 static void
rd_kafka_msgq_insert_msgq_before(rd_kafka_msgq_t * destq,rd_kafka_msg_t * insert_before,rd_kafka_msgq_t * srcq,int (* cmp)(const void * a,const void * b))727 rd_kafka_msgq_insert_msgq_before (rd_kafka_msgq_t *destq,
728 rd_kafka_msg_t *insert_before,
729 rd_kafka_msgq_t *srcq,
730 int (*cmp) (const void *a, const void *b)) {
731 rd_kafka_msg_t *slast;
732 rd_kafka_msgq_t tmpq;
733
734 if (!insert_before) {
735 /* Append all of srcq to destq */
736 rd_kafka_msgq_concat(destq, srcq);
737 rd_kafka_msgq_verify_order(NULL, destq, 0, rd_false);
738 return;
739 }
740
741 slast = rd_kafka_msgq_last(srcq);
742 rd_dassert(slast);
743
744 if (cmp(slast, insert_before) > 0) {
745 rd_kafka_msg_t *new_sfirst;
746 int cnt;
747 int64_t bytes;
748
749 /* destq insert_before resides somewhere between
750 * srcq.first and srcq.last, find the first message in
751 * srcq that is > insert_before and split srcq into
752 * a left part that contains the messages to insert before
753 * insert_before, and a right part that will need another
754 * insert position. */
755
756 new_sfirst = rd_kafka_msgq_find_pos(srcq, NULL,
757 insert_before,
758 cmp, &cnt, &bytes);
759 rd_assert(new_sfirst);
760
761 /* split srcq into two parts using the divider message */
762 rd_kafka_msgq_split(srcq, &tmpq, new_sfirst, cnt, bytes);
763
764 rd_kafka_msgq_verify_order(NULL, srcq, 0, rd_false);
765 rd_kafka_msgq_verify_order(NULL, &tmpq, 0, rd_false);
766 } else {
767 rd_kafka_msgq_init(&tmpq);
768 }
769
770 /* srcq now contains messages up to the first message in destq,
771 * insert srcq at insert_before in destq. */
772 rd_dassert(!TAILQ_EMPTY(&destq->rkmq_msgs));
773 rd_dassert(!TAILQ_EMPTY(&srcq->rkmq_msgs));
774 TAILQ_INSERT_LIST_BEFORE(&destq->rkmq_msgs,
775 insert_before,
776 &srcq->rkmq_msgs,
777 rd_kafka_msgs_head_s,
778 rd_kafka_msg_t *,
779 rkm_link);
780 destq->rkmq_msg_cnt += srcq->rkmq_msg_cnt;
781 destq->rkmq_msg_bytes += srcq->rkmq_msg_bytes;
782 srcq->rkmq_msg_cnt = 0;
783 srcq->rkmq_msg_bytes = 0;
784
785 rd_kafka_msgq_verify_order(NULL, destq, 0, rd_false);
786 rd_kafka_msgq_verify_order(NULL, srcq, 0, rd_false);
787
788 /* tmpq contains the remaining messages in srcq, move it over. */
789 rd_kafka_msgq_move(srcq, &tmpq);
790
791 rd_kafka_msgq_verify_order(NULL, srcq, 0, rd_false);
792 }
793
794
795 /**
796 * @brief Insert all messages from \p srcq into \p destq in their sorted
797 * position (using \p cmp)
798 */
rd_kafka_msgq_insert_msgq(rd_kafka_msgq_t * destq,rd_kafka_msgq_t * srcq,int (* cmp)(const void * a,const void * b))799 void rd_kafka_msgq_insert_msgq (rd_kafka_msgq_t *destq,
800 rd_kafka_msgq_t *srcq,
801 int (*cmp) (const void *a, const void *b)) {
802 rd_kafka_msg_t *sfirst, *dlast, *start_pos = NULL;
803
804 if (unlikely(RD_KAFKA_MSGQ_EMPTY(srcq))) {
805 /* srcq is empty */
806 return;
807 }
808
809 if (unlikely(RD_KAFKA_MSGQ_EMPTY(destq))) {
810 /* destq is empty, simply move the srcq. */
811 rd_kafka_msgq_move(destq, srcq);
812 rd_kafka_msgq_verify_order(NULL, destq, 0, rd_false);
813 return;
814 }
815
816 /* Optimize insertion by bulk-moving messages in place.
817 * We know that:
818 * - destq is sorted but might not be continous (1,2,3,7)
819 * - srcq is sorted but might not be continous (4,5,6,8)
820 * - there migt be (multiple) overlaps between the two, e.g:
821 * destq = (1,2,3,7), srcq = (4,5,6,8)
822 * - there may be millions of messages.
823 */
824
825 rd_kafka_msgq_verify_order(NULL, destq, 0, rd_false);
826 rd_kafka_msgq_verify_order(NULL, srcq, 0, rd_false);
827
828 dlast = rd_kafka_msgq_last(destq);
829 sfirst = rd_kafka_msgq_first(srcq);
830
831 /* Most common case, all of srcq goes after destq */
832 if (likely(cmp(dlast, sfirst) < 0)) {
833 rd_kafka_msgq_concat(destq, srcq);
834
835 rd_kafka_msgq_verify_order(NULL, destq, 0, rd_false);
836
837 rd_assert(RD_KAFKA_MSGQ_EMPTY(srcq));
838 return;
839 }
840
841 /* Insert messages from srcq into destq in non-overlapping
842 * chunks until srcq is exhausted. */
843 while (likely(sfirst != NULL)) {
844 rd_kafka_msg_t *insert_before;
845
846 /* Get insert position in destq of first element in srcq */
847 insert_before = rd_kafka_msgq_find_pos(destq, start_pos,
848 sfirst, cmp,
849 NULL, NULL);
850
851 /* Insert as much of srcq as possible at insert_before */
852 rd_kafka_msgq_insert_msgq_before(destq, insert_before,
853 srcq, cmp);
854
855 /* Remember the current destq position so the next find_pos()
856 * does not have to re-scan destq and what was
857 * added from srcq. */
858 start_pos = insert_before;
859
860 /* For next iteration */
861 sfirst = rd_kafka_msgq_first(srcq);
862
863 rd_kafka_msgq_verify_order(NULL, destq, 0, rd_false);
864 rd_kafka_msgq_verify_order(NULL, srcq, 0, rd_false);
865 }
866
867 rd_kafka_msgq_verify_order(NULL, destq, 0, rd_false);
868
869 rd_assert(RD_KAFKA_MSGQ_EMPTY(srcq));
870 }
871
872
873 /**
874 * @brief Inserts messages from \p srcq according to their sorted position
875 * into \p destq, filtering out messages that can not be retried.
876 *
877 * @param incr_retry Increment retry count for messages.
878 * @param max_retries Maximum retries allowed per message.
879 * @param backoff Absolute retry backoff for retried messages.
880 *
881 * @returns 0 if all messages were retried, or 1 if some messages
882 * could not be retried.
883 */
rd_kafka_retry_msgq(rd_kafka_msgq_t * destq,rd_kafka_msgq_t * srcq,int incr_retry,int max_retries,rd_ts_t backoff,rd_kafka_msg_status_t status,int (* cmp)(const void * a,const void * b))884 int rd_kafka_retry_msgq (rd_kafka_msgq_t *destq,
885 rd_kafka_msgq_t *srcq,
886 int incr_retry, int max_retries, rd_ts_t backoff,
887 rd_kafka_msg_status_t status,
888 int (*cmp) (const void *a, const void *b)) {
889 rd_kafka_msgq_t retryable = RD_KAFKA_MSGQ_INITIALIZER(retryable);
890 rd_kafka_msg_t *rkm, *tmp;
891
892 /* Scan through messages to see which ones are eligible for retry,
893 * move the retryable ones to temporary queue and
894 * set backoff time for first message and optionally
895 * increase retry count for each message.
896 * Sorted insert is not necessary since the original order
897 * srcq order is maintained. */
898 TAILQ_FOREACH_SAFE(rkm, &srcq->rkmq_msgs, rkm_link, tmp) {
899 if (rkm->rkm_u.producer.retries + incr_retry > max_retries)
900 continue;
901
902 rd_kafka_msgq_deq(srcq, rkm, 1);
903 rd_kafka_msgq_enq(&retryable, rkm);
904
905 rkm->rkm_u.producer.ts_backoff = backoff;
906 rkm->rkm_u.producer.retries += incr_retry;
907
908 /* Don't downgrade a message from any form of PERSISTED
909 * to NOT_PERSISTED, since the original cause of indicating
910 * PERSISTED can't be changed.
911 * E.g., a previous ack or in-flight timeout. */
912 if (likely(!(status == RD_KAFKA_MSG_STATUS_NOT_PERSISTED &&
913 rkm->rkm_status !=
914 RD_KAFKA_MSG_STATUS_NOT_PERSISTED)))
915 rkm->rkm_status = status;
916 }
917
918 /* No messages are retryable */
919 if (RD_KAFKA_MSGQ_EMPTY(&retryable))
920 return 0;
921
922 /* Insert retryable list at sorted position */
923 rd_kafka_msgq_insert_msgq(destq, &retryable, cmp);
924
925 return 1;
926 }
927
928 /**
929 * @brief Inserts messages from \p rkmq according to their sorted position
930 * into the partition's message queue.
931 *
932 * @param incr_retry Increment retry count for messages.
933 * @param status Set status on each message.
934 *
935 * @returns 0 if all messages were retried, or 1 if some messages
936 * could not be retried.
937 *
938 * @locality Broker thread (but not necessarily the leader broker thread)
939 */
940
rd_kafka_toppar_retry_msgq(rd_kafka_toppar_t * rktp,rd_kafka_msgq_t * rkmq,int incr_retry,rd_kafka_msg_status_t status)941 int rd_kafka_toppar_retry_msgq (rd_kafka_toppar_t *rktp, rd_kafka_msgq_t *rkmq,
942 int incr_retry, rd_kafka_msg_status_t status) {
943 rd_kafka_t *rk = rktp->rktp_rkt->rkt_rk;
944 rd_ts_t backoff = rd_clock() + (rk->rk_conf.retry_backoff_ms * 1000);
945 int r;
946
947 if (rd_kafka_terminating(rk))
948 return 1;
949
950 rd_kafka_toppar_lock(rktp);
951 r = rd_kafka_retry_msgq(&rktp->rktp_msgq, rkmq,
952 incr_retry, rk->rk_conf.max_retries,
953 backoff, status,
954 rktp->rktp_rkt->rkt_conf.msg_order_cmp);
955 rd_kafka_toppar_unlock(rktp);
956
957 return r;
958 }
959
960 /**
961 * @brief Insert sorted message list \p rkmq at sorted position in \p rktp 's
962 * message queue. The queues must not overlap.
963 * @remark \p rkmq will be cleared.
964 */
rd_kafka_toppar_insert_msgq(rd_kafka_toppar_t * rktp,rd_kafka_msgq_t * rkmq)965 void rd_kafka_toppar_insert_msgq (rd_kafka_toppar_t *rktp,
966 rd_kafka_msgq_t *rkmq) {
967 rd_kafka_toppar_lock(rktp);
968 rd_kafka_msgq_insert_msgq(&rktp->rktp_msgq, rkmq,
969 rktp->rktp_rkt->rkt_conf.msg_order_cmp);
970 rd_kafka_toppar_unlock(rktp);
971 }
972
973
974
975 /**
976 * Helper method for purging queues when removing a toppar.
977 * Locks: rd_kafka_toppar_lock() MUST be held
978 */
rd_kafka_toppar_purge_and_disable_queues(rd_kafka_toppar_t * rktp)979 void rd_kafka_toppar_purge_and_disable_queues (rd_kafka_toppar_t *rktp) {
980 rd_kafka_q_disable(rktp->rktp_fetchq);
981 rd_kafka_q_purge(rktp->rktp_fetchq);
982 rd_kafka_q_disable(rktp->rktp_ops);
983 rd_kafka_q_purge(rktp->rktp_ops);
984 }
985
986
987 /**
988 * @brief Migrate rktp from (optional) \p old_rkb to (optional) \p new_rkb,
989 * but at least one is required to be non-NULL.
990 *
991 * This is an async operation.
992 *
993 * @locks rd_kafka_toppar_lock() MUST be held
994 */
rd_kafka_toppar_broker_migrate(rd_kafka_toppar_t * rktp,rd_kafka_broker_t * old_rkb,rd_kafka_broker_t * new_rkb)995 static void rd_kafka_toppar_broker_migrate (rd_kafka_toppar_t *rktp,
996 rd_kafka_broker_t *old_rkb,
997 rd_kafka_broker_t *new_rkb) {
998 rd_kafka_op_t *rko;
999 rd_kafka_broker_t *dest_rkb;
1000 int had_next_broker = rktp->rktp_next_broker ? 1 : 0;
1001
1002 rd_assert(old_rkb || new_rkb);
1003
1004 /* Update next broker */
1005 if (new_rkb)
1006 rd_kafka_broker_keep(new_rkb);
1007 if (rktp->rktp_next_broker)
1008 rd_kafka_broker_destroy(rktp->rktp_next_broker);
1009 rktp->rktp_next_broker = new_rkb;
1010
1011 /* If next_broker is set it means there is already an async
1012 * migration op going on and we should not send a new one
1013 * but simply change the next_broker (which we did above). */
1014 if (had_next_broker)
1015 return;
1016
1017 /* Revert from offset-wait state back to offset-query
1018 * prior to leaving the broker to avoid stalling
1019 * on the new broker waiting for a offset reply from
1020 * this old broker (that might not come and thus need
1021 * to time out..slowly) */
1022 if (rktp->rktp_fetch_state == RD_KAFKA_TOPPAR_FETCH_OFFSET_WAIT)
1023 rd_kafka_toppar_offset_retry(rktp, 500,
1024 "migrating to new broker");
1025
1026 if (old_rkb) {
1027 /* If there is an existing broker for this toppar we let it
1028 * first handle its own leave and then trigger the join for
1029 * the next broker, if any. */
1030 rko = rd_kafka_op_new(RD_KAFKA_OP_PARTITION_LEAVE);
1031 dest_rkb = old_rkb;
1032 } else {
1033 /* No existing broker, send join op directly to new broker. */
1034 rko = rd_kafka_op_new(RD_KAFKA_OP_PARTITION_JOIN);
1035 dest_rkb = new_rkb;
1036 }
1037
1038 rko->rko_rktp = rd_kafka_toppar_keep(rktp);
1039
1040 rd_kafka_dbg(rktp->rktp_rkt->rkt_rk, TOPIC, "BRKMIGR",
1041 "Migrating topic %.*s [%"PRId32"] %p from %s to %s "
1042 "(sending %s to %s)",
1043 RD_KAFKAP_STR_PR(rktp->rktp_rkt->rkt_topic),
1044 rktp->rktp_partition, rktp,
1045 old_rkb ? rd_kafka_broker_name(old_rkb) : "(none)",
1046 new_rkb ? rd_kafka_broker_name(new_rkb) : "(none)",
1047 rd_kafka_op2str(rko->rko_type),
1048 rd_kafka_broker_name(dest_rkb));
1049
1050 rd_kafka_q_enq(dest_rkb->rkb_ops, rko);
1051 }
1052
1053
1054 /**
1055 * Async toppar leave from broker.
1056 * Only use this when partitions are to be removed.
1057 *
1058 * Locks: rd_kafka_toppar_lock() MUST be held
1059 */
rd_kafka_toppar_broker_leave_for_remove(rd_kafka_toppar_t * rktp)1060 void rd_kafka_toppar_broker_leave_for_remove (rd_kafka_toppar_t *rktp) {
1061 rd_kafka_op_t *rko;
1062 rd_kafka_broker_t *dest_rkb;
1063
1064 rktp->rktp_flags |= RD_KAFKA_TOPPAR_F_REMOVE;
1065
1066 if (rktp->rktp_next_broker)
1067 dest_rkb = rktp->rktp_next_broker;
1068 else if (rktp->rktp_broker)
1069 dest_rkb = rktp->rktp_broker;
1070 else {
1071 rd_kafka_dbg(rktp->rktp_rkt->rkt_rk, TOPIC, "TOPPARDEL",
1072 "%.*s [%"PRId32"] %p not handled by any broker: "
1073 "not sending LEAVE for remove",
1074 RD_KAFKAP_STR_PR(rktp->rktp_rkt->rkt_topic),
1075 rktp->rktp_partition, rktp);
1076 return;
1077 }
1078
1079
1080 /* Revert from offset-wait state back to offset-query
1081 * prior to leaving the broker to avoid stalling
1082 * on the new broker waiting for a offset reply from
1083 * this old broker (that might not come and thus need
1084 * to time out..slowly) */
1085 if (rktp->rktp_fetch_state == RD_KAFKA_TOPPAR_FETCH_OFFSET_WAIT)
1086 rd_kafka_toppar_set_fetch_state(
1087 rktp, RD_KAFKA_TOPPAR_FETCH_OFFSET_QUERY);
1088
1089 rko = rd_kafka_op_new(RD_KAFKA_OP_PARTITION_LEAVE);
1090 rko->rko_rktp = rd_kafka_toppar_keep(rktp);
1091
1092 rd_kafka_dbg(rktp->rktp_rkt->rkt_rk, TOPIC, "BRKMIGR",
1093 "%.*s [%"PRId32"] %p sending final LEAVE for removal by %s",
1094 RD_KAFKAP_STR_PR(rktp->rktp_rkt->rkt_topic),
1095 rktp->rktp_partition, rktp,
1096 rd_kafka_broker_name(dest_rkb));
1097
1098 rd_kafka_q_enq(dest_rkb->rkb_ops, rko);
1099 }
1100
1101
1102 /**
1103 * @brief Delegates toppar 'rktp' to broker 'rkb'. 'rkb' may be NULL to
1104 * undelegate broker.
1105 *
1106 * @locks Caller must have rd_kafka_toppar_lock(rktp) held.
1107 */
rd_kafka_toppar_broker_delegate(rd_kafka_toppar_t * rktp,rd_kafka_broker_t * rkb)1108 void rd_kafka_toppar_broker_delegate (rd_kafka_toppar_t *rktp,
1109 rd_kafka_broker_t *rkb) {
1110 rd_kafka_t *rk = rktp->rktp_rkt->rkt_rk;
1111 int internal_fallback = 0;
1112
1113 rd_kafka_dbg(rktp->rktp_rkt->rkt_rk, TOPIC, "BRKDELGT",
1114 "%s [%"PRId32"]: delegate to broker %s "
1115 "(rktp %p, term %d, ref %d)",
1116 rktp->rktp_rkt->rkt_topic->str, rktp->rktp_partition,
1117 rkb ? rkb->rkb_name : "(none)",
1118 rktp, rd_kafka_terminating(rk),
1119 rd_refcnt_get(&rktp->rktp_refcnt));
1120
1121 /* Undelegated toppars are delgated to the internal
1122 * broker for bookkeeping. */
1123 if (!rkb && !rd_kafka_terminating(rk)) {
1124 rkb = rd_kafka_broker_internal(rk);
1125 internal_fallback = 1;
1126 }
1127
1128 if (rktp->rktp_broker == rkb && !rktp->rktp_next_broker) {
1129 rd_kafka_dbg(rktp->rktp_rkt->rkt_rk, TOPIC, "BRKDELGT",
1130 "%.*s [%"PRId32"]: not updating broker: "
1131 "already on correct broker %s",
1132 RD_KAFKAP_STR_PR(rktp->rktp_rkt->rkt_topic),
1133 rktp->rktp_partition,
1134 rkb ? rd_kafka_broker_name(rkb) : "(none)");
1135
1136 if (internal_fallback)
1137 rd_kafka_broker_destroy(rkb);
1138 return;
1139 }
1140
1141 if (rktp->rktp_broker)
1142 rd_kafka_dbg(rktp->rktp_rkt->rkt_rk, TOPIC, "BRKDELGT",
1143 "%.*s [%"PRId32"]: no longer delegated to "
1144 "broker %s",
1145 RD_KAFKAP_STR_PR(rktp->rktp_rkt->rkt_topic),
1146 rktp->rktp_partition,
1147 rd_kafka_broker_name(rktp->rktp_broker));
1148
1149
1150 if (rkb) {
1151 rd_kafka_dbg(rktp->rktp_rkt->rkt_rk, TOPIC, "BRKDELGT",
1152 "%.*s [%"PRId32"]: delegating to broker %s "
1153 "for partition with %i messages "
1154 "(%"PRIu64" bytes) queued",
1155 RD_KAFKAP_STR_PR(rktp->rktp_rkt->rkt_topic),
1156 rktp->rktp_partition,
1157 rd_kafka_broker_name(rkb),
1158 rktp->rktp_msgq.rkmq_msg_cnt,
1159 rktp->rktp_msgq.rkmq_msg_bytes);
1160
1161
1162 } else {
1163 rd_kafka_dbg(rktp->rktp_rkt->rkt_rk, TOPIC, "BRKDELGT",
1164 "%.*s [%"PRId32"]: no broker delegated",
1165 RD_KAFKAP_STR_PR(rktp->rktp_rkt->rkt_topic),
1166 rktp->rktp_partition);
1167 }
1168
1169 if (rktp->rktp_broker || rkb)
1170 rd_kafka_toppar_broker_migrate(rktp, rktp->rktp_broker, rkb);
1171
1172 if (internal_fallback)
1173 rd_kafka_broker_destroy(rkb);
1174 }
1175
1176
1177
1178
1179
1180 void
rd_kafka_toppar_offset_commit_result(rd_kafka_toppar_t * rktp,rd_kafka_resp_err_t err,rd_kafka_topic_partition_list_t * offsets)1181 rd_kafka_toppar_offset_commit_result (rd_kafka_toppar_t *rktp,
1182 rd_kafka_resp_err_t err,
1183 rd_kafka_topic_partition_list_t *offsets){
1184 if (err)
1185 rd_kafka_consumer_err(rktp->rktp_fetchq,
1186 /* FIXME: propagate broker_id */
1187 RD_KAFKA_NODEID_UA,
1188 err, 0 /* FIXME:VERSION*/,
1189 NULL, rktp, RD_KAFKA_OFFSET_INVALID,
1190 "Offset commit failed: %s",
1191 rd_kafka_err2str(err));
1192
1193 rd_kafka_toppar_lock(rktp);
1194 if (!err)
1195 rktp->rktp_committed_offset = offsets->elems[0].offset;
1196
1197 /* When stopping toppars:
1198 * Final commit is now done (or failed), propagate. */
1199 if (rktp->rktp_fetch_state == RD_KAFKA_TOPPAR_FETCH_STOPPING)
1200 rd_kafka_toppar_fetch_stopped(rktp, err);
1201
1202 rd_kafka_toppar_unlock(rktp);
1203 }
1204
1205
1206
1207
1208
1209
1210
1211
1212
1213
1214
1215 /**
1216 * Handle the next offset to consume for a toppar.
1217 * This is used during initial setup when trying to figure out what
1218 * offset to start consuming from.
1219 *
1220 * Locality: toppar handler thread.
1221 * Locks: toppar_lock(rktp) must be held
1222 */
rd_kafka_toppar_next_offset_handle(rd_kafka_toppar_t * rktp,int64_t Offset)1223 void rd_kafka_toppar_next_offset_handle (rd_kafka_toppar_t *rktp,
1224 int64_t Offset) {
1225
1226 if (RD_KAFKA_OFFSET_IS_LOGICAL(Offset)) {
1227 /* Offset storage returned logical offset (e.g. "end"),
1228 * look it up. */
1229
1230 /* Save next offset, even if logical, so that e.g.,
1231 * assign(BEGINNING) survives a pause+resume, etc.
1232 * See issue #2105. */
1233 rktp->rktp_next_offset = Offset;
1234
1235 rd_kafka_offset_reset(rktp, Offset, RD_KAFKA_RESP_ERR_NO_ERROR,
1236 "update");
1237 return;
1238 }
1239
1240 /* Adjust by TAIL count if, if wanted */
1241 if (rktp->rktp_query_offset <=
1242 RD_KAFKA_OFFSET_TAIL_BASE) {
1243 int64_t orig_Offset = Offset;
1244 int64_t tail_cnt =
1245 llabs(rktp->rktp_query_offset -
1246 RD_KAFKA_OFFSET_TAIL_BASE);
1247
1248 if (tail_cnt > Offset)
1249 Offset = 0;
1250 else
1251 Offset -= tail_cnt;
1252
1253 rd_kafka_dbg(rktp->rktp_rkt->rkt_rk, TOPIC, "OFFSET",
1254 "OffsetReply for topic %s [%"PRId32"]: "
1255 "offset %"PRId64": adjusting for "
1256 "OFFSET_TAIL(%"PRId64"): "
1257 "effective offset %"PRId64,
1258 rktp->rktp_rkt->rkt_topic->str,
1259 rktp->rktp_partition,
1260 orig_Offset, tail_cnt,
1261 Offset);
1262 }
1263
1264 rktp->rktp_next_offset = Offset;
1265
1266 rd_kafka_toppar_set_fetch_state(rktp, RD_KAFKA_TOPPAR_FETCH_ACTIVE);
1267
1268 /* Wake-up broker thread which might be idling on IO */
1269 if (rktp->rktp_broker)
1270 rd_kafka_broker_wakeup(rktp->rktp_broker);
1271
1272 }
1273
1274
1275
1276 /**
1277 * Fetch committed offset for a single partition. (simple consumer)
1278 *
1279 * Locality: toppar thread
1280 */
rd_kafka_toppar_offset_fetch(rd_kafka_toppar_t * rktp,rd_kafka_replyq_t replyq)1281 void rd_kafka_toppar_offset_fetch (rd_kafka_toppar_t *rktp,
1282 rd_kafka_replyq_t replyq) {
1283 rd_kafka_t *rk = rktp->rktp_rkt->rkt_rk;
1284 rd_kafka_topic_partition_list_t *part;
1285 rd_kafka_op_t *rko;
1286
1287 rd_kafka_dbg(rk, TOPIC, "OFFSETREQ",
1288 "Partition %.*s [%"PRId32"]: querying cgrp for "
1289 "committed offset (opv %d)",
1290 RD_KAFKAP_STR_PR(rktp->rktp_rkt->rkt_topic),
1291 rktp->rktp_partition, replyq.version);
1292
1293 part = rd_kafka_topic_partition_list_new(1);
1294 rd_kafka_topic_partition_list_add0(__FUNCTION__,__LINE__,part,
1295 rktp->rktp_rkt->rkt_topic->str,
1296 rktp->rktp_partition,
1297 rktp);
1298
1299 rko = rd_kafka_op_new(RD_KAFKA_OP_OFFSET_FETCH);
1300 rko->rko_rktp = rd_kafka_toppar_keep(rktp);
1301 rko->rko_replyq = replyq;
1302
1303 rko->rko_u.offset_fetch.partitions = part;
1304 rko->rko_u.offset_fetch.require_stable =
1305 rk->rk_conf.isolation_level == RD_KAFKA_READ_COMMITTED;
1306 rko->rko_u.offset_fetch.do_free = 1;
1307
1308 rd_kafka_q_enq(rktp->rktp_cgrp->rkcg_ops, rko);
1309 }
1310
1311
1312
1313
1314 /**
1315 * Toppar based OffsetResponse handling.
1316 * This is used for finding the next offset to Fetch.
1317 *
1318 * Locality: toppar handler thread
1319 */
rd_kafka_toppar_handle_Offset(rd_kafka_t * rk,rd_kafka_broker_t * rkb,rd_kafka_resp_err_t err,rd_kafka_buf_t * rkbuf,rd_kafka_buf_t * request,void * opaque)1320 static void rd_kafka_toppar_handle_Offset (rd_kafka_t *rk,
1321 rd_kafka_broker_t *rkb,
1322 rd_kafka_resp_err_t err,
1323 rd_kafka_buf_t *rkbuf,
1324 rd_kafka_buf_t *request,
1325 void *opaque) {
1326 rd_kafka_toppar_t *rktp = opaque;
1327 rd_kafka_topic_partition_list_t *offsets;
1328 rd_kafka_topic_partition_t *rktpar;
1329 int64_t Offset;
1330
1331 rd_kafka_toppar_lock(rktp);
1332 /* Drop reply from previous partition leader */
1333 if (err != RD_KAFKA_RESP_ERR__DESTROY && rktp->rktp_broker != rkb)
1334 err = RD_KAFKA_RESP_ERR__OUTDATED;
1335 rd_kafka_toppar_unlock(rktp);
1336
1337 offsets = rd_kafka_topic_partition_list_new(1);
1338
1339 rd_rkb_dbg(rkb, TOPIC, "OFFSET",
1340 "Offset reply for "
1341 "topic %.*s [%"PRId32"] (v%d vs v%d)",
1342 RD_KAFKAP_STR_PR(rktp->rktp_rkt->rkt_topic),
1343 rktp->rktp_partition, request->rkbuf_replyq.version,
1344 rktp->rktp_op_version);
1345
1346 rd_dassert(request->rkbuf_replyq.version > 0);
1347 if (err != RD_KAFKA_RESP_ERR__DESTROY &&
1348 rd_kafka_buf_version_outdated(request, rktp->rktp_op_version)) {
1349 /* Outdated request response, ignore. */
1350 err = RD_KAFKA_RESP_ERR__OUTDATED;
1351 }
1352
1353 if (err != RD_KAFKA_RESP_ERR__OUTDATED) {
1354 /* Parse and return Offset */
1355 err = rd_kafka_handle_Offset(rkb->rkb_rk, rkb, err,
1356 rkbuf, request, offsets);
1357 }
1358
1359 if (!err) {
1360 if (!(rktpar = rd_kafka_topic_partition_list_find(
1361 offsets,
1362 rktp->rktp_rkt->rkt_topic->str,
1363 rktp->rktp_partition)))
1364 err = RD_KAFKA_RESP_ERR__UNKNOWN_PARTITION;
1365 else if (rktpar->err)
1366 err = rktpar->err;
1367 }
1368
1369 if (err) {
1370 rd_rkb_dbg(rkb, TOPIC, "OFFSET",
1371 "Offset reply error for "
1372 "topic %.*s [%"PRId32"] (v%d): %s",
1373 RD_KAFKAP_STR_PR(rktp->rktp_rkt->rkt_topic),
1374 rktp->rktp_partition, request->rkbuf_replyq.version,
1375 rd_kafka_err2str(err));
1376
1377 rd_kafka_topic_partition_list_destroy(offsets);
1378
1379 if (err == RD_KAFKA_RESP_ERR__DESTROY ||
1380 err == RD_KAFKA_RESP_ERR__OUTDATED) {
1381 /* Termination or outdated, quick cleanup. */
1382
1383 if (err == RD_KAFKA_RESP_ERR__OUTDATED) {
1384 rd_kafka_toppar_lock(rktp);
1385 rd_kafka_toppar_offset_retry(
1386 rktp, 500, "outdated offset response");
1387 rd_kafka_toppar_unlock(rktp);
1388 }
1389
1390 /* from request.opaque */
1391 rd_kafka_toppar_destroy(rktp);
1392 return;
1393
1394 } else if (err == RD_KAFKA_RESP_ERR__IN_PROGRESS)
1395 return; /* Retry in progress */
1396
1397
1398 rd_kafka_toppar_lock(rktp);
1399 rd_kafka_offset_reset(rktp, rktp->rktp_query_offset,
1400 err,
1401 "failed to query logical offset");
1402
1403 /* Signal error back to application,
1404 * unless this is an intermittent problem
1405 * (e.g.,connection lost) */
1406 if (!(err == RD_KAFKA_RESP_ERR_NOT_LEADER_FOR_PARTITION ||
1407 err == RD_KAFKA_RESP_ERR_LEADER_NOT_AVAILABLE ||
1408 err == RD_KAFKA_RESP_ERR__TRANSPORT ||
1409 err == RD_KAFKA_RESP_ERR__TIMED_OUT)) {
1410 rd_kafka_consumer_err(
1411 rktp->rktp_fetchq, rkb->rkb_nodeid,
1412 err, 0, NULL, rktp,
1413 (rktp->rktp_query_offset <=
1414 RD_KAFKA_OFFSET_TAIL_BASE ?
1415 rktp->rktp_query_offset -
1416 RD_KAFKA_OFFSET_TAIL_BASE :
1417 rktp->rktp_query_offset),
1418 "Failed to query logical offset %s: %s",
1419 rd_kafka_offset2str(rktp->rktp_query_offset),
1420 rd_kafka_err2str(err));
1421 }
1422 rd_kafka_toppar_unlock(rktp);
1423
1424 rd_kafka_toppar_destroy(rktp); /* from request.opaque */
1425 return;
1426 }
1427
1428 Offset = rktpar->offset;
1429 rd_kafka_topic_partition_list_destroy(offsets);
1430
1431 rd_kafka_toppar_lock(rktp);
1432 rd_kafka_dbg(rktp->rktp_rkt->rkt_rk, TOPIC, "OFFSET",
1433 "Offset %s request for %.*s [%"PRId32"] "
1434 "returned offset %s (%"PRId64")",
1435 rd_kafka_offset2str(rktp->rktp_query_offset),
1436 RD_KAFKAP_STR_PR(rktp->rktp_rkt->rkt_topic),
1437 rktp->rktp_partition, rd_kafka_offset2str(Offset), Offset);
1438
1439 rd_kafka_toppar_next_offset_handle(rktp, Offset);
1440 rd_kafka_toppar_unlock(rktp);
1441
1442 rd_kafka_toppar_destroy(rktp); /* from request.opaque */
1443 }
1444
1445
1446 /**
1447 * @brief An Offset fetch failed (for whatever reason) in
1448 * the RD_KAFKA_TOPPAR_FETCH_OFFSET_WAIT state:
1449 * set the state back to FETCH_OFFSET_QUERY and start the
1450 * offset_query_tmr to trigger a new request eventually.
1451 *
1452 * @locality toppar handler thread
1453 * @locks toppar_lock() MUST be held
1454 */
rd_kafka_toppar_offset_retry(rd_kafka_toppar_t * rktp,int backoff_ms,const char * reason)1455 static void rd_kafka_toppar_offset_retry (rd_kafka_toppar_t *rktp,
1456 int backoff_ms,
1457 const char *reason) {
1458 rd_ts_t tmr_next;
1459 int restart_tmr;
1460
1461 /* (Re)start timer if not started or the current timeout
1462 * is larger than \p backoff_ms. */
1463 tmr_next = rd_kafka_timer_next(&rktp->rktp_rkt->rkt_rk->rk_timers,
1464 &rktp->rktp_offset_query_tmr, 1);
1465
1466 restart_tmr = (tmr_next == -1 ||
1467 tmr_next > rd_clock() + (backoff_ms * 1000ll));
1468
1469 rd_kafka_dbg(rktp->rktp_rkt->rkt_rk, TOPIC, "OFFSET",
1470 "%s [%"PRId32"]: %s: %s for offset %s",
1471 rktp->rktp_rkt->rkt_topic->str,
1472 rktp->rktp_partition,
1473 reason,
1474 restart_tmr ?
1475 "(re)starting offset query timer" :
1476 "offset query timer already scheduled",
1477 rd_kafka_offset2str(rktp->rktp_query_offset));
1478
1479 rd_kafka_toppar_set_fetch_state(rktp,
1480 RD_KAFKA_TOPPAR_FETCH_OFFSET_QUERY);
1481
1482 if (restart_tmr)
1483 rd_kafka_timer_start(&rktp->rktp_rkt->rkt_rk->rk_timers,
1484 &rktp->rktp_offset_query_tmr,
1485 backoff_ms*1000ll,
1486 rd_kafka_offset_query_tmr_cb, rktp);
1487 }
1488
1489
1490
1491 /**
1492 * Send OffsetRequest for toppar.
1493 *
1494 * If \p backoff_ms is non-zero only the query timer is started,
1495 * otherwise a query is triggered directly.
1496 *
1497 * Locality: toppar handler thread
1498 * Locks: toppar_lock() must be held
1499 */
rd_kafka_toppar_offset_request(rd_kafka_toppar_t * rktp,int64_t query_offset,int backoff_ms)1500 void rd_kafka_toppar_offset_request (rd_kafka_toppar_t *rktp,
1501 int64_t query_offset, int backoff_ms) {
1502 rd_kafka_broker_t *rkb;
1503
1504 rd_kafka_assert(NULL,
1505 thrd_is_current(rktp->rktp_rkt->rkt_rk->rk_thread));
1506
1507 rkb = rktp->rktp_broker;
1508
1509 if (!backoff_ms && (!rkb || rkb->rkb_source == RD_KAFKA_INTERNAL))
1510 backoff_ms = 500;
1511
1512 if (backoff_ms) {
1513 rd_kafka_toppar_offset_retry(rktp, backoff_ms,
1514 !rkb ?
1515 "no current leader for partition":
1516 "backoff");
1517 return;
1518 }
1519
1520
1521 rd_kafka_timer_stop(&rktp->rktp_rkt->rkt_rk->rk_timers,
1522 &rktp->rktp_offset_query_tmr, 1/*lock*/);
1523
1524
1525 if (query_offset == RD_KAFKA_OFFSET_STORED &&
1526 rktp->rktp_rkt->rkt_conf.offset_store_method ==
1527 RD_KAFKA_OFFSET_METHOD_BROKER) {
1528 /*
1529 * Get stored offset from broker based storage:
1530 * ask cgrp manager for offsets
1531 */
1532 rd_kafka_toppar_offset_fetch(
1533 rktp,
1534 RD_KAFKA_REPLYQ(rktp->rktp_ops,
1535 rktp->rktp_op_version));
1536
1537 } else {
1538 rd_kafka_topic_partition_list_t *offsets;
1539
1540 /*
1541 * Look up logical offset (end,beginning,tail,..)
1542 */
1543
1544 rd_rkb_dbg(rkb, TOPIC, "OFFREQ",
1545 "Partition %.*s [%"PRId32"]: querying for logical "
1546 "offset %s (opv %d)",
1547 RD_KAFKAP_STR_PR(rktp->rktp_rkt->rkt_topic),
1548 rktp->rktp_partition,
1549 rd_kafka_offset2str(query_offset),
1550 rktp->rktp_op_version);
1551
1552 rd_kafka_toppar_keep(rktp); /* refcnt for OffsetRequest opaque*/
1553
1554 if (query_offset <= RD_KAFKA_OFFSET_TAIL_BASE)
1555 query_offset = RD_KAFKA_OFFSET_END;
1556
1557 offsets = rd_kafka_topic_partition_list_new(1);
1558 rd_kafka_topic_partition_list_add(
1559 offsets,
1560 rktp->rktp_rkt->rkt_topic->str,
1561 rktp->rktp_partition)->offset = query_offset;
1562
1563 rd_kafka_OffsetRequest(rkb, offsets, 0,
1564 RD_KAFKA_REPLYQ(rktp->rktp_ops,
1565 rktp->rktp_op_version),
1566 rd_kafka_toppar_handle_Offset,
1567 rktp);
1568
1569 rd_kafka_topic_partition_list_destroy(offsets);
1570 }
1571
1572 rd_kafka_toppar_set_fetch_state(rktp,
1573 RD_KAFKA_TOPPAR_FETCH_OFFSET_WAIT);
1574 }
1575
1576
1577 /**
1578 * Start fetching toppar.
1579 *
1580 * Locality: toppar handler thread
1581 * Locks: none
1582 */
rd_kafka_toppar_fetch_start(rd_kafka_toppar_t * rktp,int64_t offset,rd_kafka_op_t * rko_orig)1583 static void rd_kafka_toppar_fetch_start (rd_kafka_toppar_t *rktp,
1584 int64_t offset,
1585 rd_kafka_op_t *rko_orig) {
1586 rd_kafka_cgrp_t *rkcg = rko_orig->rko_u.fetch_start.rkcg;
1587 rd_kafka_resp_err_t err = 0;
1588 int32_t version = rko_orig->rko_version;
1589
1590 rd_kafka_toppar_lock(rktp);
1591
1592 rd_kafka_dbg(rktp->rktp_rkt->rkt_rk, TOPIC, "FETCH",
1593 "Start fetch for %.*s [%"PRId32"] in "
1594 "state %s at offset %s (v%"PRId32")",
1595 RD_KAFKAP_STR_PR(rktp->rktp_rkt->rkt_topic),
1596 rktp->rktp_partition,
1597 rd_kafka_fetch_states[rktp->rktp_fetch_state],
1598 rd_kafka_offset2str(offset), version);
1599
1600 if (rktp->rktp_fetch_state == RD_KAFKA_TOPPAR_FETCH_STOPPING) {
1601 err = RD_KAFKA_RESP_ERR__PREV_IN_PROGRESS;
1602 rd_kafka_toppar_unlock(rktp);
1603 goto err_reply;
1604 }
1605
1606 rd_kafka_toppar_op_version_bump(rktp, version);
1607
1608 if (rkcg) {
1609 rd_kafka_assert(rktp->rktp_rkt->rkt_rk, !rktp->rktp_cgrp);
1610 /* Attach toppar to cgrp */
1611 rktp->rktp_cgrp = rkcg;
1612 rd_kafka_cgrp_op(rkcg, rktp, RD_KAFKA_NO_REPLYQ,
1613 RD_KAFKA_OP_PARTITION_JOIN, 0);
1614 }
1615
1616
1617 if (offset == RD_KAFKA_OFFSET_BEGINNING ||
1618 offset == RD_KAFKA_OFFSET_END ||
1619 offset <= RD_KAFKA_OFFSET_TAIL_BASE) {
1620 rd_kafka_toppar_next_offset_handle(rktp, offset);
1621
1622 } else if (offset == RD_KAFKA_OFFSET_STORED) {
1623 rd_kafka_offset_store_init(rktp);
1624
1625 } else if (offset == RD_KAFKA_OFFSET_INVALID) {
1626 rd_kafka_offset_reset(rktp, offset,
1627 RD_KAFKA_RESP_ERR__NO_OFFSET,
1628 "no previously committed offset "
1629 "available");
1630
1631 } else {
1632 rktp->rktp_next_offset = offset;
1633 rd_kafka_toppar_set_fetch_state(rktp,
1634 RD_KAFKA_TOPPAR_FETCH_ACTIVE);
1635
1636 /* Wake-up broker thread which might be idling on IO */
1637 if (rktp->rktp_broker)
1638 rd_kafka_broker_wakeup(rktp->rktp_broker);
1639
1640 }
1641
1642 rktp->rktp_offsets_fin.eof_offset = RD_KAFKA_OFFSET_INVALID;
1643
1644 rd_kafka_toppar_unlock(rktp);
1645
1646 /* Signal back to caller thread that start has commenced, or err */
1647 err_reply:
1648 if (rko_orig->rko_replyq.q) {
1649 rd_kafka_op_t *rko;
1650
1651 rko = rd_kafka_op_new(RD_KAFKA_OP_FETCH_START);
1652
1653 rko->rko_err = err;
1654 rko->rko_rktp = rd_kafka_toppar_keep(rktp);
1655
1656 rd_kafka_replyq_enq(&rko_orig->rko_replyq, rko, 0);
1657 }
1658 }
1659
1660
1661
1662
1663 /**
1664 * Mark toppar's fetch state as stopped (all decommissioning is done,
1665 * offsets are stored, etc).
1666 *
1667 * Locality: toppar handler thread
1668 * Locks: toppar_lock(rktp) MUST be held
1669 */
rd_kafka_toppar_fetch_stopped(rd_kafka_toppar_t * rktp,rd_kafka_resp_err_t err)1670 void rd_kafka_toppar_fetch_stopped (rd_kafka_toppar_t *rktp,
1671 rd_kafka_resp_err_t err) {
1672
1673
1674 rd_kafka_toppar_set_fetch_state(rktp, RD_KAFKA_TOPPAR_FETCH_STOPPED);
1675
1676 rktp->rktp_app_offset = RD_KAFKA_OFFSET_INVALID;
1677
1678 if (rktp->rktp_cgrp) {
1679 /* Detach toppar from cgrp */
1680 rd_kafka_cgrp_op(rktp->rktp_cgrp, rktp, RD_KAFKA_NO_REPLYQ,
1681 RD_KAFKA_OP_PARTITION_LEAVE, 0);
1682 rktp->rktp_cgrp = NULL;
1683 }
1684
1685 /* Signal back to application thread that stop is done. */
1686 if (rktp->rktp_replyq.q) {
1687 rd_kafka_op_t *rko;
1688 rko = rd_kafka_op_new(RD_KAFKA_OP_FETCH_STOP|RD_KAFKA_OP_REPLY);
1689 rko->rko_err = err;
1690 rko->rko_rktp = rd_kafka_toppar_keep(rktp);
1691
1692 rd_kafka_replyq_enq(&rktp->rktp_replyq, rko, 0);
1693 }
1694 }
1695
1696
1697 /**
1698 * Stop toppar fetcher.
1699 * This is usually an async operation.
1700 *
1701 * Locality: toppar handler thread
1702 */
rd_kafka_toppar_fetch_stop(rd_kafka_toppar_t * rktp,rd_kafka_op_t * rko_orig)1703 void rd_kafka_toppar_fetch_stop (rd_kafka_toppar_t *rktp,
1704 rd_kafka_op_t *rko_orig) {
1705 int32_t version = rko_orig->rko_version;
1706
1707 rd_kafka_toppar_lock(rktp);
1708
1709 rd_kafka_dbg(rktp->rktp_rkt->rkt_rk, TOPIC, "FETCH",
1710 "Stopping fetch for %.*s [%"PRId32"] in state %s (v%d)",
1711 RD_KAFKAP_STR_PR(rktp->rktp_rkt->rkt_topic),
1712 rktp->rktp_partition,
1713 rd_kafka_fetch_states[rktp->rktp_fetch_state], version);
1714
1715 rd_kafka_toppar_op_version_bump(rktp, version);
1716
1717 /* Abort pending offset lookups. */
1718 if (rktp->rktp_fetch_state == RD_KAFKA_TOPPAR_FETCH_OFFSET_QUERY)
1719 rd_kafka_timer_stop(&rktp->rktp_rkt->rkt_rk->rk_timers,
1720 &rktp->rktp_offset_query_tmr,
1721 1/*lock*/);
1722
1723 /* Clear out the forwarding queue. */
1724 rd_kafka_q_fwd_set(rktp->rktp_fetchq, NULL);
1725
1726 /* Assign the future replyq to propagate stop results. */
1727 rd_kafka_assert(rktp->rktp_rkt->rkt_rk, rktp->rktp_replyq.q == NULL);
1728 rktp->rktp_replyq = rko_orig->rko_replyq;
1729 rd_kafka_replyq_clear(&rko_orig->rko_replyq);
1730
1731 rd_kafka_toppar_set_fetch_state(rktp, RD_KAFKA_TOPPAR_FETCH_STOPPING);
1732
1733 /* Stop offset store (possibly async).
1734 * NOTE: will call .._stopped() if store finishes immediately,
1735 * so no more operations after this call! */
1736 rd_kafka_offset_store_stop(rktp);
1737
1738 rd_kafka_toppar_unlock(rktp);
1739 }
1740
1741
1742 /**
1743 * Update a toppars offset.
1744 * The toppar must have been previously FETCH_START:ed
1745 *
1746 * Locality: toppar handler thread
1747 */
rd_kafka_toppar_seek(rd_kafka_toppar_t * rktp,int64_t offset,rd_kafka_op_t * rko_orig)1748 void rd_kafka_toppar_seek (rd_kafka_toppar_t *rktp,
1749 int64_t offset, rd_kafka_op_t *rko_orig) {
1750 rd_kafka_resp_err_t err = 0;
1751 int32_t version = rko_orig->rko_version;
1752
1753 rd_kafka_toppar_lock(rktp);
1754
1755 rd_kafka_dbg(rktp->rktp_rkt->rkt_rk, TOPIC, "FETCH",
1756 "Seek %.*s [%"PRId32"] to offset %s "
1757 "in state %s (v%"PRId32")",
1758 RD_KAFKAP_STR_PR(rktp->rktp_rkt->rkt_topic),
1759 rktp->rktp_partition,
1760 rd_kafka_offset2str(offset),
1761 rd_kafka_fetch_states[rktp->rktp_fetch_state], version);
1762
1763
1764 if (rktp->rktp_fetch_state == RD_KAFKA_TOPPAR_FETCH_STOPPING) {
1765 err = RD_KAFKA_RESP_ERR__PREV_IN_PROGRESS;
1766 goto err_reply;
1767 } else if (!RD_KAFKA_TOPPAR_FETCH_IS_STARTED(rktp->rktp_fetch_state)) {
1768 err = RD_KAFKA_RESP_ERR__STATE;
1769 goto err_reply;
1770 } else if (offset == RD_KAFKA_OFFSET_STORED) {
1771 err = RD_KAFKA_RESP_ERR__INVALID_ARG;
1772 goto err_reply;
1773 }
1774
1775 rd_kafka_toppar_op_version_bump(rktp, version);
1776
1777 /* Abort pending offset lookups. */
1778 if (rktp->rktp_fetch_state == RD_KAFKA_TOPPAR_FETCH_OFFSET_QUERY)
1779 rd_kafka_timer_stop(&rktp->rktp_rkt->rkt_rk->rk_timers,
1780 &rktp->rktp_offset_query_tmr,
1781 1/*lock*/);
1782
1783 if (RD_KAFKA_OFFSET_IS_LOGICAL(offset))
1784 rd_kafka_toppar_next_offset_handle(rktp, offset);
1785 else {
1786 rktp->rktp_next_offset = offset;
1787 rd_kafka_toppar_set_fetch_state(rktp,
1788 RD_KAFKA_TOPPAR_FETCH_ACTIVE);
1789
1790 /* Wake-up broker thread which might be idling on IO */
1791 if (rktp->rktp_broker)
1792 rd_kafka_broker_wakeup(rktp->rktp_broker);
1793 }
1794
1795 /* Signal back to caller thread that seek has commenced, or err */
1796 err_reply:
1797 rd_kafka_toppar_unlock(rktp);
1798
1799 if (rko_orig->rko_replyq.q) {
1800 rd_kafka_op_t *rko;
1801
1802 rko = rd_kafka_op_new(RD_KAFKA_OP_SEEK|RD_KAFKA_OP_REPLY);
1803
1804 rko->rko_err = err;
1805 rko->rko_u.fetch_start.offset =
1806 rko_orig->rko_u.fetch_start.offset;
1807 rko->rko_rktp = rd_kafka_toppar_keep(rktp);
1808
1809 rd_kafka_replyq_enq(&rko_orig->rko_replyq, rko, 0);
1810 }
1811 }
1812
1813
1814 /**
1815 * @brief Pause/resume toppar.
1816 *
1817 * This is the internal handler of the pause/resume op.
1818 *
1819 * @locality toppar's handler thread
1820 */
rd_kafka_toppar_pause_resume(rd_kafka_toppar_t * rktp,rd_kafka_op_t * rko_orig)1821 static void rd_kafka_toppar_pause_resume (rd_kafka_toppar_t *rktp,
1822 rd_kafka_op_t *rko_orig) {
1823 rd_kafka_t *rk = rktp->rktp_rkt->rkt_rk;
1824 int pause = rko_orig->rko_u.pause.pause;
1825 int flag = rko_orig->rko_u.pause.flag;
1826 int32_t version = rko_orig->rko_version;
1827
1828 rd_kafka_toppar_lock(rktp);
1829
1830 rd_kafka_toppar_op_version_bump(rktp, version);
1831
1832 if (!pause && (rktp->rktp_flags & flag) != flag) {
1833 rd_kafka_dbg(rk, TOPIC, "RESUME",
1834 "Not resuming %s [%"PRId32"]: "
1835 "partition is not paused by %s",
1836 rktp->rktp_rkt->rkt_topic->str,
1837 rktp->rktp_partition,
1838 (flag & RD_KAFKA_TOPPAR_F_APP_PAUSE ?
1839 "application" : "library"));
1840 rd_kafka_toppar_unlock(rktp);
1841 return;
1842 }
1843
1844 if (pause) {
1845 /* Pause partition by setting either
1846 * RD_KAFKA_TOPPAR_F_APP_PAUSE or
1847 * RD_KAFKA_TOPPAR_F_LIB_PAUSE */
1848 rktp->rktp_flags |= flag;
1849
1850 if (rk->rk_type == RD_KAFKA_CONSUMER) {
1851 /* Save offset of last consumed message+1 as the
1852 * next message to fetch on resume. */
1853 if (rktp->rktp_app_offset != RD_KAFKA_OFFSET_INVALID) {
1854 rktp->rktp_next_offset = rktp->rktp_app_offset;
1855 }
1856
1857 rd_kafka_dbg(rk, TOPIC, pause?"PAUSE":"RESUME",
1858 "%s %s [%"PRId32"]: at offset %s "
1859 "(state %s, v%d)",
1860 pause ? "Pause":"Resume",
1861 rktp->rktp_rkt->rkt_topic->str,
1862 rktp->rktp_partition,
1863 rd_kafka_offset2str(
1864 rktp->rktp_next_offset),
1865 rd_kafka_fetch_states[rktp->
1866 rktp_fetch_state],
1867 version);
1868 } else {
1869 rd_kafka_dbg(rk, TOPIC, pause?"PAUSE":"RESUME",
1870 "%s %s [%"PRId32"] (state %s, v%d)",
1871 pause ? "Pause":"Resume",
1872 rktp->rktp_rkt->rkt_topic->str,
1873 rktp->rktp_partition,
1874 rd_kafka_fetch_states[rktp->
1875 rktp_fetch_state],
1876 version);
1877 }
1878
1879 } else {
1880 /* Unset the RD_KAFKA_TOPPAR_F_APP_PAUSE or
1881 * RD_KAFKA_TOPPAR_F_LIB_PAUSE flag */
1882 rktp->rktp_flags &= ~flag;
1883
1884 if (rk->rk_type == RD_KAFKA_CONSUMER) {
1885 rd_kafka_dbg(rk, TOPIC, pause?"PAUSE":"RESUME",
1886 "%s %s [%"PRId32"]: at offset %s "
1887 "(state %s, v%d)",
1888 rktp->rktp_fetch_state ==
1889 RD_KAFKA_TOPPAR_FETCH_ACTIVE ?
1890 "Resuming" : "Not resuming stopped",
1891 rktp->rktp_rkt->rkt_topic->str,
1892 rktp->rktp_partition,
1893 rd_kafka_offset2str(
1894 rktp->rktp_next_offset),
1895 rd_kafka_fetch_states[rktp->
1896 rktp_fetch_state],
1897 version);
1898
1899 /* If the resuming offset is logical we
1900 * need to trigger a seek (that performs the
1901 * logical->absolute lookup logic) to get
1902 * things going.
1903 * Typical case is when a partition is paused
1904 * before anything has been consumed by app
1905 * yet thus having rktp_app_offset=INVALID. */
1906 if (!RD_KAFKA_TOPPAR_IS_PAUSED(rktp) &&
1907 (rktp->rktp_fetch_state ==
1908 RD_KAFKA_TOPPAR_FETCH_ACTIVE ||
1909 rktp->rktp_fetch_state ==
1910 RD_KAFKA_TOPPAR_FETCH_OFFSET_WAIT) &&
1911 rktp->rktp_next_offset == RD_KAFKA_OFFSET_INVALID)
1912 rd_kafka_toppar_next_offset_handle(
1913 rktp, rktp->rktp_next_offset);
1914
1915 } else
1916 rd_kafka_dbg(rk, TOPIC, pause?"PAUSE":"RESUME",
1917 "%s %s [%"PRId32"] (state %s, v%d)",
1918 pause ? "Pause":"Resume",
1919 rktp->rktp_rkt->rkt_topic->str,
1920 rktp->rktp_partition,
1921 rd_kafka_fetch_states[rktp->
1922 rktp_fetch_state],
1923 version);
1924 }
1925 rd_kafka_toppar_unlock(rktp);
1926
1927 if (pause && rk->rk_type == RD_KAFKA_CONSUMER) {
1928 /* Flush partition's fetch queue */
1929 rd_kafka_q_purge_toppar_version(rktp->rktp_fetchq, rktp,
1930 rko_orig->rko_version);
1931 }
1932 }
1933
1934
1935
1936
1937 /**
1938 * @brief Decide whether this toppar should be on the fetch list or not.
1939 *
1940 * Also:
1941 * - update toppar's op version (for broker thread's copy)
1942 * - finalize statistics (move rktp_offsets to rktp_offsets_fin)
1943 *
1944 * @returns the partition's Fetch backoff timestamp, or 0 if no backoff.
1945 *
1946 * @locality broker thread
1947 * @locks none
1948 */
rd_kafka_toppar_fetch_decide(rd_kafka_toppar_t * rktp,rd_kafka_broker_t * rkb,int force_remove)1949 rd_ts_t rd_kafka_toppar_fetch_decide (rd_kafka_toppar_t *rktp,
1950 rd_kafka_broker_t *rkb,
1951 int force_remove) {
1952 int should_fetch = 1;
1953 const char *reason = "";
1954 int32_t version;
1955 rd_ts_t ts_backoff = 0;
1956 rd_bool_t lease_expired = rd_false;
1957
1958 rd_kafka_toppar_lock(rktp);
1959
1960 /* Check for preferred replica lease expiry */
1961 lease_expired =
1962 rktp->rktp_leader_id != rktp->rktp_broker_id &&
1963 rd_interval(&rktp->rktp_lease_intvl,
1964 5*60*1000*1000/*5 minutes*/, 0) > 0;
1965 if (lease_expired) {
1966 /* delete_to_leader() requires no locks to be held */
1967 rd_kafka_toppar_unlock(rktp);
1968 rd_kafka_toppar_delegate_to_leader(rktp);
1969 rd_kafka_toppar_lock(rktp);
1970
1971 reason = "preferred replica lease expired";
1972 should_fetch = 0;
1973 goto done;
1974 }
1975
1976 /* Forced removal from fetch list */
1977 if (unlikely(force_remove)) {
1978 reason = "forced removal";
1979 should_fetch = 0;
1980 goto done;
1981 }
1982
1983 if (unlikely((rktp->rktp_flags & RD_KAFKA_TOPPAR_F_REMOVE) != 0)) {
1984 reason = "partition removed";
1985 should_fetch = 0;
1986 goto done;
1987 }
1988
1989 /* Skip toppars not in active fetch state */
1990 if (rktp->rktp_fetch_state != RD_KAFKA_TOPPAR_FETCH_ACTIVE) {
1991 reason = "not in active fetch state";
1992 should_fetch = 0;
1993 goto done;
1994 }
1995
1996 /* Update broker thread's fetch op version */
1997 version = rktp->rktp_op_version;
1998 if (version > rktp->rktp_fetch_version ||
1999 rktp->rktp_next_offset != rktp->rktp_last_next_offset ||
2000 rktp->rktp_offsets.fetch_offset == RD_KAFKA_OFFSET_INVALID) {
2001 /* New version barrier, something was modified from the
2002 * control plane. Reset and start over.
2003 * Alternatively only the next_offset changed but not the
2004 * barrier, which is the case when automatically triggering
2005 * offset.reset (such as on PARTITION_EOF or
2006 * OFFSET_OUT_OF_RANGE). */
2007
2008 rd_kafka_dbg(rktp->rktp_rkt->rkt_rk, TOPIC, "FETCHDEC",
2009 "Topic %s [%"PRId32"]: fetch decide: "
2010 "updating to version %d (was %d) at "
2011 "offset %"PRId64" (was %"PRId64")",
2012 rktp->rktp_rkt->rkt_topic->str,
2013 rktp->rktp_partition,
2014 version, rktp->rktp_fetch_version,
2015 rktp->rktp_next_offset,
2016 rktp->rktp_offsets.fetch_offset);
2017
2018 rd_kafka_offset_stats_reset(&rktp->rktp_offsets);
2019
2020 /* New start offset */
2021 rktp->rktp_offsets.fetch_offset = rktp->rktp_next_offset;
2022 rktp->rktp_last_next_offset = rktp->rktp_next_offset;
2023
2024 rktp->rktp_fetch_version = version;
2025
2026 /* Clear last error to propagate new fetch
2027 * errors if encountered. */
2028 rktp->rktp_last_error = RD_KAFKA_RESP_ERR_NO_ERROR;
2029
2030 rd_kafka_q_purge_toppar_version(rktp->rktp_fetchq, rktp,
2031 version);
2032 }
2033
2034
2035 if (RD_KAFKA_TOPPAR_IS_PAUSED(rktp)) {
2036 should_fetch = 0;
2037 reason = "paused";
2038
2039 } else if (RD_KAFKA_OFFSET_IS_LOGICAL(rktp->rktp_next_offset)) {
2040 should_fetch = 0;
2041 reason = "no concrete offset";
2042
2043 } else if (rd_kafka_q_len(rktp->rktp_fetchq) >=
2044 rkb->rkb_rk->rk_conf.queued_min_msgs) {
2045 /* Skip toppars who's local message queue is already above
2046 * the lower threshold. */
2047 reason = "queued.min.messages exceeded";
2048 should_fetch = 0;
2049
2050 } else if ((int64_t)rd_kafka_q_size(rktp->rktp_fetchq) >=
2051 rkb->rkb_rk->rk_conf.queued_max_msg_bytes) {
2052 reason = "queued.max.messages.kbytes exceeded";
2053 should_fetch = 0;
2054
2055 } else if (rktp->rktp_ts_fetch_backoff > rd_clock()) {
2056 reason = "fetch backed off";
2057 ts_backoff = rktp->rktp_ts_fetch_backoff;
2058 should_fetch = 0;
2059 }
2060
2061 done:
2062 /* Copy offset stats to finalized place holder. */
2063 rktp->rktp_offsets_fin = rktp->rktp_offsets;
2064
2065 if (rktp->rktp_fetch != should_fetch) {
2066 rd_rkb_dbg(rkb, FETCH, "FETCH",
2067 "Topic %s [%"PRId32"] in state %s at offset %s "
2068 "(%d/%d msgs, %"PRId64"/%d kb queued, "
2069 "opv %"PRId32") is %s%s",
2070 rktp->rktp_rkt->rkt_topic->str,
2071 rktp->rktp_partition,
2072 rd_kafka_fetch_states[rktp->rktp_fetch_state],
2073 rd_kafka_offset2str(rktp->rktp_next_offset),
2074 rd_kafka_q_len(rktp->rktp_fetchq),
2075 rkb->rkb_rk->rk_conf.queued_min_msgs,
2076 rd_kafka_q_size(rktp->rktp_fetchq) / 1024,
2077 rkb->rkb_rk->rk_conf.queued_max_msg_kbytes,
2078 rktp->rktp_fetch_version,
2079 should_fetch ? "fetchable" : "not fetchable: ",
2080 reason);
2081
2082 if (should_fetch) {
2083 rd_dassert(rktp->rktp_fetch_version > 0);
2084 rd_kafka_broker_active_toppar_add(rkb, rktp,
2085 *reason ? reason :
2086 "fetchable");
2087 } else {
2088 rd_kafka_broker_active_toppar_del(rkb, rktp, reason);
2089 }
2090 }
2091
2092 rd_kafka_toppar_unlock(rktp);
2093
2094 /* Non-fetching partitions will have an
2095 * indefinate backoff, unless explicitly specified. */
2096 if (!should_fetch && !ts_backoff)
2097 ts_backoff = RD_TS_MAX;
2098
2099 return ts_backoff;
2100 }
2101
2102
2103 /**
2104 * @brief Serve a toppar in a consumer broker thread.
2105 * This is considered the fast path and should be minimal,
2106 * mostly focusing on fetch related mechanisms.
2107 *
2108 * @returns the partition's Fetch backoff timestamp, or 0 if no backoff.
2109 *
2110 * @locality broker thread
2111 * @locks none
2112 */
rd_kafka_broker_consumer_toppar_serve(rd_kafka_broker_t * rkb,rd_kafka_toppar_t * rktp)2113 rd_ts_t rd_kafka_broker_consumer_toppar_serve (rd_kafka_broker_t *rkb,
2114 rd_kafka_toppar_t *rktp) {
2115 return rd_kafka_toppar_fetch_decide(rktp, rkb, 0);
2116 }
2117
2118
2119
2120 /**
2121 * @brief Serve a toppar op
2122 *
2123 * @param rktp may be NULL for certain ops (OP_RECV_BUF)
2124 *
2125 * Will send an empty reply op if the request rko has a replyq set,
2126 * providing synchronous operation.
2127 *
2128 * @locality toppar handler thread
2129 */
2130 static rd_kafka_op_res_t
rd_kafka_toppar_op_serve(rd_kafka_t * rk,rd_kafka_q_t * rkq,rd_kafka_op_t * rko,rd_kafka_q_cb_type_t cb_type,void * opaque)2131 rd_kafka_toppar_op_serve (rd_kafka_t *rk,
2132 rd_kafka_q_t *rkq, rd_kafka_op_t *rko,
2133 rd_kafka_q_cb_type_t cb_type, void *opaque) {
2134 rd_kafka_toppar_t *rktp = NULL;
2135 int outdated = 0;
2136
2137 if (rko->rko_rktp)
2138 rktp = rko->rko_rktp;
2139
2140 if (rktp) {
2141 outdated = rd_kafka_op_version_outdated(rko,
2142 rktp->rktp_op_version);
2143
2144 rd_kafka_dbg(rktp->rktp_rkt->rkt_rk, TOPIC, "OP",
2145 "%.*s [%"PRId32"] received %sop %s "
2146 "(v%"PRId32") in fetch-state %s (opv%d)",
2147 RD_KAFKAP_STR_PR(rktp->rktp_rkt->rkt_topic),
2148 rktp->rktp_partition,
2149 outdated ? "outdated ": "",
2150 rd_kafka_op2str(rko->rko_type),
2151 rko->rko_version,
2152 rd_kafka_fetch_states[rktp->rktp_fetch_state],
2153 rktp->rktp_op_version);
2154
2155 if (outdated) {
2156 #if ENABLE_DEVEL
2157 rd_kafka_op_print(stdout, "PART_OUTDATED", rko);
2158 #endif
2159 rd_kafka_op_reply(rko, RD_KAFKA_RESP_ERR__OUTDATED);
2160 return RD_KAFKA_OP_RES_HANDLED;
2161 }
2162 }
2163
2164 switch ((int)rko->rko_type)
2165 {
2166 case RD_KAFKA_OP_FETCH_START:
2167 rd_kafka_toppar_fetch_start(rktp,
2168 rko->rko_u.fetch_start.offset, rko);
2169 break;
2170
2171 case RD_KAFKA_OP_FETCH_STOP:
2172 rd_kafka_toppar_fetch_stop(rktp, rko);
2173 break;
2174
2175 case RD_KAFKA_OP_SEEK:
2176 rd_kafka_toppar_seek(rktp, rko->rko_u.fetch_start.offset, rko);
2177 break;
2178
2179 case RD_KAFKA_OP_PAUSE:
2180 rd_kafka_toppar_pause_resume(rktp, rko);
2181 break;
2182
2183 case RD_KAFKA_OP_OFFSET_COMMIT | RD_KAFKA_OP_REPLY:
2184 rd_kafka_assert(NULL, rko->rko_u.offset_commit.cb);
2185 rko->rko_u.offset_commit.cb(
2186 rk, rko->rko_err,
2187 rko->rko_u.offset_commit.partitions,
2188 rko->rko_u.offset_commit.opaque);
2189 break;
2190
2191 case RD_KAFKA_OP_OFFSET_FETCH | RD_KAFKA_OP_REPLY:
2192 {
2193 /* OffsetFetch reply */
2194 rd_kafka_topic_partition_list_t *offsets =
2195 rko->rko_u.offset_fetch.partitions;
2196 int64_t offset = RD_KAFKA_OFFSET_INVALID;
2197
2198 rktp = offsets->elems[0]._private;
2199 if (!rko->rko_err) {
2200 /* Request succeeded but per-partition might have failed */
2201 rko->rko_err = offsets->elems[0].err;
2202 offset = offsets->elems[0].offset;
2203 }
2204 offsets->elems[0]._private = NULL;
2205 rd_kafka_topic_partition_list_destroy(offsets);
2206 rko->rko_u.offset_fetch.partitions = NULL;
2207
2208 rd_kafka_timer_stop(&rktp->rktp_rkt->rkt_rk->rk_timers,
2209 &rktp->rktp_offset_query_tmr,
2210 1/*lock*/);
2211
2212 rd_kafka_toppar_lock(rktp);
2213
2214 if (rko->rko_err) {
2215 rd_kafka_dbg(rktp->rktp_rkt->rkt_rk,
2216 TOPIC, "OFFSET",
2217 "Failed to fetch offset for "
2218 "%.*s [%"PRId32"]: %s",
2219 RD_KAFKAP_STR_PR(rktp->rktp_rkt->rkt_topic),
2220 rktp->rktp_partition,
2221 rd_kafka_err2str(rko->rko_err));
2222
2223 /* Keep on querying until we succeed. */
2224 rd_kafka_toppar_offset_retry(rktp, 500,
2225 "failed to fetch offsets");
2226 rd_kafka_toppar_unlock(rktp);
2227
2228
2229 /* Propagate error to application */
2230 if (rko->rko_err != RD_KAFKA_RESP_ERR__WAIT_COORD &&
2231 rko->rko_err !=
2232 RD_KAFKA_RESP_ERR_UNSTABLE_OFFSET_COMMIT)
2233 rd_kafka_consumer_err(
2234 rktp->rktp_fetchq,
2235 RD_KAFKA_NODEID_UA,
2236 rko->rko_err, 0,
2237 NULL, rktp,
2238 RD_KAFKA_OFFSET_INVALID,
2239 "Failed to fetch "
2240 "offsets from brokers: %s",
2241 rd_kafka_err2str(rko->rko_err));
2242
2243 rd_kafka_toppar_destroy(rktp);
2244
2245 break;
2246 }
2247
2248 rd_kafka_dbg(rktp->rktp_rkt->rkt_rk,
2249 TOPIC, "OFFSET",
2250 "%.*s [%"PRId32"]: OffsetFetch returned "
2251 "offset %s (%"PRId64")",
2252 RD_KAFKAP_STR_PR(rktp->rktp_rkt->rkt_topic),
2253 rktp->rktp_partition,
2254 rd_kafka_offset2str(offset), offset);
2255
2256 if (offset > 0)
2257 rktp->rktp_committed_offset = offset;
2258
2259 if (offset >= 0)
2260 rd_kafka_toppar_next_offset_handle(rktp, offset);
2261 else
2262 rd_kafka_offset_reset(rktp, offset,
2263 RD_KAFKA_RESP_ERR__NO_OFFSET,
2264 "no previously committed offset "
2265 "available");
2266 rd_kafka_toppar_unlock(rktp);
2267
2268 rd_kafka_toppar_destroy(rktp);
2269 }
2270 break;
2271
2272 default:
2273 rd_kafka_assert(NULL, !*"unknown type");
2274 break;
2275 }
2276
2277 rd_kafka_op_reply(rko, RD_KAFKA_RESP_ERR_NO_ERROR);
2278
2279 return RD_KAFKA_OP_RES_HANDLED;
2280 }
2281
2282
2283
2284
2285
2286 /**
2287 * Send command op to toppar (handled by toppar's thread).
2288 *
2289 * Locality: any thread
2290 */
rd_kafka_toppar_op0(rd_kafka_toppar_t * rktp,rd_kafka_op_t * rko,rd_kafka_replyq_t replyq)2291 static void rd_kafka_toppar_op0 (rd_kafka_toppar_t *rktp, rd_kafka_op_t *rko,
2292 rd_kafka_replyq_t replyq) {
2293 rko->rko_rktp = rd_kafka_toppar_keep(rktp);
2294 rko->rko_replyq = replyq;
2295
2296 rd_kafka_q_enq(rktp->rktp_ops, rko);
2297 }
2298
2299
2300 /**
2301 * Send command op to toppar (handled by toppar's thread).
2302 *
2303 * Locality: any thread
2304 */
rd_kafka_toppar_op(rd_kafka_toppar_t * rktp,rd_kafka_op_type_t type,int32_t version,int64_t offset,rd_kafka_cgrp_t * rkcg,rd_kafka_replyq_t replyq)2305 static void rd_kafka_toppar_op (rd_kafka_toppar_t *rktp,
2306 rd_kafka_op_type_t type, int32_t version,
2307 int64_t offset, rd_kafka_cgrp_t *rkcg,
2308 rd_kafka_replyq_t replyq) {
2309 rd_kafka_op_t *rko;
2310
2311 rko = rd_kafka_op_new(type);
2312 rko->rko_version = version;
2313 if (type == RD_KAFKA_OP_FETCH_START ||
2314 type == RD_KAFKA_OP_SEEK) {
2315 if (rkcg)
2316 rko->rko_u.fetch_start.rkcg = rkcg;
2317 rko->rko_u.fetch_start.offset = offset;
2318 }
2319
2320 rd_kafka_toppar_op0(rktp, rko, replyq);
2321 }
2322
2323
2324
2325 /**
2326 * Start consuming partition (async operation).
2327 * 'offset' is the initial offset
2328 * 'fwdq' is an optional queue to forward messages to, if this is NULL
2329 * then messages will be enqueued on rktp_fetchq.
2330 * 'replyq' is an optional queue for handling the consume_start ack.
2331 *
2332 * This is the thread-safe interface that can be called from any thread.
2333 */
rd_kafka_toppar_op_fetch_start(rd_kafka_toppar_t * rktp,int64_t offset,rd_kafka_q_t * fwdq,rd_kafka_replyq_t replyq)2334 rd_kafka_resp_err_t rd_kafka_toppar_op_fetch_start (rd_kafka_toppar_t *rktp,
2335 int64_t offset,
2336 rd_kafka_q_t *fwdq,
2337 rd_kafka_replyq_t replyq) {
2338 int32_t version;
2339
2340 rd_kafka_q_lock(rktp->rktp_fetchq);
2341 if (fwdq && !(rktp->rktp_fetchq->rkq_flags & RD_KAFKA_Q_F_FWD_APP))
2342 rd_kafka_q_fwd_set0(rktp->rktp_fetchq, fwdq,
2343 0, /* no do_lock */
2344 0 /* no fwd_app */);
2345 rd_kafka_q_unlock(rktp->rktp_fetchq);
2346
2347 /* Bump version barrier. */
2348 version = rd_kafka_toppar_version_new_barrier(rktp);
2349
2350 rd_kafka_dbg(rktp->rktp_rkt->rkt_rk, TOPIC, "CONSUMER",
2351 "Start consuming %.*s [%"PRId32"] at "
2352 "offset %s (v%"PRId32")",
2353 RD_KAFKAP_STR_PR(rktp->rktp_rkt->rkt_topic),
2354 rktp->rktp_partition, rd_kafka_offset2str(offset),
2355 version);
2356
2357 rd_kafka_toppar_op(rktp, RD_KAFKA_OP_FETCH_START, version,
2358 offset, rktp->rktp_rkt->rkt_rk->rk_cgrp, replyq);
2359
2360 return RD_KAFKA_RESP_ERR_NO_ERROR;
2361 }
2362
2363
2364 /**
2365 * Stop consuming partition (async operatoin)
2366 * This is thread-safe interface that can be called from any thread.
2367 *
2368 * Locality: any thread
2369 */
rd_kafka_toppar_op_fetch_stop(rd_kafka_toppar_t * rktp,rd_kafka_replyq_t replyq)2370 rd_kafka_resp_err_t rd_kafka_toppar_op_fetch_stop (rd_kafka_toppar_t *rktp,
2371 rd_kafka_replyq_t replyq) {
2372 int32_t version;
2373
2374 /* Bump version barrier. */
2375 version = rd_kafka_toppar_version_new_barrier(rktp);
2376
2377 rd_kafka_dbg(rktp->rktp_rkt->rkt_rk, TOPIC, "CONSUMER",
2378 "Stop consuming %.*s [%"PRId32"] (v%"PRId32")",
2379 RD_KAFKAP_STR_PR(rktp->rktp_rkt->rkt_topic),
2380 rktp->rktp_partition, version);
2381
2382 rd_kafka_toppar_op(rktp, RD_KAFKA_OP_FETCH_STOP, version,
2383 0, NULL, replyq);
2384
2385 return RD_KAFKA_RESP_ERR_NO_ERROR;
2386 }
2387
2388
2389 /**
2390 * Set/Seek offset of a consumed partition (async operation).
2391 * 'offset' is the target offset
2392 * 'replyq' is an optional queue for handling the ack.
2393 *
2394 * This is the thread-safe interface that can be called from any thread.
2395 */
rd_kafka_toppar_op_seek(rd_kafka_toppar_t * rktp,int64_t offset,rd_kafka_replyq_t replyq)2396 rd_kafka_resp_err_t rd_kafka_toppar_op_seek (rd_kafka_toppar_t *rktp,
2397 int64_t offset,
2398 rd_kafka_replyq_t replyq) {
2399 int32_t version;
2400
2401 /* Bump version barrier. */
2402 version = rd_kafka_toppar_version_new_barrier(rktp);
2403
2404 rd_kafka_dbg(rktp->rktp_rkt->rkt_rk, TOPIC, "CONSUMER",
2405 "Seek %.*s [%"PRId32"] to "
2406 "offset %s (v%"PRId32")",
2407 RD_KAFKAP_STR_PR(rktp->rktp_rkt->rkt_topic),
2408 rktp->rktp_partition, rd_kafka_offset2str(offset),
2409 version);
2410
2411 rd_kafka_toppar_op(rktp, RD_KAFKA_OP_SEEK, version,
2412 offset, NULL, replyq);
2413
2414 return RD_KAFKA_RESP_ERR_NO_ERROR;
2415 }
2416
2417
2418 /**
2419 * @brief Pause/resume partition (async operation).
2420 *
2421 * @param flag is either RD_KAFKA_TOPPAR_F_APP_PAUSE or .._F_LIB_PAUSE
2422 * depending on if the app paused or librdkafka.
2423 * @param pause is 1 for pausing or 0 for resuming.
2424 *
2425 * @locality any
2426 */
2427 rd_kafka_resp_err_t
rd_kafka_toppar_op_pause_resume(rd_kafka_toppar_t * rktp,int pause,int flag,rd_kafka_replyq_t replyq)2428 rd_kafka_toppar_op_pause_resume (rd_kafka_toppar_t *rktp, int pause, int flag,
2429 rd_kafka_replyq_t replyq) {
2430 int32_t version;
2431 rd_kafka_op_t *rko;
2432
2433 /* Bump version barrier. */
2434 version = rd_kafka_toppar_version_new_barrier(rktp);
2435
2436 rd_kafka_dbg(rktp->rktp_rkt->rkt_rk, TOPIC, pause ? "PAUSE":"RESUME",
2437 "%s %.*s [%"PRId32"] (v%"PRId32")",
2438 pause ? "Pause" : "Resume",
2439 RD_KAFKAP_STR_PR(rktp->rktp_rkt->rkt_topic),
2440 rktp->rktp_partition, version);
2441
2442 rko = rd_kafka_op_new(RD_KAFKA_OP_PAUSE);
2443 rko->rko_version = version;
2444 rko->rko_u.pause.pause = pause;
2445 rko->rko_u.pause.flag = flag;
2446
2447 rd_kafka_toppar_op0(rktp, rko, replyq);
2448
2449 return RD_KAFKA_RESP_ERR_NO_ERROR;
2450 }
2451
2452
2453 /**
2454 * @brief Pause a toppar (asynchronous).
2455 *
2456 * @param flag is either RD_KAFKA_TOPPAR_F_APP_PAUSE or .._F_LIB_PAUSE
2457 * depending on if the app paused or librdkafka.
2458 *
2459 * @locality any
2460 * @locks none needed
2461 */
rd_kafka_toppar_pause(rd_kafka_toppar_t * rktp,int flag)2462 void rd_kafka_toppar_pause (rd_kafka_toppar_t *rktp, int flag) {
2463 rd_kafka_toppar_op_pause_resume(rktp, 1/*pause*/, flag,
2464 RD_KAFKA_NO_REPLYQ);
2465 }
2466
2467 /**
2468 * @brief Resume a toppar (asynchronous).
2469 *
2470 * @param flag is either RD_KAFKA_TOPPAR_F_APP_PAUSE or .._F_LIB_PAUSE
2471 * depending on if the app paused or librdkafka.
2472 *
2473 * @locality any
2474 * @locks none needed
2475 */
rd_kafka_toppar_resume(rd_kafka_toppar_t * rktp,int flag)2476 void rd_kafka_toppar_resume (rd_kafka_toppar_t *rktp, int flag) {
2477 rd_kafka_toppar_op_pause_resume(rktp, 1/*pause*/, flag,
2478 RD_KAFKA_NO_REPLYQ);
2479 }
2480
2481
2482
2483 /**
2484 * @brief Pause or resume a list of partitions.
2485 *
2486 * @param flag is either RD_KAFKA_TOPPAR_F_APP_PAUSE or .._F_LIB_PAUSE
2487 * depending on if the app paused or librdkafka.
2488 * @param pause true for pausing, false for resuming.
2489 * @param async RD_SYNC to wait for background thread to handle op,
2490 * RD_ASYNC for asynchronous operation.
2491 *
2492 * @locality any
2493 *
2494 * @remark This is an asynchronous call, the actual pause/resume is performed
2495 * by toppar_pause() in the toppar's handler thread.
2496 */
2497 rd_kafka_resp_err_t
rd_kafka_toppars_pause_resume(rd_kafka_t * rk,rd_bool_t pause,rd_async_t async,int flag,rd_kafka_topic_partition_list_t * partitions)2498 rd_kafka_toppars_pause_resume (rd_kafka_t *rk,
2499 rd_bool_t pause, rd_async_t async, int flag,
2500 rd_kafka_topic_partition_list_t *partitions) {
2501 int i;
2502 int waitcnt = 0;
2503 rd_kafka_q_t *tmpq = NULL;
2504
2505 if (!async)
2506 tmpq = rd_kafka_q_new(rk);
2507
2508 rd_kafka_dbg(rk, TOPIC, pause ? "PAUSE":"RESUME",
2509 "%s %s %d partition(s)",
2510 flag & RD_KAFKA_TOPPAR_F_APP_PAUSE ? "Application" : "Library",
2511 pause ? "pausing" : "resuming", partitions->cnt);
2512
2513 for (i = 0 ; i < partitions->cnt ; i++) {
2514 rd_kafka_topic_partition_t *rktpar = &partitions->elems[i];
2515 rd_kafka_toppar_t *rktp;
2516
2517 rktp = rd_kafka_topic_partition_get_toppar(rk, rktpar,
2518 rd_false);
2519 if (!rktp) {
2520 rd_kafka_dbg(rk, TOPIC, pause ? "PAUSE":"RESUME",
2521 "%s %s [%"PRId32"]: skipped: "
2522 "unknown partition",
2523 pause ? "Pause":"Resume",
2524 rktpar->topic, rktpar->partition);
2525
2526 rktpar->err = RD_KAFKA_RESP_ERR__UNKNOWN_PARTITION;
2527 continue;
2528 }
2529
2530 rd_kafka_toppar_op_pause_resume(rktp, pause, flag,
2531 RD_KAFKA_REPLYQ(tmpq, 0));
2532
2533 if (!async)
2534 waitcnt++;
2535
2536 rd_kafka_toppar_destroy(rktp);
2537
2538 rktpar->err = RD_KAFKA_RESP_ERR_NO_ERROR;
2539 }
2540
2541 if (!async) {
2542 while (waitcnt-- > 0)
2543 rd_kafka_q_wait_result(tmpq, RD_POLL_INFINITE);
2544
2545 rd_kafka_q_destroy_owner(tmpq);
2546 }
2547
2548 return RD_KAFKA_RESP_ERR_NO_ERROR;
2549 }
2550
2551
2552
2553
2554
2555 /**
2556 * Propagate error for toppar
2557 */
rd_kafka_toppar_enq_error(rd_kafka_toppar_t * rktp,rd_kafka_resp_err_t err,const char * reason)2558 void rd_kafka_toppar_enq_error (rd_kafka_toppar_t *rktp,
2559 rd_kafka_resp_err_t err,
2560 const char *reason) {
2561 rd_kafka_op_t *rko;
2562 char buf[512];
2563
2564 rko = rd_kafka_op_new(RD_KAFKA_OP_ERR);
2565 rko->rko_err = err;
2566 rko->rko_rktp = rd_kafka_toppar_keep(rktp);
2567
2568 rd_snprintf(buf, sizeof(buf), "%.*s [%"PRId32"]: %s (%s)",
2569 RD_KAFKAP_STR_PR(rktp->rktp_rkt->rkt_topic),
2570 rktp->rktp_partition, reason,
2571 rd_kafka_err2str(err));
2572
2573 rko->rko_u.err.errstr = rd_strdup(buf);
2574
2575 rd_kafka_q_enq(rktp->rktp_fetchq, rko);
2576 }
2577
2578
2579
2580
2581
2582 /**
2583 * Returns the currently delegated broker for this toppar.
2584 * If \p proper_broker is set NULL will be returned if current handler
2585 * is not a proper broker (INTERNAL broker).
2586 *
2587 * The returned broker has an increased refcount.
2588 *
2589 * Locks: none
2590 */
rd_kafka_toppar_broker(rd_kafka_toppar_t * rktp,int proper_broker)2591 rd_kafka_broker_t *rd_kafka_toppar_broker (rd_kafka_toppar_t *rktp,
2592 int proper_broker) {
2593 rd_kafka_broker_t *rkb;
2594 rd_kafka_toppar_lock(rktp);
2595 rkb = rktp->rktp_broker;
2596 if (rkb) {
2597 if (proper_broker && rkb->rkb_source == RD_KAFKA_INTERNAL)
2598 rkb = NULL;
2599 else
2600 rd_kafka_broker_keep(rkb);
2601 }
2602 rd_kafka_toppar_unlock(rktp);
2603
2604 return rkb;
2605 }
2606
2607
2608 /**
2609 * @brief Take action when partition broker becomes unavailable.
2610 * This should be called when requests fail with
2611 * NOT_LEADER_FOR.. or similar error codes, e.g. ProduceRequest.
2612 *
2613 * @locks none
2614 * @locality any
2615 */
rd_kafka_toppar_leader_unavailable(rd_kafka_toppar_t * rktp,const char * reason,rd_kafka_resp_err_t err)2616 void rd_kafka_toppar_leader_unavailable (rd_kafka_toppar_t *rktp,
2617 const char *reason,
2618 rd_kafka_resp_err_t err) {
2619 rd_kafka_topic_t *rkt = rktp->rktp_rkt;
2620
2621 rd_kafka_dbg(rkt->rkt_rk, TOPIC, "BROKERUA",
2622 "%s [%"PRId32"]: broker unavailable: %s: %s",
2623 rkt->rkt_topic->str, rktp->rktp_partition, reason,
2624 rd_kafka_err2str(err));
2625
2626 rd_kafka_topic_wrlock(rkt);
2627 rkt->rkt_flags |= RD_KAFKA_TOPIC_F_LEADER_UNAVAIL;
2628 rd_kafka_topic_wrunlock(rkt);
2629
2630 rd_kafka_topic_fast_leader_query(rkt->rkt_rk);
2631 }
2632
2633
2634 const char *
rd_kafka_topic_partition_topic(const rd_kafka_topic_partition_t * rktpar)2635 rd_kafka_topic_partition_topic (const rd_kafka_topic_partition_t *rktpar) {
2636 const rd_kafka_toppar_t *rktp = (const rd_kafka_toppar_t *)rktpar;
2637 return rktp->rktp_rkt->rkt_topic->str;
2638 }
2639
2640 int32_t
rd_kafka_topic_partition_partition(const rd_kafka_topic_partition_t * rktpar)2641 rd_kafka_topic_partition_partition (const rd_kafka_topic_partition_t *rktpar) {
2642 const rd_kafka_toppar_t *rktp = (const rd_kafka_toppar_t *)rktpar;
2643 return rktp->rktp_partition;
2644 }
2645
rd_kafka_topic_partition_get(const rd_kafka_topic_partition_t * rktpar,const char ** name,int32_t * partition)2646 void rd_kafka_topic_partition_get (const rd_kafka_topic_partition_t *rktpar,
2647 const char **name, int32_t *partition) {
2648 const rd_kafka_toppar_t *rktp = (const rd_kafka_toppar_t *)rktpar;
2649 *name = rktp->rktp_rkt->rkt_topic->str;
2650 *partition = rktp->rktp_partition;
2651 }
2652
2653
2654
2655
2656 /**
2657 *
2658 * rd_kafka_topic_partition_t lists
2659 * Fixed-size non-growable list of partitions for propagation to application.
2660 *
2661 */
2662
2663
2664 static void
rd_kafka_topic_partition_list_grow(rd_kafka_topic_partition_list_t * rktparlist,int add_size)2665 rd_kafka_topic_partition_list_grow (rd_kafka_topic_partition_list_t *rktparlist,
2666 int add_size) {
2667 if (add_size < rktparlist->size)
2668 add_size = RD_MAX(rktparlist->size, 32);
2669
2670 rktparlist->size += add_size;
2671 rktparlist->elems = rd_realloc(rktparlist->elems,
2672 sizeof(*rktparlist->elems) *
2673 rktparlist->size);
2674
2675 }
2676
2677
2678 /**
2679 * @brief Initialize a list for fitting \p size partitions.
2680 */
rd_kafka_topic_partition_list_init(rd_kafka_topic_partition_list_t * rktparlist,int size)2681 void rd_kafka_topic_partition_list_init (
2682 rd_kafka_topic_partition_list_t *rktparlist, int size) {
2683 memset(rktparlist, 0, sizeof(*rktparlist));
2684
2685 if (size > 0)
2686 rd_kafka_topic_partition_list_grow(rktparlist, size);
2687 }
2688
2689
2690 /**
2691 * Create a list for fitting 'size' topic_partitions (rktp).
2692 */
rd_kafka_topic_partition_list_new(int size)2693 rd_kafka_topic_partition_list_t *rd_kafka_topic_partition_list_new (int size) {
2694 rd_kafka_topic_partition_list_t *rktparlist;
2695
2696 rktparlist = rd_calloc(1, sizeof(*rktparlist));
2697
2698 if (size > 0)
2699 rd_kafka_topic_partition_list_grow(rktparlist, size);
2700
2701 return rktparlist;
2702 }
2703
2704
2705
rd_kafka_topic_partition_new(const char * topic,int32_t partition)2706 rd_kafka_topic_partition_t *rd_kafka_topic_partition_new (const char *topic,
2707 int32_t partition) {
2708 rd_kafka_topic_partition_t *rktpar = rd_calloc(1, sizeof(*rktpar));
2709
2710 rktpar->topic = rd_strdup(topic);
2711 rktpar->partition = partition;
2712
2713 return rktpar;
2714 }
2715
2716
2717 rd_kafka_topic_partition_t *
rd_kafka_topic_partition_copy(const rd_kafka_topic_partition_t * src)2718 rd_kafka_topic_partition_copy (const rd_kafka_topic_partition_t *src) {
2719 return rd_kafka_topic_partition_new(src->topic, src->partition);
2720 }
2721
2722
2723 /** Same as above but with generic void* signature */
rd_kafka_topic_partition_copy_void(const void * src)2724 void *rd_kafka_topic_partition_copy_void (const void *src) {
2725 return rd_kafka_topic_partition_copy(src);
2726 }
2727
2728
2729 rd_kafka_topic_partition_t *
rd_kafka_topic_partition_new_from_rktp(rd_kafka_toppar_t * rktp)2730 rd_kafka_topic_partition_new_from_rktp (rd_kafka_toppar_t *rktp) {
2731 rd_kafka_topic_partition_t *rktpar = rd_calloc(1, sizeof(*rktpar));
2732
2733 rktpar->topic = RD_KAFKAP_STR_DUP(rktp->rktp_rkt->rkt_topic);
2734 rktpar->partition = rktp->rktp_partition;
2735
2736 return rktpar;
2737 }
2738
2739
2740
2741 static void
rd_kafka_topic_partition_destroy0(rd_kafka_topic_partition_t * rktpar,int do_free)2742 rd_kafka_topic_partition_destroy0 (rd_kafka_topic_partition_t *rktpar, int do_free) {
2743 if (rktpar->topic)
2744 rd_free(rktpar->topic);
2745 if (rktpar->metadata)
2746 rd_free(rktpar->metadata);
2747 if (rktpar->_private)
2748 rd_kafka_toppar_destroy((rd_kafka_toppar_t *)rktpar->_private);
2749
2750 if (do_free)
2751 rd_free(rktpar);
2752 }
2753
2754
2755 /**
2756 * @brief Destroy all partitions in list.
2757 *
2758 * @remark The allocated size of the list will not shrink.
2759 */
rd_kafka_topic_partition_list_clear(rd_kafka_topic_partition_list_t * rktparlist)2760 void rd_kafka_topic_partition_list_clear (
2761 rd_kafka_topic_partition_list_t *rktparlist) {
2762 int i;
2763
2764 for (i = 0 ; i < rktparlist->cnt ; i++)
2765 rd_kafka_topic_partition_destroy0(&rktparlist->elems[i], 0);
2766
2767 rktparlist->cnt = 0;
2768 }
2769
2770
rd_kafka_topic_partition_destroy_free(void * ptr)2771 void rd_kafka_topic_partition_destroy_free (void *ptr) {
2772 rd_kafka_topic_partition_destroy0(ptr, rd_true/*do_free*/);
2773 }
2774
rd_kafka_topic_partition_destroy(rd_kafka_topic_partition_t * rktpar)2775 void rd_kafka_topic_partition_destroy (rd_kafka_topic_partition_t *rktpar) {
2776 rd_kafka_topic_partition_destroy0(rktpar, 1);
2777 }
2778
2779
2780 /**
2781 * Destroys a list previously created with .._list_new() and drops
2782 * any references to contained toppars.
2783 */
2784 void
rd_kafka_topic_partition_list_destroy(rd_kafka_topic_partition_list_t * rktparlist)2785 rd_kafka_topic_partition_list_destroy (rd_kafka_topic_partition_list_t *rktparlist) {
2786 int i;
2787
2788 for (i = 0 ; i < rktparlist->cnt ; i++)
2789 rd_kafka_topic_partition_destroy0(&rktparlist->elems[i], 0);
2790
2791 if (rktparlist->elems)
2792 rd_free(rktparlist->elems);
2793
2794 rd_free(rktparlist);
2795 }
2796
2797
2798 /**
2799 * @brief Wrapper for rd_kafka_topic_partition_list_destroy() that
2800 * matches the standard free(void *) signature, for callback use.
2801 */
rd_kafka_topic_partition_list_destroy_free(void * ptr)2802 void rd_kafka_topic_partition_list_destroy_free (void *ptr) {
2803 rd_kafka_topic_partition_list_destroy(
2804 (rd_kafka_topic_partition_list_t *)ptr);
2805 }
2806
2807
2808 /**
2809 * Add a partition to an rktpar list.
2810 * The list must have enough room to fit it.
2811 *
2812 * '_private' must be NULL or a valid 'rd_kafka_toppar_t *'.
2813 *
2814 * Returns a pointer to the added element.
2815 */
2816 rd_kafka_topic_partition_t *
rd_kafka_topic_partition_list_add0(const char * func,int line,rd_kafka_topic_partition_list_t * rktparlist,const char * topic,int32_t partition,rd_kafka_toppar_t * _private)2817 rd_kafka_topic_partition_list_add0 (const char *func, int line,
2818 rd_kafka_topic_partition_list_t *rktparlist,
2819 const char *topic, int32_t partition,
2820 rd_kafka_toppar_t *_private) {
2821 rd_kafka_topic_partition_t *rktpar;
2822 if (rktparlist->cnt == rktparlist->size)
2823 rd_kafka_topic_partition_list_grow(rktparlist, 1);
2824 rd_kafka_assert(NULL, rktparlist->cnt < rktparlist->size);
2825
2826 rktpar = &rktparlist->elems[rktparlist->cnt++];
2827 memset(rktpar, 0, sizeof(*rktpar));
2828 rktpar->topic = rd_strdup(topic);
2829 rktpar->partition = partition;
2830 rktpar->offset = RD_KAFKA_OFFSET_INVALID;
2831 rktpar->_private = _private;
2832 if (_private)
2833 rd_kafka_toppar_keep_fl(func, line, _private);
2834
2835 return rktpar;
2836 }
2837
2838
2839 rd_kafka_topic_partition_t *
rd_kafka_topic_partition_list_add(rd_kafka_topic_partition_list_t * rktparlist,const char * topic,int32_t partition)2840 rd_kafka_topic_partition_list_add (rd_kafka_topic_partition_list_t *rktparlist,
2841 const char *topic, int32_t partition) {
2842 return rd_kafka_topic_partition_list_add0(__FUNCTION__,__LINE__,
2843 rktparlist,
2844 topic, partition, NULL);
2845 }
2846
2847
2848 /**
2849 * Adds a consecutive list of partitions to a list
2850 */
2851 void
rd_kafka_topic_partition_list_add_range(rd_kafka_topic_partition_list_t * rktparlist,const char * topic,int32_t start,int32_t stop)2852 rd_kafka_topic_partition_list_add_range (rd_kafka_topic_partition_list_t
2853 *rktparlist,
2854 const char *topic,
2855 int32_t start, int32_t stop) {
2856
2857 for (; start <= stop ; start++)
2858 rd_kafka_topic_partition_list_add(rktparlist, topic, start);
2859 }
2860
2861
2862 rd_kafka_topic_partition_t *
rd_kafka_topic_partition_list_upsert(rd_kafka_topic_partition_list_t * rktparlist,const char * topic,int32_t partition)2863 rd_kafka_topic_partition_list_upsert (
2864 rd_kafka_topic_partition_list_t *rktparlist,
2865 const char *topic, int32_t partition) {
2866 rd_kafka_topic_partition_t *rktpar;
2867
2868 if ((rktpar = rd_kafka_topic_partition_list_find(rktparlist,
2869 topic, partition)))
2870 return rktpar;
2871
2872 return rd_kafka_topic_partition_list_add(rktparlist, topic, partition);
2873 }
2874
2875
2876 /**
2877 * @brief Update \p dst with info from \p src.
2878 */
rd_kafka_topic_partition_update(rd_kafka_topic_partition_t * dst,const rd_kafka_topic_partition_t * src)2879 void rd_kafka_topic_partition_update (rd_kafka_topic_partition_t *dst,
2880 const rd_kafka_topic_partition_t *src) {
2881 rd_dassert(!strcmp(dst->topic, src->topic));
2882 rd_dassert(dst->partition == src->partition);
2883 rd_dassert(dst != src);
2884
2885 dst->offset = src->offset;
2886 dst->opaque = src->opaque;
2887 dst->err = src->err;
2888
2889 if (src->metadata_size > 0) {
2890 dst->metadata = rd_malloc(src->metadata_size);
2891 dst->metadata_size = src->metadata_size;;
2892 memcpy(dst->metadata, src->metadata, dst->metadata_size);
2893 }
2894 }
2895
2896 /**
2897 * @brief Creates a copy of \p rktpar and adds it to \p rktparlist
2898 */
rd_kafka_topic_partition_list_add_copy(rd_kafka_topic_partition_list_t * rktparlist,const rd_kafka_topic_partition_t * rktpar)2899 void rd_kafka_topic_partition_list_add_copy (
2900 rd_kafka_topic_partition_list_t *rktparlist,
2901 const rd_kafka_topic_partition_t *rktpar) {
2902 rd_kafka_topic_partition_t *dst;
2903
2904 dst = rd_kafka_topic_partition_list_add0(
2905 __FUNCTION__,__LINE__,
2906 rktparlist,
2907 rktpar->topic,
2908 rktpar->partition,
2909 rktpar->_private);
2910
2911 rd_kafka_topic_partition_update(dst, rktpar);
2912 }
2913
2914
2915
2916 /**
2917 * Create and return a copy of list 'src'
2918 */
2919 rd_kafka_topic_partition_list_t *
rd_kafka_topic_partition_list_copy(const rd_kafka_topic_partition_list_t * src)2920 rd_kafka_topic_partition_list_copy (const rd_kafka_topic_partition_list_t *src){
2921 rd_kafka_topic_partition_list_t *dst;
2922 int i;
2923
2924 dst = rd_kafka_topic_partition_list_new(src->size);
2925
2926 for (i = 0 ; i < src->cnt ; i++)
2927 rd_kafka_topic_partition_list_add_copy(dst, &src->elems[i]);
2928 return dst;
2929 }
2930
2931 /**
2932 * @brief Same as rd_kafka_topic_partition_list_copy() but suitable for
2933 * rd_list_copy(). The \p opaque is ignored.
2934 */
2935 void *
rd_kafka_topic_partition_list_copy_opaque(const void * src,void * opaque)2936 rd_kafka_topic_partition_list_copy_opaque (const void *src, void *opaque) {
2937 return rd_kafka_topic_partition_list_copy(src);
2938 }
2939
2940 /**
2941 * @brief Append copies of all elements in \p src to \p dst.
2942 * No duplicate-checks are performed.
2943 */
rd_kafka_topic_partition_list_add_list(rd_kafka_topic_partition_list_t * dst,const rd_kafka_topic_partition_list_t * src)2944 void rd_kafka_topic_partition_list_add_list (
2945 rd_kafka_topic_partition_list_t *dst,
2946 const rd_kafka_topic_partition_list_t *src) {
2947 int i;
2948
2949 if (src->cnt == 0)
2950 return;
2951
2952 if (dst->size < dst->cnt + src->cnt)
2953 rd_kafka_topic_partition_list_grow(dst, src->cnt);
2954
2955 for (i = 0 ; i < src->cnt ; i++)
2956 rd_kafka_topic_partition_list_add_copy(dst, &src->elems[i]);
2957 }
2958
2959
2960 /**
2961 * @brief Compare two partition lists using partition comparator \p cmp.
2962 *
2963 * @warning This is an O(Na*Nb) operation.
2964 */
2965 int
rd_kafka_topic_partition_list_cmp(const void * _a,const void * _b,int (* cmp)(const void *,const void *))2966 rd_kafka_topic_partition_list_cmp (const void *_a, const void *_b,
2967 int (*cmp) (const void *, const void *)) {
2968 const rd_kafka_topic_partition_list_t *a = _a, *b = _b;
2969 int r;
2970 int i;
2971
2972 r = a->cnt - b->cnt;
2973 if (r || a->cnt == 0)
2974 return r;
2975
2976 /* Since the lists may not be sorted we need to scan all of B
2977 * for each element in A.
2978 * FIXME: If the list sizes are larger than X we could create a
2979 * temporary hash map instead. */
2980 for (i = 0 ; i < a->cnt ; i++) {
2981 int j;
2982
2983 for (j = 0 ; j < b->cnt ; j++) {
2984 r = cmp(&a->elems[i], &b->elems[j]);
2985 if (!r)
2986 break;
2987 }
2988
2989 if (j == b->cnt)
2990 return 1;
2991 }
2992
2993 return 0;
2994 }
2995
2996
2997 /**
2998 * @brief Ensures the \p rktpar has a toppar set in _private.
2999 *
3000 * @returns the toppar object (or possibly NULL if \p create_on_miss is true)
3001 * WITHOUT refcnt increased.
3002 */
3003 rd_kafka_toppar_t *
rd_kafka_topic_partition_ensure_toppar(rd_kafka_t * rk,rd_kafka_topic_partition_t * rktpar,rd_bool_t create_on_miss)3004 rd_kafka_topic_partition_ensure_toppar (rd_kafka_t *rk,
3005 rd_kafka_topic_partition_t *rktpar,
3006 rd_bool_t create_on_miss) {
3007 if (!rktpar->_private)
3008 rktpar->_private =
3009 rd_kafka_toppar_get2(rk,
3010 rktpar->topic,
3011 rktpar->partition, 0,
3012 create_on_miss);
3013 return rktpar->_private;
3014 }
3015
3016
3017 /**
3018 * @returns (and sets if necessary) the \p rktpar's _private / toppar.
3019 * @remark a new reference is returned.
3020 */
3021 rd_kafka_toppar_t *
rd_kafka_topic_partition_get_toppar(rd_kafka_t * rk,rd_kafka_topic_partition_t * rktpar,rd_bool_t create_on_miss)3022 rd_kafka_topic_partition_get_toppar (rd_kafka_t *rk,
3023 rd_kafka_topic_partition_t *rktpar,
3024 rd_bool_t create_on_miss) {
3025 rd_kafka_toppar_t *rktp;
3026
3027 rktp = rd_kafka_topic_partition_ensure_toppar(rk, rktpar,
3028 create_on_miss);
3029
3030 if (rktp)
3031 rd_kafka_toppar_keep(rktp);
3032
3033 return rktp;
3034 }
3035
3036
rd_kafka_topic_partition_cmp(const void * _a,const void * _b)3037 int rd_kafka_topic_partition_cmp (const void *_a, const void *_b) {
3038 const rd_kafka_topic_partition_t *a = _a;
3039 const rd_kafka_topic_partition_t *b = _b;
3040 int r = strcmp(a->topic, b->topic);
3041 if (r)
3042 return r;
3043 else
3044 return RD_CMP(a->partition, b->partition);
3045 }
3046
3047 /** @brief Compare only the topic */
rd_kafka_topic_partition_cmp_topic(const void * _a,const void * _b)3048 int rd_kafka_topic_partition_cmp_topic (const void *_a, const void *_b) {
3049 const rd_kafka_topic_partition_t *a = _a;
3050 const rd_kafka_topic_partition_t *b = _b;
3051 return strcmp(a->topic, b->topic);
3052 }
3053
rd_kafka_topic_partition_cmp_opaque(const void * _a,const void * _b,void * opaque)3054 static int rd_kafka_topic_partition_cmp_opaque (const void *_a, const void *_b,
3055 void *opaque) {
3056 return rd_kafka_topic_partition_cmp(_a, _b);
3057 }
3058
3059 /** @returns a hash of the topic and partition */
rd_kafka_topic_partition_hash(const void * _a)3060 unsigned int rd_kafka_topic_partition_hash (const void *_a) {
3061 const rd_kafka_topic_partition_t *a = _a;
3062 int r = 31 * 17 + a->partition;
3063 return 31 * r + rd_string_hash(a->topic, -1);
3064 }
3065
3066
3067
3068 /**
3069 * @brief Search 'rktparlist' for 'topic' and 'partition'.
3070 * @returns the elems[] index or -1 on miss.
3071 */
3072 static int
rd_kafka_topic_partition_list_find0(const rd_kafka_topic_partition_list_t * rktparlist,const char * topic,int32_t partition,int (* cmp)(const void *,const void *))3073 rd_kafka_topic_partition_list_find0 (
3074 const rd_kafka_topic_partition_list_t *rktparlist,
3075 const char *topic, int32_t partition,
3076 int (*cmp) (const void *, const void *)) {
3077 rd_kafka_topic_partition_t skel;
3078 int i;
3079
3080 skel.topic = (char *)topic;
3081 skel.partition = partition;
3082
3083 for (i = 0 ; i < rktparlist->cnt ; i++) {
3084 if (!cmp(&skel, &rktparlist->elems[i]))
3085 return i;
3086 }
3087
3088 return -1;
3089 }
3090
3091 rd_kafka_topic_partition_t *
rd_kafka_topic_partition_list_find(const rd_kafka_topic_partition_list_t * rktparlist,const char * topic,int32_t partition)3092 rd_kafka_topic_partition_list_find (
3093 const rd_kafka_topic_partition_list_t *rktparlist,
3094 const char *topic, int32_t partition) {
3095 int i = rd_kafka_topic_partition_list_find0(
3096 rktparlist, topic, partition, rd_kafka_topic_partition_cmp);
3097 if (i == -1)
3098 return NULL;
3099 else
3100 return &rktparlist->elems[i];
3101 }
3102
3103
3104 int
rd_kafka_topic_partition_list_find_idx(const rd_kafka_topic_partition_list_t * rktparlist,const char * topic,int32_t partition)3105 rd_kafka_topic_partition_list_find_idx (
3106 const rd_kafka_topic_partition_list_t *rktparlist,
3107 const char *topic, int32_t partition) {
3108 return rd_kafka_topic_partition_list_find0(
3109 rktparlist, topic, partition, rd_kafka_topic_partition_cmp);
3110 }
3111
3112
3113 /**
3114 * @returns the first element that matches \p topic, regardless of partition.
3115 */
3116 rd_kafka_topic_partition_t *
rd_kafka_topic_partition_list_find_topic(const rd_kafka_topic_partition_list_t * rktparlist,const char * topic)3117 rd_kafka_topic_partition_list_find_topic (
3118 const rd_kafka_topic_partition_list_t *rktparlist, const char *topic) {
3119 int i = rd_kafka_topic_partition_list_find0(
3120 rktparlist, topic, RD_KAFKA_PARTITION_UA,
3121 rd_kafka_topic_partition_cmp_topic);
3122 if (i == -1)
3123 return NULL;
3124 else
3125 return &rktparlist->elems[i];
3126 }
3127
3128
3129 int
rd_kafka_topic_partition_list_del_by_idx(rd_kafka_topic_partition_list_t * rktparlist,int idx)3130 rd_kafka_topic_partition_list_del_by_idx (rd_kafka_topic_partition_list_t *rktparlist,
3131 int idx) {
3132 if (unlikely(idx < 0 || idx >= rktparlist->cnt))
3133 return 0;
3134
3135 rd_kafka_topic_partition_destroy0(&rktparlist->elems[idx], 0);
3136 memmove(&rktparlist->elems[idx], &rktparlist->elems[idx+1],
3137 (rktparlist->cnt - idx - 1) * sizeof(rktparlist->elems[idx]));
3138 rktparlist->cnt--;
3139
3140 return 1;
3141 }
3142
3143
3144 int
rd_kafka_topic_partition_list_del(rd_kafka_topic_partition_list_t * rktparlist,const char * topic,int32_t partition)3145 rd_kafka_topic_partition_list_del (rd_kafka_topic_partition_list_t *rktparlist,
3146 const char *topic, int32_t partition) {
3147 int i = rd_kafka_topic_partition_list_find0(
3148 rktparlist, topic, partition, rd_kafka_topic_partition_cmp);
3149 if (i == -1)
3150 return 0;
3151
3152 return rd_kafka_topic_partition_list_del_by_idx(rktparlist, i);
3153 }
3154
3155
3156
3157 /**
3158 * Returns true if 'topic' matches the 'rktpar', else false.
3159 * On match, if rktpar is a regex pattern then 'matched_by_regex' is set to 1.
3160 */
rd_kafka_topic_partition_match(rd_kafka_t * rk,const rd_kafka_group_member_t * rkgm,const rd_kafka_topic_partition_t * rktpar,const char * topic,int * matched_by_regex)3161 int rd_kafka_topic_partition_match (rd_kafka_t *rk,
3162 const rd_kafka_group_member_t *rkgm,
3163 const rd_kafka_topic_partition_t *rktpar,
3164 const char *topic, int *matched_by_regex) {
3165 int ret = 0;
3166
3167 if (*rktpar->topic == '^') {
3168 char errstr[128];
3169
3170 ret = rd_regex_match(rktpar->topic, topic,
3171 errstr, sizeof(errstr));
3172 if (ret == -1) {
3173 rd_kafka_dbg(rk, CGRP,
3174 "SUBMATCH",
3175 "Invalid regex for member "
3176 "\"%.*s\" subscription \"%s\": %s",
3177 RD_KAFKAP_STR_PR(rkgm->rkgm_member_id),
3178 rktpar->topic, errstr);
3179 return 0;
3180 }
3181
3182 if (ret && matched_by_regex)
3183 *matched_by_regex = 1;
3184
3185 } else if (!strcmp(rktpar->topic, topic)) {
3186
3187 if (matched_by_regex)
3188 *matched_by_regex = 0;
3189
3190 ret = 1;
3191 }
3192
3193 return ret;
3194 }
3195
3196
3197
rd_kafka_topic_partition_list_sort(rd_kafka_topic_partition_list_t * rktparlist,int (* cmp)(const void *,const void *,void *),void * opaque)3198 void rd_kafka_topic_partition_list_sort (
3199 rd_kafka_topic_partition_list_t *rktparlist,
3200 int (*cmp) (const void *, const void *, void *),
3201 void *opaque) {
3202
3203 if (!cmp)
3204 cmp = rd_kafka_topic_partition_cmp_opaque;
3205
3206 rd_qsort_r(rktparlist->elems, rktparlist->cnt,
3207 sizeof(*rktparlist->elems),
3208 cmp, opaque);
3209 }
3210
3211
rd_kafka_topic_partition_list_sort_by_topic(rd_kafka_topic_partition_list_t * rktparlist)3212 void rd_kafka_topic_partition_list_sort_by_topic (
3213 rd_kafka_topic_partition_list_t *rktparlist) {
3214 rd_kafka_topic_partition_list_sort(rktparlist,
3215 rd_kafka_topic_partition_cmp_opaque,
3216 NULL);
3217 }
3218
rd_kafka_topic_partition_list_set_offset(rd_kafka_topic_partition_list_t * rktparlist,const char * topic,int32_t partition,int64_t offset)3219 rd_kafka_resp_err_t rd_kafka_topic_partition_list_set_offset (
3220 rd_kafka_topic_partition_list_t *rktparlist,
3221 const char *topic, int32_t partition, int64_t offset) {
3222 rd_kafka_topic_partition_t *rktpar;
3223
3224 if (!(rktpar = rd_kafka_topic_partition_list_find(rktparlist,
3225 topic, partition)))
3226 return RD_KAFKA_RESP_ERR__UNKNOWN_PARTITION;
3227
3228 rktpar->offset = offset;
3229
3230 return RD_KAFKA_RESP_ERR_NO_ERROR;
3231 }
3232
3233
3234 /**
3235 * @brief Reset all offsets to the provided value.
3236 */
3237 void
rd_kafka_topic_partition_list_reset_offsets(rd_kafka_topic_partition_list_t * rktparlist,int64_t offset)3238 rd_kafka_topic_partition_list_reset_offsets (rd_kafka_topic_partition_list_t *rktparlist,
3239 int64_t offset) {
3240
3241 int i;
3242 for (i = 0 ; i < rktparlist->cnt ; i++)
3243 rktparlist->elems[i].offset = offset;
3244 }
3245
3246
3247 /**
3248 * Set offset values in partition list based on toppar's last stored offset.
3249 *
3250 * from_rktp - true: set rktp's last stored offset, false: set def_value
3251 * unless a concrete offset is set.
3252 * is_commit: indicates that set offset is to be committed (for debug log)
3253 *
3254 * Returns the number of valid non-logical offsets (>=0).
3255 */
rd_kafka_topic_partition_list_set_offsets(rd_kafka_t * rk,rd_kafka_topic_partition_list_t * rktparlist,int from_rktp,int64_t def_value,int is_commit)3256 int rd_kafka_topic_partition_list_set_offsets (
3257 rd_kafka_t *rk,
3258 rd_kafka_topic_partition_list_t *rktparlist,
3259 int from_rktp, int64_t def_value, int is_commit) {
3260 int i;
3261 int valid_cnt = 0;
3262
3263 for (i = 0 ; i < rktparlist->cnt ; i++) {
3264 rd_kafka_topic_partition_t *rktpar = &rktparlist->elems[i];
3265 const char *verb = "setting";
3266 char preamble[80];
3267
3268 *preamble = '\0'; /* Avoid warning */
3269
3270 if (from_rktp) {
3271 rd_kafka_toppar_t *rktp = rktpar->_private;
3272 rd_kafka_toppar_lock(rktp);
3273
3274 if (rk->rk_conf.debug & (RD_KAFKA_DBG_CGRP |
3275 RD_KAFKA_DBG_TOPIC))
3276 rd_snprintf(preamble, sizeof(preamble),
3277 "stored offset %"PRId64
3278 ", committed offset %"PRId64": ",
3279 rktp->rktp_stored_offset,
3280 rktp->rktp_committed_offset);
3281
3282 if (rktp->rktp_stored_offset >
3283 rktp->rktp_committed_offset) {
3284 verb = "setting stored";
3285 rktpar->offset = rktp->rktp_stored_offset;
3286 } else {
3287 rktpar->offset = RD_KAFKA_OFFSET_INVALID;
3288 }
3289 rd_kafka_toppar_unlock(rktp);
3290 } else {
3291 if (RD_KAFKA_OFFSET_IS_LOGICAL(rktpar->offset)) {
3292 verb = "setting default";
3293 rktpar->offset = def_value;
3294 } else
3295 verb = "keeping";
3296 }
3297
3298 if (is_commit && rktpar->offset == RD_KAFKA_OFFSET_INVALID)
3299 rd_kafka_dbg(rk, CGRP | RD_KAFKA_DBG_TOPIC, "OFFSET",
3300 "Topic %s [%"PRId32"]: "
3301 "%snot including in commit",
3302 rktpar->topic, rktpar->partition,
3303 preamble);
3304 else
3305 rd_kafka_dbg(rk, CGRP | RD_KAFKA_DBG_TOPIC, "OFFSET",
3306 "Topic %s [%"PRId32"]: "
3307 "%s%s offset %s%s",
3308 rktpar->topic, rktpar->partition,
3309 preamble,
3310 verb,
3311 rd_kafka_offset2str(rktpar->offset),
3312 is_commit ? " for commit" : "");
3313
3314 if (!RD_KAFKA_OFFSET_IS_LOGICAL(rktpar->offset))
3315 valid_cnt++;
3316 }
3317
3318 return valid_cnt;
3319 }
3320
3321
3322 /**
3323 * @returns the number of partitions with absolute (non-logical) offsets set.
3324 */
rd_kafka_topic_partition_list_count_abs_offsets(const rd_kafka_topic_partition_list_t * rktparlist)3325 int rd_kafka_topic_partition_list_count_abs_offsets (
3326 const rd_kafka_topic_partition_list_t *rktparlist) {
3327 int i;
3328 int valid_cnt = 0;
3329
3330 for (i = 0 ; i < rktparlist->cnt ; i++)
3331 if (!RD_KAFKA_OFFSET_IS_LOGICAL(rktparlist->elems[i].offset))
3332 valid_cnt++;
3333
3334 return valid_cnt;
3335 }
3336
3337
3338 /**
3339 * @brief Update _private (toppar) field to point to valid rktp
3340 * for each parition.
3341 *
3342 * @param create_on_miss Create partition (and topic_t object) if necessary.
3343 */
3344 void
rd_kafka_topic_partition_list_update_toppars(rd_kafka_t * rk,rd_kafka_topic_partition_list_t * rktparlist,rd_bool_t create_on_miss)3345 rd_kafka_topic_partition_list_update_toppars (rd_kafka_t *rk,
3346 rd_kafka_topic_partition_list_t
3347 *rktparlist,
3348 rd_bool_t create_on_miss) {
3349 int i;
3350 for (i = 0 ; i < rktparlist->cnt ; i++) {
3351 rd_kafka_topic_partition_t *rktpar = &rktparlist->elems[i];
3352
3353 if (!rktpar->_private)
3354 rktpar->_private =
3355 rd_kafka_toppar_get2(rk,
3356 rktpar->topic,
3357 rktpar->partition,
3358 0/*not ua-on-miss*/,
3359 create_on_miss);
3360
3361 }
3362 }
3363
3364
3365 /**
3366 * @brief Populate \p leaders with the leaders+partitions for the partitions in
3367 * \p rktparlist. Duplicates are suppressed.
3368 *
3369 * If no leader is found for a partition that element's \c .err will
3370 * be set to RD_KAFKA_RESP_ERR_LEADER_NOT_AVAILABLE.
3371 *
3372 * If the partition does not exist \c .err will be set to
3373 * RD_KAFKA_RESP_ERR__UNKNOWN_PARTITION.
3374 *
3375 * @param rktparlist The partitions to look up leaders for, the .err field
3376 * will be set according to outcome, e.g., ERR_NO_ERROR,
3377 * ERR_UNKNOWN_TOPIC_OR_PART, etc.
3378 * @param leaders rd_list_t of allocated (struct rd_kafka_partition_leader *)
3379 * @param query_topics (optional) rd_list of strdupped (char *)
3380 * @param query_unknown Add unknown topics to \p query_topics.
3381 * @param eonce (optional) For triggering asynchronously on cache change
3382 * in case not all leaders are known now.
3383 *
3384 * @remark This is based on the current topic_t and partition state
3385 * which may lag behind the last metadata update due to internal
3386 * threading and also the fact that no topic_t may have been created.
3387 *
3388 * @param leaders rd_list_t of type (struct rd_kafka_partition_leader *)
3389 *
3390 * @returns true if all partitions have leaders, else false.
3391 *
3392 * @sa rd_kafka_topic_partition_list_get_leaders_by_metadata
3393 *
3394 * @locks rd_kafka_*lock() MUST NOT be held
3395 */
3396 static rd_bool_t
rd_kafka_topic_partition_list_get_leaders(rd_kafka_t * rk,rd_kafka_topic_partition_list_t * rktparlist,rd_list_t * leaders,rd_list_t * query_topics,rd_bool_t query_unknown,rd_kafka_enq_once_t * eonce)3397 rd_kafka_topic_partition_list_get_leaders (
3398 rd_kafka_t *rk,
3399 rd_kafka_topic_partition_list_t *rktparlist,
3400 rd_list_t *leaders,
3401 rd_list_t *query_topics,
3402 rd_bool_t query_unknown,
3403 rd_kafka_enq_once_t *eonce) {
3404 rd_bool_t complete;
3405 int cnt = 0;
3406 int i;
3407
3408 if (eonce)
3409 rd_kafka_wrlock(rk);
3410 else
3411 rd_kafka_rdlock(rk);
3412
3413 for (i = 0 ; i < rktparlist->cnt ; i++) {
3414 rd_kafka_topic_partition_t *rktpar = &rktparlist->elems[i];
3415 rd_kafka_topic_partition_t *rktpar2;
3416 rd_kafka_broker_t *rkb = NULL;
3417 struct rd_kafka_partition_leader leader_skel;
3418 struct rd_kafka_partition_leader *leader;
3419 const rd_kafka_metadata_topic_t *mtopic;
3420 const rd_kafka_metadata_partition_t *mpart;
3421 rd_bool_t topic_wait_cache;
3422
3423 rd_kafka_metadata_cache_topic_partition_get(
3424 rk, &mtopic, &mpart,
3425 rktpar->topic, rktpar->partition,
3426 0/*negative entries too*/);
3427
3428 topic_wait_cache =
3429 !mtopic ||
3430 RD_KAFKA_METADATA_CACHE_ERR_IS_TEMPORARY(mtopic->err);
3431
3432 if (!topic_wait_cache &&
3433 mtopic &&
3434 mtopic->err != RD_KAFKA_RESP_ERR_NO_ERROR &&
3435 mtopic->err != RD_KAFKA_RESP_ERR_LEADER_NOT_AVAILABLE) {
3436 /* Topic permanently errored */
3437 rktpar->err = mtopic->err;
3438 continue;
3439 }
3440
3441 if (mtopic && !mpart && mtopic->partition_cnt > 0) {
3442 /* Topic exists but partition doesnt.
3443 * This is a permanent error. */
3444 rktpar->err = RD_KAFKA_RESP_ERR__UNKNOWN_PARTITION;
3445 continue;
3446 }
3447
3448 if (mpart &&
3449 (mpart->leader == -1 ||
3450 !(rkb = rd_kafka_broker_find_by_nodeid0(
3451 rk, mpart->leader, -1/*any state*/,
3452 rd_false)))) {
3453 /* Partition has no (valid) leader.
3454 * This is a permanent error. */
3455 rktpar->err =
3456 mtopic->err ? mtopic->err :
3457 RD_KAFKA_RESP_ERR_LEADER_NOT_AVAILABLE;
3458 continue;
3459 }
3460
3461 if (topic_wait_cache || !rkb) {
3462 /* Topic unknown or no current leader for partition,
3463 * add topic to query list. */
3464 rktpar->err = RD_KAFKA_RESP_ERR__IN_PROGRESS;
3465 if (query_topics &&
3466 !rd_list_find(query_topics, rktpar->topic,
3467 (void *)strcmp))
3468 rd_list_add(query_topics,
3469 rd_strdup(rktpar->topic));
3470 continue;
3471 }
3472
3473 /* Leader exists, add to leader list. */
3474
3475 rktpar->err = RD_KAFKA_RESP_ERR_NO_ERROR;
3476
3477 memset(&leader_skel, 0, sizeof(leader_skel));
3478 leader_skel.rkb = rkb;
3479
3480 leader = rd_list_find(leaders, &leader_skel,
3481 rd_kafka_partition_leader_cmp);
3482
3483 if (!leader) {
3484 leader = rd_kafka_partition_leader_new(rkb);
3485 rd_list_add(leaders, leader);
3486 }
3487
3488 rktpar2 = rd_kafka_topic_partition_list_find(leader->partitions,
3489 rktpar->topic,
3490 rktpar->partition);
3491 if (rktpar2) {
3492 /* Already exists in partitions list, just update. */
3493 rd_kafka_topic_partition_update(rktpar2, rktpar);
3494 } else {
3495 /* Make a copy of rktpar and add to partitions list */
3496 rd_kafka_topic_partition_list_add_copy(
3497 leader->partitions, rktpar);
3498 }
3499
3500 rktpar->err = RD_KAFKA_RESP_ERR_NO_ERROR;
3501
3502 rd_kafka_broker_destroy(rkb); /* loose refcount */
3503 cnt++;
3504 }
3505
3506 complete = cnt == rktparlist->cnt;
3507
3508 if (!complete && eonce)
3509 /* Add eonce to cache observers */
3510 rd_kafka_metadata_cache_wait_state_change_async(rk, eonce);
3511
3512 if (eonce)
3513 rd_kafka_wrunlock(rk);
3514 else
3515 rd_kafka_rdunlock(rk);
3516
3517 return complete;
3518 }
3519
3520
3521 /**
3522 * @brief Timer timeout callback for query_leaders_async rko's eonce object.
3523 */
3524 static void
rd_kafka_partition_leader_query_eonce_timeout_cb(rd_kafka_timers_t * rkts,void * arg)3525 rd_kafka_partition_leader_query_eonce_timeout_cb (rd_kafka_timers_t *rkts,
3526 void *arg) {
3527 rd_kafka_enq_once_t *eonce = arg;
3528 rd_kafka_enq_once_trigger(eonce, RD_KAFKA_RESP_ERR__TIMED_OUT,
3529 "timeout timer");
3530 }
3531
3532
3533 /**
3534 * @brief Query timer callback for query_leaders_async rko's eonce object.
3535 */
3536 static void
rd_kafka_partition_leader_query_eonce_timer_cb(rd_kafka_timers_t * rkts,void * arg)3537 rd_kafka_partition_leader_query_eonce_timer_cb (rd_kafka_timers_t *rkts,
3538 void *arg) {
3539 rd_kafka_enq_once_t *eonce = arg;
3540 rd_kafka_enq_once_trigger(eonce, RD_KAFKA_RESP_ERR_NO_ERROR,
3541 "query timer");
3542 }
3543
3544
3545 /**
3546 * @brief Query metadata cache for partition leaders, or trigger metadata
3547 * refresh if leaders not known.
3548 *
3549 * @locks_required none
3550 * @locality any
3551 */
3552 static rd_kafka_op_res_t
rd_kafka_topic_partition_list_query_leaders_async_worker(rd_kafka_op_t * rko)3553 rd_kafka_topic_partition_list_query_leaders_async_worker (rd_kafka_op_t *rko) {
3554 rd_kafka_t *rk = rko->rko_rk;
3555 rd_list_t query_topics, *leaders = NULL;
3556 rd_kafka_op_t *reply;
3557
3558 RD_KAFKA_OP_TYPE_ASSERT(rko, RD_KAFKA_OP_LEADERS);
3559
3560 if (rko->rko_err)
3561 goto reply; /* Timeout or ERR__DESTROY */
3562
3563 /* Since we're iterating over get_leaders() until all partition leaders
3564 * are known we need to re-enable the eonce to be triggered again (which
3565 * is not necessary the first time we get here, but there
3566 * is no harm doing it then either). */
3567 rd_kafka_enq_once_reenable(rko->rko_u.leaders.eonce,
3568 rko, RD_KAFKA_REPLYQ(rk->rk_ops, 0));
3569
3570 /* Look up the leaders in the metadata cache, if not all leaders
3571 * are known the eonce is registered for metadata cache changes
3572 * which will cause our function to be called
3573 * again on (any) metadata cache change.
3574 *
3575 * When we are called again we perform the cache lookup again and
3576 * hopefully get all leaders, otherwise defer a new async wait.
3577 * Repeat until success or timeout. */
3578
3579 rd_list_init(&query_topics, 4 + rko->rko_u.leaders.partitions->cnt/2,
3580 rd_free);
3581
3582 leaders = rd_list_new(1 + rko->rko_u.leaders.partitions->cnt / 2,
3583 rd_kafka_partition_leader_destroy_free);
3584
3585 if (rd_kafka_topic_partition_list_get_leaders(
3586 rk, rko->rko_u.leaders.partitions,
3587 leaders,
3588 &query_topics,
3589 /* Add unknown topics to query_topics only on the
3590 * first query, after that we consider them permanently
3591 * non-existent */
3592 rko->rko_u.leaders.query_cnt == 0,
3593 rko->rko_u.leaders.eonce)) {
3594 /* All leaders now known (or failed), reply to caller */
3595 rd_list_destroy(&query_topics);
3596 goto reply;
3597 }
3598
3599 if (rd_list_empty(&query_topics)) {
3600 /* Not all leaders known but no topics left to query,
3601 * reply to caller. */
3602 rd_list_destroy(&query_topics);
3603 goto reply;
3604 }
3605
3606 /* Need to refresh topic metadata, but at most every interval. */
3607 if (!rd_kafka_timer_is_started(&rk->rk_timers,
3608 &rko->rko_u.leaders.query_tmr)) {
3609
3610 rko->rko_u.leaders.query_cnt++;
3611
3612 /* Add query interval timer. */
3613 rd_kafka_enq_once_add_source(rko->rko_u.leaders.eonce,
3614 "query timer");
3615 rd_kafka_timer_start_oneshot(
3616 &rk->rk_timers,
3617 &rko->rko_u.leaders.query_tmr,
3618 rd_true,
3619 3*1000*1000 /* 3s */,
3620 rd_kafka_partition_leader_query_eonce_timer_cb,
3621 rko->rko_u.leaders.eonce);
3622
3623 /* Request metadata refresh */
3624 rd_kafka_metadata_refresh_topics(
3625 rk, NULL, &query_topics,
3626 rd_true/*force*/,
3627 rd_false/*!allow_auto_create*/,
3628 rd_false/*!cgrp_update*/,
3629 "query partition leaders");
3630
3631 }
3632
3633 rd_list_destroy(leaders);
3634 rd_list_destroy(&query_topics);
3635
3636 /* Wait for next eonce trigger */
3637 return RD_KAFKA_OP_RES_KEEP; /* rko is still used */
3638
3639 reply:
3640 /* Decommission worker state and reply to caller */
3641
3642 if (rd_kafka_timer_stop(&rk->rk_timers,
3643 &rko->rko_u.leaders.query_tmr,
3644 RD_DO_LOCK))
3645 rd_kafka_enq_once_del_source(rko->rko_u.leaders.eonce,
3646 "query timer");
3647 if (rd_kafka_timer_stop(&rk->rk_timers,
3648 &rko->rko_u.leaders.timeout_tmr,
3649 RD_DO_LOCK))
3650 rd_kafka_enq_once_del_source(rko->rko_u.leaders.eonce,
3651 "timeout timer");
3652
3653 if (rko->rko_u.leaders.eonce) {
3654 rd_kafka_enq_once_disable(rko->rko_u.leaders.eonce);
3655 rko->rko_u.leaders.eonce = NULL;
3656 }
3657
3658 /* No leaders found, set a request-level error */
3659 if (leaders && rd_list_cnt(leaders) == 0) {
3660 if (!rko->rko_err)
3661 rko->rko_err = RD_KAFKA_RESP_ERR__NOENT;
3662 rd_list_destroy(leaders);
3663 leaders = NULL;
3664 }
3665
3666 /* Create and enqueue reply rko */
3667 if (rko->rko_u.leaders.replyq.q) {
3668 reply = rd_kafka_op_new_cb(rk, RD_KAFKA_OP_LEADERS,
3669 rko->rko_u.leaders.cb);
3670 rd_kafka_op_get_reply_version(reply, rko);
3671 reply->rko_err = rko->rko_err;
3672 reply->rko_u.leaders.partitions =
3673 rko->rko_u.leaders.partitions; /* Transfer ownership for
3674 * partition list that
3675 * now contains
3676 * per-partition errors*/
3677 rko->rko_u.leaders.partitions = NULL;
3678 reply->rko_u.leaders.leaders = leaders; /* Possibly NULL */
3679 reply->rko_u.leaders.opaque = rko->rko_u.leaders.opaque;
3680
3681 rd_kafka_replyq_enq(&rko->rko_u.leaders.replyq, reply, 0);
3682 }
3683
3684 return RD_KAFKA_OP_RES_HANDLED;
3685 }
3686
3687
3688 static rd_kafka_op_res_t
rd_kafka_topic_partition_list_query_leaders_async_worker_op_cb(rd_kafka_t * rk,rd_kafka_q_t * rkq,rd_kafka_op_t * rko)3689 rd_kafka_topic_partition_list_query_leaders_async_worker_op_cb (
3690 rd_kafka_t *rk,
3691 rd_kafka_q_t *rkq,
3692 rd_kafka_op_t *rko) {
3693 return rd_kafka_topic_partition_list_query_leaders_async_worker(rko);
3694 }
3695
3696 /**
3697 * @brief Async variant of rd_kafka_topic_partition_list_query_leaders().
3698 *
3699 * The reply rko op will contain:
3700 * - .leaders which is a list of leaders and their partitions, this may be
3701 * NULL for overall errors (such as no leaders are found), or a
3702 * partial or complete list of leaders.
3703 * - .partitions which is a copy of the input list of partitions with the
3704 * .err field set to the outcome of the leader query, typically ERR_NO_ERROR
3705 * or ERR_UNKNOWN_TOPIC_OR_PART.
3706 *
3707 * @locks_acquired rd_kafka_*lock()
3708 *
3709 * @remark rd_kafka_*lock() MUST NOT be held
3710 */
3711 void
rd_kafka_topic_partition_list_query_leaders_async(rd_kafka_t * rk,const rd_kafka_topic_partition_list_t * rktparlist,int timeout_ms,rd_kafka_replyq_t replyq,rd_kafka_op_cb_t * cb,void * opaque)3712 rd_kafka_topic_partition_list_query_leaders_async (
3713 rd_kafka_t *rk,
3714 const rd_kafka_topic_partition_list_t *rktparlist,
3715 int timeout_ms,
3716 rd_kafka_replyq_t replyq,
3717 rd_kafka_op_cb_t *cb,
3718 void *opaque) {
3719 rd_kafka_op_t *rko;
3720
3721 rd_assert(rktparlist && rktparlist->cnt > 0);
3722 rd_assert(replyq.q);
3723
3724 rko = rd_kafka_op_new_cb(
3725 rk,
3726 RD_KAFKA_OP_LEADERS,
3727 rd_kafka_topic_partition_list_query_leaders_async_worker_op_cb);
3728 rko->rko_u.leaders.replyq = replyq;
3729 rko->rko_u.leaders.partitions =
3730 rd_kafka_topic_partition_list_copy(rktparlist);
3731 rko->rko_u.leaders.ts_timeout = rd_timeout_init(timeout_ms);
3732 rko->rko_u.leaders.cb = cb;
3733 rko->rko_u.leaders.opaque = opaque;
3734
3735 /* Create an eonce to be triggered either by metadata cache update
3736 * (from refresh_topics()), query interval, or timeout. */
3737 rko->rko_u.leaders.eonce = rd_kafka_enq_once_new(
3738 rko, RD_KAFKA_REPLYQ(rk->rk_ops, 0));
3739
3740 rd_kafka_enq_once_add_source(rko->rko_u.leaders.eonce, "timeout timer");
3741 rd_kafka_timer_start_oneshot(
3742 &rk->rk_timers,
3743 &rko->rko_u.leaders.timeout_tmr,
3744 rd_true,
3745 rd_timeout_remains_us(rko->rko_u.leaders.ts_timeout),
3746 rd_kafka_partition_leader_query_eonce_timeout_cb,
3747 rko->rko_u.leaders.eonce);
3748
3749 if (rd_kafka_topic_partition_list_query_leaders_async_worker(rko) ==
3750 RD_KAFKA_OP_RES_HANDLED)
3751 rd_kafka_op_destroy(rko); /* Reply queue already disabled */
3752 }
3753
3754
3755 /**
3756 * @brief Get leaders for all partitions in \p rktparlist, querying metadata
3757 * if needed.
3758 *
3759 * @param leaders is a pre-initialized (empty) list which will be populated
3760 * with the leader brokers and their partitions
3761 * (struct rd_kafka_partition_leader *)
3762 *
3763 * @remark Will not trigger topic auto creation (unless configured).
3764 *
3765 * @returns an error code on error.
3766 *
3767 * @locks rd_kafka_*lock() MUST NOT be held
3768 */
3769 rd_kafka_resp_err_t
rd_kafka_topic_partition_list_query_leaders(rd_kafka_t * rk,rd_kafka_topic_partition_list_t * rktparlist,rd_list_t * leaders,int timeout_ms)3770 rd_kafka_topic_partition_list_query_leaders (
3771 rd_kafka_t *rk,
3772 rd_kafka_topic_partition_list_t *rktparlist,
3773 rd_list_t *leaders, int timeout_ms) {
3774 rd_ts_t ts_end = rd_timeout_init(timeout_ms);
3775 rd_ts_t ts_query = 0;
3776 rd_ts_t now;
3777 int query_cnt = 0;
3778 int i = 0;
3779
3780 /* Get all the partition leaders, try multiple times:
3781 * if there are no leaders after the first run fire off a leader
3782 * query and wait for broker state update before trying again,
3783 * keep trying and re-querying at increasing intervals until
3784 * success or timeout. */
3785 do {
3786 rd_list_t query_topics;
3787 int query_intvl;
3788
3789 rd_list_init(&query_topics, rktparlist->cnt, rd_free);
3790
3791 rd_kafka_topic_partition_list_get_leaders(
3792 rk, rktparlist, leaders, &query_topics,
3793 /* Add unknown topics to query_topics only on the
3794 * first query, after that we consider them
3795 * permanently non-existent */
3796 query_cnt == 0,
3797 NULL);
3798
3799 if (rd_list_empty(&query_topics)) {
3800 /* No remaining topics to query: leader-list complete.*/
3801 rd_list_destroy(&query_topics);
3802
3803 /* No leader(s) for partitions means all partitions
3804 * are unknown. */
3805 if (rd_list_empty(leaders))
3806 return RD_KAFKA_RESP_ERR__UNKNOWN_PARTITION;
3807
3808 return RD_KAFKA_RESP_ERR_NO_ERROR;
3809 }
3810
3811 now = rd_clock();
3812
3813 /*
3814 * Missing leader for some partitions
3815 */
3816 query_intvl = (i+1) * 100; /* add 100ms per iteration */
3817 if (query_intvl > 2*1000)
3818 query_intvl = 2*1000; /* Cap to 2s */
3819
3820 if (now >= ts_query + (query_intvl*1000)) {
3821 /* Query metadata for missing leaders,
3822 * possibly creating the topic. */
3823 rd_kafka_metadata_refresh_topics(
3824 rk, NULL, &query_topics,
3825 rd_true/*force*/,
3826 rd_false/*!allow_auto_create*/,
3827 rd_false/*!cgrp_update*/,
3828 "query partition leaders");
3829 ts_query = now;
3830 query_cnt++;
3831
3832 } else {
3833 /* Wait for broker ids to be updated from
3834 * metadata refresh above. */
3835 int wait_ms = rd_timeout_remains_limit(ts_end,
3836 query_intvl);
3837 rd_kafka_metadata_cache_wait_change(rk, wait_ms);
3838 }
3839
3840 rd_list_destroy(&query_topics);
3841
3842 i++;
3843 } while (ts_end == RD_POLL_INFINITE ||
3844 now < ts_end); /* now is deliberately outdated here
3845 * since wait_change() will block.
3846 * This gives us one more chance to spin thru*/
3847
3848 if (rd_atomic32_get(&rk->rk_broker_up_cnt) == 0)
3849 return RD_KAFKA_RESP_ERR__ALL_BROKERS_DOWN;
3850
3851 return RD_KAFKA_RESP_ERR__TIMED_OUT;
3852 }
3853
3854
3855 /**
3856 * @brief Populate \p rkts with the rd_kafka_topic_t objects for the
3857 * partitions in. Duplicates are suppressed.
3858 *
3859 * @returns the number of topics added.
3860 */
3861 int
rd_kafka_topic_partition_list_get_topics(rd_kafka_t * rk,rd_kafka_topic_partition_list_t * rktparlist,rd_list_t * rkts)3862 rd_kafka_topic_partition_list_get_topics (
3863 rd_kafka_t *rk,
3864 rd_kafka_topic_partition_list_t *rktparlist,
3865 rd_list_t *rkts) {
3866 int cnt = 0;
3867
3868 int i;
3869 for (i = 0 ; i < rktparlist->cnt ; i++) {
3870 rd_kafka_topic_partition_t *rktpar = &rktparlist->elems[i];
3871 rd_kafka_toppar_t *rktp;
3872
3873 rktp = rd_kafka_topic_partition_get_toppar(rk, rktpar,
3874 rd_false);
3875 if (!rktp) {
3876 rktpar->err = RD_KAFKA_RESP_ERR__UNKNOWN_PARTITION;
3877 continue;
3878 }
3879
3880 if (!rd_list_find(rkts, rktp->rktp_rkt,
3881 rd_kafka_topic_cmp_rkt)) {
3882 rd_list_add(rkts, rd_kafka_topic_keep(rktp->rktp_rkt));
3883 cnt++;
3884 }
3885
3886 rd_kafka_toppar_destroy(rktp);
3887 }
3888
3889 return cnt;
3890 }
3891
3892
3893 /**
3894 * @brief Populate \p topics with the strdupped topic names in \p rktparlist.
3895 * Duplicates are suppressed.
3896 *
3897 * @param include_regex: include regex topics
3898 *
3899 * @returns the number of topics added.
3900 */
3901 int
rd_kafka_topic_partition_list_get_topic_names(const rd_kafka_topic_partition_list_t * rktparlist,rd_list_t * topics,int include_regex)3902 rd_kafka_topic_partition_list_get_topic_names (
3903 const rd_kafka_topic_partition_list_t *rktparlist,
3904 rd_list_t *topics, int include_regex) {
3905 int cnt = 0;
3906 int i;
3907
3908 for (i = 0 ; i < rktparlist->cnt ; i++) {
3909 const rd_kafka_topic_partition_t *rktpar = &rktparlist->elems[i];
3910
3911 if (!include_regex && *rktpar->topic == '^')
3912 continue;
3913
3914 if (!rd_list_find(topics, rktpar->topic, (void *)strcmp)) {
3915 rd_list_add(topics, rd_strdup(rktpar->topic));
3916 cnt++;
3917 }
3918 }
3919
3920 return cnt;
3921 }
3922
3923
3924 /**
3925 * @brief Create a copy of \p rktparlist only containing the partitions
3926 * matched by \p match function.
3927 *
3928 * \p match shall return 1 for match, else 0.
3929 *
3930 * @returns a new list
3931 */
rd_kafka_topic_partition_list_match(const rd_kafka_topic_partition_list_t * rktparlist,int (* match)(const void * elem,const void * opaque),void * opaque)3932 rd_kafka_topic_partition_list_t *rd_kafka_topic_partition_list_match (
3933 const rd_kafka_topic_partition_list_t *rktparlist,
3934 int (*match) (const void *elem, const void *opaque),
3935 void *opaque) {
3936 rd_kafka_topic_partition_list_t *newlist;
3937 int i;
3938
3939 newlist = rd_kafka_topic_partition_list_new(0);
3940
3941 for (i = 0 ; i < rktparlist->cnt ; i++) {
3942 const rd_kafka_topic_partition_t *rktpar =
3943 &rktparlist->elems[i];
3944
3945 if (!match(rktpar, opaque))
3946 continue;
3947
3948 rd_kafka_topic_partition_list_add_copy(newlist, rktpar);
3949 }
3950
3951 return newlist;
3952 }
3953
3954 void
rd_kafka_topic_partition_list_log(rd_kafka_t * rk,const char * fac,int dbg,const rd_kafka_topic_partition_list_t * rktparlist)3955 rd_kafka_topic_partition_list_log (rd_kafka_t *rk, const char *fac, int dbg,
3956 const rd_kafka_topic_partition_list_t *rktparlist) {
3957 int i;
3958
3959 rd_kafka_dbg(rk, NONE|dbg, fac, "List with %d partition(s):",
3960 rktparlist->cnt);
3961 for (i = 0 ; i < rktparlist->cnt ; i++) {
3962 const rd_kafka_topic_partition_t *rktpar =
3963 &rktparlist->elems[i];
3964 rd_kafka_dbg(rk, NONE|dbg, fac, " %s [%"PRId32"] offset %s%s%s",
3965 rktpar->topic, rktpar->partition,
3966 rd_kafka_offset2str(rktpar->offset),
3967 rktpar->err ? ": error: " : "",
3968 rktpar->err ? rd_kafka_err2str(rktpar->err) : "");
3969 }
3970 }
3971
3972 /**
3973 * @returns a comma-separated list of partitions.
3974 */
3975 const char *
rd_kafka_topic_partition_list_str(const rd_kafka_topic_partition_list_t * rktparlist,char * dest,size_t dest_size,int fmt_flags)3976 rd_kafka_topic_partition_list_str (const rd_kafka_topic_partition_list_t *rktparlist,
3977 char *dest, size_t dest_size,
3978 int fmt_flags) {
3979 int i;
3980 size_t of = 0;
3981
3982 for (i = 0 ; i < rktparlist->cnt ; i++) {
3983 const rd_kafka_topic_partition_t *rktpar =
3984 &rktparlist->elems[i];
3985 char errstr[128];
3986 char offsetstr[32];
3987 int r;
3988
3989 if (!rktpar->err && (fmt_flags & RD_KAFKA_FMT_F_ONLY_ERR))
3990 continue;
3991
3992 if (rktpar->err && !(fmt_flags & RD_KAFKA_FMT_F_NO_ERR))
3993 rd_snprintf(errstr, sizeof(errstr),
3994 "(%s)", rd_kafka_err2str(rktpar->err));
3995 else
3996 errstr[0] = '\0';
3997
3998 if (rktpar->offset != RD_KAFKA_OFFSET_INVALID)
3999 rd_snprintf(offsetstr, sizeof(offsetstr),
4000 "@%"PRId64, rktpar->offset);
4001 else
4002 offsetstr[0] = '\0';
4003
4004 r = rd_snprintf(&dest[of], dest_size-of,
4005 "%s"
4006 "%s[%"PRId32"]"
4007 "%s"
4008 "%s",
4009 of == 0 ? "" : ", ",
4010 rktpar->topic, rktpar->partition,
4011 offsetstr,
4012 errstr);
4013
4014 if ((size_t)r >= dest_size-of) {
4015 rd_snprintf(&dest[dest_size-4], 4, "...");
4016 break;
4017 }
4018
4019 of += r;
4020 }
4021
4022 return dest;
4023 }
4024
4025
4026
4027 /**
4028 * @brief Update \p dst with info from \p src.
4029 *
4030 * Fields updated:
4031 * - metadata
4032 * - metadata_size
4033 * - offset
4034 * - err
4035 *
4036 * Will only update partitions that are in both dst and src, other partitions will
4037 * remain unchanged.
4038 */
4039 void
rd_kafka_topic_partition_list_update(rd_kafka_topic_partition_list_t * dst,const rd_kafka_topic_partition_list_t * src)4040 rd_kafka_topic_partition_list_update (rd_kafka_topic_partition_list_t *dst,
4041 const rd_kafka_topic_partition_list_t *src){
4042 int i;
4043
4044 for (i = 0 ; i < dst->cnt ; i++) {
4045 rd_kafka_topic_partition_t *d = &dst->elems[i];
4046 rd_kafka_topic_partition_t *s;
4047
4048 if (!(s = rd_kafka_topic_partition_list_find(
4049 (rd_kafka_topic_partition_list_t *)src,
4050 d->topic, d->partition)))
4051 continue;
4052
4053 d->offset = s->offset;
4054 d->err = s->err;
4055 if (d->metadata) {
4056 rd_free(d->metadata);
4057 d->metadata = NULL;
4058 d->metadata_size = 0;
4059 }
4060 if (s->metadata_size > 0) {
4061 d->metadata =
4062 rd_malloc(s->metadata_size);
4063 d->metadata_size = s->metadata_size;
4064 memcpy((void *)d->metadata, s->metadata,
4065 s->metadata_size);
4066 }
4067 }
4068 }
4069
4070
4071 /**
4072 * @returns the sum of \p cb called for each element.
4073 */
4074 size_t
rd_kafka_topic_partition_list_sum(const rd_kafka_topic_partition_list_t * rktparlist,size_t (* cb)(const rd_kafka_topic_partition_t * rktpar,void * opaque),void * opaque)4075 rd_kafka_topic_partition_list_sum (
4076 const rd_kafka_topic_partition_list_t *rktparlist,
4077 size_t (*cb) (const rd_kafka_topic_partition_t *rktpar, void *opaque),
4078 void *opaque) {
4079 int i;
4080 size_t sum = 0;
4081
4082 for (i = 0 ; i < rktparlist->cnt ; i++) {
4083 const rd_kafka_topic_partition_t *rktpar =
4084 &rktparlist->elems[i];
4085 sum += cb(rktpar, opaque);
4086 }
4087
4088 return sum;
4089 }
4090
4091
4092 /**
4093 * @returns rd_true if there are duplicate topic/partitions in the list,
4094 * rd_false if not.
4095 *
4096 * @remarks sorts the elements of the list.
4097 */
4098 rd_bool_t
rd_kafka_topic_partition_list_has_duplicates(rd_kafka_topic_partition_list_t * rktparlist,rd_bool_t ignore_partition)4099 rd_kafka_topic_partition_list_has_duplicates (
4100 rd_kafka_topic_partition_list_t *rktparlist,
4101 rd_bool_t ignore_partition) {
4102
4103 int i;
4104
4105 if (rktparlist->cnt <= 1)
4106 return rd_false;
4107
4108 rd_kafka_topic_partition_list_sort_by_topic(rktparlist);
4109
4110 for (i=1; i<rktparlist->cnt; i++) {
4111 const rd_kafka_topic_partition_t *p1 = &rktparlist->elems[i-1];
4112 const rd_kafka_topic_partition_t *p2 = &rktparlist->elems[i];
4113
4114 if (((p1->partition == p2->partition) || ignore_partition) &&
4115 !strcmp(p1->topic, p2->topic)) {
4116 return rd_true;
4117 }
4118 }
4119
4120 return rd_false;
4121 }
4122
4123
4124 /**
4125 * @brief Set \c .err field \p err on all partitions in list.
4126 */
rd_kafka_topic_partition_list_set_err(rd_kafka_topic_partition_list_t * rktparlist,rd_kafka_resp_err_t err)4127 void rd_kafka_topic_partition_list_set_err (
4128 rd_kafka_topic_partition_list_t *rktparlist,
4129 rd_kafka_resp_err_t err) {
4130 int i;
4131
4132 for (i = 0 ; i < rktparlist->cnt ; i++)
4133 rktparlist->elems[i].err = err;
4134 }
4135
4136 /**
4137 * @brief Get the first set error in the partition list.
4138 */
rd_kafka_topic_partition_list_get_err(const rd_kafka_topic_partition_list_t * rktparlist)4139 rd_kafka_resp_err_t rd_kafka_topic_partition_list_get_err (
4140 const rd_kafka_topic_partition_list_t *rktparlist) {
4141 int i;
4142
4143 for (i = 0 ; i < rktparlist->cnt ; i++)
4144 if (rktparlist->elems[i].err)
4145 return rktparlist->elems[i].err;
4146
4147 return RD_KAFKA_RESP_ERR_NO_ERROR;
4148 }
4149
4150
4151 /**
4152 * @returns the number of wildcard/regex topics
4153 */
rd_kafka_topic_partition_list_regex_cnt(const rd_kafka_topic_partition_list_t * rktparlist)4154 int rd_kafka_topic_partition_list_regex_cnt (
4155 const rd_kafka_topic_partition_list_t *rktparlist) {
4156 int i;
4157 int cnt = 0;
4158
4159 for (i = 0 ; i < rktparlist->cnt ; i++) {
4160 const rd_kafka_topic_partition_t *rktpar =
4161 &rktparlist->elems[i];
4162 cnt += *rktpar->topic == '^';
4163 }
4164 return cnt;
4165 }
4166
4167
4168 /**
4169 * @brief Reset base sequence for this toppar.
4170 *
4171 * See rd_kafka_toppar_pid_change() below.
4172 *
4173 * @warning Toppar must be completely drained.
4174 *
4175 * @locality toppar handler thread
4176 * @locks toppar_lock MUST be held.
4177 */
rd_kafka_toppar_reset_base_msgid(rd_kafka_toppar_t * rktp,uint64_t new_base_msgid)4178 static void rd_kafka_toppar_reset_base_msgid (rd_kafka_toppar_t *rktp,
4179 uint64_t new_base_msgid) {
4180 rd_kafka_dbg(rktp->rktp_rkt->rkt_rk,
4181 TOPIC|RD_KAFKA_DBG_EOS, "RESETSEQ",
4182 "%.*s [%"PRId32"] "
4183 "resetting epoch base seq from %"PRIu64" to %"PRIu64,
4184 RD_KAFKAP_STR_PR(rktp->rktp_rkt->rkt_topic),
4185 rktp->rktp_partition,
4186 rktp->rktp_eos.epoch_base_msgid, new_base_msgid);
4187
4188 rktp->rktp_eos.next_ack_seq = 0;
4189 rktp->rktp_eos.next_err_seq = 0;
4190 rktp->rktp_eos.epoch_base_msgid = new_base_msgid;
4191 }
4192
4193
4194 /**
4195 * @brief Update/change the Producer ID for this toppar.
4196 *
4197 * Must only be called when pid is different from the current toppar pid.
4198 *
4199 * The epoch base sequence will be set to \p base_msgid, which must be the
4200 * first message in the partition
4201 * queue. However, if there are outstanding messages in-flight to the broker
4202 * we will need to wait for these ProduceRequests to finish (most likely
4203 * with failure) and have their messages re-enqueued to maintain original order.
4204 * In this case the pid will not be updated and this function should be
4205 * called again when there are no outstanding messages.
4206 *
4207 * @remark This function must only be called when rktp_xmitq is non-empty.
4208 *
4209 * @returns 1 if a new pid was set, else 0.
4210 *
4211 * @locality toppar handler thread
4212 * @locks none
4213 */
rd_kafka_toppar_pid_change(rd_kafka_toppar_t * rktp,rd_kafka_pid_t pid,uint64_t base_msgid)4214 int rd_kafka_toppar_pid_change (rd_kafka_toppar_t *rktp, rd_kafka_pid_t pid,
4215 uint64_t base_msgid) {
4216 int inflight = rd_atomic32_get(&rktp->rktp_msgs_inflight);
4217
4218 if (unlikely(inflight > 0)) {
4219 rd_kafka_dbg(rktp->rktp_rkt->rkt_rk,
4220 TOPIC|RD_KAFKA_DBG_EOS, "NEWPID",
4221 "%.*s [%"PRId32"] will not change %s -> %s yet: "
4222 "%d message(s) still in-flight from current "
4223 "epoch",
4224 RD_KAFKAP_STR_PR(rktp->rktp_rkt->rkt_topic),
4225 rktp->rktp_partition,
4226 rd_kafka_pid2str(rktp->rktp_eos.pid),
4227 rd_kafka_pid2str(pid),
4228 inflight);
4229 return 0;
4230 }
4231
4232 rd_assert(base_msgid != 0 &&
4233 *"BUG: pid_change() must only be called with "
4234 "non-empty xmitq");
4235
4236 rd_kafka_toppar_lock(rktp);
4237 rd_kafka_dbg(rktp->rktp_rkt->rkt_rk,
4238 TOPIC|RD_KAFKA_DBG_EOS, "NEWPID",
4239 "%.*s [%"PRId32"] changed %s -> %s "
4240 "with base MsgId %"PRIu64,
4241 RD_KAFKAP_STR_PR(rktp->rktp_rkt->rkt_topic),
4242 rktp->rktp_partition,
4243 rd_kafka_pid2str(rktp->rktp_eos.pid),
4244 rd_kafka_pid2str(pid),
4245 base_msgid);
4246
4247 rktp->rktp_eos.pid = pid;
4248 rd_kafka_toppar_reset_base_msgid(rktp, base_msgid);
4249
4250 rd_kafka_toppar_unlock(rktp);
4251
4252 return 1;
4253 }
4254
4255
4256 /**
4257 * @brief Purge messages in partition queues.
4258 * Delivery reports will be enqueued for all purged messages, the error
4259 * code is set to RD_KAFKA_RESP_ERR__PURGE_QUEUE.
4260 *
4261 * @param include_xmit_msgq If executing from the rktp's current broker handler
4262 * thread, also include the xmit message queue.
4263 *
4264 * @warning Only to be used with the producer.
4265 *
4266 * @returns the number of messages purged
4267 *
4268 * @locality any thread.
4269 * @locks_acquired rd_kafka_toppar_lock()
4270 * @locks_required none
4271 */
rd_kafka_toppar_purge_queues(rd_kafka_toppar_t * rktp,int purge_flags,rd_bool_t include_xmit_msgq)4272 int rd_kafka_toppar_purge_queues (rd_kafka_toppar_t *rktp,
4273 int purge_flags,
4274 rd_bool_t include_xmit_msgq) {
4275 rd_kafka_t *rk = rktp->rktp_rkt->rkt_rk;
4276 rd_kafka_msgq_t rkmq = RD_KAFKA_MSGQ_INITIALIZER(rkmq);
4277 int cnt;
4278
4279 rd_assert(rk->rk_type == RD_KAFKA_PRODUCER);
4280
4281 rd_kafka_dbg(rk, TOPIC, "PURGE",
4282 "%s [%"PRId32"]: purging queues "
4283 "(purge_flags 0x%x, %s xmit_msgq)",
4284 rktp->rktp_rkt->rkt_topic->str,
4285 rktp->rktp_partition,
4286 purge_flags,
4287 include_xmit_msgq ? "include" : "exclude");
4288
4289 if (!(purge_flags & RD_KAFKA_PURGE_F_QUEUE))
4290 return 0;
4291
4292 if (include_xmit_msgq) {
4293 /* xmit_msgq is owned by the toppar handler thread
4294 * (broker thread) and requires no locking. */
4295 rd_assert(rktp->rktp_broker);
4296 rd_assert(thrd_is_current(rktp->rktp_broker->rkb_thread));
4297 rd_kafka_msgq_concat(&rkmq, &rktp->rktp_xmit_msgq);
4298 }
4299
4300 rd_kafka_toppar_lock(rktp);
4301 rd_kafka_msgq_concat(&rkmq, &rktp->rktp_msgq);
4302 cnt = rd_kafka_msgq_len(&rkmq);
4303
4304 if (cnt > 0 && purge_flags & RD_KAFKA_PURGE_F_ABORT_TXN) {
4305 /* All messages in-queue are purged
4306 * on abort_transaction(). Since these messages
4307 * will not be produced (retried) we need to adjust the
4308 * idempotence epoch's base msgid to skip the messages. */
4309 rktp->rktp_eos.epoch_base_msgid += cnt;
4310 rd_kafka_dbg(rk,
4311 TOPIC|RD_KAFKA_DBG_EOS, "ADVBASE",
4312 "%.*s [%"PRId32"] "
4313 "advancing epoch base msgid to %"PRIu64
4314 " due to %d message(s) in aborted transaction",
4315 RD_KAFKAP_STR_PR(rktp->rktp_rkt->rkt_topic),
4316 rktp->rktp_partition,
4317 rktp->rktp_eos.epoch_base_msgid, cnt);
4318 }
4319 rd_kafka_toppar_unlock(rktp);
4320
4321 rd_kafka_dr_msgq(rktp->rktp_rkt, &rkmq, RD_KAFKA_RESP_ERR__PURGE_QUEUE);
4322
4323 return cnt;
4324 }
4325
4326
4327 /**
4328 * @brief Purge queues for the unassigned toppars of all known topics.
4329 *
4330 * @locality application thread
4331 * @locks none
4332 */
rd_kafka_purge_ua_toppar_queues(rd_kafka_t * rk)4333 void rd_kafka_purge_ua_toppar_queues (rd_kafka_t *rk) {
4334 rd_kafka_topic_t *rkt;
4335 int msg_cnt = 0, part_cnt = 0;
4336
4337 rd_kafka_rdlock(rk);
4338 TAILQ_FOREACH(rkt, &rk->rk_topics, rkt_link) {
4339 rd_kafka_toppar_t *rktp;
4340 int r;
4341
4342 rd_kafka_topic_rdlock(rkt);
4343 rktp = rkt->rkt_ua;
4344 if (rktp)
4345 rd_kafka_toppar_keep(rktp);
4346 rd_kafka_topic_rdunlock(rkt);
4347
4348 if (unlikely(!rktp))
4349 continue;
4350
4351
4352 rd_kafka_toppar_lock(rktp);
4353
4354 r = rd_kafka_msgq_len(&rktp->rktp_msgq);
4355 rd_kafka_dr_msgq(rkt, &rktp->rktp_msgq,
4356 RD_KAFKA_RESP_ERR__PURGE_QUEUE);
4357 rd_kafka_toppar_unlock(rktp);
4358 rd_kafka_toppar_destroy(rktp);
4359
4360 if (r > 0) {
4361 msg_cnt += r;
4362 part_cnt++;
4363 }
4364 }
4365 rd_kafka_rdunlock(rk);
4366
4367 rd_kafka_dbg(rk, QUEUE|RD_KAFKA_DBG_TOPIC, "PURGEQ",
4368 "Purged %i message(s) from %d UA-partition(s)",
4369 msg_cnt, part_cnt);
4370 }
4371
4372
rd_kafka_partition_leader_destroy_free(void * ptr)4373 void rd_kafka_partition_leader_destroy_free (void *ptr) {
4374 struct rd_kafka_partition_leader *leader = ptr;
4375 rd_kafka_partition_leader_destroy(leader);
4376 }
4377