1 /*
2 * librdkafka - The Apache Kafka C/C++ library
3 *
4 * Copyright (c) 2015 Magnus Edenhill
5 * All rights reserved.
6 *
7 * Redistribution and use in source and binary forms, with or without
8 * modification, are permitted provided that the following conditions are met:
9 *
10 * 1. Redistributions of source code must retain the above copyright notice,
11 * this list of conditions and the following disclaimer.
12 * 2. Redistributions in binary form must reproduce the above copyright notice,
13 * this list of conditions and the following disclaimer in the documentation
14 * and/or other materials provided with the distribution.
15 *
16 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
17 * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
18 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
19 * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
20 * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
21 * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
22 * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
23 * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
24 * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
25 * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
26 * POSSIBILITY OF SUCH DAMAGE.
27 */
28 #include "rdkafka_int.h"
29 #include "rdkafka_topic.h"
30 #include "rdkafka_broker.h"
31 #include "rdkafka_request.h"
32 #include "rdkafka_offset.h"
33 #include "rdkafka_partition.h"
34 #include "rdregex.h"
35 #include "rdports.h" /* rd_qsort_r() */
36
37 #include "rdunittest.h"
38
39 const char *rd_kafka_fetch_states[] = {
40 "none",
41 "stopping",
42 "stopped",
43 "offset-query",
44 "offset-wait",
45 "active"
46 };
47
48
49 static rd_kafka_op_res_t
50 rd_kafka_toppar_op_serve (rd_kafka_t *rk,
51 rd_kafka_q_t *rkq, rd_kafka_op_t *rko,
52 rd_kafka_q_cb_type_t cb_type, void *opaque);
53
54 static void rd_kafka_toppar_offset_retry (rd_kafka_toppar_t *rktp,
55 int backoff_ms,
56 const char *reason);
57
58
59 static RD_INLINE int32_t
rd_kafka_toppar_version_new_barrier0(rd_kafka_toppar_t * rktp,const char * func,int line)60 rd_kafka_toppar_version_new_barrier0 (rd_kafka_toppar_t *rktp,
61 const char *func, int line) {
62 int32_t version = rd_atomic32_add(&rktp->rktp_version, 1);
63 rd_kafka_dbg(rktp->rktp_rkt->rkt_rk, TOPIC, "BARRIER",
64 "%s [%"PRId32"]: %s:%d: new version barrier v%"PRId32,
65 rktp->rktp_rkt->rkt_topic->str, rktp->rktp_partition,
66 func, line, version);
67 return version;
68 }
69
70 #define rd_kafka_toppar_version_new_barrier(rktp) \
71 rd_kafka_toppar_version_new_barrier0(rktp, __FUNCTION__, __LINE__)
72
73
74 /**
75 * Toppar based OffsetResponse handling.
76 * This is used for updating the low water mark for consumer lag.
77 */
rd_kafka_toppar_lag_handle_Offset(rd_kafka_t * rk,rd_kafka_broker_t * rkb,rd_kafka_resp_err_t err,rd_kafka_buf_t * rkbuf,rd_kafka_buf_t * request,void * opaque)78 static void rd_kafka_toppar_lag_handle_Offset (rd_kafka_t *rk,
79 rd_kafka_broker_t *rkb,
80 rd_kafka_resp_err_t err,
81 rd_kafka_buf_t *rkbuf,
82 rd_kafka_buf_t *request,
83 void *opaque) {
84 rd_kafka_toppar_t *rktp = opaque;
85 rd_kafka_topic_partition_list_t *offsets;
86 rd_kafka_topic_partition_t *rktpar;
87
88 offsets = rd_kafka_topic_partition_list_new(1);
89
90 /* Parse and return Offset */
91 err = rd_kafka_handle_ListOffsets(rk, rkb, err,
92 rkbuf, request, offsets, NULL);
93
94 if (err == RD_KAFKA_RESP_ERR__IN_PROGRESS) {
95 rd_kafka_topic_partition_list_destroy(offsets);
96 return; /* Retrying */
97 }
98
99 if (!err && !(rktpar = rd_kafka_topic_partition_list_find(
100 offsets,
101 rktp->rktp_rkt->rkt_topic->str,
102 rktp->rktp_partition)))
103 err = RD_KAFKA_RESP_ERR__UNKNOWN_PARTITION;
104
105 if (!err && !rktpar->err) {
106 rd_kafka_toppar_lock(rktp);
107 rktp->rktp_lo_offset = rktpar->offset;
108 rd_kafka_toppar_unlock(rktp);
109 }
110
111 rd_kafka_topic_partition_list_destroy(offsets);
112
113 rktp->rktp_wait_consumer_lag_resp = 0;
114
115 rd_kafka_toppar_destroy(rktp); /* from request.opaque */
116 }
117
118
119
120 /**
121 * Request information from broker to keep track of consumer lag.
122 *
123 * @locality toppar handle thread
124 * @locks none
125 */
rd_kafka_toppar_consumer_lag_req(rd_kafka_toppar_t * rktp)126 static void rd_kafka_toppar_consumer_lag_req (rd_kafka_toppar_t *rktp) {
127 rd_kafka_topic_partition_list_t *partitions;
128
129 if (rktp->rktp_wait_consumer_lag_resp)
130 return; /* Previous request not finished yet */
131
132 rd_kafka_toppar_lock(rktp);
133
134 /* Offset requests can only be sent to the leader replica.
135 *
136 * Note: If rktp is delegated to a preferred replica, it is
137 * certain that FETCH >= v5 and so rktp_lo_offset will be
138 * updated via LogStartOffset in the FETCH response.
139 */
140 if (!rktp->rktp_leader || (rktp->rktp_leader != rktp->rktp_broker)) {
141 rd_kafka_toppar_unlock(rktp);
142 return;
143 }
144
145 /* Also don't send a timed log start offset request if leader
146 * broker supports FETCH >= v5, since this will be set when
147 * doing fetch requests.
148 */
149 if (rd_kafka_broker_ApiVersion_supported(rktp->rktp_broker,
150 RD_KAFKAP_Fetch, 0,
151 5, NULL) == 5) {
152 rd_kafka_toppar_unlock(rktp);
153 return;
154 }
155
156 rktp->rktp_wait_consumer_lag_resp = 1;
157
158 partitions = rd_kafka_topic_partition_list_new(1);
159 rd_kafka_topic_partition_list_add(partitions,
160 rktp->rktp_rkt->rkt_topic->str,
161 rktp->rktp_partition)->offset =
162 RD_KAFKA_OFFSET_BEGINNING;
163
164 /* Ask for oldest offset. The newest offset is automatically
165 * propagated in FetchResponse.HighwaterMark. */
166 rd_kafka_ListOffsetsRequest(rktp->rktp_broker, partitions,
167 RD_KAFKA_REPLYQ(rktp->rktp_ops, 0),
168 rd_kafka_toppar_lag_handle_Offset,
169 rd_kafka_toppar_keep(rktp));
170
171 rd_kafka_toppar_unlock(rktp);
172
173 rd_kafka_topic_partition_list_destroy(partitions);
174 }
175
176
177
178 /**
179 * Request earliest offset for a partition
180 *
181 * Locality: toppar handler thread
182 */
rd_kafka_toppar_consumer_lag_tmr_cb(rd_kafka_timers_t * rkts,void * arg)183 static void rd_kafka_toppar_consumer_lag_tmr_cb (rd_kafka_timers_t *rkts,
184 void *arg) {
185 rd_kafka_toppar_t *rktp = arg;
186 rd_kafka_toppar_consumer_lag_req(rktp);
187 }
188
189 /**
190 * @brief Update rktp_op_version.
191 * Enqueue an RD_KAFKA_OP_BARRIER type of operation
192 * when the op_version is updated.
193 *
194 * @locks_required rd_kafka_toppar_lock() must be held.
195 * @locality Toppar handler thread
196 */
rd_kafka_toppar_op_version_bump(rd_kafka_toppar_t * rktp,int32_t version)197 void rd_kafka_toppar_op_version_bump (rd_kafka_toppar_t *rktp,
198 int32_t version) {
199 rd_kafka_op_t *rko;
200
201 rktp->rktp_op_version = version;
202 rko = rd_kafka_op_new(RD_KAFKA_OP_BARRIER);
203 rko->rko_version = version;
204 rd_kafka_q_enq(rktp->rktp_fetchq, rko);
205 }
206
207
208 /**
209 * Add new partition to topic.
210 *
211 * Locks: rd_kafka_topic_wrlock() must be held.
212 * Locks: rd_kafka_wrlock() must be held.
213 */
rd_kafka_toppar_new0(rd_kafka_topic_t * rkt,int32_t partition,const char * func,int line)214 rd_kafka_toppar_t *rd_kafka_toppar_new0 (rd_kafka_topic_t *rkt,
215 int32_t partition,
216 const char *func, int line) {
217 rd_kafka_toppar_t *rktp;
218
219 rktp = rd_calloc(1, sizeof(*rktp));
220
221 rktp->rktp_partition = partition;
222 rktp->rktp_rkt = rkt;
223 rktp->rktp_leader_id = -1;
224 rktp->rktp_broker_id = -1;
225 rd_interval_init(&rktp->rktp_lease_intvl);
226 rd_interval_init(&rktp->rktp_new_lease_intvl);
227 rd_interval_init(&rktp->rktp_new_lease_log_intvl);
228 rd_interval_init(&rktp->rktp_metadata_intvl);
229 /* Mark partition as unknown (does not exist) until we see the
230 * partition in topic metadata. */
231 if (partition != RD_KAFKA_PARTITION_UA)
232 rktp->rktp_flags |= RD_KAFKA_TOPPAR_F_UNKNOWN;
233 rktp->rktp_fetch_state = RD_KAFKA_TOPPAR_FETCH_NONE;
234 rktp->rktp_fetch_msg_max_bytes
235 = rkt->rkt_rk->rk_conf.fetch_msg_max_bytes;
236 rktp->rktp_offset_fp = NULL;
237 rd_kafka_offset_stats_reset(&rktp->rktp_offsets);
238 rd_kafka_offset_stats_reset(&rktp->rktp_offsets_fin);
239 rktp->rktp_ls_offset = RD_KAFKA_OFFSET_INVALID;
240 rktp->rktp_hi_offset = RD_KAFKA_OFFSET_INVALID;
241 rktp->rktp_lo_offset = RD_KAFKA_OFFSET_INVALID;
242 rktp->rktp_query_offset = RD_KAFKA_OFFSET_INVALID;
243 rktp->rktp_next_offset = RD_KAFKA_OFFSET_INVALID;
244 rktp->rktp_last_next_offset = RD_KAFKA_OFFSET_INVALID;
245 rktp->rktp_app_offset = RD_KAFKA_OFFSET_INVALID;
246 rktp->rktp_stored_offset = RD_KAFKA_OFFSET_INVALID;
247 rktp->rktp_committing_offset = RD_KAFKA_OFFSET_INVALID;
248 rktp->rktp_committed_offset = RD_KAFKA_OFFSET_INVALID;
249 rd_kafka_msgq_init(&rktp->rktp_msgq);
250 rd_kafka_msgq_init(&rktp->rktp_xmit_msgq);
251 mtx_init(&rktp->rktp_lock, mtx_plain);
252
253 rd_refcnt_init(&rktp->rktp_refcnt, 0);
254 rktp->rktp_fetchq = rd_kafka_q_new(rkt->rkt_rk);
255 rktp->rktp_ops = rd_kafka_q_new(rkt->rkt_rk);
256 rktp->rktp_ops->rkq_serve = rd_kafka_toppar_op_serve;
257 rktp->rktp_ops->rkq_opaque = rktp;
258 rd_atomic32_init(&rktp->rktp_version, 1);
259 rktp->rktp_op_version = rd_atomic32_get(&rktp->rktp_version);
260
261 rd_atomic32_init(&rktp->rktp_msgs_inflight, 0);
262 rd_kafka_pid_reset(&rktp->rktp_eos.pid);
263
264 /* Consumer: If statistics is available we query the log start offset
265 * of each partition.
266 * Since the oldest offset only moves on log retention, we cap this
267 * value on the low end to a reasonable value to avoid flooding
268 * the brokers with OffsetRequests when our statistics interval is low.
269 * FIXME: Use a global timer to collect offsets for all partitions
270 * FIXME: This timer is superfulous for FETCH >= v5 because the log
271 * start offset is included in fetch responses.
272 * */
273 if (rktp->rktp_rkt->rkt_rk->rk_conf.stats_interval_ms > 0 &&
274 rkt->rkt_rk->rk_type == RD_KAFKA_CONSUMER &&
275 rktp->rktp_partition != RD_KAFKA_PARTITION_UA) {
276 int intvl = rkt->rkt_rk->rk_conf.stats_interval_ms;
277 if (intvl < 10 * 1000 /* 10s */)
278 intvl = 10 * 1000;
279 rd_kafka_timer_start(&rkt->rkt_rk->rk_timers,
280 &rktp->rktp_consumer_lag_tmr,
281 intvl * 1000ll,
282 rd_kafka_toppar_consumer_lag_tmr_cb,
283 rktp);
284 }
285
286 rktp->rktp_rkt = rd_kafka_topic_keep(rkt);
287
288 rd_kafka_q_fwd_set(rktp->rktp_ops, rkt->rkt_rk->rk_ops);
289 rd_kafka_dbg(rkt->rkt_rk, TOPIC, "TOPPARNEW",
290 "NEW %s [%"PRId32"] %p refcnt %p (at %s:%d)",
291 rkt->rkt_topic->str, rktp->rktp_partition, rktp,
292 &rktp->rktp_refcnt,
293 func, line);
294
295 return rd_kafka_toppar_keep(rktp);
296 }
297
298
299
300 /**
301 * Removes a toppar from its duties, global lists, etc.
302 *
303 * Locks: rd_kafka_toppar_lock() MUST be held
304 */
rd_kafka_toppar_remove(rd_kafka_toppar_t * rktp)305 static void rd_kafka_toppar_remove (rd_kafka_toppar_t *rktp) {
306 rd_kafka_dbg(rktp->rktp_rkt->rkt_rk, TOPIC, "TOPPARREMOVE",
307 "Removing toppar %s [%"PRId32"] %p",
308 rktp->rktp_rkt->rkt_topic->str, rktp->rktp_partition,
309 rktp);
310
311 rd_kafka_timer_stop(&rktp->rktp_rkt->rkt_rk->rk_timers,
312 &rktp->rktp_offset_query_tmr, 1/*lock*/);
313 rd_kafka_timer_stop(&rktp->rktp_rkt->rkt_rk->rk_timers,
314 &rktp->rktp_consumer_lag_tmr, 1/*lock*/);
315
316 rd_kafka_q_fwd_set(rktp->rktp_ops, NULL);
317 }
318
319
320 /**
321 * Final destructor for partition.
322 */
rd_kafka_toppar_destroy_final(rd_kafka_toppar_t * rktp)323 void rd_kafka_toppar_destroy_final (rd_kafka_toppar_t *rktp) {
324
325 rd_kafka_toppar_remove(rktp);
326
327 rd_kafka_dbg(rktp->rktp_rkt->rkt_rk, TOPIC, "DESTROY",
328 "%s [%"PRId32"]: %p DESTROY_FINAL",
329 rktp->rktp_rkt->rkt_topic->str,
330 rktp->rktp_partition, rktp);
331
332 /* Clear queues */
333 rd_kafka_assert(rktp->rktp_rkt->rkt_rk,
334 rd_kafka_msgq_len(&rktp->rktp_xmit_msgq) == 0);
335 rd_kafka_dr_msgq(rktp->rktp_rkt, &rktp->rktp_msgq,
336 RD_KAFKA_RESP_ERR__DESTROY);
337 rd_kafka_q_destroy_owner(rktp->rktp_fetchq);
338 rd_kafka_q_destroy_owner(rktp->rktp_ops);
339
340 rd_kafka_replyq_destroy(&rktp->rktp_replyq);
341
342 rd_kafka_topic_destroy0(rktp->rktp_rkt);
343
344 mtx_destroy(&rktp->rktp_lock);
345
346 if (rktp->rktp_leader)
347 rd_kafka_broker_destroy(rktp->rktp_leader);
348
349 rd_refcnt_destroy(&rktp->rktp_refcnt);
350
351 rd_free(rktp);
352 }
353
354
355 /**
356 * Set toppar fetching state.
357 *
358 * Locality: broker thread
359 * Locks: rd_kafka_toppar_lock() MUST be held.
360 */
rd_kafka_toppar_set_fetch_state(rd_kafka_toppar_t * rktp,int fetch_state)361 void rd_kafka_toppar_set_fetch_state (rd_kafka_toppar_t *rktp,
362 int fetch_state) {
363 rd_kafka_assert(NULL,
364 thrd_is_current(rktp->rktp_rkt->rkt_rk->rk_thread));
365
366 if ((int)rktp->rktp_fetch_state == fetch_state)
367 return;
368
369 rd_kafka_dbg(rktp->rktp_rkt->rkt_rk, TOPIC, "PARTSTATE",
370 "Partition %.*s [%"PRId32"] changed fetch state %s -> %s",
371 RD_KAFKAP_STR_PR(rktp->rktp_rkt->rkt_topic),
372 rktp->rktp_partition,
373 rd_kafka_fetch_states[rktp->rktp_fetch_state],
374 rd_kafka_fetch_states[fetch_state]);
375
376 rktp->rktp_fetch_state = fetch_state;
377
378 if (fetch_state == RD_KAFKA_TOPPAR_FETCH_ACTIVE)
379 rd_kafka_dbg(rktp->rktp_rkt->rkt_rk,
380 CONSUMER|RD_KAFKA_DBG_TOPIC,
381 "FETCH",
382 "Partition %.*s [%"PRId32"] start fetching "
383 "at offset %s",
384 RD_KAFKAP_STR_PR(rktp->rktp_rkt->rkt_topic),
385 rktp->rktp_partition,
386 rd_kafka_offset2str(rktp->rktp_next_offset));
387 }
388
389
390 /**
391 * Returns the appropriate toppar for a given rkt and partition.
392 * The returned toppar has increased refcnt and must be unreffed by calling
393 * rd_kafka_toppar_destroy().
394 * May return NULL.
395 *
396 * If 'ua_on_miss' is true the UA (unassigned) toppar is returned if
397 * 'partition' was not known locally, else NULL is returned.
398 *
399 * Locks: Caller must hold rd_kafka_topic_*lock()
400 */
rd_kafka_toppar_get0(const char * func,int line,const rd_kafka_topic_t * rkt,int32_t partition,int ua_on_miss)401 rd_kafka_toppar_t *rd_kafka_toppar_get0 (const char *func, int line,
402 const rd_kafka_topic_t *rkt,
403 int32_t partition,
404 int ua_on_miss) {
405 rd_kafka_toppar_t *rktp;
406
407 if (partition >= 0 && partition < rkt->rkt_partition_cnt)
408 rktp = rkt->rkt_p[partition];
409 else if (partition == RD_KAFKA_PARTITION_UA || ua_on_miss)
410 rktp = rkt->rkt_ua;
411 else
412 return NULL;
413
414 if (rktp)
415 return rd_kafka_toppar_keep_fl(func, line, rktp);
416
417 return NULL;
418 }
419
420
421 /**
422 * Same as rd_kafka_toppar_get() but no need for locking and
423 * looks up the topic first.
424 *
425 * Locality: any
426 * Locks: none
427 */
rd_kafka_toppar_get2(rd_kafka_t * rk,const char * topic,int32_t partition,int ua_on_miss,int create_on_miss)428 rd_kafka_toppar_t *rd_kafka_toppar_get2 (rd_kafka_t *rk,
429 const char *topic,
430 int32_t partition,
431 int ua_on_miss,
432 int create_on_miss) {
433 rd_kafka_topic_t *rkt;
434 rd_kafka_toppar_t *rktp;
435
436 rd_kafka_wrlock(rk);
437
438 /* Find or create topic */
439 if (unlikely(!(rkt = rd_kafka_topic_find(rk, topic, 0/*no-lock*/)))) {
440 if (!create_on_miss) {
441 rd_kafka_wrunlock(rk);
442 return NULL;
443 }
444 rkt = rd_kafka_topic_new0(rk, topic, NULL,
445 NULL, 0/*no-lock*/);
446 if (!rkt) {
447 rd_kafka_wrunlock(rk);
448 rd_kafka_log(rk, LOG_ERR, "TOPIC",
449 "Failed to create local topic \"%s\": %s",
450 topic, rd_strerror(errno));
451 return NULL;
452 }
453 }
454
455 rd_kafka_wrunlock(rk);
456
457 rd_kafka_topic_wrlock(rkt);
458 rktp = rd_kafka_toppar_desired_add(rkt, partition);
459 rd_kafka_topic_wrunlock(rkt);
460
461 rd_kafka_topic_destroy0(rkt);
462
463 return rktp;
464 }
465
466
467 /**
468 * Returns a toppar if it is available in the cluster.
469 * '*errp' is set to the error-code if lookup fails.
470 *
471 * Locks: topic_*lock() MUST be held
472 */
473 rd_kafka_toppar_t *
rd_kafka_toppar_get_avail(const rd_kafka_topic_t * rkt,int32_t partition,int ua_on_miss,rd_kafka_resp_err_t * errp)474 rd_kafka_toppar_get_avail (const rd_kafka_topic_t *rkt,
475 int32_t partition, int ua_on_miss,
476 rd_kafka_resp_err_t *errp) {
477 rd_kafka_toppar_t *rktp;
478
479 switch (rkt->rkt_state)
480 {
481 case RD_KAFKA_TOPIC_S_UNKNOWN:
482 /* No metadata received from cluster yet.
483 * Put message in UA partition and re-run partitioner when
484 * cluster comes up. */
485 partition = RD_KAFKA_PARTITION_UA;
486 break;
487
488 case RD_KAFKA_TOPIC_S_NOTEXISTS:
489 /* Topic not found in cluster.
490 * Fail message immediately. */
491 *errp = RD_KAFKA_RESP_ERR__UNKNOWN_TOPIC;
492 return NULL;
493
494 case RD_KAFKA_TOPIC_S_ERROR:
495 /* Permanent topic error. */
496 *errp = rkt->rkt_err;
497 return NULL;
498
499 case RD_KAFKA_TOPIC_S_EXISTS:
500 /* Topic exists in cluster. */
501
502 /* Topic exists but has no partitions.
503 * This is usually an transient state following the
504 * auto-creation of a topic. */
505 if (unlikely(rkt->rkt_partition_cnt == 0)) {
506 partition = RD_KAFKA_PARTITION_UA;
507 break;
508 }
509
510 /* Check that partition exists. */
511 if (partition >= rkt->rkt_partition_cnt) {
512 *errp = RD_KAFKA_RESP_ERR__UNKNOWN_PARTITION;
513 return NULL;
514 }
515 break;
516
517 default:
518 rd_kafka_assert(rkt->rkt_rk, !*"NOTREACHED");
519 break;
520 }
521
522 /* Get new partition */
523 rktp = rd_kafka_toppar_get(rkt, partition, 0);
524
525 if (unlikely(!rktp)) {
526 /* Unknown topic or partition */
527 if (rkt->rkt_state == RD_KAFKA_TOPIC_S_NOTEXISTS)
528 *errp = RD_KAFKA_RESP_ERR__UNKNOWN_TOPIC;
529 else
530 *errp = RD_KAFKA_RESP_ERR__UNKNOWN_PARTITION;
531
532 return NULL;
533 }
534
535 return rktp;
536 }
537
538
539 /**
540 * Looks for partition 'i' in topic 'rkt's desired list.
541 *
542 * The desired partition list is the list of partitions that are desired
543 * (e.g., by the consumer) but not yet seen on a broker.
544 * As soon as the partition is seen on a broker the toppar is moved from
545 * the desired list and onto the normal rkt_p array.
546 * When the partition on the broker goes away a desired partition is put
547 * back on the desired list.
548 *
549 * Locks: rd_kafka_topic_*lock() must be held.
550 * Note: 'rktp' refcount is increased.
551 */
552
rd_kafka_toppar_desired_get(rd_kafka_topic_t * rkt,int32_t partition)553 rd_kafka_toppar_t *rd_kafka_toppar_desired_get (rd_kafka_topic_t *rkt,
554 int32_t partition) {
555 rd_kafka_toppar_t *rktp;
556 int i;
557
558 RD_LIST_FOREACH(rktp, &rkt->rkt_desp, i) {
559 if (rktp->rktp_partition == partition)
560 return rd_kafka_toppar_keep(rktp);
561 }
562
563 return NULL;
564 }
565
566
567 /**
568 * Link toppar on desired list.
569 *
570 * Locks: rd_kafka_topic_wrlock() and toppar_lock() must be held.
571 */
rd_kafka_toppar_desired_link(rd_kafka_toppar_t * rktp)572 void rd_kafka_toppar_desired_link (rd_kafka_toppar_t *rktp) {
573
574 if (rktp->rktp_flags & RD_KAFKA_TOPPAR_F_ON_DESP)
575 return; /* Already linked */
576
577 rd_kafka_toppar_keep(rktp);
578 rd_list_add(&rktp->rktp_rkt->rkt_desp, rktp);
579 rd_interval_reset(&rktp->rktp_rkt->rkt_desp_refresh_intvl);
580 rktp->rktp_flags |= RD_KAFKA_TOPPAR_F_ON_DESP;
581 }
582
583 /**
584 * Unlink toppar from desired list.
585 *
586 * Locks: rd_kafka_topic_wrlock() and toppar_lock() must be held.
587 */
rd_kafka_toppar_desired_unlink(rd_kafka_toppar_t * rktp)588 void rd_kafka_toppar_desired_unlink (rd_kafka_toppar_t *rktp) {
589 if (!(rktp->rktp_flags & RD_KAFKA_TOPPAR_F_ON_DESP))
590 return; /* Not linked */
591
592 rktp->rktp_flags &= ~RD_KAFKA_TOPPAR_F_ON_DESP;
593 rd_list_remove(&rktp->rktp_rkt->rkt_desp, rktp);
594 rd_interval_reset(&rktp->rktp_rkt->rkt_desp_refresh_intvl);
595 rd_kafka_toppar_destroy(rktp);
596 }
597
598
599 /**
600 * @brief If rktp is not already desired:
601 * - mark as DESIRED|~REMOVE
602 * - add to desired list if unknown
603 *
604 * @remark toppar_lock() MUST be held
605 */
rd_kafka_toppar_desired_add0(rd_kafka_toppar_t * rktp)606 void rd_kafka_toppar_desired_add0 (rd_kafka_toppar_t *rktp) {
607 if ((rktp->rktp_flags & RD_KAFKA_TOPPAR_F_DESIRED))
608 return;
609
610 rd_kafka_dbg(rktp->rktp_rkt->rkt_rk, TOPIC, "DESIRED",
611 "%s [%"PRId32"]: marking as DESIRED",
612 rktp->rktp_rkt->rkt_topic->str, rktp->rktp_partition);
613
614 /* If toppar was marked for removal this is no longer
615 * the case since the partition is now desired. */
616 rktp->rktp_flags &= ~RD_KAFKA_TOPPAR_F_REMOVE;
617
618 rktp->rktp_flags |= RD_KAFKA_TOPPAR_F_DESIRED;
619
620 if (rktp->rktp_flags & RD_KAFKA_TOPPAR_F_UNKNOWN) {
621 rd_kafka_dbg(rktp->rktp_rkt->rkt_rk, TOPIC, "DESIRED",
622 "%s [%"PRId32"]: adding to DESIRED list",
623 rktp->rktp_rkt->rkt_topic->str, rktp->rktp_partition);
624 rd_kafka_toppar_desired_link(rktp);
625 }
626 }
627
628
629 /**
630 * Adds 'partition' as a desired partition to topic 'rkt', or updates
631 * an existing partition to be desired.
632 *
633 * Locks: rd_kafka_topic_wrlock() must be held.
634 */
rd_kafka_toppar_desired_add(rd_kafka_topic_t * rkt,int32_t partition)635 rd_kafka_toppar_t *rd_kafka_toppar_desired_add (rd_kafka_topic_t *rkt,
636 int32_t partition) {
637 rd_kafka_toppar_t *rktp;
638
639 rktp = rd_kafka_toppar_get(rkt, partition, 0/*no_ua_on_miss*/);
640
641 if (!rktp)
642 rktp = rd_kafka_toppar_desired_get(rkt, partition);
643
644 if (!rktp)
645 rktp = rd_kafka_toppar_new(rkt, partition);
646
647 rd_kafka_toppar_lock(rktp);
648 rd_kafka_toppar_desired_add0(rktp);
649 rd_kafka_toppar_unlock(rktp);
650
651 return rktp; /* Callers refcount */
652 }
653
654
655
656
657 /**
658 * Unmarks an 'rktp' as desired.
659 *
660 * Locks: rd_kafka_topic_wrlock() and rd_kafka_toppar_lock() MUST be held.
661 */
rd_kafka_toppar_desired_del(rd_kafka_toppar_t * rktp)662 void rd_kafka_toppar_desired_del (rd_kafka_toppar_t *rktp) {
663
664 if (!(rktp->rktp_flags & RD_KAFKA_TOPPAR_F_DESIRED))
665 return;
666
667 rktp->rktp_flags &= ~RD_KAFKA_TOPPAR_F_DESIRED;
668 rd_kafka_toppar_desired_unlink(rktp);
669
670 rd_kafka_dbg(rktp->rktp_rkt->rkt_rk, TOPIC, "DESP",
671 "Removing (un)desired topic %s [%"PRId32"]",
672 rktp->rktp_rkt->rkt_topic->str, rktp->rktp_partition);
673
674 if (rktp->rktp_flags & RD_KAFKA_TOPPAR_F_UNKNOWN) {
675 /* If this partition does not exist in the cluster
676 * and is no longer desired, remove it. */
677 rd_kafka_toppar_broker_leave_for_remove(rktp);
678 }
679 }
680
681
682
683 /**
684 * Append message at tail of 'rktp' message queue.
685 */
rd_kafka_toppar_enq_msg(rd_kafka_toppar_t * rktp,rd_kafka_msg_t * rkm)686 void rd_kafka_toppar_enq_msg (rd_kafka_toppar_t *rktp, rd_kafka_msg_t *rkm) {
687 int queue_len;
688 rd_kafka_q_t *wakeup_q = NULL;
689
690 rd_kafka_toppar_lock(rktp);
691
692 if (!rkm->rkm_u.producer.msgid &&
693 rktp->rktp_partition != RD_KAFKA_PARTITION_UA)
694 rkm->rkm_u.producer.msgid = ++rktp->rktp_msgid;
695
696 if (rktp->rktp_partition == RD_KAFKA_PARTITION_UA ||
697 rktp->rktp_rkt->rkt_conf.queuing_strategy == RD_KAFKA_QUEUE_FIFO) {
698 /* No need for enq_sorted(), this is the oldest message. */
699 queue_len = rd_kafka_msgq_enq(&rktp->rktp_msgq, rkm);
700 } else {
701 queue_len = rd_kafka_msgq_enq_sorted(rktp->rktp_rkt,
702 &rktp->rktp_msgq, rkm);
703 }
704
705 if (unlikely(queue_len == 1 &&
706 (wakeup_q = rktp->rktp_msgq_wakeup_q)))
707 rd_kafka_q_keep(wakeup_q);
708
709 rd_kafka_toppar_unlock(rktp);
710
711 if (wakeup_q) {
712 rd_kafka_q_yield(wakeup_q);
713 rd_kafka_q_destroy(wakeup_q);
714 }
715 }
716
717
718 /**
719 * @brief Insert \p srcq before \p insert_before in \p destq.
720 *
721 * If \p srcq and \p destq overlaps only part of the \p srcq will be inserted.
722 *
723 * Upon return \p srcq will contain any remaining messages that require
724 * another insert position in \p destq.
725 */
726 static void
rd_kafka_msgq_insert_msgq_before(rd_kafka_msgq_t * destq,rd_kafka_msg_t * insert_before,rd_kafka_msgq_t * srcq,int (* cmp)(const void * a,const void * b))727 rd_kafka_msgq_insert_msgq_before (rd_kafka_msgq_t *destq,
728 rd_kafka_msg_t *insert_before,
729 rd_kafka_msgq_t *srcq,
730 int (*cmp) (const void *a, const void *b)) {
731 rd_kafka_msg_t *slast;
732 rd_kafka_msgq_t tmpq;
733
734 if (!insert_before) {
735 /* Append all of srcq to destq */
736 rd_kafka_msgq_concat(destq, srcq);
737 rd_kafka_msgq_verify_order(NULL, destq, 0, rd_false);
738 return;
739 }
740
741 slast = rd_kafka_msgq_last(srcq);
742 rd_dassert(slast);
743
744 if (cmp(slast, insert_before) > 0) {
745 rd_kafka_msg_t *new_sfirst;
746 int cnt;
747 int64_t bytes;
748
749 /* destq insert_before resides somewhere between
750 * srcq.first and srcq.last, find the first message in
751 * srcq that is > insert_before and split srcq into
752 * a left part that contains the messages to insert before
753 * insert_before, and a right part that will need another
754 * insert position. */
755
756 new_sfirst = rd_kafka_msgq_find_pos(srcq, NULL,
757 insert_before,
758 cmp, &cnt, &bytes);
759 rd_assert(new_sfirst);
760
761 /* split srcq into two parts using the divider message */
762 rd_kafka_msgq_split(srcq, &tmpq, new_sfirst, cnt, bytes);
763
764 rd_kafka_msgq_verify_order(NULL, srcq, 0, rd_false);
765 rd_kafka_msgq_verify_order(NULL, &tmpq, 0, rd_false);
766 } else {
767 rd_kafka_msgq_init(&tmpq);
768 }
769
770 /* srcq now contains messages up to the first message in destq,
771 * insert srcq at insert_before in destq. */
772 rd_dassert(!TAILQ_EMPTY(&destq->rkmq_msgs));
773 rd_dassert(!TAILQ_EMPTY(&srcq->rkmq_msgs));
774 TAILQ_INSERT_LIST_BEFORE(&destq->rkmq_msgs,
775 insert_before,
776 &srcq->rkmq_msgs,
777 rd_kafka_msgs_head_s,
778 rd_kafka_msg_t *,
779 rkm_link);
780 destq->rkmq_msg_cnt += srcq->rkmq_msg_cnt;
781 destq->rkmq_msg_bytes += srcq->rkmq_msg_bytes;
782 srcq->rkmq_msg_cnt = 0;
783 srcq->rkmq_msg_bytes = 0;
784
785 rd_kafka_msgq_verify_order(NULL, destq, 0, rd_false);
786 rd_kafka_msgq_verify_order(NULL, srcq, 0, rd_false);
787
788 /* tmpq contains the remaining messages in srcq, move it over. */
789 rd_kafka_msgq_move(srcq, &tmpq);
790
791 rd_kafka_msgq_verify_order(NULL, srcq, 0, rd_false);
792 }
793
794
795 /**
796 * @brief Insert all messages from \p srcq into \p destq in their sorted
797 * position (using \p cmp)
798 */
rd_kafka_msgq_insert_msgq(rd_kafka_msgq_t * destq,rd_kafka_msgq_t * srcq,int (* cmp)(const void * a,const void * b))799 void rd_kafka_msgq_insert_msgq (rd_kafka_msgq_t *destq,
800 rd_kafka_msgq_t *srcq,
801 int (*cmp) (const void *a, const void *b)) {
802 rd_kafka_msg_t *sfirst, *dlast, *start_pos = NULL;
803
804 if (unlikely(RD_KAFKA_MSGQ_EMPTY(srcq))) {
805 /* srcq is empty */
806 return;
807 }
808
809 if (unlikely(RD_KAFKA_MSGQ_EMPTY(destq))) {
810 /* destq is empty, simply move the srcq. */
811 rd_kafka_msgq_move(destq, srcq);
812 rd_kafka_msgq_verify_order(NULL, destq, 0, rd_false);
813 return;
814 }
815
816 /* Optimize insertion by bulk-moving messages in place.
817 * We know that:
818 * - destq is sorted but might not be continous (1,2,3,7)
819 * - srcq is sorted but might not be continous (4,5,6,8)
820 * - there migt be (multiple) overlaps between the two, e.g:
821 * destq = (1,2,3,7), srcq = (4,5,6,8)
822 * - there may be millions of messages.
823 */
824
825 rd_kafka_msgq_verify_order(NULL, destq, 0, rd_false);
826 rd_kafka_msgq_verify_order(NULL, srcq, 0, rd_false);
827
828 dlast = rd_kafka_msgq_last(destq);
829 sfirst = rd_kafka_msgq_first(srcq);
830
831 /* Most common case, all of srcq goes after destq */
832 if (likely(cmp(dlast, sfirst) < 0)) {
833 rd_kafka_msgq_concat(destq, srcq);
834
835 rd_kafka_msgq_verify_order(NULL, destq, 0, rd_false);
836
837 rd_assert(RD_KAFKA_MSGQ_EMPTY(srcq));
838 return;
839 }
840
841 /* Insert messages from srcq into destq in non-overlapping
842 * chunks until srcq is exhausted. */
843 while (likely(sfirst != NULL)) {
844 rd_kafka_msg_t *insert_before;
845
846 /* Get insert position in destq of first element in srcq */
847 insert_before = rd_kafka_msgq_find_pos(destq, start_pos,
848 sfirst, cmp,
849 NULL, NULL);
850
851 /* Insert as much of srcq as possible at insert_before */
852 rd_kafka_msgq_insert_msgq_before(destq, insert_before,
853 srcq, cmp);
854
855 /* Remember the current destq position so the next find_pos()
856 * does not have to re-scan destq and what was
857 * added from srcq. */
858 start_pos = insert_before;
859
860 /* For next iteration */
861 sfirst = rd_kafka_msgq_first(srcq);
862
863 rd_kafka_msgq_verify_order(NULL, destq, 0, rd_false);
864 rd_kafka_msgq_verify_order(NULL, srcq, 0, rd_false);
865 }
866
867 rd_kafka_msgq_verify_order(NULL, destq, 0, rd_false);
868
869 rd_assert(RD_KAFKA_MSGQ_EMPTY(srcq));
870 }
871
872
873 /**
874 * @brief Inserts messages from \p srcq according to their sorted position
875 * into \p destq, filtering out messages that can not be retried.
876 *
877 * @param incr_retry Increment retry count for messages.
878 * @param max_retries Maximum retries allowed per message.
879 * @param backoff Absolute retry backoff for retried messages.
880 *
881 * @returns 0 if all messages were retried, or 1 if some messages
882 * could not be retried.
883 */
rd_kafka_retry_msgq(rd_kafka_msgq_t * destq,rd_kafka_msgq_t * srcq,int incr_retry,int max_retries,rd_ts_t backoff,rd_kafka_msg_status_t status,int (* cmp)(const void * a,const void * b))884 int rd_kafka_retry_msgq (rd_kafka_msgq_t *destq,
885 rd_kafka_msgq_t *srcq,
886 int incr_retry, int max_retries, rd_ts_t backoff,
887 rd_kafka_msg_status_t status,
888 int (*cmp) (const void *a, const void *b)) {
889 rd_kafka_msgq_t retryable = RD_KAFKA_MSGQ_INITIALIZER(retryable);
890 rd_kafka_msg_t *rkm, *tmp;
891
892 /* Scan through messages to see which ones are eligible for retry,
893 * move the retryable ones to temporary queue and
894 * set backoff time for first message and optionally
895 * increase retry count for each message.
896 * Sorted insert is not necessary since the original order
897 * srcq order is maintained. */
898 TAILQ_FOREACH_SAFE(rkm, &srcq->rkmq_msgs, rkm_link, tmp) {
899 if (rkm->rkm_u.producer.retries + incr_retry > max_retries)
900 continue;
901
902 rd_kafka_msgq_deq(srcq, rkm, 1);
903 rd_kafka_msgq_enq(&retryable, rkm);
904
905 rkm->rkm_u.producer.ts_backoff = backoff;
906 rkm->rkm_u.producer.retries += incr_retry;
907
908 /* Don't downgrade a message from any form of PERSISTED
909 * to NOT_PERSISTED, since the original cause of indicating
910 * PERSISTED can't be changed.
911 * E.g., a previous ack or in-flight timeout. */
912 if (likely(!(status == RD_KAFKA_MSG_STATUS_NOT_PERSISTED &&
913 rkm->rkm_status !=
914 RD_KAFKA_MSG_STATUS_NOT_PERSISTED)))
915 rkm->rkm_status = status;
916 }
917
918 /* No messages are retryable */
919 if (RD_KAFKA_MSGQ_EMPTY(&retryable))
920 return 0;
921
922 /* Insert retryable list at sorted position */
923 rd_kafka_msgq_insert_msgq(destq, &retryable, cmp);
924
925 return 1;
926 }
927
928 /**
929 * @brief Inserts messages from \p rkmq according to their sorted position
930 * into the partition's message queue.
931 *
932 * @param incr_retry Increment retry count for messages.
933 * @param status Set status on each message.
934 *
935 * @returns 0 if all messages were retried, or 1 if some messages
936 * could not be retried.
937 *
938 * @locality Broker thread (but not necessarily the leader broker thread)
939 */
940
rd_kafka_toppar_retry_msgq(rd_kafka_toppar_t * rktp,rd_kafka_msgq_t * rkmq,int incr_retry,rd_kafka_msg_status_t status)941 int rd_kafka_toppar_retry_msgq (rd_kafka_toppar_t *rktp, rd_kafka_msgq_t *rkmq,
942 int incr_retry, rd_kafka_msg_status_t status) {
943 rd_kafka_t *rk = rktp->rktp_rkt->rkt_rk;
944 rd_ts_t backoff = rd_clock() + (rk->rk_conf.retry_backoff_ms * 1000);
945 int r;
946
947 if (rd_kafka_terminating(rk))
948 return 1;
949
950 rd_kafka_toppar_lock(rktp);
951 r = rd_kafka_retry_msgq(&rktp->rktp_msgq, rkmq,
952 incr_retry, rk->rk_conf.max_retries,
953 backoff, status,
954 rktp->rktp_rkt->rkt_conf.msg_order_cmp);
955 rd_kafka_toppar_unlock(rktp);
956
957 return r;
958 }
959
960 /**
961 * @brief Insert sorted message list \p rkmq at sorted position in \p rktp 's
962 * message queue. The queues must not overlap.
963 * @remark \p rkmq will be cleared.
964 */
rd_kafka_toppar_insert_msgq(rd_kafka_toppar_t * rktp,rd_kafka_msgq_t * rkmq)965 void rd_kafka_toppar_insert_msgq (rd_kafka_toppar_t *rktp,
966 rd_kafka_msgq_t *rkmq) {
967 rd_kafka_toppar_lock(rktp);
968 rd_kafka_msgq_insert_msgq(&rktp->rktp_msgq, rkmq,
969 rktp->rktp_rkt->rkt_conf.msg_order_cmp);
970 rd_kafka_toppar_unlock(rktp);
971 }
972
973
974
975 /**
976 * Helper method for purging queues when removing a toppar.
977 * Locks: rd_kafka_toppar_lock() MUST be held
978 */
rd_kafka_toppar_purge_and_disable_queues(rd_kafka_toppar_t * rktp)979 void rd_kafka_toppar_purge_and_disable_queues (rd_kafka_toppar_t *rktp) {
980 rd_kafka_q_disable(rktp->rktp_fetchq);
981 rd_kafka_q_purge(rktp->rktp_fetchq);
982 rd_kafka_q_disable(rktp->rktp_ops);
983 rd_kafka_q_purge(rktp->rktp_ops);
984 }
985
986
987 /**
988 * @brief Migrate rktp from (optional) \p old_rkb to (optional) \p new_rkb,
989 * but at least one is required to be non-NULL.
990 *
991 * This is an async operation.
992 *
993 * @locks rd_kafka_toppar_lock() MUST be held
994 */
rd_kafka_toppar_broker_migrate(rd_kafka_toppar_t * rktp,rd_kafka_broker_t * old_rkb,rd_kafka_broker_t * new_rkb)995 static void rd_kafka_toppar_broker_migrate (rd_kafka_toppar_t *rktp,
996 rd_kafka_broker_t *old_rkb,
997 rd_kafka_broker_t *new_rkb) {
998 rd_kafka_op_t *rko;
999 rd_kafka_broker_t *dest_rkb;
1000 int had_next_broker = rktp->rktp_next_broker ? 1 : 0;
1001
1002 rd_assert(old_rkb || new_rkb);
1003
1004 /* Update next broker */
1005 if (new_rkb)
1006 rd_kafka_broker_keep(new_rkb);
1007 if (rktp->rktp_next_broker)
1008 rd_kafka_broker_destroy(rktp->rktp_next_broker);
1009 rktp->rktp_next_broker = new_rkb;
1010
1011 /* If next_broker is set it means there is already an async
1012 * migration op going on and we should not send a new one
1013 * but simply change the next_broker (which we did above). */
1014 if (had_next_broker)
1015 return;
1016
1017 /* Revert from offset-wait state back to offset-query
1018 * prior to leaving the broker to avoid stalling
1019 * on the new broker waiting for a offset reply from
1020 * this old broker (that might not come and thus need
1021 * to time out..slowly) */
1022 if (rktp->rktp_fetch_state == RD_KAFKA_TOPPAR_FETCH_OFFSET_WAIT)
1023 rd_kafka_toppar_offset_retry(rktp, 500,
1024 "migrating to new broker");
1025
1026 if (old_rkb) {
1027 /* If there is an existing broker for this toppar we let it
1028 * first handle its own leave and then trigger the join for
1029 * the next broker, if any. */
1030 rko = rd_kafka_op_new(RD_KAFKA_OP_PARTITION_LEAVE);
1031 dest_rkb = old_rkb;
1032 } else {
1033 /* No existing broker, send join op directly to new broker. */
1034 rko = rd_kafka_op_new(RD_KAFKA_OP_PARTITION_JOIN);
1035 dest_rkb = new_rkb;
1036 }
1037
1038 rko->rko_rktp = rd_kafka_toppar_keep(rktp);
1039
1040 rd_kafka_dbg(rktp->rktp_rkt->rkt_rk, TOPIC, "BRKMIGR",
1041 "Migrating topic %.*s [%"PRId32"] %p from %s to %s "
1042 "(sending %s to %s)",
1043 RD_KAFKAP_STR_PR(rktp->rktp_rkt->rkt_topic),
1044 rktp->rktp_partition, rktp,
1045 old_rkb ? rd_kafka_broker_name(old_rkb) : "(none)",
1046 new_rkb ? rd_kafka_broker_name(new_rkb) : "(none)",
1047 rd_kafka_op2str(rko->rko_type),
1048 rd_kafka_broker_name(dest_rkb));
1049
1050 rd_kafka_q_enq(dest_rkb->rkb_ops, rko);
1051 }
1052
1053
1054 /**
1055 * Async toppar leave from broker.
1056 * Only use this when partitions are to be removed.
1057 *
1058 * Locks: rd_kafka_toppar_lock() MUST be held
1059 */
rd_kafka_toppar_broker_leave_for_remove(rd_kafka_toppar_t * rktp)1060 void rd_kafka_toppar_broker_leave_for_remove (rd_kafka_toppar_t *rktp) {
1061 rd_kafka_op_t *rko;
1062 rd_kafka_broker_t *dest_rkb;
1063
1064 rktp->rktp_flags |= RD_KAFKA_TOPPAR_F_REMOVE;
1065
1066 if (rktp->rktp_next_broker)
1067 dest_rkb = rktp->rktp_next_broker;
1068 else if (rktp->rktp_broker)
1069 dest_rkb = rktp->rktp_broker;
1070 else {
1071 rd_kafka_dbg(rktp->rktp_rkt->rkt_rk, TOPIC, "TOPPARDEL",
1072 "%.*s [%"PRId32"] %p not handled by any broker: "
1073 "not sending LEAVE for remove",
1074 RD_KAFKAP_STR_PR(rktp->rktp_rkt->rkt_topic),
1075 rktp->rktp_partition, rktp);
1076 return;
1077 }
1078
1079
1080 /* Revert from offset-wait state back to offset-query
1081 * prior to leaving the broker to avoid stalling
1082 * on the new broker waiting for a offset reply from
1083 * this old broker (that might not come and thus need
1084 * to time out..slowly) */
1085 if (rktp->rktp_fetch_state == RD_KAFKA_TOPPAR_FETCH_OFFSET_WAIT)
1086 rd_kafka_toppar_set_fetch_state(
1087 rktp, RD_KAFKA_TOPPAR_FETCH_OFFSET_QUERY);
1088
1089 rko = rd_kafka_op_new(RD_KAFKA_OP_PARTITION_LEAVE);
1090 rko->rko_rktp = rd_kafka_toppar_keep(rktp);
1091
1092 rd_kafka_dbg(rktp->rktp_rkt->rkt_rk, TOPIC, "BRKMIGR",
1093 "%.*s [%"PRId32"] %p sending final LEAVE for removal by %s",
1094 RD_KAFKAP_STR_PR(rktp->rktp_rkt->rkt_topic),
1095 rktp->rktp_partition, rktp,
1096 rd_kafka_broker_name(dest_rkb));
1097
1098 rd_kafka_q_enq(dest_rkb->rkb_ops, rko);
1099 }
1100
1101
1102 /**
1103 * @brief Delegates toppar 'rktp' to broker 'rkb'. 'rkb' may be NULL to
1104 * undelegate broker.
1105 *
1106 * @locks Caller must have rd_kafka_toppar_lock(rktp) held.
1107 */
rd_kafka_toppar_broker_delegate(rd_kafka_toppar_t * rktp,rd_kafka_broker_t * rkb)1108 void rd_kafka_toppar_broker_delegate (rd_kafka_toppar_t *rktp,
1109 rd_kafka_broker_t *rkb) {
1110 rd_kafka_t *rk = rktp->rktp_rkt->rkt_rk;
1111 int internal_fallback = 0;
1112
1113 rd_kafka_dbg(rktp->rktp_rkt->rkt_rk, TOPIC, "BRKDELGT",
1114 "%s [%"PRId32"]: delegate to broker %s "
1115 "(rktp %p, term %d, ref %d)",
1116 rktp->rktp_rkt->rkt_topic->str, rktp->rktp_partition,
1117 rkb ? rkb->rkb_name : "(none)",
1118 rktp, rd_kafka_terminating(rk),
1119 rd_refcnt_get(&rktp->rktp_refcnt));
1120
1121 /* Undelegated toppars are delgated to the internal
1122 * broker for bookkeeping. */
1123 if (!rkb && !rd_kafka_terminating(rk)) {
1124 rkb = rd_kafka_broker_internal(rk);
1125 internal_fallback = 1;
1126 }
1127
1128 if (rktp->rktp_broker == rkb && !rktp->rktp_next_broker) {
1129 rd_kafka_dbg(rktp->rktp_rkt->rkt_rk, TOPIC, "BRKDELGT",
1130 "%.*s [%"PRId32"]: not updating broker: "
1131 "already on correct broker %s",
1132 RD_KAFKAP_STR_PR(rktp->rktp_rkt->rkt_topic),
1133 rktp->rktp_partition,
1134 rkb ? rd_kafka_broker_name(rkb) : "(none)");
1135
1136 if (internal_fallback)
1137 rd_kafka_broker_destroy(rkb);
1138 return;
1139 }
1140
1141 if (rktp->rktp_broker)
1142 rd_kafka_dbg(rktp->rktp_rkt->rkt_rk, TOPIC, "BRKDELGT",
1143 "%.*s [%"PRId32"]: no longer delegated to "
1144 "broker %s",
1145 RD_KAFKAP_STR_PR(rktp->rktp_rkt->rkt_topic),
1146 rktp->rktp_partition,
1147 rd_kafka_broker_name(rktp->rktp_broker));
1148
1149
1150 if (rkb) {
1151 rd_kafka_dbg(rktp->rktp_rkt->rkt_rk, TOPIC, "BRKDELGT",
1152 "%.*s [%"PRId32"]: delegating to broker %s "
1153 "for partition with %i messages "
1154 "(%"PRIu64" bytes) queued",
1155 RD_KAFKAP_STR_PR(rktp->rktp_rkt->rkt_topic),
1156 rktp->rktp_partition,
1157 rd_kafka_broker_name(rkb),
1158 rktp->rktp_msgq.rkmq_msg_cnt,
1159 rktp->rktp_msgq.rkmq_msg_bytes);
1160
1161
1162 } else {
1163 rd_kafka_dbg(rktp->rktp_rkt->rkt_rk, TOPIC, "BRKDELGT",
1164 "%.*s [%"PRId32"]: no broker delegated",
1165 RD_KAFKAP_STR_PR(rktp->rktp_rkt->rkt_topic),
1166 rktp->rktp_partition);
1167 }
1168
1169 if (rktp->rktp_broker || rkb)
1170 rd_kafka_toppar_broker_migrate(rktp, rktp->rktp_broker, rkb);
1171
1172 if (internal_fallback)
1173 rd_kafka_broker_destroy(rkb);
1174 }
1175
1176
1177
1178
1179
1180 void
rd_kafka_toppar_offset_commit_result(rd_kafka_toppar_t * rktp,rd_kafka_resp_err_t err,rd_kafka_topic_partition_list_t * offsets)1181 rd_kafka_toppar_offset_commit_result (rd_kafka_toppar_t *rktp,
1182 rd_kafka_resp_err_t err,
1183 rd_kafka_topic_partition_list_t *offsets){
1184 if (err)
1185 rd_kafka_consumer_err(rktp->rktp_fetchq,
1186 /* FIXME: propagate broker_id */
1187 RD_KAFKA_NODEID_UA,
1188 err, 0 /* FIXME:VERSION*/,
1189 NULL, rktp, RD_KAFKA_OFFSET_INVALID,
1190 "Offset commit failed: %s",
1191 rd_kafka_err2str(err));
1192
1193 rd_kafka_toppar_lock(rktp);
1194 if (!err)
1195 rktp->rktp_committed_offset = offsets->elems[0].offset;
1196
1197 /* When stopping toppars:
1198 * Final commit is now done (or failed), propagate. */
1199 if (rktp->rktp_fetch_state == RD_KAFKA_TOPPAR_FETCH_STOPPING)
1200 rd_kafka_toppar_fetch_stopped(rktp, err);
1201
1202 rd_kafka_toppar_unlock(rktp);
1203 }
1204
1205
1206
1207
1208
1209
1210
1211
1212
1213
1214
1215 /**
1216 * Handle the next offset to consume for a toppar.
1217 * This is used during initial setup when trying to figure out what
1218 * offset to start consuming from.
1219 *
1220 * Locality: toppar handler thread.
1221 * Locks: toppar_lock(rktp) must be held
1222 */
rd_kafka_toppar_next_offset_handle(rd_kafka_toppar_t * rktp,int64_t Offset)1223 void rd_kafka_toppar_next_offset_handle (rd_kafka_toppar_t *rktp,
1224 int64_t Offset) {
1225
1226 if (RD_KAFKA_OFFSET_IS_LOGICAL(Offset)) {
1227 /* Offset storage returned logical offset (e.g. "end"),
1228 * look it up. */
1229
1230 /* Save next offset, even if logical, so that e.g.,
1231 * assign(BEGINNING) survives a pause+resume, etc.
1232 * See issue #2105. */
1233 rktp->rktp_next_offset = Offset;
1234
1235 rd_kafka_offset_reset(rktp, Offset, RD_KAFKA_RESP_ERR_NO_ERROR,
1236 "update");
1237 return;
1238 }
1239
1240 /* Adjust by TAIL count if, if wanted */
1241 if (rktp->rktp_query_offset <=
1242 RD_KAFKA_OFFSET_TAIL_BASE) {
1243 int64_t orig_Offset = Offset;
1244 int64_t tail_cnt =
1245 llabs(rktp->rktp_query_offset -
1246 RD_KAFKA_OFFSET_TAIL_BASE);
1247
1248 if (tail_cnt > Offset)
1249 Offset = 0;
1250 else
1251 Offset -= tail_cnt;
1252
1253 rd_kafka_dbg(rktp->rktp_rkt->rkt_rk, TOPIC, "OFFSET",
1254 "OffsetReply for topic %s [%"PRId32"]: "
1255 "offset %"PRId64": adjusting for "
1256 "OFFSET_TAIL(%"PRId64"): "
1257 "effective offset %"PRId64,
1258 rktp->rktp_rkt->rkt_topic->str,
1259 rktp->rktp_partition,
1260 orig_Offset, tail_cnt,
1261 Offset);
1262 }
1263
1264 rktp->rktp_next_offset = Offset;
1265
1266 rd_kafka_toppar_set_fetch_state(rktp, RD_KAFKA_TOPPAR_FETCH_ACTIVE);
1267
1268 /* Wake-up broker thread which might be idling on IO */
1269 if (rktp->rktp_broker)
1270 rd_kafka_broker_wakeup(rktp->rktp_broker);
1271
1272 }
1273
1274
1275
1276 /**
1277 * Fetch committed offset for a single partition. (simple consumer)
1278 *
1279 * Locality: toppar thread
1280 */
rd_kafka_toppar_offset_fetch(rd_kafka_toppar_t * rktp,rd_kafka_replyq_t replyq)1281 void rd_kafka_toppar_offset_fetch (rd_kafka_toppar_t *rktp,
1282 rd_kafka_replyq_t replyq) {
1283 rd_kafka_t *rk = rktp->rktp_rkt->rkt_rk;
1284 rd_kafka_topic_partition_list_t *part;
1285 rd_kafka_op_t *rko;
1286
1287 rd_kafka_dbg(rk, TOPIC, "OFFSETREQ",
1288 "Partition %.*s [%"PRId32"]: querying cgrp for "
1289 "committed offset (opv %d)",
1290 RD_KAFKAP_STR_PR(rktp->rktp_rkt->rkt_topic),
1291 rktp->rktp_partition, replyq.version);
1292
1293 part = rd_kafka_topic_partition_list_new(1);
1294 rd_kafka_topic_partition_list_add0(__FUNCTION__,__LINE__,part,
1295 rktp->rktp_rkt->rkt_topic->str,
1296 rktp->rktp_partition,
1297 rktp);
1298
1299 rko = rd_kafka_op_new(RD_KAFKA_OP_OFFSET_FETCH);
1300 rko->rko_rktp = rd_kafka_toppar_keep(rktp);
1301 rko->rko_replyq = replyq;
1302
1303 rko->rko_u.offset_fetch.partitions = part;
1304 rko->rko_u.offset_fetch.require_stable =
1305 rk->rk_conf.isolation_level == RD_KAFKA_READ_COMMITTED;
1306 rko->rko_u.offset_fetch.do_free = 1;
1307
1308 rd_kafka_q_enq(rktp->rktp_cgrp->rkcg_ops, rko);
1309 }
1310
1311
1312
1313
1314 /**
1315 * Toppar based OffsetResponse handling.
1316 * This is used for finding the next offset to Fetch.
1317 *
1318 * Locality: toppar handler thread
1319 */
rd_kafka_toppar_handle_Offset(rd_kafka_t * rk,rd_kafka_broker_t * rkb,rd_kafka_resp_err_t err,rd_kafka_buf_t * rkbuf,rd_kafka_buf_t * request,void * opaque)1320 static void rd_kafka_toppar_handle_Offset (rd_kafka_t *rk,
1321 rd_kafka_broker_t *rkb,
1322 rd_kafka_resp_err_t err,
1323 rd_kafka_buf_t *rkbuf,
1324 rd_kafka_buf_t *request,
1325 void *opaque) {
1326 rd_kafka_toppar_t *rktp = opaque;
1327 rd_kafka_topic_partition_list_t *offsets;
1328 rd_kafka_topic_partition_t *rktpar;
1329 int64_t Offset;
1330 int actions = 0;
1331
1332 rd_kafka_toppar_lock(rktp);
1333 /* Drop reply from previous partition leader */
1334 if (err != RD_KAFKA_RESP_ERR__DESTROY && rktp->rktp_broker != rkb)
1335 err = RD_KAFKA_RESP_ERR__OUTDATED;
1336 rd_kafka_toppar_unlock(rktp);
1337
1338 offsets = rd_kafka_topic_partition_list_new(1);
1339
1340 rd_rkb_dbg(rkb, TOPIC, "OFFSET",
1341 "Offset reply for "
1342 "topic %.*s [%"PRId32"] (v%d vs v%d)",
1343 RD_KAFKAP_STR_PR(rktp->rktp_rkt->rkt_topic),
1344 rktp->rktp_partition, request->rkbuf_replyq.version,
1345 rktp->rktp_op_version);
1346
1347 rd_dassert(request->rkbuf_replyq.version > 0);
1348 if (err != RD_KAFKA_RESP_ERR__DESTROY &&
1349 rd_kafka_buf_version_outdated(request, rktp->rktp_op_version)) {
1350 /* Outdated request response, ignore. */
1351 err = RD_KAFKA_RESP_ERR__OUTDATED;
1352 }
1353
1354 /* Parse and return Offset */
1355 if (err != RD_KAFKA_RESP_ERR__OUTDATED)
1356 err = rd_kafka_handle_ListOffsets(rk, rkb, err,
1357 rkbuf, request, offsets,
1358 &actions);
1359
1360 if (!err &&
1361 !(rktpar = rd_kafka_topic_partition_list_find(
1362 offsets,
1363 rktp->rktp_rkt->rkt_topic->str,
1364 rktp->rktp_partition))) {
1365 /* Request partition not found in response */
1366 err = RD_KAFKA_RESP_ERR__UNKNOWN_PARTITION;
1367 actions |= RD_KAFKA_ERR_ACTION_PERMANENT;
1368 }
1369
1370 if (err) {
1371 rd_rkb_dbg(rkb, TOPIC, "OFFSET",
1372 "Offset reply error for "
1373 "topic %.*s [%"PRId32"] (v%d, %s): %s",
1374 RD_KAFKAP_STR_PR(rktp->rktp_rkt->rkt_topic),
1375 rktp->rktp_partition, request->rkbuf_replyq.version,
1376 rd_kafka_err2str(err),
1377 rd_kafka_actions2str(actions));
1378
1379 rd_kafka_topic_partition_list_destroy(offsets);
1380
1381 if (err == RD_KAFKA_RESP_ERR__DESTROY ||
1382 err == RD_KAFKA_RESP_ERR__OUTDATED) {
1383 /* Termination or outdated, quick cleanup. */
1384
1385 if (err == RD_KAFKA_RESP_ERR__OUTDATED) {
1386 rd_kafka_toppar_lock(rktp);
1387 rd_kafka_toppar_offset_retry(
1388 rktp, 500, "outdated offset response");
1389 rd_kafka_toppar_unlock(rktp);
1390 }
1391
1392 /* from request.opaque */
1393 rd_kafka_toppar_destroy(rktp);
1394 return;
1395
1396 } else if (err == RD_KAFKA_RESP_ERR__IN_PROGRESS)
1397 return; /* Retry in progress */
1398
1399
1400 rd_kafka_toppar_lock(rktp);
1401
1402 if (!(actions & (RD_KAFKA_ERR_ACTION_RETRY|
1403 RD_KAFKA_ERR_ACTION_REFRESH))) {
1404 /* Permanent error. Trigger auto.offset.reset policy
1405 * and signal error back to application. */
1406
1407 rd_kafka_offset_reset(rktp, rktp->rktp_query_offset,
1408 err,
1409 "failed to query logical offset");
1410
1411 rd_kafka_consumer_err(
1412 rktp->rktp_fetchq, rkb->rkb_nodeid,
1413 err, 0, NULL, rktp,
1414 (rktp->rktp_query_offset <=
1415 RD_KAFKA_OFFSET_TAIL_BASE ?
1416 rktp->rktp_query_offset -
1417 RD_KAFKA_OFFSET_TAIL_BASE :
1418 rktp->rktp_query_offset),
1419 "Failed to query logical offset %s: %s",
1420 rd_kafka_offset2str(rktp->rktp_query_offset),
1421 rd_kafka_err2str(err));
1422
1423 } else {
1424 /* Temporary error. Schedule retry. */
1425 char tmp[256];
1426
1427 rd_snprintf(tmp, sizeof(tmp),
1428 "failed to query logical offset %s: %s",
1429 rd_kafka_offset2str(
1430 rktp->rktp_query_offset),
1431 rd_kafka_err2str(err));
1432
1433 rd_kafka_toppar_offset_retry(rktp, 500, tmp);
1434 }
1435
1436 rd_kafka_toppar_unlock(rktp);
1437
1438 rd_kafka_toppar_destroy(rktp); /* from request.opaque */
1439 return;
1440 }
1441
1442 Offset = rktpar->offset;
1443 rd_kafka_topic_partition_list_destroy(offsets);
1444
1445 rd_kafka_toppar_lock(rktp);
1446 rd_kafka_dbg(rktp->rktp_rkt->rkt_rk, TOPIC, "OFFSET",
1447 "Offset %s request for %.*s [%"PRId32"] "
1448 "returned offset %s (%"PRId64")",
1449 rd_kafka_offset2str(rktp->rktp_query_offset),
1450 RD_KAFKAP_STR_PR(rktp->rktp_rkt->rkt_topic),
1451 rktp->rktp_partition, rd_kafka_offset2str(Offset), Offset);
1452
1453 rd_kafka_toppar_next_offset_handle(rktp, Offset);
1454 rd_kafka_toppar_unlock(rktp);
1455
1456 rd_kafka_toppar_destroy(rktp); /* from request.opaque */
1457 }
1458
1459
1460 /**
1461 * @brief An Offset fetch failed (for whatever reason) in
1462 * the RD_KAFKA_TOPPAR_FETCH_OFFSET_WAIT state:
1463 * set the state back to FETCH_OFFSET_QUERY and start the
1464 * offset_query_tmr to trigger a new request eventually.
1465 *
1466 * @locality toppar handler thread
1467 * @locks toppar_lock() MUST be held
1468 */
rd_kafka_toppar_offset_retry(rd_kafka_toppar_t * rktp,int backoff_ms,const char * reason)1469 static void rd_kafka_toppar_offset_retry (rd_kafka_toppar_t *rktp,
1470 int backoff_ms,
1471 const char *reason) {
1472 rd_ts_t tmr_next;
1473 int restart_tmr;
1474
1475 /* (Re)start timer if not started or the current timeout
1476 * is larger than \p backoff_ms. */
1477 tmr_next = rd_kafka_timer_next(&rktp->rktp_rkt->rkt_rk->rk_timers,
1478 &rktp->rktp_offset_query_tmr, 1);
1479
1480 restart_tmr = (tmr_next == -1 ||
1481 tmr_next > rd_clock() + (backoff_ms * 1000ll));
1482
1483 rd_kafka_dbg(rktp->rktp_rkt->rkt_rk, TOPIC, "OFFSET",
1484 "%s [%"PRId32"]: %s: %s for offset %s",
1485 rktp->rktp_rkt->rkt_topic->str,
1486 rktp->rktp_partition,
1487 reason,
1488 restart_tmr ?
1489 "(re)starting offset query timer" :
1490 "offset query timer already scheduled",
1491 rd_kafka_offset2str(rktp->rktp_query_offset));
1492
1493 rd_kafka_toppar_set_fetch_state(rktp,
1494 RD_KAFKA_TOPPAR_FETCH_OFFSET_QUERY);
1495
1496 if (restart_tmr)
1497 rd_kafka_timer_start(&rktp->rktp_rkt->rkt_rk->rk_timers,
1498 &rktp->rktp_offset_query_tmr,
1499 backoff_ms*1000ll,
1500 rd_kafka_offset_query_tmr_cb, rktp);
1501 }
1502
1503
1504
1505 /**
1506 * Send OffsetRequest for toppar.
1507 *
1508 * If \p backoff_ms is non-zero only the query timer is started,
1509 * otherwise a query is triggered directly.
1510 *
1511 * Locality: toppar handler thread
1512 * Locks: toppar_lock() must be held
1513 */
rd_kafka_toppar_offset_request(rd_kafka_toppar_t * rktp,int64_t query_offset,int backoff_ms)1514 void rd_kafka_toppar_offset_request (rd_kafka_toppar_t *rktp,
1515 int64_t query_offset, int backoff_ms) {
1516 rd_kafka_broker_t *rkb;
1517
1518 rd_kafka_assert(NULL,
1519 thrd_is_current(rktp->rktp_rkt->rkt_rk->rk_thread));
1520
1521 rkb = rktp->rktp_broker;
1522
1523 if (!backoff_ms && (!rkb || rkb->rkb_source == RD_KAFKA_INTERNAL))
1524 backoff_ms = 500;
1525
1526 if (backoff_ms) {
1527 rd_kafka_toppar_offset_retry(rktp, backoff_ms,
1528 !rkb ?
1529 "no current leader for partition":
1530 "backoff");
1531 return;
1532 }
1533
1534
1535 rd_kafka_timer_stop(&rktp->rktp_rkt->rkt_rk->rk_timers,
1536 &rktp->rktp_offset_query_tmr, 1/*lock*/);
1537
1538
1539 if (query_offset == RD_KAFKA_OFFSET_STORED &&
1540 rktp->rktp_rkt->rkt_conf.offset_store_method ==
1541 RD_KAFKA_OFFSET_METHOD_BROKER) {
1542 /*
1543 * Get stored offset from broker based storage:
1544 * ask cgrp manager for offsets
1545 */
1546 rd_kafka_toppar_offset_fetch(
1547 rktp,
1548 RD_KAFKA_REPLYQ(rktp->rktp_ops,
1549 rktp->rktp_op_version));
1550
1551 } else {
1552 rd_kafka_topic_partition_list_t *offsets;
1553
1554 /*
1555 * Look up logical offset (end,beginning,tail,..)
1556 */
1557
1558 rd_rkb_dbg(rkb, TOPIC, "OFFREQ",
1559 "Partition %.*s [%"PRId32"]: querying for logical "
1560 "offset %s (opv %d)",
1561 RD_KAFKAP_STR_PR(rktp->rktp_rkt->rkt_topic),
1562 rktp->rktp_partition,
1563 rd_kafka_offset2str(query_offset),
1564 rktp->rktp_op_version);
1565
1566 rd_kafka_toppar_keep(rktp); /* refcnt for OffsetRequest opaque*/
1567
1568 if (query_offset <= RD_KAFKA_OFFSET_TAIL_BASE)
1569 query_offset = RD_KAFKA_OFFSET_END;
1570
1571 offsets = rd_kafka_topic_partition_list_new(1);
1572 rd_kafka_topic_partition_list_add(
1573 offsets,
1574 rktp->rktp_rkt->rkt_topic->str,
1575 rktp->rktp_partition)->offset = query_offset;
1576
1577 rd_kafka_ListOffsetsRequest(
1578 rkb, offsets,
1579 RD_KAFKA_REPLYQ(rktp->rktp_ops,
1580 rktp->rktp_op_version),
1581 rd_kafka_toppar_handle_Offset,
1582 rktp);
1583
1584 rd_kafka_topic_partition_list_destroy(offsets);
1585 }
1586
1587 rd_kafka_toppar_set_fetch_state(rktp,
1588 RD_KAFKA_TOPPAR_FETCH_OFFSET_WAIT);
1589 }
1590
1591
1592 /**
1593 * Start fetching toppar.
1594 *
1595 * Locality: toppar handler thread
1596 * Locks: none
1597 */
rd_kafka_toppar_fetch_start(rd_kafka_toppar_t * rktp,int64_t offset,rd_kafka_op_t * rko_orig)1598 static void rd_kafka_toppar_fetch_start (rd_kafka_toppar_t *rktp,
1599 int64_t offset,
1600 rd_kafka_op_t *rko_orig) {
1601 rd_kafka_cgrp_t *rkcg = rko_orig->rko_u.fetch_start.rkcg;
1602 rd_kafka_resp_err_t err = 0;
1603 int32_t version = rko_orig->rko_version;
1604
1605 rd_kafka_toppar_lock(rktp);
1606
1607 rd_kafka_dbg(rktp->rktp_rkt->rkt_rk, TOPIC, "FETCH",
1608 "Start fetch for %.*s [%"PRId32"] in "
1609 "state %s at offset %s (v%"PRId32")",
1610 RD_KAFKAP_STR_PR(rktp->rktp_rkt->rkt_topic),
1611 rktp->rktp_partition,
1612 rd_kafka_fetch_states[rktp->rktp_fetch_state],
1613 rd_kafka_offset2str(offset), version);
1614
1615 if (rktp->rktp_fetch_state == RD_KAFKA_TOPPAR_FETCH_STOPPING) {
1616 err = RD_KAFKA_RESP_ERR__PREV_IN_PROGRESS;
1617 rd_kafka_toppar_unlock(rktp);
1618 goto err_reply;
1619 }
1620
1621 rd_kafka_toppar_op_version_bump(rktp, version);
1622
1623 if (rkcg) {
1624 rd_kafka_assert(rktp->rktp_rkt->rkt_rk, !rktp->rktp_cgrp);
1625 /* Attach toppar to cgrp */
1626 rktp->rktp_cgrp = rkcg;
1627 rd_kafka_cgrp_op(rkcg, rktp, RD_KAFKA_NO_REPLYQ,
1628 RD_KAFKA_OP_PARTITION_JOIN, 0);
1629 }
1630
1631
1632 if (offset == RD_KAFKA_OFFSET_BEGINNING ||
1633 offset == RD_KAFKA_OFFSET_END ||
1634 offset <= RD_KAFKA_OFFSET_TAIL_BASE) {
1635 rd_kafka_toppar_next_offset_handle(rktp, offset);
1636
1637 } else if (offset == RD_KAFKA_OFFSET_STORED) {
1638 rd_kafka_offset_store_init(rktp);
1639
1640 } else if (offset == RD_KAFKA_OFFSET_INVALID) {
1641 rd_kafka_offset_reset(rktp, offset,
1642 RD_KAFKA_RESP_ERR__NO_OFFSET,
1643 "no previously committed offset "
1644 "available");
1645
1646 } else {
1647 rktp->rktp_next_offset = offset;
1648 rd_kafka_toppar_set_fetch_state(rktp,
1649 RD_KAFKA_TOPPAR_FETCH_ACTIVE);
1650
1651 /* Wake-up broker thread which might be idling on IO */
1652 if (rktp->rktp_broker)
1653 rd_kafka_broker_wakeup(rktp->rktp_broker);
1654
1655 }
1656
1657 rktp->rktp_offsets_fin.eof_offset = RD_KAFKA_OFFSET_INVALID;
1658
1659 rd_kafka_toppar_unlock(rktp);
1660
1661 /* Signal back to caller thread that start has commenced, or err */
1662 err_reply:
1663 if (rko_orig->rko_replyq.q) {
1664 rd_kafka_op_t *rko;
1665
1666 rko = rd_kafka_op_new(RD_KAFKA_OP_FETCH_START);
1667
1668 rko->rko_err = err;
1669 rko->rko_rktp = rd_kafka_toppar_keep(rktp);
1670
1671 rd_kafka_replyq_enq(&rko_orig->rko_replyq, rko, 0);
1672 }
1673 }
1674
1675
1676
1677
1678 /**
1679 * Mark toppar's fetch state as stopped (all decommissioning is done,
1680 * offsets are stored, etc).
1681 *
1682 * Locality: toppar handler thread
1683 * Locks: toppar_lock(rktp) MUST be held
1684 */
rd_kafka_toppar_fetch_stopped(rd_kafka_toppar_t * rktp,rd_kafka_resp_err_t err)1685 void rd_kafka_toppar_fetch_stopped (rd_kafka_toppar_t *rktp,
1686 rd_kafka_resp_err_t err) {
1687
1688
1689 rd_kafka_toppar_set_fetch_state(rktp, RD_KAFKA_TOPPAR_FETCH_STOPPED);
1690
1691 rktp->rktp_app_offset = RD_KAFKA_OFFSET_INVALID;
1692
1693 if (rktp->rktp_cgrp) {
1694 /* Detach toppar from cgrp */
1695 rd_kafka_cgrp_op(rktp->rktp_cgrp, rktp, RD_KAFKA_NO_REPLYQ,
1696 RD_KAFKA_OP_PARTITION_LEAVE, 0);
1697 rktp->rktp_cgrp = NULL;
1698 }
1699
1700 /* Signal back to application thread that stop is done. */
1701 if (rktp->rktp_replyq.q) {
1702 rd_kafka_op_t *rko;
1703 rko = rd_kafka_op_new(RD_KAFKA_OP_FETCH_STOP|RD_KAFKA_OP_REPLY);
1704 rko->rko_err = err;
1705 rko->rko_rktp = rd_kafka_toppar_keep(rktp);
1706
1707 rd_kafka_replyq_enq(&rktp->rktp_replyq, rko, 0);
1708 }
1709 }
1710
1711
1712 /**
1713 * Stop toppar fetcher.
1714 * This is usually an async operation.
1715 *
1716 * Locality: toppar handler thread
1717 */
rd_kafka_toppar_fetch_stop(rd_kafka_toppar_t * rktp,rd_kafka_op_t * rko_orig)1718 void rd_kafka_toppar_fetch_stop (rd_kafka_toppar_t *rktp,
1719 rd_kafka_op_t *rko_orig) {
1720 int32_t version = rko_orig->rko_version;
1721
1722 rd_kafka_toppar_lock(rktp);
1723
1724 rd_kafka_dbg(rktp->rktp_rkt->rkt_rk, TOPIC, "FETCH",
1725 "Stopping fetch for %.*s [%"PRId32"] in state %s (v%d)",
1726 RD_KAFKAP_STR_PR(rktp->rktp_rkt->rkt_topic),
1727 rktp->rktp_partition,
1728 rd_kafka_fetch_states[rktp->rktp_fetch_state], version);
1729
1730 rd_kafka_toppar_op_version_bump(rktp, version);
1731
1732 /* Abort pending offset lookups. */
1733 if (rktp->rktp_fetch_state == RD_KAFKA_TOPPAR_FETCH_OFFSET_QUERY)
1734 rd_kafka_timer_stop(&rktp->rktp_rkt->rkt_rk->rk_timers,
1735 &rktp->rktp_offset_query_tmr,
1736 1/*lock*/);
1737
1738 /* Clear out the forwarding queue. */
1739 rd_kafka_q_fwd_set(rktp->rktp_fetchq, NULL);
1740
1741 /* Assign the future replyq to propagate stop results. */
1742 rd_kafka_assert(rktp->rktp_rkt->rkt_rk, rktp->rktp_replyq.q == NULL);
1743 rktp->rktp_replyq = rko_orig->rko_replyq;
1744 rd_kafka_replyq_clear(&rko_orig->rko_replyq);
1745
1746 rd_kafka_toppar_set_fetch_state(rktp, RD_KAFKA_TOPPAR_FETCH_STOPPING);
1747
1748 /* Stop offset store (possibly async).
1749 * NOTE: will call .._stopped() if store finishes immediately,
1750 * so no more operations after this call! */
1751 rd_kafka_offset_store_stop(rktp);
1752
1753 rd_kafka_toppar_unlock(rktp);
1754 }
1755
1756
1757 /**
1758 * Update a toppars offset.
1759 * The toppar must have been previously FETCH_START:ed
1760 *
1761 * Locality: toppar handler thread
1762 */
rd_kafka_toppar_seek(rd_kafka_toppar_t * rktp,int64_t offset,rd_kafka_op_t * rko_orig)1763 void rd_kafka_toppar_seek (rd_kafka_toppar_t *rktp,
1764 int64_t offset, rd_kafka_op_t *rko_orig) {
1765 rd_kafka_resp_err_t err = 0;
1766 int32_t version = rko_orig->rko_version;
1767
1768 rd_kafka_toppar_lock(rktp);
1769
1770 rd_kafka_dbg(rktp->rktp_rkt->rkt_rk, TOPIC, "FETCH",
1771 "Seek %.*s [%"PRId32"] to offset %s "
1772 "in state %s (v%"PRId32")",
1773 RD_KAFKAP_STR_PR(rktp->rktp_rkt->rkt_topic),
1774 rktp->rktp_partition,
1775 rd_kafka_offset2str(offset),
1776 rd_kafka_fetch_states[rktp->rktp_fetch_state], version);
1777
1778
1779 if (rktp->rktp_fetch_state == RD_KAFKA_TOPPAR_FETCH_STOPPING) {
1780 err = RD_KAFKA_RESP_ERR__PREV_IN_PROGRESS;
1781 goto err_reply;
1782 } else if (!RD_KAFKA_TOPPAR_FETCH_IS_STARTED(rktp->rktp_fetch_state)) {
1783 err = RD_KAFKA_RESP_ERR__STATE;
1784 goto err_reply;
1785 } else if (offset == RD_KAFKA_OFFSET_STORED) {
1786 err = RD_KAFKA_RESP_ERR__INVALID_ARG;
1787 goto err_reply;
1788 }
1789
1790 rd_kafka_toppar_op_version_bump(rktp, version);
1791
1792 /* Abort pending offset lookups. */
1793 if (rktp->rktp_fetch_state == RD_KAFKA_TOPPAR_FETCH_OFFSET_QUERY)
1794 rd_kafka_timer_stop(&rktp->rktp_rkt->rkt_rk->rk_timers,
1795 &rktp->rktp_offset_query_tmr,
1796 1/*lock*/);
1797
1798 if (RD_KAFKA_OFFSET_IS_LOGICAL(offset))
1799 rd_kafka_toppar_next_offset_handle(rktp, offset);
1800 else {
1801 rktp->rktp_next_offset = offset;
1802 rd_kafka_toppar_set_fetch_state(rktp,
1803 RD_KAFKA_TOPPAR_FETCH_ACTIVE);
1804
1805 /* Wake-up broker thread which might be idling on IO */
1806 if (rktp->rktp_broker)
1807 rd_kafka_broker_wakeup(rktp->rktp_broker);
1808 }
1809
1810 /* Signal back to caller thread that seek has commenced, or err */
1811 err_reply:
1812 rd_kafka_toppar_unlock(rktp);
1813
1814 if (rko_orig->rko_replyq.q) {
1815 rd_kafka_op_t *rko;
1816
1817 rko = rd_kafka_op_new(RD_KAFKA_OP_SEEK|RD_KAFKA_OP_REPLY);
1818
1819 rko->rko_err = err;
1820 rko->rko_u.fetch_start.offset =
1821 rko_orig->rko_u.fetch_start.offset;
1822 rko->rko_rktp = rd_kafka_toppar_keep(rktp);
1823
1824 rd_kafka_replyq_enq(&rko_orig->rko_replyq, rko, 0);
1825 }
1826 }
1827
1828
1829 /**
1830 * @brief Pause/resume toppar.
1831 *
1832 * This is the internal handler of the pause/resume op.
1833 *
1834 * @locality toppar's handler thread
1835 */
rd_kafka_toppar_pause_resume(rd_kafka_toppar_t * rktp,rd_kafka_op_t * rko_orig)1836 static void rd_kafka_toppar_pause_resume (rd_kafka_toppar_t *rktp,
1837 rd_kafka_op_t *rko_orig) {
1838 rd_kafka_t *rk = rktp->rktp_rkt->rkt_rk;
1839 int pause = rko_orig->rko_u.pause.pause;
1840 int flag = rko_orig->rko_u.pause.flag;
1841 int32_t version = rko_orig->rko_version;
1842
1843 rd_kafka_toppar_lock(rktp);
1844
1845 rd_kafka_toppar_op_version_bump(rktp, version);
1846
1847 if (!pause && (rktp->rktp_flags & flag) != flag) {
1848 rd_kafka_dbg(rk, TOPIC, "RESUME",
1849 "Not resuming %s [%"PRId32"]: "
1850 "partition is not paused by %s",
1851 rktp->rktp_rkt->rkt_topic->str,
1852 rktp->rktp_partition,
1853 (flag & RD_KAFKA_TOPPAR_F_APP_PAUSE ?
1854 "application" : "library"));
1855 rd_kafka_toppar_unlock(rktp);
1856 return;
1857 }
1858
1859 if (pause) {
1860 /* Pause partition by setting either
1861 * RD_KAFKA_TOPPAR_F_APP_PAUSE or
1862 * RD_KAFKA_TOPPAR_F_LIB_PAUSE */
1863 rktp->rktp_flags |= flag;
1864
1865 if (rk->rk_type == RD_KAFKA_CONSUMER) {
1866 /* Save offset of last consumed message+1 as the
1867 * next message to fetch on resume. */
1868 if (rktp->rktp_app_offset != RD_KAFKA_OFFSET_INVALID) {
1869 rktp->rktp_next_offset = rktp->rktp_app_offset;
1870 }
1871
1872 rd_kafka_dbg(rk, TOPIC, pause?"PAUSE":"RESUME",
1873 "%s %s [%"PRId32"]: at offset %s "
1874 "(state %s, v%d)",
1875 pause ? "Pause":"Resume",
1876 rktp->rktp_rkt->rkt_topic->str,
1877 rktp->rktp_partition,
1878 rd_kafka_offset2str(
1879 rktp->rktp_next_offset),
1880 rd_kafka_fetch_states[rktp->
1881 rktp_fetch_state],
1882 version);
1883 } else {
1884 rd_kafka_dbg(rk, TOPIC, pause?"PAUSE":"RESUME",
1885 "%s %s [%"PRId32"] (state %s, v%d)",
1886 pause ? "Pause":"Resume",
1887 rktp->rktp_rkt->rkt_topic->str,
1888 rktp->rktp_partition,
1889 rd_kafka_fetch_states[rktp->
1890 rktp_fetch_state],
1891 version);
1892 }
1893
1894 } else {
1895 /* Unset the RD_KAFKA_TOPPAR_F_APP_PAUSE or
1896 * RD_KAFKA_TOPPAR_F_LIB_PAUSE flag */
1897 rktp->rktp_flags &= ~flag;
1898
1899 if (rk->rk_type == RD_KAFKA_CONSUMER) {
1900 rd_kafka_dbg(rk, TOPIC, pause?"PAUSE":"RESUME",
1901 "%s %s [%"PRId32"]: at offset %s "
1902 "(state %s, v%d)",
1903 rktp->rktp_fetch_state ==
1904 RD_KAFKA_TOPPAR_FETCH_ACTIVE ?
1905 "Resuming" : "Not resuming stopped",
1906 rktp->rktp_rkt->rkt_topic->str,
1907 rktp->rktp_partition,
1908 rd_kafka_offset2str(
1909 rktp->rktp_next_offset),
1910 rd_kafka_fetch_states[rktp->
1911 rktp_fetch_state],
1912 version);
1913
1914 /* If the resuming offset is logical we
1915 * need to trigger a seek (that performs the
1916 * logical->absolute lookup logic) to get
1917 * things going.
1918 * Typical case is when a partition is paused
1919 * before anything has been consumed by app
1920 * yet thus having rktp_app_offset=INVALID. */
1921 if (!RD_KAFKA_TOPPAR_IS_PAUSED(rktp) &&
1922 (rktp->rktp_fetch_state ==
1923 RD_KAFKA_TOPPAR_FETCH_ACTIVE ||
1924 rktp->rktp_fetch_state ==
1925 RD_KAFKA_TOPPAR_FETCH_OFFSET_WAIT) &&
1926 rktp->rktp_next_offset == RD_KAFKA_OFFSET_INVALID)
1927 rd_kafka_toppar_next_offset_handle(
1928 rktp, rktp->rktp_next_offset);
1929
1930 } else
1931 rd_kafka_dbg(rk, TOPIC, pause?"PAUSE":"RESUME",
1932 "%s %s [%"PRId32"] (state %s, v%d)",
1933 pause ? "Pause":"Resume",
1934 rktp->rktp_rkt->rkt_topic->str,
1935 rktp->rktp_partition,
1936 rd_kafka_fetch_states[rktp->
1937 rktp_fetch_state],
1938 version);
1939 }
1940 rd_kafka_toppar_unlock(rktp);
1941
1942 if (pause && rk->rk_type == RD_KAFKA_CONSUMER) {
1943 /* Flush partition's fetch queue */
1944 rd_kafka_q_purge_toppar_version(rktp->rktp_fetchq, rktp,
1945 rko_orig->rko_version);
1946 }
1947 }
1948
1949
1950
1951
1952 /**
1953 * @brief Decide whether this toppar should be on the fetch list or not.
1954 *
1955 * Also:
1956 * - update toppar's op version (for broker thread's copy)
1957 * - finalize statistics (move rktp_offsets to rktp_offsets_fin)
1958 *
1959 * @returns the partition's Fetch backoff timestamp, or 0 if no backoff.
1960 *
1961 * @locality broker thread
1962 * @locks none
1963 */
rd_kafka_toppar_fetch_decide(rd_kafka_toppar_t * rktp,rd_kafka_broker_t * rkb,int force_remove)1964 rd_ts_t rd_kafka_toppar_fetch_decide (rd_kafka_toppar_t *rktp,
1965 rd_kafka_broker_t *rkb,
1966 int force_remove) {
1967 int should_fetch = 1;
1968 const char *reason = "";
1969 int32_t version;
1970 rd_ts_t ts_backoff = 0;
1971 rd_bool_t lease_expired = rd_false;
1972
1973 rd_kafka_toppar_lock(rktp);
1974
1975 /* Check for preferred replica lease expiry */
1976 lease_expired =
1977 rktp->rktp_leader_id != rktp->rktp_broker_id &&
1978 rd_interval(&rktp->rktp_lease_intvl,
1979 5*60*1000*1000/*5 minutes*/, 0) > 0;
1980 if (lease_expired) {
1981 /* delete_to_leader() requires no locks to be held */
1982 rd_kafka_toppar_unlock(rktp);
1983 rd_kafka_toppar_delegate_to_leader(rktp);
1984 rd_kafka_toppar_lock(rktp);
1985
1986 reason = "preferred replica lease expired";
1987 should_fetch = 0;
1988 goto done;
1989 }
1990
1991 /* Forced removal from fetch list */
1992 if (unlikely(force_remove)) {
1993 reason = "forced removal";
1994 should_fetch = 0;
1995 goto done;
1996 }
1997
1998 if (unlikely((rktp->rktp_flags & RD_KAFKA_TOPPAR_F_REMOVE) != 0)) {
1999 reason = "partition removed";
2000 should_fetch = 0;
2001 goto done;
2002 }
2003
2004 /* Skip toppars not in active fetch state */
2005 if (rktp->rktp_fetch_state != RD_KAFKA_TOPPAR_FETCH_ACTIVE) {
2006 reason = "not in active fetch state";
2007 should_fetch = 0;
2008 goto done;
2009 }
2010
2011 /* Update broker thread's fetch op version */
2012 version = rktp->rktp_op_version;
2013 if (version > rktp->rktp_fetch_version ||
2014 rktp->rktp_next_offset != rktp->rktp_last_next_offset ||
2015 rktp->rktp_offsets.fetch_offset == RD_KAFKA_OFFSET_INVALID) {
2016 /* New version barrier, something was modified from the
2017 * control plane. Reset and start over.
2018 * Alternatively only the next_offset changed but not the
2019 * barrier, which is the case when automatically triggering
2020 * offset.reset (such as on PARTITION_EOF or
2021 * OFFSET_OUT_OF_RANGE). */
2022
2023 rd_kafka_dbg(rktp->rktp_rkt->rkt_rk, TOPIC, "FETCHDEC",
2024 "Topic %s [%"PRId32"]: fetch decide: "
2025 "updating to version %d (was %d) at "
2026 "offset %"PRId64" (was %"PRId64")",
2027 rktp->rktp_rkt->rkt_topic->str,
2028 rktp->rktp_partition,
2029 version, rktp->rktp_fetch_version,
2030 rktp->rktp_next_offset,
2031 rktp->rktp_offsets.fetch_offset);
2032
2033 rd_kafka_offset_stats_reset(&rktp->rktp_offsets);
2034
2035 /* New start offset */
2036 rktp->rktp_offsets.fetch_offset = rktp->rktp_next_offset;
2037 rktp->rktp_last_next_offset = rktp->rktp_next_offset;
2038
2039 rktp->rktp_fetch_version = version;
2040
2041 /* Clear last error to propagate new fetch
2042 * errors if encountered. */
2043 rktp->rktp_last_error = RD_KAFKA_RESP_ERR_NO_ERROR;
2044
2045 rd_kafka_q_purge_toppar_version(rktp->rktp_fetchq, rktp,
2046 version);
2047 }
2048
2049
2050 if (RD_KAFKA_TOPPAR_IS_PAUSED(rktp)) {
2051 should_fetch = 0;
2052 reason = "paused";
2053
2054 } else if (RD_KAFKA_OFFSET_IS_LOGICAL(rktp->rktp_next_offset)) {
2055 should_fetch = 0;
2056 reason = "no concrete offset";
2057
2058 } else if (rd_kafka_q_len(rktp->rktp_fetchq) >=
2059 rkb->rkb_rk->rk_conf.queued_min_msgs) {
2060 /* Skip toppars who's local message queue is already above
2061 * the lower threshold. */
2062 reason = "queued.min.messages exceeded";
2063 should_fetch = 0;
2064
2065 } else if ((int64_t)rd_kafka_q_size(rktp->rktp_fetchq) >=
2066 rkb->rkb_rk->rk_conf.queued_max_msg_bytes) {
2067 reason = "queued.max.messages.kbytes exceeded";
2068 should_fetch = 0;
2069
2070 } else if (rktp->rktp_ts_fetch_backoff > rd_clock()) {
2071 reason = "fetch backed off";
2072 ts_backoff = rktp->rktp_ts_fetch_backoff;
2073 should_fetch = 0;
2074 }
2075
2076 done:
2077 /* Copy offset stats to finalized place holder. */
2078 rktp->rktp_offsets_fin = rktp->rktp_offsets;
2079
2080 if (rktp->rktp_fetch != should_fetch) {
2081 rd_rkb_dbg(rkb, FETCH, "FETCH",
2082 "Topic %s [%"PRId32"] in state %s at offset %s "
2083 "(%d/%d msgs, %"PRId64"/%d kb queued, "
2084 "opv %"PRId32") is %s%s",
2085 rktp->rktp_rkt->rkt_topic->str,
2086 rktp->rktp_partition,
2087 rd_kafka_fetch_states[rktp->rktp_fetch_state],
2088 rd_kafka_offset2str(rktp->rktp_next_offset),
2089 rd_kafka_q_len(rktp->rktp_fetchq),
2090 rkb->rkb_rk->rk_conf.queued_min_msgs,
2091 rd_kafka_q_size(rktp->rktp_fetchq) / 1024,
2092 rkb->rkb_rk->rk_conf.queued_max_msg_kbytes,
2093 rktp->rktp_fetch_version,
2094 should_fetch ? "fetchable" : "not fetchable: ",
2095 reason);
2096
2097 if (should_fetch) {
2098 rd_dassert(rktp->rktp_fetch_version > 0);
2099 rd_kafka_broker_active_toppar_add(rkb, rktp,
2100 *reason ? reason :
2101 "fetchable");
2102 } else {
2103 rd_kafka_broker_active_toppar_del(rkb, rktp, reason);
2104 }
2105 }
2106
2107 rd_kafka_toppar_unlock(rktp);
2108
2109 /* Non-fetching partitions will have an
2110 * indefinate backoff, unless explicitly specified. */
2111 if (!should_fetch && !ts_backoff)
2112 ts_backoff = RD_TS_MAX;
2113
2114 return ts_backoff;
2115 }
2116
2117
2118 /**
2119 * @brief Serve a toppar in a consumer broker thread.
2120 * This is considered the fast path and should be minimal,
2121 * mostly focusing on fetch related mechanisms.
2122 *
2123 * @returns the partition's Fetch backoff timestamp, or 0 if no backoff.
2124 *
2125 * @locality broker thread
2126 * @locks none
2127 */
rd_kafka_broker_consumer_toppar_serve(rd_kafka_broker_t * rkb,rd_kafka_toppar_t * rktp)2128 rd_ts_t rd_kafka_broker_consumer_toppar_serve (rd_kafka_broker_t *rkb,
2129 rd_kafka_toppar_t *rktp) {
2130 return rd_kafka_toppar_fetch_decide(rktp, rkb, 0);
2131 }
2132
2133
2134
2135 /**
2136 * @brief Serve a toppar op
2137 *
2138 * @param rktp may be NULL for certain ops (OP_RECV_BUF)
2139 *
2140 * Will send an empty reply op if the request rko has a replyq set,
2141 * providing synchronous operation.
2142 *
2143 * @locality toppar handler thread
2144 */
2145 static rd_kafka_op_res_t
rd_kafka_toppar_op_serve(rd_kafka_t * rk,rd_kafka_q_t * rkq,rd_kafka_op_t * rko,rd_kafka_q_cb_type_t cb_type,void * opaque)2146 rd_kafka_toppar_op_serve (rd_kafka_t *rk,
2147 rd_kafka_q_t *rkq, rd_kafka_op_t *rko,
2148 rd_kafka_q_cb_type_t cb_type, void *opaque) {
2149 rd_kafka_toppar_t *rktp = NULL;
2150 int outdated = 0;
2151
2152 if (rko->rko_rktp)
2153 rktp = rko->rko_rktp;
2154
2155 if (rktp) {
2156 outdated = rd_kafka_op_version_outdated(rko,
2157 rktp->rktp_op_version);
2158
2159 rd_kafka_dbg(rktp->rktp_rkt->rkt_rk, TOPIC, "OP",
2160 "%.*s [%"PRId32"] received %sop %s "
2161 "(v%"PRId32") in fetch-state %s (opv%d)",
2162 RD_KAFKAP_STR_PR(rktp->rktp_rkt->rkt_topic),
2163 rktp->rktp_partition,
2164 outdated ? "outdated ": "",
2165 rd_kafka_op2str(rko->rko_type),
2166 rko->rko_version,
2167 rd_kafka_fetch_states[rktp->rktp_fetch_state],
2168 rktp->rktp_op_version);
2169
2170 if (outdated) {
2171 #if ENABLE_DEVEL
2172 rd_kafka_op_print(stdout, "PART_OUTDATED", rko);
2173 #endif
2174 rd_kafka_op_reply(rko, RD_KAFKA_RESP_ERR__OUTDATED);
2175 return RD_KAFKA_OP_RES_HANDLED;
2176 }
2177 }
2178
2179 switch ((int)rko->rko_type)
2180 {
2181 case RD_KAFKA_OP_FETCH_START:
2182 rd_kafka_toppar_fetch_start(rktp,
2183 rko->rko_u.fetch_start.offset, rko);
2184 break;
2185
2186 case RD_KAFKA_OP_FETCH_STOP:
2187 rd_kafka_toppar_fetch_stop(rktp, rko);
2188 break;
2189
2190 case RD_KAFKA_OP_SEEK:
2191 rd_kafka_toppar_seek(rktp, rko->rko_u.fetch_start.offset, rko);
2192 break;
2193
2194 case RD_KAFKA_OP_PAUSE:
2195 rd_kafka_toppar_pause_resume(rktp, rko);
2196 break;
2197
2198 case RD_KAFKA_OP_OFFSET_COMMIT | RD_KAFKA_OP_REPLY:
2199 rd_kafka_assert(NULL, rko->rko_u.offset_commit.cb);
2200 rko->rko_u.offset_commit.cb(
2201 rk, rko->rko_err,
2202 rko->rko_u.offset_commit.partitions,
2203 rko->rko_u.offset_commit.opaque);
2204 break;
2205
2206 case RD_KAFKA_OP_OFFSET_FETCH | RD_KAFKA_OP_REPLY:
2207 {
2208 /* OffsetFetch reply */
2209 rd_kafka_topic_partition_list_t *offsets =
2210 rko->rko_u.offset_fetch.partitions;
2211 int64_t offset = RD_KAFKA_OFFSET_INVALID;
2212
2213 rktp = offsets->elems[0]._private;
2214 if (!rko->rko_err) {
2215 /* Request succeeded but per-partition might have failed */
2216 rko->rko_err = offsets->elems[0].err;
2217 offset = offsets->elems[0].offset;
2218 }
2219 offsets->elems[0]._private = NULL;
2220 rd_kafka_topic_partition_list_destroy(offsets);
2221 rko->rko_u.offset_fetch.partitions = NULL;
2222
2223 rd_kafka_timer_stop(&rktp->rktp_rkt->rkt_rk->rk_timers,
2224 &rktp->rktp_offset_query_tmr,
2225 1/*lock*/);
2226
2227 rd_kafka_toppar_lock(rktp);
2228
2229 if (rko->rko_err) {
2230 rd_kafka_dbg(rktp->rktp_rkt->rkt_rk,
2231 TOPIC, "OFFSET",
2232 "Failed to fetch offset for "
2233 "%.*s [%"PRId32"]: %s",
2234 RD_KAFKAP_STR_PR(rktp->rktp_rkt->rkt_topic),
2235 rktp->rktp_partition,
2236 rd_kafka_err2str(rko->rko_err));
2237
2238 /* Keep on querying until we succeed. */
2239 rd_kafka_toppar_offset_retry(rktp, 500,
2240 "failed to fetch offsets");
2241 rd_kafka_toppar_unlock(rktp);
2242
2243
2244 /* Propagate error to application */
2245 if (rko->rko_err != RD_KAFKA_RESP_ERR__WAIT_COORD &&
2246 rko->rko_err !=
2247 RD_KAFKA_RESP_ERR_UNSTABLE_OFFSET_COMMIT)
2248 rd_kafka_consumer_err(
2249 rktp->rktp_fetchq,
2250 RD_KAFKA_NODEID_UA,
2251 rko->rko_err, 0,
2252 NULL, rktp,
2253 RD_KAFKA_OFFSET_INVALID,
2254 "Failed to fetch "
2255 "offsets from brokers: %s",
2256 rd_kafka_err2str(rko->rko_err));
2257
2258 rd_kafka_toppar_destroy(rktp);
2259
2260 break;
2261 }
2262
2263 rd_kafka_dbg(rktp->rktp_rkt->rkt_rk,
2264 TOPIC, "OFFSET",
2265 "%.*s [%"PRId32"]: OffsetFetch returned "
2266 "offset %s (%"PRId64")",
2267 RD_KAFKAP_STR_PR(rktp->rktp_rkt->rkt_topic),
2268 rktp->rktp_partition,
2269 rd_kafka_offset2str(offset), offset);
2270
2271 if (offset > 0)
2272 rktp->rktp_committed_offset = offset;
2273
2274 if (offset >= 0)
2275 rd_kafka_toppar_next_offset_handle(rktp, offset);
2276 else
2277 rd_kafka_offset_reset(rktp, offset,
2278 RD_KAFKA_RESP_ERR__NO_OFFSET,
2279 "no previously committed offset "
2280 "available");
2281 rd_kafka_toppar_unlock(rktp);
2282
2283 rd_kafka_toppar_destroy(rktp);
2284 }
2285 break;
2286
2287 default:
2288 rd_kafka_assert(NULL, !*"unknown type");
2289 break;
2290 }
2291
2292 rd_kafka_op_reply(rko, RD_KAFKA_RESP_ERR_NO_ERROR);
2293
2294 return RD_KAFKA_OP_RES_HANDLED;
2295 }
2296
2297
2298
2299
2300
2301 /**
2302 * Send command op to toppar (handled by toppar's thread).
2303 *
2304 * Locality: any thread
2305 */
rd_kafka_toppar_op0(rd_kafka_toppar_t * rktp,rd_kafka_op_t * rko,rd_kafka_replyq_t replyq)2306 static void rd_kafka_toppar_op0 (rd_kafka_toppar_t *rktp, rd_kafka_op_t *rko,
2307 rd_kafka_replyq_t replyq) {
2308 rko->rko_rktp = rd_kafka_toppar_keep(rktp);
2309 rko->rko_replyq = replyq;
2310
2311 rd_kafka_q_enq(rktp->rktp_ops, rko);
2312 }
2313
2314
2315 /**
2316 * Send command op to toppar (handled by toppar's thread).
2317 *
2318 * Locality: any thread
2319 */
rd_kafka_toppar_op(rd_kafka_toppar_t * rktp,rd_kafka_op_type_t type,int32_t version,int64_t offset,rd_kafka_cgrp_t * rkcg,rd_kafka_replyq_t replyq)2320 static void rd_kafka_toppar_op (rd_kafka_toppar_t *rktp,
2321 rd_kafka_op_type_t type, int32_t version,
2322 int64_t offset, rd_kafka_cgrp_t *rkcg,
2323 rd_kafka_replyq_t replyq) {
2324 rd_kafka_op_t *rko;
2325
2326 rko = rd_kafka_op_new(type);
2327 rko->rko_version = version;
2328 if (type == RD_KAFKA_OP_FETCH_START ||
2329 type == RD_KAFKA_OP_SEEK) {
2330 if (rkcg)
2331 rko->rko_u.fetch_start.rkcg = rkcg;
2332 rko->rko_u.fetch_start.offset = offset;
2333 }
2334
2335 rd_kafka_toppar_op0(rktp, rko, replyq);
2336 }
2337
2338
2339
2340 /**
2341 * Start consuming partition (async operation).
2342 * 'offset' is the initial offset
2343 * 'fwdq' is an optional queue to forward messages to, if this is NULL
2344 * then messages will be enqueued on rktp_fetchq.
2345 * 'replyq' is an optional queue for handling the consume_start ack.
2346 *
2347 * This is the thread-safe interface that can be called from any thread.
2348 */
rd_kafka_toppar_op_fetch_start(rd_kafka_toppar_t * rktp,int64_t offset,rd_kafka_q_t * fwdq,rd_kafka_replyq_t replyq)2349 rd_kafka_resp_err_t rd_kafka_toppar_op_fetch_start (rd_kafka_toppar_t *rktp,
2350 int64_t offset,
2351 rd_kafka_q_t *fwdq,
2352 rd_kafka_replyq_t replyq) {
2353 int32_t version;
2354
2355 rd_kafka_q_lock(rktp->rktp_fetchq);
2356 if (fwdq && !(rktp->rktp_fetchq->rkq_flags & RD_KAFKA_Q_F_FWD_APP))
2357 rd_kafka_q_fwd_set0(rktp->rktp_fetchq, fwdq,
2358 0, /* no do_lock */
2359 0 /* no fwd_app */);
2360 rd_kafka_q_unlock(rktp->rktp_fetchq);
2361
2362 /* Bump version barrier. */
2363 version = rd_kafka_toppar_version_new_barrier(rktp);
2364
2365 rd_kafka_dbg(rktp->rktp_rkt->rkt_rk, TOPIC, "CONSUMER",
2366 "Start consuming %.*s [%"PRId32"] at "
2367 "offset %s (v%"PRId32")",
2368 RD_KAFKAP_STR_PR(rktp->rktp_rkt->rkt_topic),
2369 rktp->rktp_partition, rd_kafka_offset2str(offset),
2370 version);
2371
2372 rd_kafka_toppar_op(rktp, RD_KAFKA_OP_FETCH_START, version,
2373 offset, rktp->rktp_rkt->rkt_rk->rk_cgrp, replyq);
2374
2375 return RD_KAFKA_RESP_ERR_NO_ERROR;
2376 }
2377
2378
2379 /**
2380 * Stop consuming partition (async operatoin)
2381 * This is thread-safe interface that can be called from any thread.
2382 *
2383 * Locality: any thread
2384 */
rd_kafka_toppar_op_fetch_stop(rd_kafka_toppar_t * rktp,rd_kafka_replyq_t replyq)2385 rd_kafka_resp_err_t rd_kafka_toppar_op_fetch_stop (rd_kafka_toppar_t *rktp,
2386 rd_kafka_replyq_t replyq) {
2387 int32_t version;
2388
2389 /* Bump version barrier. */
2390 version = rd_kafka_toppar_version_new_barrier(rktp);
2391
2392 rd_kafka_dbg(rktp->rktp_rkt->rkt_rk, TOPIC, "CONSUMER",
2393 "Stop consuming %.*s [%"PRId32"] (v%"PRId32")",
2394 RD_KAFKAP_STR_PR(rktp->rktp_rkt->rkt_topic),
2395 rktp->rktp_partition, version);
2396
2397 rd_kafka_toppar_op(rktp, RD_KAFKA_OP_FETCH_STOP, version,
2398 0, NULL, replyq);
2399
2400 return RD_KAFKA_RESP_ERR_NO_ERROR;
2401 }
2402
2403
2404 /**
2405 * Set/Seek offset of a consumed partition (async operation).
2406 * 'offset' is the target offset
2407 * 'replyq' is an optional queue for handling the ack.
2408 *
2409 * This is the thread-safe interface that can be called from any thread.
2410 */
rd_kafka_toppar_op_seek(rd_kafka_toppar_t * rktp,int64_t offset,rd_kafka_replyq_t replyq)2411 rd_kafka_resp_err_t rd_kafka_toppar_op_seek (rd_kafka_toppar_t *rktp,
2412 int64_t offset,
2413 rd_kafka_replyq_t replyq) {
2414 int32_t version;
2415
2416 /* Bump version barrier. */
2417 version = rd_kafka_toppar_version_new_barrier(rktp);
2418
2419 rd_kafka_dbg(rktp->rktp_rkt->rkt_rk, TOPIC, "CONSUMER",
2420 "Seek %.*s [%"PRId32"] to "
2421 "offset %s (v%"PRId32")",
2422 RD_KAFKAP_STR_PR(rktp->rktp_rkt->rkt_topic),
2423 rktp->rktp_partition, rd_kafka_offset2str(offset),
2424 version);
2425
2426 rd_kafka_toppar_op(rktp, RD_KAFKA_OP_SEEK, version,
2427 offset, NULL, replyq);
2428
2429 return RD_KAFKA_RESP_ERR_NO_ERROR;
2430 }
2431
2432
2433 /**
2434 * @brief Pause/resume partition (async operation).
2435 *
2436 * @param flag is either RD_KAFKA_TOPPAR_F_APP_PAUSE or .._F_LIB_PAUSE
2437 * depending on if the app paused or librdkafka.
2438 * @param pause is 1 for pausing or 0 for resuming.
2439 *
2440 * @locality any
2441 */
2442 rd_kafka_resp_err_t
rd_kafka_toppar_op_pause_resume(rd_kafka_toppar_t * rktp,int pause,int flag,rd_kafka_replyq_t replyq)2443 rd_kafka_toppar_op_pause_resume (rd_kafka_toppar_t *rktp, int pause, int flag,
2444 rd_kafka_replyq_t replyq) {
2445 int32_t version;
2446 rd_kafka_op_t *rko;
2447
2448 /* Bump version barrier. */
2449 version = rd_kafka_toppar_version_new_barrier(rktp);
2450
2451 rd_kafka_dbg(rktp->rktp_rkt->rkt_rk, TOPIC, pause ? "PAUSE":"RESUME",
2452 "%s %.*s [%"PRId32"] (v%"PRId32")",
2453 pause ? "Pause" : "Resume",
2454 RD_KAFKAP_STR_PR(rktp->rktp_rkt->rkt_topic),
2455 rktp->rktp_partition, version);
2456
2457 rko = rd_kafka_op_new(RD_KAFKA_OP_PAUSE);
2458 rko->rko_version = version;
2459 rko->rko_u.pause.pause = pause;
2460 rko->rko_u.pause.flag = flag;
2461
2462 rd_kafka_toppar_op0(rktp, rko, replyq);
2463
2464 return RD_KAFKA_RESP_ERR_NO_ERROR;
2465 }
2466
2467
2468 /**
2469 * @brief Pause a toppar (asynchronous).
2470 *
2471 * @param flag is either RD_KAFKA_TOPPAR_F_APP_PAUSE or .._F_LIB_PAUSE
2472 * depending on if the app paused or librdkafka.
2473 *
2474 * @locality any
2475 * @locks none needed
2476 */
rd_kafka_toppar_pause(rd_kafka_toppar_t * rktp,int flag)2477 void rd_kafka_toppar_pause (rd_kafka_toppar_t *rktp, int flag) {
2478 rd_kafka_toppar_op_pause_resume(rktp, 1/*pause*/, flag,
2479 RD_KAFKA_NO_REPLYQ);
2480 }
2481
2482 /**
2483 * @brief Resume a toppar (asynchronous).
2484 *
2485 * @param flag is either RD_KAFKA_TOPPAR_F_APP_PAUSE or .._F_LIB_PAUSE
2486 * depending on if the app paused or librdkafka.
2487 *
2488 * @locality any
2489 * @locks none needed
2490 */
rd_kafka_toppar_resume(rd_kafka_toppar_t * rktp,int flag)2491 void rd_kafka_toppar_resume (rd_kafka_toppar_t *rktp, int flag) {
2492 rd_kafka_toppar_op_pause_resume(rktp, 1/*pause*/, flag,
2493 RD_KAFKA_NO_REPLYQ);
2494 }
2495
2496
2497
2498 /**
2499 * @brief Pause or resume a list of partitions.
2500 *
2501 * @param flag is either RD_KAFKA_TOPPAR_F_APP_PAUSE or .._F_LIB_PAUSE
2502 * depending on if the app paused or librdkafka.
2503 * @param pause true for pausing, false for resuming.
2504 * @param async RD_SYNC to wait for background thread to handle op,
2505 * RD_ASYNC for asynchronous operation.
2506 *
2507 * @locality any
2508 *
2509 * @remark This is an asynchronous call, the actual pause/resume is performed
2510 * by toppar_pause() in the toppar's handler thread.
2511 */
2512 rd_kafka_resp_err_t
rd_kafka_toppars_pause_resume(rd_kafka_t * rk,rd_bool_t pause,rd_async_t async,int flag,rd_kafka_topic_partition_list_t * partitions)2513 rd_kafka_toppars_pause_resume (rd_kafka_t *rk,
2514 rd_bool_t pause, rd_async_t async, int flag,
2515 rd_kafka_topic_partition_list_t *partitions) {
2516 int i;
2517 int waitcnt = 0;
2518 rd_kafka_q_t *tmpq = NULL;
2519
2520 if (!async)
2521 tmpq = rd_kafka_q_new(rk);
2522
2523 rd_kafka_dbg(rk, TOPIC, pause ? "PAUSE":"RESUME",
2524 "%s %s %d partition(s)",
2525 flag & RD_KAFKA_TOPPAR_F_APP_PAUSE ? "Application" : "Library",
2526 pause ? "pausing" : "resuming", partitions->cnt);
2527
2528 for (i = 0 ; i < partitions->cnt ; i++) {
2529 rd_kafka_topic_partition_t *rktpar = &partitions->elems[i];
2530 rd_kafka_toppar_t *rktp;
2531
2532 rktp = rd_kafka_topic_partition_get_toppar(rk, rktpar,
2533 rd_false);
2534 if (!rktp) {
2535 rd_kafka_dbg(rk, TOPIC, pause ? "PAUSE":"RESUME",
2536 "%s %s [%"PRId32"]: skipped: "
2537 "unknown partition",
2538 pause ? "Pause":"Resume",
2539 rktpar->topic, rktpar->partition);
2540
2541 rktpar->err = RD_KAFKA_RESP_ERR__UNKNOWN_PARTITION;
2542 continue;
2543 }
2544
2545 rd_kafka_toppar_op_pause_resume(rktp, pause, flag,
2546 RD_KAFKA_REPLYQ(tmpq, 0));
2547
2548 if (!async)
2549 waitcnt++;
2550
2551 rd_kafka_toppar_destroy(rktp);
2552
2553 rktpar->err = RD_KAFKA_RESP_ERR_NO_ERROR;
2554 }
2555
2556 if (!async) {
2557 while (waitcnt-- > 0)
2558 rd_kafka_q_wait_result(tmpq, RD_POLL_INFINITE);
2559
2560 rd_kafka_q_destroy_owner(tmpq);
2561 }
2562
2563 return RD_KAFKA_RESP_ERR_NO_ERROR;
2564 }
2565
2566
2567
2568
2569
2570 /**
2571 * Propagate error for toppar
2572 */
rd_kafka_toppar_enq_error(rd_kafka_toppar_t * rktp,rd_kafka_resp_err_t err,const char * reason)2573 void rd_kafka_toppar_enq_error (rd_kafka_toppar_t *rktp,
2574 rd_kafka_resp_err_t err,
2575 const char *reason) {
2576 rd_kafka_op_t *rko;
2577 char buf[512];
2578
2579 rko = rd_kafka_op_new(RD_KAFKA_OP_ERR);
2580 rko->rko_err = err;
2581 rko->rko_rktp = rd_kafka_toppar_keep(rktp);
2582
2583 rd_snprintf(buf, sizeof(buf), "%.*s [%"PRId32"]: %s (%s)",
2584 RD_KAFKAP_STR_PR(rktp->rktp_rkt->rkt_topic),
2585 rktp->rktp_partition, reason,
2586 rd_kafka_err2str(err));
2587
2588 rko->rko_u.err.errstr = rd_strdup(buf);
2589
2590 rd_kafka_q_enq(rktp->rktp_fetchq, rko);
2591 }
2592
2593
2594
2595
2596
2597 /**
2598 * Returns the currently delegated broker for this toppar.
2599 * If \p proper_broker is set NULL will be returned if current handler
2600 * is not a proper broker (INTERNAL broker).
2601 *
2602 * The returned broker has an increased refcount.
2603 *
2604 * Locks: none
2605 */
rd_kafka_toppar_broker(rd_kafka_toppar_t * rktp,int proper_broker)2606 rd_kafka_broker_t *rd_kafka_toppar_broker (rd_kafka_toppar_t *rktp,
2607 int proper_broker) {
2608 rd_kafka_broker_t *rkb;
2609 rd_kafka_toppar_lock(rktp);
2610 rkb = rktp->rktp_broker;
2611 if (rkb) {
2612 if (proper_broker && rkb->rkb_source == RD_KAFKA_INTERNAL)
2613 rkb = NULL;
2614 else
2615 rd_kafka_broker_keep(rkb);
2616 }
2617 rd_kafka_toppar_unlock(rktp);
2618
2619 return rkb;
2620 }
2621
2622
2623 /**
2624 * @brief Take action when partition broker becomes unavailable.
2625 * This should be called when requests fail with
2626 * NOT_LEADER_FOR.. or similar error codes, e.g. ProduceRequest.
2627 *
2628 * @locks none
2629 * @locality any
2630 */
rd_kafka_toppar_leader_unavailable(rd_kafka_toppar_t * rktp,const char * reason,rd_kafka_resp_err_t err)2631 void rd_kafka_toppar_leader_unavailable (rd_kafka_toppar_t *rktp,
2632 const char *reason,
2633 rd_kafka_resp_err_t err) {
2634 rd_kafka_topic_t *rkt = rktp->rktp_rkt;
2635
2636 rd_kafka_dbg(rkt->rkt_rk, TOPIC, "BROKERUA",
2637 "%s [%"PRId32"]: broker unavailable: %s: %s",
2638 rkt->rkt_topic->str, rktp->rktp_partition, reason,
2639 rd_kafka_err2str(err));
2640
2641 rd_kafka_topic_wrlock(rkt);
2642 rkt->rkt_flags |= RD_KAFKA_TOPIC_F_LEADER_UNAVAIL;
2643 rd_kafka_topic_wrunlock(rkt);
2644
2645 rd_kafka_topic_fast_leader_query(rkt->rkt_rk);
2646 }
2647
2648
2649 const char *
rd_kafka_topic_partition_topic(const rd_kafka_topic_partition_t * rktpar)2650 rd_kafka_topic_partition_topic (const rd_kafka_topic_partition_t *rktpar) {
2651 const rd_kafka_toppar_t *rktp = (const rd_kafka_toppar_t *)rktpar;
2652 return rktp->rktp_rkt->rkt_topic->str;
2653 }
2654
2655 int32_t
rd_kafka_topic_partition_partition(const rd_kafka_topic_partition_t * rktpar)2656 rd_kafka_topic_partition_partition (const rd_kafka_topic_partition_t *rktpar) {
2657 const rd_kafka_toppar_t *rktp = (const rd_kafka_toppar_t *)rktpar;
2658 return rktp->rktp_partition;
2659 }
2660
rd_kafka_topic_partition_get(const rd_kafka_topic_partition_t * rktpar,const char ** name,int32_t * partition)2661 void rd_kafka_topic_partition_get (const rd_kafka_topic_partition_t *rktpar,
2662 const char **name, int32_t *partition) {
2663 const rd_kafka_toppar_t *rktp = (const rd_kafka_toppar_t *)rktpar;
2664 *name = rktp->rktp_rkt->rkt_topic->str;
2665 *partition = rktp->rktp_partition;
2666 }
2667
2668
2669
2670
2671 /**
2672 *
2673 * rd_kafka_topic_partition_t lists
2674 * Fixed-size non-growable list of partitions for propagation to application.
2675 *
2676 */
2677
2678
2679 static void
rd_kafka_topic_partition_list_grow(rd_kafka_topic_partition_list_t * rktparlist,int add_size)2680 rd_kafka_topic_partition_list_grow (rd_kafka_topic_partition_list_t *rktparlist,
2681 int add_size) {
2682 if (add_size < rktparlist->size)
2683 add_size = RD_MAX(rktparlist->size, 32);
2684
2685 rktparlist->size += add_size;
2686 rktparlist->elems = rd_realloc(rktparlist->elems,
2687 sizeof(*rktparlist->elems) *
2688 rktparlist->size);
2689
2690 }
2691
2692
2693 /**
2694 * @brief Initialize a list for fitting \p size partitions.
2695 */
rd_kafka_topic_partition_list_init(rd_kafka_topic_partition_list_t * rktparlist,int size)2696 void rd_kafka_topic_partition_list_init (
2697 rd_kafka_topic_partition_list_t *rktparlist, int size) {
2698 memset(rktparlist, 0, sizeof(*rktparlist));
2699
2700 if (size > 0)
2701 rd_kafka_topic_partition_list_grow(rktparlist, size);
2702 }
2703
2704
2705 /**
2706 * Create a list for fitting 'size' topic_partitions (rktp).
2707 */
rd_kafka_topic_partition_list_new(int size)2708 rd_kafka_topic_partition_list_t *rd_kafka_topic_partition_list_new (int size) {
2709 rd_kafka_topic_partition_list_t *rktparlist;
2710
2711 rktparlist = rd_calloc(1, sizeof(*rktparlist));
2712
2713 if (size > 0)
2714 rd_kafka_topic_partition_list_grow(rktparlist, size);
2715
2716 return rktparlist;
2717 }
2718
2719
2720
rd_kafka_topic_partition_new(const char * topic,int32_t partition)2721 rd_kafka_topic_partition_t *rd_kafka_topic_partition_new (const char *topic,
2722 int32_t partition) {
2723 rd_kafka_topic_partition_t *rktpar = rd_calloc(1, sizeof(*rktpar));
2724
2725 rktpar->topic = rd_strdup(topic);
2726 rktpar->partition = partition;
2727
2728 return rktpar;
2729 }
2730
2731
2732 rd_kafka_topic_partition_t *
rd_kafka_topic_partition_copy(const rd_kafka_topic_partition_t * src)2733 rd_kafka_topic_partition_copy (const rd_kafka_topic_partition_t *src) {
2734 return rd_kafka_topic_partition_new(src->topic, src->partition);
2735 }
2736
2737
2738 /** Same as above but with generic void* signature */
rd_kafka_topic_partition_copy_void(const void * src)2739 void *rd_kafka_topic_partition_copy_void (const void *src) {
2740 return rd_kafka_topic_partition_copy(src);
2741 }
2742
2743
2744 rd_kafka_topic_partition_t *
rd_kafka_topic_partition_new_from_rktp(rd_kafka_toppar_t * rktp)2745 rd_kafka_topic_partition_new_from_rktp (rd_kafka_toppar_t *rktp) {
2746 rd_kafka_topic_partition_t *rktpar = rd_calloc(1, sizeof(*rktpar));
2747
2748 rktpar->topic = RD_KAFKAP_STR_DUP(rktp->rktp_rkt->rkt_topic);
2749 rktpar->partition = rktp->rktp_partition;
2750
2751 return rktpar;
2752 }
2753
2754
2755
2756 static void
rd_kafka_topic_partition_destroy0(rd_kafka_topic_partition_t * rktpar,int do_free)2757 rd_kafka_topic_partition_destroy0 (rd_kafka_topic_partition_t *rktpar, int do_free) {
2758 if (rktpar->topic)
2759 rd_free(rktpar->topic);
2760 if (rktpar->metadata)
2761 rd_free(rktpar->metadata);
2762 if (rktpar->_private)
2763 rd_kafka_toppar_destroy((rd_kafka_toppar_t *)rktpar->_private);
2764
2765 if (do_free)
2766 rd_free(rktpar);
2767 }
2768
2769
2770 /**
2771 * @brief Destroy all partitions in list.
2772 *
2773 * @remark The allocated size of the list will not shrink.
2774 */
rd_kafka_topic_partition_list_clear(rd_kafka_topic_partition_list_t * rktparlist)2775 void rd_kafka_topic_partition_list_clear (
2776 rd_kafka_topic_partition_list_t *rktparlist) {
2777 int i;
2778
2779 for (i = 0 ; i < rktparlist->cnt ; i++)
2780 rd_kafka_topic_partition_destroy0(&rktparlist->elems[i], 0);
2781
2782 rktparlist->cnt = 0;
2783 }
2784
2785
rd_kafka_topic_partition_destroy_free(void * ptr)2786 void rd_kafka_topic_partition_destroy_free (void *ptr) {
2787 rd_kafka_topic_partition_destroy0(ptr, rd_true/*do_free*/);
2788 }
2789
rd_kafka_topic_partition_destroy(rd_kafka_topic_partition_t * rktpar)2790 void rd_kafka_topic_partition_destroy (rd_kafka_topic_partition_t *rktpar) {
2791 rd_kafka_topic_partition_destroy0(rktpar, 1);
2792 }
2793
2794
2795 /**
2796 * Destroys a list previously created with .._list_new() and drops
2797 * any references to contained toppars.
2798 */
2799 void
rd_kafka_topic_partition_list_destroy(rd_kafka_topic_partition_list_t * rktparlist)2800 rd_kafka_topic_partition_list_destroy (rd_kafka_topic_partition_list_t *rktparlist) {
2801 int i;
2802
2803 for (i = 0 ; i < rktparlist->cnt ; i++)
2804 rd_kafka_topic_partition_destroy0(&rktparlist->elems[i], 0);
2805
2806 if (rktparlist->elems)
2807 rd_free(rktparlist->elems);
2808
2809 rd_free(rktparlist);
2810 }
2811
2812
2813 /**
2814 * @brief Wrapper for rd_kafka_topic_partition_list_destroy() that
2815 * matches the standard free(void *) signature, for callback use.
2816 */
rd_kafka_topic_partition_list_destroy_free(void * ptr)2817 void rd_kafka_topic_partition_list_destroy_free (void *ptr) {
2818 rd_kafka_topic_partition_list_destroy(
2819 (rd_kafka_topic_partition_list_t *)ptr);
2820 }
2821
2822
2823 /**
2824 * Add a partition to an rktpar list.
2825 * The list must have enough room to fit it.
2826 *
2827 * '_private' must be NULL or a valid 'rd_kafka_toppar_t *'.
2828 *
2829 * Returns a pointer to the added element.
2830 */
2831 rd_kafka_topic_partition_t *
rd_kafka_topic_partition_list_add0(const char * func,int line,rd_kafka_topic_partition_list_t * rktparlist,const char * topic,int32_t partition,rd_kafka_toppar_t * _private)2832 rd_kafka_topic_partition_list_add0 (const char *func, int line,
2833 rd_kafka_topic_partition_list_t *rktparlist,
2834 const char *topic, int32_t partition,
2835 rd_kafka_toppar_t *_private) {
2836 rd_kafka_topic_partition_t *rktpar;
2837 if (rktparlist->cnt == rktparlist->size)
2838 rd_kafka_topic_partition_list_grow(rktparlist, 1);
2839 rd_kafka_assert(NULL, rktparlist->cnt < rktparlist->size);
2840
2841 rktpar = &rktparlist->elems[rktparlist->cnt++];
2842 memset(rktpar, 0, sizeof(*rktpar));
2843 rktpar->topic = rd_strdup(topic);
2844 rktpar->partition = partition;
2845 rktpar->offset = RD_KAFKA_OFFSET_INVALID;
2846 rktpar->_private = _private;
2847 if (_private)
2848 rd_kafka_toppar_keep_fl(func, line, _private);
2849
2850 return rktpar;
2851 }
2852
2853
2854 rd_kafka_topic_partition_t *
rd_kafka_topic_partition_list_add(rd_kafka_topic_partition_list_t * rktparlist,const char * topic,int32_t partition)2855 rd_kafka_topic_partition_list_add (rd_kafka_topic_partition_list_t *rktparlist,
2856 const char *topic, int32_t partition) {
2857 return rd_kafka_topic_partition_list_add0(__FUNCTION__,__LINE__,
2858 rktparlist,
2859 topic, partition, NULL);
2860 }
2861
2862
2863 /**
2864 * Adds a consecutive list of partitions to a list
2865 */
2866 void
rd_kafka_topic_partition_list_add_range(rd_kafka_topic_partition_list_t * rktparlist,const char * topic,int32_t start,int32_t stop)2867 rd_kafka_topic_partition_list_add_range (rd_kafka_topic_partition_list_t
2868 *rktparlist,
2869 const char *topic,
2870 int32_t start, int32_t stop) {
2871
2872 for (; start <= stop ; start++)
2873 rd_kafka_topic_partition_list_add(rktparlist, topic, start);
2874 }
2875
2876
2877 rd_kafka_topic_partition_t *
rd_kafka_topic_partition_list_upsert(rd_kafka_topic_partition_list_t * rktparlist,const char * topic,int32_t partition)2878 rd_kafka_topic_partition_list_upsert (
2879 rd_kafka_topic_partition_list_t *rktparlist,
2880 const char *topic, int32_t partition) {
2881 rd_kafka_topic_partition_t *rktpar;
2882
2883 if ((rktpar = rd_kafka_topic_partition_list_find(rktparlist,
2884 topic, partition)))
2885 return rktpar;
2886
2887 return rd_kafka_topic_partition_list_add(rktparlist, topic, partition);
2888 }
2889
2890
2891 /**
2892 * @brief Update \p dst with info from \p src.
2893 */
rd_kafka_topic_partition_update(rd_kafka_topic_partition_t * dst,const rd_kafka_topic_partition_t * src)2894 void rd_kafka_topic_partition_update (rd_kafka_topic_partition_t *dst,
2895 const rd_kafka_topic_partition_t *src) {
2896 rd_dassert(!strcmp(dst->topic, src->topic));
2897 rd_dassert(dst->partition == src->partition);
2898 rd_dassert(dst != src);
2899
2900 dst->offset = src->offset;
2901 dst->opaque = src->opaque;
2902 dst->err = src->err;
2903
2904 if (src->metadata_size > 0) {
2905 dst->metadata = rd_malloc(src->metadata_size);
2906 dst->metadata_size = src->metadata_size;;
2907 memcpy(dst->metadata, src->metadata, dst->metadata_size);
2908 }
2909 }
2910
2911 /**
2912 * @brief Creates a copy of \p rktpar and adds it to \p rktparlist
2913 */
rd_kafka_topic_partition_list_add_copy(rd_kafka_topic_partition_list_t * rktparlist,const rd_kafka_topic_partition_t * rktpar)2914 void rd_kafka_topic_partition_list_add_copy (
2915 rd_kafka_topic_partition_list_t *rktparlist,
2916 const rd_kafka_topic_partition_t *rktpar) {
2917 rd_kafka_topic_partition_t *dst;
2918
2919 dst = rd_kafka_topic_partition_list_add0(
2920 __FUNCTION__,__LINE__,
2921 rktparlist,
2922 rktpar->topic,
2923 rktpar->partition,
2924 rktpar->_private);
2925
2926 rd_kafka_topic_partition_update(dst, rktpar);
2927 }
2928
2929
2930
2931 /**
2932 * Create and return a copy of list 'src'
2933 */
2934 rd_kafka_topic_partition_list_t *
rd_kafka_topic_partition_list_copy(const rd_kafka_topic_partition_list_t * src)2935 rd_kafka_topic_partition_list_copy (const rd_kafka_topic_partition_list_t *src){
2936 rd_kafka_topic_partition_list_t *dst;
2937 int i;
2938
2939 dst = rd_kafka_topic_partition_list_new(src->size);
2940
2941 for (i = 0 ; i < src->cnt ; i++)
2942 rd_kafka_topic_partition_list_add_copy(dst, &src->elems[i]);
2943 return dst;
2944 }
2945
2946 /**
2947 * @brief Same as rd_kafka_topic_partition_list_copy() but suitable for
2948 * rd_list_copy(). The \p opaque is ignored.
2949 */
2950 void *
rd_kafka_topic_partition_list_copy_opaque(const void * src,void * opaque)2951 rd_kafka_topic_partition_list_copy_opaque (const void *src, void *opaque) {
2952 return rd_kafka_topic_partition_list_copy(src);
2953 }
2954
2955 /**
2956 * @brief Append copies of all elements in \p src to \p dst.
2957 * No duplicate-checks are performed.
2958 */
rd_kafka_topic_partition_list_add_list(rd_kafka_topic_partition_list_t * dst,const rd_kafka_topic_partition_list_t * src)2959 void rd_kafka_topic_partition_list_add_list (
2960 rd_kafka_topic_partition_list_t *dst,
2961 const rd_kafka_topic_partition_list_t *src) {
2962 int i;
2963
2964 if (src->cnt == 0)
2965 return;
2966
2967 if (dst->size < dst->cnt + src->cnt)
2968 rd_kafka_topic_partition_list_grow(dst, src->cnt);
2969
2970 for (i = 0 ; i < src->cnt ; i++)
2971 rd_kafka_topic_partition_list_add_copy(dst, &src->elems[i]);
2972 }
2973
2974
2975 /**
2976 * @brief Compare two partition lists using partition comparator \p cmp.
2977 *
2978 * @warning This is an O(Na*Nb) operation.
2979 */
2980 int
rd_kafka_topic_partition_list_cmp(const void * _a,const void * _b,int (* cmp)(const void *,const void *))2981 rd_kafka_topic_partition_list_cmp (const void *_a, const void *_b,
2982 int (*cmp) (const void *, const void *)) {
2983 const rd_kafka_topic_partition_list_t *a = _a, *b = _b;
2984 int r;
2985 int i;
2986
2987 r = a->cnt - b->cnt;
2988 if (r || a->cnt == 0)
2989 return r;
2990
2991 /* Since the lists may not be sorted we need to scan all of B
2992 * for each element in A.
2993 * FIXME: If the list sizes are larger than X we could create a
2994 * temporary hash map instead. */
2995 for (i = 0 ; i < a->cnt ; i++) {
2996 int j;
2997
2998 for (j = 0 ; j < b->cnt ; j++) {
2999 r = cmp(&a->elems[i], &b->elems[j]);
3000 if (!r)
3001 break;
3002 }
3003
3004 if (j == b->cnt)
3005 return 1;
3006 }
3007
3008 return 0;
3009 }
3010
3011
3012 /**
3013 * @brief Ensures the \p rktpar has a toppar set in _private.
3014 *
3015 * @returns the toppar object (or possibly NULL if \p create_on_miss is true)
3016 * WITHOUT refcnt increased.
3017 */
3018 rd_kafka_toppar_t *
rd_kafka_topic_partition_ensure_toppar(rd_kafka_t * rk,rd_kafka_topic_partition_t * rktpar,rd_bool_t create_on_miss)3019 rd_kafka_topic_partition_ensure_toppar (rd_kafka_t *rk,
3020 rd_kafka_topic_partition_t *rktpar,
3021 rd_bool_t create_on_miss) {
3022 if (!rktpar->_private)
3023 rktpar->_private =
3024 rd_kafka_toppar_get2(rk,
3025 rktpar->topic,
3026 rktpar->partition, 0,
3027 create_on_miss);
3028 return rktpar->_private;
3029 }
3030
3031
3032 /**
3033 * @returns (and sets if necessary) the \p rktpar's _private / toppar.
3034 * @remark a new reference is returned.
3035 */
3036 rd_kafka_toppar_t *
rd_kafka_topic_partition_get_toppar(rd_kafka_t * rk,rd_kafka_topic_partition_t * rktpar,rd_bool_t create_on_miss)3037 rd_kafka_topic_partition_get_toppar (rd_kafka_t *rk,
3038 rd_kafka_topic_partition_t *rktpar,
3039 rd_bool_t create_on_miss) {
3040 rd_kafka_toppar_t *rktp;
3041
3042 rktp = rd_kafka_topic_partition_ensure_toppar(rk, rktpar,
3043 create_on_miss);
3044
3045 if (rktp)
3046 rd_kafka_toppar_keep(rktp);
3047
3048 return rktp;
3049 }
3050
3051
rd_kafka_topic_partition_cmp(const void * _a,const void * _b)3052 int rd_kafka_topic_partition_cmp (const void *_a, const void *_b) {
3053 const rd_kafka_topic_partition_t *a = _a;
3054 const rd_kafka_topic_partition_t *b = _b;
3055 int r = strcmp(a->topic, b->topic);
3056 if (r)
3057 return r;
3058 else
3059 return RD_CMP(a->partition, b->partition);
3060 }
3061
3062 /** @brief Compare only the topic */
rd_kafka_topic_partition_cmp_topic(const void * _a,const void * _b)3063 int rd_kafka_topic_partition_cmp_topic (const void *_a, const void *_b) {
3064 const rd_kafka_topic_partition_t *a = _a;
3065 const rd_kafka_topic_partition_t *b = _b;
3066 return strcmp(a->topic, b->topic);
3067 }
3068
rd_kafka_topic_partition_cmp_opaque(const void * _a,const void * _b,void * opaque)3069 static int rd_kafka_topic_partition_cmp_opaque (const void *_a, const void *_b,
3070 void *opaque) {
3071 return rd_kafka_topic_partition_cmp(_a, _b);
3072 }
3073
3074 /** @returns a hash of the topic and partition */
rd_kafka_topic_partition_hash(const void * _a)3075 unsigned int rd_kafka_topic_partition_hash (const void *_a) {
3076 const rd_kafka_topic_partition_t *a = _a;
3077 int r = 31 * 17 + a->partition;
3078 return 31 * r + rd_string_hash(a->topic, -1);
3079 }
3080
3081
3082
3083 /**
3084 * @brief Search 'rktparlist' for 'topic' and 'partition'.
3085 * @returns the elems[] index or -1 on miss.
3086 */
3087 static int
rd_kafka_topic_partition_list_find0(const rd_kafka_topic_partition_list_t * rktparlist,const char * topic,int32_t partition,int (* cmp)(const void *,const void *))3088 rd_kafka_topic_partition_list_find0 (
3089 const rd_kafka_topic_partition_list_t *rktparlist,
3090 const char *topic, int32_t partition,
3091 int (*cmp) (const void *, const void *)) {
3092 rd_kafka_topic_partition_t skel;
3093 int i;
3094
3095 skel.topic = (char *)topic;
3096 skel.partition = partition;
3097
3098 for (i = 0 ; i < rktparlist->cnt ; i++) {
3099 if (!cmp(&skel, &rktparlist->elems[i]))
3100 return i;
3101 }
3102
3103 return -1;
3104 }
3105
3106 rd_kafka_topic_partition_t *
rd_kafka_topic_partition_list_find(const rd_kafka_topic_partition_list_t * rktparlist,const char * topic,int32_t partition)3107 rd_kafka_topic_partition_list_find (
3108 const rd_kafka_topic_partition_list_t *rktparlist,
3109 const char *topic, int32_t partition) {
3110 int i = rd_kafka_topic_partition_list_find0(
3111 rktparlist, topic, partition, rd_kafka_topic_partition_cmp);
3112 if (i == -1)
3113 return NULL;
3114 else
3115 return &rktparlist->elems[i];
3116 }
3117
3118
3119 int
rd_kafka_topic_partition_list_find_idx(const rd_kafka_topic_partition_list_t * rktparlist,const char * topic,int32_t partition)3120 rd_kafka_topic_partition_list_find_idx (
3121 const rd_kafka_topic_partition_list_t *rktparlist,
3122 const char *topic, int32_t partition) {
3123 return rd_kafka_topic_partition_list_find0(
3124 rktparlist, topic, partition, rd_kafka_topic_partition_cmp);
3125 }
3126
3127
3128 /**
3129 * @returns the first element that matches \p topic, regardless of partition.
3130 */
3131 rd_kafka_topic_partition_t *
rd_kafka_topic_partition_list_find_topic(const rd_kafka_topic_partition_list_t * rktparlist,const char * topic)3132 rd_kafka_topic_partition_list_find_topic (
3133 const rd_kafka_topic_partition_list_t *rktparlist, const char *topic) {
3134 int i = rd_kafka_topic_partition_list_find0(
3135 rktparlist, topic, RD_KAFKA_PARTITION_UA,
3136 rd_kafka_topic_partition_cmp_topic);
3137 if (i == -1)
3138 return NULL;
3139 else
3140 return &rktparlist->elems[i];
3141 }
3142
3143
3144 int
rd_kafka_topic_partition_list_del_by_idx(rd_kafka_topic_partition_list_t * rktparlist,int idx)3145 rd_kafka_topic_partition_list_del_by_idx (rd_kafka_topic_partition_list_t *rktparlist,
3146 int idx) {
3147 if (unlikely(idx < 0 || idx >= rktparlist->cnt))
3148 return 0;
3149
3150 rd_kafka_topic_partition_destroy0(&rktparlist->elems[idx], 0);
3151 memmove(&rktparlist->elems[idx], &rktparlist->elems[idx+1],
3152 (rktparlist->cnt - idx - 1) * sizeof(rktparlist->elems[idx]));
3153 rktparlist->cnt--;
3154
3155 return 1;
3156 }
3157
3158
3159 int
rd_kafka_topic_partition_list_del(rd_kafka_topic_partition_list_t * rktparlist,const char * topic,int32_t partition)3160 rd_kafka_topic_partition_list_del (rd_kafka_topic_partition_list_t *rktparlist,
3161 const char *topic, int32_t partition) {
3162 int i = rd_kafka_topic_partition_list_find0(
3163 rktparlist, topic, partition, rd_kafka_topic_partition_cmp);
3164 if (i == -1)
3165 return 0;
3166
3167 return rd_kafka_topic_partition_list_del_by_idx(rktparlist, i);
3168 }
3169
3170
3171
3172 /**
3173 * Returns true if 'topic' matches the 'rktpar', else false.
3174 * On match, if rktpar is a regex pattern then 'matched_by_regex' is set to 1.
3175 */
rd_kafka_topic_partition_match(rd_kafka_t * rk,const rd_kafka_group_member_t * rkgm,const rd_kafka_topic_partition_t * rktpar,const char * topic,int * matched_by_regex)3176 int rd_kafka_topic_partition_match (rd_kafka_t *rk,
3177 const rd_kafka_group_member_t *rkgm,
3178 const rd_kafka_topic_partition_t *rktpar,
3179 const char *topic, int *matched_by_regex) {
3180 int ret = 0;
3181
3182 if (*rktpar->topic == '^') {
3183 char errstr[128];
3184
3185 ret = rd_regex_match(rktpar->topic, topic,
3186 errstr, sizeof(errstr));
3187 if (ret == -1) {
3188 rd_kafka_dbg(rk, CGRP,
3189 "SUBMATCH",
3190 "Invalid regex for member "
3191 "\"%.*s\" subscription \"%s\": %s",
3192 RD_KAFKAP_STR_PR(rkgm->rkgm_member_id),
3193 rktpar->topic, errstr);
3194 return 0;
3195 }
3196
3197 if (ret && matched_by_regex)
3198 *matched_by_regex = 1;
3199
3200 } else if (!strcmp(rktpar->topic, topic)) {
3201
3202 if (matched_by_regex)
3203 *matched_by_regex = 0;
3204
3205 ret = 1;
3206 }
3207
3208 return ret;
3209 }
3210
3211
3212
rd_kafka_topic_partition_list_sort(rd_kafka_topic_partition_list_t * rktparlist,int (* cmp)(const void *,const void *,void *),void * opaque)3213 void rd_kafka_topic_partition_list_sort (
3214 rd_kafka_topic_partition_list_t *rktparlist,
3215 int (*cmp) (const void *, const void *, void *),
3216 void *opaque) {
3217
3218 if (!cmp)
3219 cmp = rd_kafka_topic_partition_cmp_opaque;
3220
3221 rd_qsort_r(rktparlist->elems, rktparlist->cnt,
3222 sizeof(*rktparlist->elems),
3223 cmp, opaque);
3224 }
3225
3226
rd_kafka_topic_partition_list_sort_by_topic(rd_kafka_topic_partition_list_t * rktparlist)3227 void rd_kafka_topic_partition_list_sort_by_topic (
3228 rd_kafka_topic_partition_list_t *rktparlist) {
3229 rd_kafka_topic_partition_list_sort(rktparlist,
3230 rd_kafka_topic_partition_cmp_opaque,
3231 NULL);
3232 }
3233
rd_kafka_topic_partition_list_set_offset(rd_kafka_topic_partition_list_t * rktparlist,const char * topic,int32_t partition,int64_t offset)3234 rd_kafka_resp_err_t rd_kafka_topic_partition_list_set_offset (
3235 rd_kafka_topic_partition_list_t *rktparlist,
3236 const char *topic, int32_t partition, int64_t offset) {
3237 rd_kafka_topic_partition_t *rktpar;
3238
3239 if (!(rktpar = rd_kafka_topic_partition_list_find(rktparlist,
3240 topic, partition)))
3241 return RD_KAFKA_RESP_ERR__UNKNOWN_PARTITION;
3242
3243 rktpar->offset = offset;
3244
3245 return RD_KAFKA_RESP_ERR_NO_ERROR;
3246 }
3247
3248
3249 /**
3250 * @brief Reset all offsets to the provided value.
3251 */
3252 void
rd_kafka_topic_partition_list_reset_offsets(rd_kafka_topic_partition_list_t * rktparlist,int64_t offset)3253 rd_kafka_topic_partition_list_reset_offsets (rd_kafka_topic_partition_list_t *rktparlist,
3254 int64_t offset) {
3255
3256 int i;
3257 for (i = 0 ; i < rktparlist->cnt ; i++)
3258 rktparlist->elems[i].offset = offset;
3259 }
3260
3261
3262 /**
3263 * Set offset values in partition list based on toppar's last stored offset.
3264 *
3265 * from_rktp - true: set rktp's last stored offset, false: set def_value
3266 * unless a concrete offset is set.
3267 * is_commit: indicates that set offset is to be committed (for debug log)
3268 *
3269 * Returns the number of valid non-logical offsets (>=0).
3270 */
rd_kafka_topic_partition_list_set_offsets(rd_kafka_t * rk,rd_kafka_topic_partition_list_t * rktparlist,int from_rktp,int64_t def_value,int is_commit)3271 int rd_kafka_topic_partition_list_set_offsets (
3272 rd_kafka_t *rk,
3273 rd_kafka_topic_partition_list_t *rktparlist,
3274 int from_rktp, int64_t def_value, int is_commit) {
3275 int i;
3276 int valid_cnt = 0;
3277
3278 for (i = 0 ; i < rktparlist->cnt ; i++) {
3279 rd_kafka_topic_partition_t *rktpar = &rktparlist->elems[i];
3280 const char *verb = "setting";
3281 char preamble[80];
3282
3283 *preamble = '\0'; /* Avoid warning */
3284
3285 if (from_rktp) {
3286 rd_kafka_toppar_t *rktp = rktpar->_private;
3287 rd_kafka_toppar_lock(rktp);
3288
3289 if (rk->rk_conf.debug & (RD_KAFKA_DBG_CGRP |
3290 RD_KAFKA_DBG_TOPIC))
3291 rd_snprintf(preamble, sizeof(preamble),
3292 "stored offset %"PRId64
3293 ", committed offset %"PRId64": ",
3294 rktp->rktp_stored_offset,
3295 rktp->rktp_committed_offset);
3296
3297 if (rktp->rktp_stored_offset >
3298 rktp->rktp_committed_offset) {
3299 verb = "setting stored";
3300 rktpar->offset = rktp->rktp_stored_offset;
3301 } else {
3302 rktpar->offset = RD_KAFKA_OFFSET_INVALID;
3303 }
3304 rd_kafka_toppar_unlock(rktp);
3305 } else {
3306 if (RD_KAFKA_OFFSET_IS_LOGICAL(rktpar->offset)) {
3307 verb = "setting default";
3308 rktpar->offset = def_value;
3309 } else
3310 verb = "keeping";
3311 }
3312
3313 if (is_commit && rktpar->offset == RD_KAFKA_OFFSET_INVALID)
3314 rd_kafka_dbg(rk, CGRP | RD_KAFKA_DBG_TOPIC, "OFFSET",
3315 "Topic %s [%"PRId32"]: "
3316 "%snot including in commit",
3317 rktpar->topic, rktpar->partition,
3318 preamble);
3319 else
3320 rd_kafka_dbg(rk, CGRP | RD_KAFKA_DBG_TOPIC, "OFFSET",
3321 "Topic %s [%"PRId32"]: "
3322 "%s%s offset %s%s",
3323 rktpar->topic, rktpar->partition,
3324 preamble,
3325 verb,
3326 rd_kafka_offset2str(rktpar->offset),
3327 is_commit ? " for commit" : "");
3328
3329 if (!RD_KAFKA_OFFSET_IS_LOGICAL(rktpar->offset))
3330 valid_cnt++;
3331 }
3332
3333 return valid_cnt;
3334 }
3335
3336
3337 /**
3338 * @returns the number of partitions with absolute (non-logical) offsets set.
3339 */
rd_kafka_topic_partition_list_count_abs_offsets(const rd_kafka_topic_partition_list_t * rktparlist)3340 int rd_kafka_topic_partition_list_count_abs_offsets (
3341 const rd_kafka_topic_partition_list_t *rktparlist) {
3342 int i;
3343 int valid_cnt = 0;
3344
3345 for (i = 0 ; i < rktparlist->cnt ; i++)
3346 if (!RD_KAFKA_OFFSET_IS_LOGICAL(rktparlist->elems[i].offset))
3347 valid_cnt++;
3348
3349 return valid_cnt;
3350 }
3351
3352
3353 /**
3354 * @brief Update _private (toppar) field to point to valid rktp
3355 * for each parition.
3356 *
3357 * @param create_on_miss Create partition (and topic_t object) if necessary.
3358 */
3359 void
rd_kafka_topic_partition_list_update_toppars(rd_kafka_t * rk,rd_kafka_topic_partition_list_t * rktparlist,rd_bool_t create_on_miss)3360 rd_kafka_topic_partition_list_update_toppars (rd_kafka_t *rk,
3361 rd_kafka_topic_partition_list_t
3362 *rktparlist,
3363 rd_bool_t create_on_miss) {
3364 int i;
3365 for (i = 0 ; i < rktparlist->cnt ; i++) {
3366 rd_kafka_topic_partition_t *rktpar = &rktparlist->elems[i];
3367
3368 if (!rktpar->_private)
3369 rktpar->_private =
3370 rd_kafka_toppar_get2(rk,
3371 rktpar->topic,
3372 rktpar->partition,
3373 0/*not ua-on-miss*/,
3374 create_on_miss);
3375
3376 }
3377 }
3378
3379
3380 /**
3381 * @brief Populate \p leaders with the leaders+partitions for the partitions in
3382 * \p rktparlist. Duplicates are suppressed.
3383 *
3384 * If no leader is found for a partition that element's \c .err will
3385 * be set to RD_KAFKA_RESP_ERR_LEADER_NOT_AVAILABLE.
3386 *
3387 * If the partition does not exist \c .err will be set to
3388 * RD_KAFKA_RESP_ERR__UNKNOWN_PARTITION.
3389 *
3390 * @param rktparlist The partitions to look up leaders for, the .err field
3391 * will be set according to outcome, e.g., ERR_NO_ERROR,
3392 * ERR_UNKNOWN_TOPIC_OR_PART, etc.
3393 * @param leaders rd_list_t of allocated (struct rd_kafka_partition_leader *)
3394 * @param query_topics (optional) rd_list of strdupped (char *)
3395 * @param query_unknown Add unknown topics to \p query_topics.
3396 * @param eonce (optional) For triggering asynchronously on cache change
3397 * in case not all leaders are known now.
3398 *
3399 * @remark This is based on the current topic_t and partition state
3400 * which may lag behind the last metadata update due to internal
3401 * threading and also the fact that no topic_t may have been created.
3402 *
3403 * @param leaders rd_list_t of type (struct rd_kafka_partition_leader *)
3404 *
3405 * @returns true if all partitions have leaders, else false.
3406 *
3407 * @sa rd_kafka_topic_partition_list_get_leaders_by_metadata
3408 *
3409 * @locks rd_kafka_*lock() MUST NOT be held
3410 */
3411 static rd_bool_t
rd_kafka_topic_partition_list_get_leaders(rd_kafka_t * rk,rd_kafka_topic_partition_list_t * rktparlist,rd_list_t * leaders,rd_list_t * query_topics,rd_bool_t query_unknown,rd_kafka_enq_once_t * eonce)3412 rd_kafka_topic_partition_list_get_leaders (
3413 rd_kafka_t *rk,
3414 rd_kafka_topic_partition_list_t *rktparlist,
3415 rd_list_t *leaders,
3416 rd_list_t *query_topics,
3417 rd_bool_t query_unknown,
3418 rd_kafka_enq_once_t *eonce) {
3419 rd_bool_t complete;
3420 int cnt = 0;
3421 int i;
3422
3423 if (eonce)
3424 rd_kafka_wrlock(rk);
3425 else
3426 rd_kafka_rdlock(rk);
3427
3428 for (i = 0 ; i < rktparlist->cnt ; i++) {
3429 rd_kafka_topic_partition_t *rktpar = &rktparlist->elems[i];
3430 rd_kafka_topic_partition_t *rktpar2;
3431 rd_kafka_broker_t *rkb = NULL;
3432 struct rd_kafka_partition_leader leader_skel;
3433 struct rd_kafka_partition_leader *leader;
3434 const rd_kafka_metadata_topic_t *mtopic;
3435 const rd_kafka_metadata_partition_t *mpart;
3436 rd_bool_t topic_wait_cache;
3437
3438 rd_kafka_metadata_cache_topic_partition_get(
3439 rk, &mtopic, &mpart,
3440 rktpar->topic, rktpar->partition,
3441 0/*negative entries too*/);
3442
3443 topic_wait_cache =
3444 !mtopic ||
3445 RD_KAFKA_METADATA_CACHE_ERR_IS_TEMPORARY(mtopic->err);
3446
3447 if (!topic_wait_cache &&
3448 mtopic &&
3449 mtopic->err != RD_KAFKA_RESP_ERR_NO_ERROR &&
3450 mtopic->err != RD_KAFKA_RESP_ERR_LEADER_NOT_AVAILABLE) {
3451 /* Topic permanently errored */
3452 rktpar->err = mtopic->err;
3453 continue;
3454 }
3455
3456 if (mtopic && !mpart && mtopic->partition_cnt > 0) {
3457 /* Topic exists but partition doesnt.
3458 * This is a permanent error. */
3459 rktpar->err = RD_KAFKA_RESP_ERR__UNKNOWN_PARTITION;
3460 continue;
3461 }
3462
3463 if (mpart &&
3464 (mpart->leader == -1 ||
3465 !(rkb = rd_kafka_broker_find_by_nodeid0(
3466 rk, mpart->leader, -1/*any state*/,
3467 rd_false)))) {
3468 /* Partition has no (valid) leader.
3469 * This is a permanent error. */
3470 rktpar->err =
3471 mtopic->err ? mtopic->err :
3472 RD_KAFKA_RESP_ERR_LEADER_NOT_AVAILABLE;
3473 continue;
3474 }
3475
3476 if (topic_wait_cache || !rkb) {
3477 /* Topic unknown or no current leader for partition,
3478 * add topic to query list. */
3479 rktpar->err = RD_KAFKA_RESP_ERR__IN_PROGRESS;
3480 if (query_topics &&
3481 !rd_list_find(query_topics, rktpar->topic,
3482 (void *)strcmp))
3483 rd_list_add(query_topics,
3484 rd_strdup(rktpar->topic));
3485 continue;
3486 }
3487
3488 /* Leader exists, add to leader list. */
3489
3490 rktpar->err = RD_KAFKA_RESP_ERR_NO_ERROR;
3491
3492 memset(&leader_skel, 0, sizeof(leader_skel));
3493 leader_skel.rkb = rkb;
3494
3495 leader = rd_list_find(leaders, &leader_skel,
3496 rd_kafka_partition_leader_cmp);
3497
3498 if (!leader) {
3499 leader = rd_kafka_partition_leader_new(rkb);
3500 rd_list_add(leaders, leader);
3501 }
3502
3503 rktpar2 = rd_kafka_topic_partition_list_find(leader->partitions,
3504 rktpar->topic,
3505 rktpar->partition);
3506 if (rktpar2) {
3507 /* Already exists in partitions list, just update. */
3508 rd_kafka_topic_partition_update(rktpar2, rktpar);
3509 } else {
3510 /* Make a copy of rktpar and add to partitions list */
3511 rd_kafka_topic_partition_list_add_copy(
3512 leader->partitions, rktpar);
3513 }
3514
3515 rktpar->err = RD_KAFKA_RESP_ERR_NO_ERROR;
3516
3517 rd_kafka_broker_destroy(rkb); /* loose refcount */
3518 cnt++;
3519 }
3520
3521 complete = cnt == rktparlist->cnt;
3522
3523 if (!complete && eonce)
3524 /* Add eonce to cache observers */
3525 rd_kafka_metadata_cache_wait_state_change_async(rk, eonce);
3526
3527 if (eonce)
3528 rd_kafka_wrunlock(rk);
3529 else
3530 rd_kafka_rdunlock(rk);
3531
3532 return complete;
3533 }
3534
3535
3536 /**
3537 * @brief Timer timeout callback for query_leaders_async rko's eonce object.
3538 */
3539 static void
rd_kafka_partition_leader_query_eonce_timeout_cb(rd_kafka_timers_t * rkts,void * arg)3540 rd_kafka_partition_leader_query_eonce_timeout_cb (rd_kafka_timers_t *rkts,
3541 void *arg) {
3542 rd_kafka_enq_once_t *eonce = arg;
3543 rd_kafka_enq_once_trigger(eonce, RD_KAFKA_RESP_ERR__TIMED_OUT,
3544 "timeout timer");
3545 }
3546
3547
3548 /**
3549 * @brief Query timer callback for query_leaders_async rko's eonce object.
3550 */
3551 static void
rd_kafka_partition_leader_query_eonce_timer_cb(rd_kafka_timers_t * rkts,void * arg)3552 rd_kafka_partition_leader_query_eonce_timer_cb (rd_kafka_timers_t *rkts,
3553 void *arg) {
3554 rd_kafka_enq_once_t *eonce = arg;
3555 rd_kafka_enq_once_trigger(eonce, RD_KAFKA_RESP_ERR_NO_ERROR,
3556 "query timer");
3557 }
3558
3559
3560 /**
3561 * @brief Query metadata cache for partition leaders, or trigger metadata
3562 * refresh if leaders not known.
3563 *
3564 * @locks_required none
3565 * @locality any
3566 */
3567 static rd_kafka_op_res_t
rd_kafka_topic_partition_list_query_leaders_async_worker(rd_kafka_op_t * rko)3568 rd_kafka_topic_partition_list_query_leaders_async_worker (rd_kafka_op_t *rko) {
3569 rd_kafka_t *rk = rko->rko_rk;
3570 rd_list_t query_topics, *leaders = NULL;
3571 rd_kafka_op_t *reply;
3572
3573 RD_KAFKA_OP_TYPE_ASSERT(rko, RD_KAFKA_OP_LEADERS);
3574
3575 if (rko->rko_err)
3576 goto reply; /* Timeout or ERR__DESTROY */
3577
3578 /* Since we're iterating over get_leaders() until all partition leaders
3579 * are known we need to re-enable the eonce to be triggered again (which
3580 * is not necessary the first time we get here, but there
3581 * is no harm doing it then either). */
3582 rd_kafka_enq_once_reenable(rko->rko_u.leaders.eonce,
3583 rko, RD_KAFKA_REPLYQ(rk->rk_ops, 0));
3584
3585 /* Look up the leaders in the metadata cache, if not all leaders
3586 * are known the eonce is registered for metadata cache changes
3587 * which will cause our function to be called
3588 * again on (any) metadata cache change.
3589 *
3590 * When we are called again we perform the cache lookup again and
3591 * hopefully get all leaders, otherwise defer a new async wait.
3592 * Repeat until success or timeout. */
3593
3594 rd_list_init(&query_topics, 4 + rko->rko_u.leaders.partitions->cnt/2,
3595 rd_free);
3596
3597 leaders = rd_list_new(1 + rko->rko_u.leaders.partitions->cnt / 2,
3598 rd_kafka_partition_leader_destroy_free);
3599
3600 if (rd_kafka_topic_partition_list_get_leaders(
3601 rk, rko->rko_u.leaders.partitions,
3602 leaders,
3603 &query_topics,
3604 /* Add unknown topics to query_topics only on the
3605 * first query, after that we consider them permanently
3606 * non-existent */
3607 rko->rko_u.leaders.query_cnt == 0,
3608 rko->rko_u.leaders.eonce)) {
3609 /* All leaders now known (or failed), reply to caller */
3610 rd_list_destroy(&query_topics);
3611 goto reply;
3612 }
3613
3614 if (rd_list_empty(&query_topics)) {
3615 /* Not all leaders known but no topics left to query,
3616 * reply to caller. */
3617 rd_list_destroy(&query_topics);
3618 goto reply;
3619 }
3620
3621 /* Need to refresh topic metadata, but at most every interval. */
3622 if (!rd_kafka_timer_is_started(&rk->rk_timers,
3623 &rko->rko_u.leaders.query_tmr)) {
3624
3625 rko->rko_u.leaders.query_cnt++;
3626
3627 /* Add query interval timer. */
3628 rd_kafka_enq_once_add_source(rko->rko_u.leaders.eonce,
3629 "query timer");
3630 rd_kafka_timer_start_oneshot(
3631 &rk->rk_timers,
3632 &rko->rko_u.leaders.query_tmr,
3633 rd_true,
3634 3*1000*1000 /* 3s */,
3635 rd_kafka_partition_leader_query_eonce_timer_cb,
3636 rko->rko_u.leaders.eonce);
3637
3638 /* Request metadata refresh */
3639 rd_kafka_metadata_refresh_topics(
3640 rk, NULL, &query_topics,
3641 rd_true/*force*/,
3642 rd_false/*!allow_auto_create*/,
3643 rd_false/*!cgrp_update*/,
3644 "query partition leaders");
3645
3646 }
3647
3648 rd_list_destroy(leaders);
3649 rd_list_destroy(&query_topics);
3650
3651 /* Wait for next eonce trigger */
3652 return RD_KAFKA_OP_RES_KEEP; /* rko is still used */
3653
3654 reply:
3655 /* Decommission worker state and reply to caller */
3656
3657 if (rd_kafka_timer_stop(&rk->rk_timers,
3658 &rko->rko_u.leaders.query_tmr,
3659 RD_DO_LOCK))
3660 rd_kafka_enq_once_del_source(rko->rko_u.leaders.eonce,
3661 "query timer");
3662 if (rd_kafka_timer_stop(&rk->rk_timers,
3663 &rko->rko_u.leaders.timeout_tmr,
3664 RD_DO_LOCK))
3665 rd_kafka_enq_once_del_source(rko->rko_u.leaders.eonce,
3666 "timeout timer");
3667
3668 if (rko->rko_u.leaders.eonce) {
3669 rd_kafka_enq_once_disable(rko->rko_u.leaders.eonce);
3670 rko->rko_u.leaders.eonce = NULL;
3671 }
3672
3673 /* No leaders found, set a request-level error */
3674 if (leaders && rd_list_cnt(leaders) == 0) {
3675 if (!rko->rko_err)
3676 rko->rko_err = RD_KAFKA_RESP_ERR__NOENT;
3677 rd_list_destroy(leaders);
3678 leaders = NULL;
3679 }
3680
3681 /* Create and enqueue reply rko */
3682 if (rko->rko_u.leaders.replyq.q) {
3683 reply = rd_kafka_op_new_cb(rk, RD_KAFKA_OP_LEADERS,
3684 rko->rko_u.leaders.cb);
3685 rd_kafka_op_get_reply_version(reply, rko);
3686 reply->rko_err = rko->rko_err;
3687 reply->rko_u.leaders.partitions =
3688 rko->rko_u.leaders.partitions; /* Transfer ownership for
3689 * partition list that
3690 * now contains
3691 * per-partition errors*/
3692 rko->rko_u.leaders.partitions = NULL;
3693 reply->rko_u.leaders.leaders = leaders; /* Possibly NULL */
3694 reply->rko_u.leaders.opaque = rko->rko_u.leaders.opaque;
3695
3696 rd_kafka_replyq_enq(&rko->rko_u.leaders.replyq, reply, 0);
3697 }
3698
3699 return RD_KAFKA_OP_RES_HANDLED;
3700 }
3701
3702
3703 static rd_kafka_op_res_t
rd_kafka_topic_partition_list_query_leaders_async_worker_op_cb(rd_kafka_t * rk,rd_kafka_q_t * rkq,rd_kafka_op_t * rko)3704 rd_kafka_topic_partition_list_query_leaders_async_worker_op_cb (
3705 rd_kafka_t *rk,
3706 rd_kafka_q_t *rkq,
3707 rd_kafka_op_t *rko) {
3708 return rd_kafka_topic_partition_list_query_leaders_async_worker(rko);
3709 }
3710
3711 /**
3712 * @brief Async variant of rd_kafka_topic_partition_list_query_leaders().
3713 *
3714 * The reply rko op will contain:
3715 * - .leaders which is a list of leaders and their partitions, this may be
3716 * NULL for overall errors (such as no leaders are found), or a
3717 * partial or complete list of leaders.
3718 * - .partitions which is a copy of the input list of partitions with the
3719 * .err field set to the outcome of the leader query, typically ERR_NO_ERROR
3720 * or ERR_UNKNOWN_TOPIC_OR_PART.
3721 *
3722 * @locks_acquired rd_kafka_*lock()
3723 *
3724 * @remark rd_kafka_*lock() MUST NOT be held
3725 */
3726 void
rd_kafka_topic_partition_list_query_leaders_async(rd_kafka_t * rk,const rd_kafka_topic_partition_list_t * rktparlist,int timeout_ms,rd_kafka_replyq_t replyq,rd_kafka_op_cb_t * cb,void * opaque)3727 rd_kafka_topic_partition_list_query_leaders_async (
3728 rd_kafka_t *rk,
3729 const rd_kafka_topic_partition_list_t *rktparlist,
3730 int timeout_ms,
3731 rd_kafka_replyq_t replyq,
3732 rd_kafka_op_cb_t *cb,
3733 void *opaque) {
3734 rd_kafka_op_t *rko;
3735
3736 rd_assert(rktparlist && rktparlist->cnt > 0);
3737 rd_assert(replyq.q);
3738
3739 rko = rd_kafka_op_new_cb(
3740 rk,
3741 RD_KAFKA_OP_LEADERS,
3742 rd_kafka_topic_partition_list_query_leaders_async_worker_op_cb);
3743 rko->rko_u.leaders.replyq = replyq;
3744 rko->rko_u.leaders.partitions =
3745 rd_kafka_topic_partition_list_copy(rktparlist);
3746 rko->rko_u.leaders.ts_timeout = rd_timeout_init(timeout_ms);
3747 rko->rko_u.leaders.cb = cb;
3748 rko->rko_u.leaders.opaque = opaque;
3749
3750 /* Create an eonce to be triggered either by metadata cache update
3751 * (from refresh_topics()), query interval, or timeout. */
3752 rko->rko_u.leaders.eonce = rd_kafka_enq_once_new(
3753 rko, RD_KAFKA_REPLYQ(rk->rk_ops, 0));
3754
3755 rd_kafka_enq_once_add_source(rko->rko_u.leaders.eonce, "timeout timer");
3756 rd_kafka_timer_start_oneshot(
3757 &rk->rk_timers,
3758 &rko->rko_u.leaders.timeout_tmr,
3759 rd_true,
3760 rd_timeout_remains_us(rko->rko_u.leaders.ts_timeout),
3761 rd_kafka_partition_leader_query_eonce_timeout_cb,
3762 rko->rko_u.leaders.eonce);
3763
3764 if (rd_kafka_topic_partition_list_query_leaders_async_worker(rko) ==
3765 RD_KAFKA_OP_RES_HANDLED)
3766 rd_kafka_op_destroy(rko); /* Reply queue already disabled */
3767 }
3768
3769
3770 /**
3771 * @brief Get leaders for all partitions in \p rktparlist, querying metadata
3772 * if needed.
3773 *
3774 * @param leaders is a pre-initialized (empty) list which will be populated
3775 * with the leader brokers and their partitions
3776 * (struct rd_kafka_partition_leader *)
3777 *
3778 * @remark Will not trigger topic auto creation (unless configured).
3779 *
3780 * @returns an error code on error.
3781 *
3782 * @locks rd_kafka_*lock() MUST NOT be held
3783 */
3784 rd_kafka_resp_err_t
rd_kafka_topic_partition_list_query_leaders(rd_kafka_t * rk,rd_kafka_topic_partition_list_t * rktparlist,rd_list_t * leaders,int timeout_ms)3785 rd_kafka_topic_partition_list_query_leaders (
3786 rd_kafka_t *rk,
3787 rd_kafka_topic_partition_list_t *rktparlist,
3788 rd_list_t *leaders, int timeout_ms) {
3789 rd_ts_t ts_end = rd_timeout_init(timeout_ms);
3790 rd_ts_t ts_query = 0;
3791 rd_ts_t now;
3792 int query_cnt = 0;
3793 int i = 0;
3794
3795 /* Get all the partition leaders, try multiple times:
3796 * if there are no leaders after the first run fire off a leader
3797 * query and wait for broker state update before trying again,
3798 * keep trying and re-querying at increasing intervals until
3799 * success or timeout. */
3800 do {
3801 rd_list_t query_topics;
3802 int query_intvl;
3803
3804 rd_list_init(&query_topics, rktparlist->cnt, rd_free);
3805
3806 rd_kafka_topic_partition_list_get_leaders(
3807 rk, rktparlist, leaders, &query_topics,
3808 /* Add unknown topics to query_topics only on the
3809 * first query, after that we consider them
3810 * permanently non-existent */
3811 query_cnt == 0,
3812 NULL);
3813
3814 if (rd_list_empty(&query_topics)) {
3815 /* No remaining topics to query: leader-list complete.*/
3816 rd_list_destroy(&query_topics);
3817
3818 /* No leader(s) for partitions means all partitions
3819 * are unknown. */
3820 if (rd_list_empty(leaders))
3821 return RD_KAFKA_RESP_ERR__UNKNOWN_PARTITION;
3822
3823 return RD_KAFKA_RESP_ERR_NO_ERROR;
3824 }
3825
3826 now = rd_clock();
3827
3828 /*
3829 * Missing leader for some partitions
3830 */
3831 query_intvl = (i+1) * 100; /* add 100ms per iteration */
3832 if (query_intvl > 2*1000)
3833 query_intvl = 2*1000; /* Cap to 2s */
3834
3835 if (now >= ts_query + (query_intvl*1000)) {
3836 /* Query metadata for missing leaders,
3837 * possibly creating the topic. */
3838 rd_kafka_metadata_refresh_topics(
3839 rk, NULL, &query_topics,
3840 rd_true/*force*/,
3841 rd_false/*!allow_auto_create*/,
3842 rd_false/*!cgrp_update*/,
3843 "query partition leaders");
3844 ts_query = now;
3845 query_cnt++;
3846
3847 } else {
3848 /* Wait for broker ids to be updated from
3849 * metadata refresh above. */
3850 int wait_ms = rd_timeout_remains_limit(ts_end,
3851 query_intvl);
3852 rd_kafka_metadata_cache_wait_change(rk, wait_ms);
3853 }
3854
3855 rd_list_destroy(&query_topics);
3856
3857 i++;
3858 } while (ts_end == RD_POLL_INFINITE ||
3859 now < ts_end); /* now is deliberately outdated here
3860 * since wait_change() will block.
3861 * This gives us one more chance to spin thru*/
3862
3863 if (rd_atomic32_get(&rk->rk_broker_up_cnt) == 0)
3864 return RD_KAFKA_RESP_ERR__ALL_BROKERS_DOWN;
3865
3866 return RD_KAFKA_RESP_ERR__TIMED_OUT;
3867 }
3868
3869
3870 /**
3871 * @brief Populate \p rkts with the rd_kafka_topic_t objects for the
3872 * partitions in. Duplicates are suppressed.
3873 *
3874 * @returns the number of topics added.
3875 */
3876 int
rd_kafka_topic_partition_list_get_topics(rd_kafka_t * rk,rd_kafka_topic_partition_list_t * rktparlist,rd_list_t * rkts)3877 rd_kafka_topic_partition_list_get_topics (
3878 rd_kafka_t *rk,
3879 rd_kafka_topic_partition_list_t *rktparlist,
3880 rd_list_t *rkts) {
3881 int cnt = 0;
3882
3883 int i;
3884 for (i = 0 ; i < rktparlist->cnt ; i++) {
3885 rd_kafka_topic_partition_t *rktpar = &rktparlist->elems[i];
3886 rd_kafka_toppar_t *rktp;
3887
3888 rktp = rd_kafka_topic_partition_get_toppar(rk, rktpar,
3889 rd_false);
3890 if (!rktp) {
3891 rktpar->err = RD_KAFKA_RESP_ERR__UNKNOWN_PARTITION;
3892 continue;
3893 }
3894
3895 if (!rd_list_find(rkts, rktp->rktp_rkt,
3896 rd_kafka_topic_cmp_rkt)) {
3897 rd_list_add(rkts, rd_kafka_topic_keep(rktp->rktp_rkt));
3898 cnt++;
3899 }
3900
3901 rd_kafka_toppar_destroy(rktp);
3902 }
3903
3904 return cnt;
3905 }
3906
3907
3908 /**
3909 * @brief Populate \p topics with the strdupped topic names in \p rktparlist.
3910 * Duplicates are suppressed.
3911 *
3912 * @param include_regex: include regex topics
3913 *
3914 * @returns the number of topics added.
3915 */
3916 int
rd_kafka_topic_partition_list_get_topic_names(const rd_kafka_topic_partition_list_t * rktparlist,rd_list_t * topics,int include_regex)3917 rd_kafka_topic_partition_list_get_topic_names (
3918 const rd_kafka_topic_partition_list_t *rktparlist,
3919 rd_list_t *topics, int include_regex) {
3920 int cnt = 0;
3921 int i;
3922
3923 for (i = 0 ; i < rktparlist->cnt ; i++) {
3924 const rd_kafka_topic_partition_t *rktpar = &rktparlist->elems[i];
3925
3926 if (!include_regex && *rktpar->topic == '^')
3927 continue;
3928
3929 if (!rd_list_find(topics, rktpar->topic, (void *)strcmp)) {
3930 rd_list_add(topics, rd_strdup(rktpar->topic));
3931 cnt++;
3932 }
3933 }
3934
3935 return cnt;
3936 }
3937
3938
3939 /**
3940 * @brief Create a copy of \p rktparlist only containing the partitions
3941 * matched by \p match function.
3942 *
3943 * \p match shall return 1 for match, else 0.
3944 *
3945 * @returns a new list
3946 */
rd_kafka_topic_partition_list_match(const rd_kafka_topic_partition_list_t * rktparlist,int (* match)(const void * elem,const void * opaque),void * opaque)3947 rd_kafka_topic_partition_list_t *rd_kafka_topic_partition_list_match (
3948 const rd_kafka_topic_partition_list_t *rktparlist,
3949 int (*match) (const void *elem, const void *opaque),
3950 void *opaque) {
3951 rd_kafka_topic_partition_list_t *newlist;
3952 int i;
3953
3954 newlist = rd_kafka_topic_partition_list_new(0);
3955
3956 for (i = 0 ; i < rktparlist->cnt ; i++) {
3957 const rd_kafka_topic_partition_t *rktpar =
3958 &rktparlist->elems[i];
3959
3960 if (!match(rktpar, opaque))
3961 continue;
3962
3963 rd_kafka_topic_partition_list_add_copy(newlist, rktpar);
3964 }
3965
3966 return newlist;
3967 }
3968
3969 void
rd_kafka_topic_partition_list_log(rd_kafka_t * rk,const char * fac,int dbg,const rd_kafka_topic_partition_list_t * rktparlist)3970 rd_kafka_topic_partition_list_log (rd_kafka_t *rk, const char *fac, int dbg,
3971 const rd_kafka_topic_partition_list_t *rktparlist) {
3972 int i;
3973
3974 rd_kafka_dbg(rk, NONE|dbg, fac, "List with %d partition(s):",
3975 rktparlist->cnt);
3976 for (i = 0 ; i < rktparlist->cnt ; i++) {
3977 const rd_kafka_topic_partition_t *rktpar =
3978 &rktparlist->elems[i];
3979 rd_kafka_dbg(rk, NONE|dbg, fac, " %s [%"PRId32"] offset %s%s%s",
3980 rktpar->topic, rktpar->partition,
3981 rd_kafka_offset2str(rktpar->offset),
3982 rktpar->err ? ": error: " : "",
3983 rktpar->err ? rd_kafka_err2str(rktpar->err) : "");
3984 }
3985 }
3986
3987 /**
3988 * @returns a comma-separated list of partitions.
3989 */
3990 const char *
rd_kafka_topic_partition_list_str(const rd_kafka_topic_partition_list_t * rktparlist,char * dest,size_t dest_size,int fmt_flags)3991 rd_kafka_topic_partition_list_str (const rd_kafka_topic_partition_list_t *rktparlist,
3992 char *dest, size_t dest_size,
3993 int fmt_flags) {
3994 int i;
3995 size_t of = 0;
3996
3997 for (i = 0 ; i < rktparlist->cnt ; i++) {
3998 const rd_kafka_topic_partition_t *rktpar =
3999 &rktparlist->elems[i];
4000 char errstr[128];
4001 char offsetstr[32];
4002 int r;
4003
4004 if (!rktpar->err && (fmt_flags & RD_KAFKA_FMT_F_ONLY_ERR))
4005 continue;
4006
4007 if (rktpar->err && !(fmt_flags & RD_KAFKA_FMT_F_NO_ERR))
4008 rd_snprintf(errstr, sizeof(errstr),
4009 "(%s)", rd_kafka_err2str(rktpar->err));
4010 else
4011 errstr[0] = '\0';
4012
4013 if (rktpar->offset != RD_KAFKA_OFFSET_INVALID)
4014 rd_snprintf(offsetstr, sizeof(offsetstr),
4015 "@%"PRId64, rktpar->offset);
4016 else
4017 offsetstr[0] = '\0';
4018
4019 r = rd_snprintf(&dest[of], dest_size-of,
4020 "%s"
4021 "%s[%"PRId32"]"
4022 "%s"
4023 "%s",
4024 of == 0 ? "" : ", ",
4025 rktpar->topic, rktpar->partition,
4026 offsetstr,
4027 errstr);
4028
4029 if ((size_t)r >= dest_size-of) {
4030 rd_snprintf(&dest[dest_size-4], 4, "...");
4031 break;
4032 }
4033
4034 of += r;
4035 }
4036
4037 return dest;
4038 }
4039
4040
4041
4042 /**
4043 * @brief Update \p dst with info from \p src.
4044 *
4045 * Fields updated:
4046 * - metadata
4047 * - metadata_size
4048 * - offset
4049 * - err
4050 *
4051 * Will only update partitions that are in both dst and src, other partitions will
4052 * remain unchanged.
4053 */
4054 void
rd_kafka_topic_partition_list_update(rd_kafka_topic_partition_list_t * dst,const rd_kafka_topic_partition_list_t * src)4055 rd_kafka_topic_partition_list_update (rd_kafka_topic_partition_list_t *dst,
4056 const rd_kafka_topic_partition_list_t *src){
4057 int i;
4058
4059 for (i = 0 ; i < dst->cnt ; i++) {
4060 rd_kafka_topic_partition_t *d = &dst->elems[i];
4061 rd_kafka_topic_partition_t *s;
4062
4063 if (!(s = rd_kafka_topic_partition_list_find(
4064 (rd_kafka_topic_partition_list_t *)src,
4065 d->topic, d->partition)))
4066 continue;
4067
4068 d->offset = s->offset;
4069 d->err = s->err;
4070 if (d->metadata) {
4071 rd_free(d->metadata);
4072 d->metadata = NULL;
4073 d->metadata_size = 0;
4074 }
4075 if (s->metadata_size > 0) {
4076 d->metadata =
4077 rd_malloc(s->metadata_size);
4078 d->metadata_size = s->metadata_size;
4079 memcpy((void *)d->metadata, s->metadata,
4080 s->metadata_size);
4081 }
4082 }
4083 }
4084
4085
4086 /**
4087 * @returns the sum of \p cb called for each element.
4088 */
4089 size_t
rd_kafka_topic_partition_list_sum(const rd_kafka_topic_partition_list_t * rktparlist,size_t (* cb)(const rd_kafka_topic_partition_t * rktpar,void * opaque),void * opaque)4090 rd_kafka_topic_partition_list_sum (
4091 const rd_kafka_topic_partition_list_t *rktparlist,
4092 size_t (*cb) (const rd_kafka_topic_partition_t *rktpar, void *opaque),
4093 void *opaque) {
4094 int i;
4095 size_t sum = 0;
4096
4097 for (i = 0 ; i < rktparlist->cnt ; i++) {
4098 const rd_kafka_topic_partition_t *rktpar =
4099 &rktparlist->elems[i];
4100 sum += cb(rktpar, opaque);
4101 }
4102
4103 return sum;
4104 }
4105
4106
4107 /**
4108 * @returns rd_true if there are duplicate topic/partitions in the list,
4109 * rd_false if not.
4110 *
4111 * @remarks sorts the elements of the list.
4112 */
4113 rd_bool_t
rd_kafka_topic_partition_list_has_duplicates(rd_kafka_topic_partition_list_t * rktparlist,rd_bool_t ignore_partition)4114 rd_kafka_topic_partition_list_has_duplicates (
4115 rd_kafka_topic_partition_list_t *rktparlist,
4116 rd_bool_t ignore_partition) {
4117
4118 int i;
4119
4120 if (rktparlist->cnt <= 1)
4121 return rd_false;
4122
4123 rd_kafka_topic_partition_list_sort_by_topic(rktparlist);
4124
4125 for (i=1; i<rktparlist->cnt; i++) {
4126 const rd_kafka_topic_partition_t *p1 = &rktparlist->elems[i-1];
4127 const rd_kafka_topic_partition_t *p2 = &rktparlist->elems[i];
4128
4129 if (((p1->partition == p2->partition) || ignore_partition) &&
4130 !strcmp(p1->topic, p2->topic)) {
4131 return rd_true;
4132 }
4133 }
4134
4135 return rd_false;
4136 }
4137
4138
4139 /**
4140 * @brief Set \c .err field \p err on all partitions in list.
4141 */
rd_kafka_topic_partition_list_set_err(rd_kafka_topic_partition_list_t * rktparlist,rd_kafka_resp_err_t err)4142 void rd_kafka_topic_partition_list_set_err (
4143 rd_kafka_topic_partition_list_t *rktparlist,
4144 rd_kafka_resp_err_t err) {
4145 int i;
4146
4147 for (i = 0 ; i < rktparlist->cnt ; i++)
4148 rktparlist->elems[i].err = err;
4149 }
4150
4151 /**
4152 * @brief Get the first set error in the partition list.
4153 */
rd_kafka_topic_partition_list_get_err(const rd_kafka_topic_partition_list_t * rktparlist)4154 rd_kafka_resp_err_t rd_kafka_topic_partition_list_get_err (
4155 const rd_kafka_topic_partition_list_t *rktparlist) {
4156 int i;
4157
4158 for (i = 0 ; i < rktparlist->cnt ; i++)
4159 if (rktparlist->elems[i].err)
4160 return rktparlist->elems[i].err;
4161
4162 return RD_KAFKA_RESP_ERR_NO_ERROR;
4163 }
4164
4165
4166 /**
4167 * @returns the number of wildcard/regex topics
4168 */
rd_kafka_topic_partition_list_regex_cnt(const rd_kafka_topic_partition_list_t * rktparlist)4169 int rd_kafka_topic_partition_list_regex_cnt (
4170 const rd_kafka_topic_partition_list_t *rktparlist) {
4171 int i;
4172 int cnt = 0;
4173
4174 for (i = 0 ; i < rktparlist->cnt ; i++) {
4175 const rd_kafka_topic_partition_t *rktpar =
4176 &rktparlist->elems[i];
4177 cnt += *rktpar->topic == '^';
4178 }
4179 return cnt;
4180 }
4181
4182
4183 /**
4184 * @brief Reset base sequence for this toppar.
4185 *
4186 * See rd_kafka_toppar_pid_change() below.
4187 *
4188 * @warning Toppar must be completely drained.
4189 *
4190 * @locality toppar handler thread
4191 * @locks toppar_lock MUST be held.
4192 */
rd_kafka_toppar_reset_base_msgid(rd_kafka_toppar_t * rktp,uint64_t new_base_msgid)4193 static void rd_kafka_toppar_reset_base_msgid (rd_kafka_toppar_t *rktp,
4194 uint64_t new_base_msgid) {
4195 rd_kafka_dbg(rktp->rktp_rkt->rkt_rk,
4196 TOPIC|RD_KAFKA_DBG_EOS, "RESETSEQ",
4197 "%.*s [%"PRId32"] "
4198 "resetting epoch base seq from %"PRIu64" to %"PRIu64,
4199 RD_KAFKAP_STR_PR(rktp->rktp_rkt->rkt_topic),
4200 rktp->rktp_partition,
4201 rktp->rktp_eos.epoch_base_msgid, new_base_msgid);
4202
4203 rktp->rktp_eos.next_ack_seq = 0;
4204 rktp->rktp_eos.next_err_seq = 0;
4205 rktp->rktp_eos.epoch_base_msgid = new_base_msgid;
4206 }
4207
4208
4209 /**
4210 * @brief Update/change the Producer ID for this toppar.
4211 *
4212 * Must only be called when pid is different from the current toppar pid.
4213 *
4214 * The epoch base sequence will be set to \p base_msgid, which must be the
4215 * first message in the partition
4216 * queue. However, if there are outstanding messages in-flight to the broker
4217 * we will need to wait for these ProduceRequests to finish (most likely
4218 * with failure) and have their messages re-enqueued to maintain original order.
4219 * In this case the pid will not be updated and this function should be
4220 * called again when there are no outstanding messages.
4221 *
4222 * @remark This function must only be called when rktp_xmitq is non-empty.
4223 *
4224 * @returns 1 if a new pid was set, else 0.
4225 *
4226 * @locality toppar handler thread
4227 * @locks none
4228 */
rd_kafka_toppar_pid_change(rd_kafka_toppar_t * rktp,rd_kafka_pid_t pid,uint64_t base_msgid)4229 int rd_kafka_toppar_pid_change (rd_kafka_toppar_t *rktp, rd_kafka_pid_t pid,
4230 uint64_t base_msgid) {
4231 int inflight = rd_atomic32_get(&rktp->rktp_msgs_inflight);
4232
4233 if (unlikely(inflight > 0)) {
4234 rd_kafka_dbg(rktp->rktp_rkt->rkt_rk,
4235 TOPIC|RD_KAFKA_DBG_EOS, "NEWPID",
4236 "%.*s [%"PRId32"] will not change %s -> %s yet: "
4237 "%d message(s) still in-flight from current "
4238 "epoch",
4239 RD_KAFKAP_STR_PR(rktp->rktp_rkt->rkt_topic),
4240 rktp->rktp_partition,
4241 rd_kafka_pid2str(rktp->rktp_eos.pid),
4242 rd_kafka_pid2str(pid),
4243 inflight);
4244 return 0;
4245 }
4246
4247 rd_assert(base_msgid != 0 &&
4248 *"BUG: pid_change() must only be called with "
4249 "non-empty xmitq");
4250
4251 rd_kafka_toppar_lock(rktp);
4252 rd_kafka_dbg(rktp->rktp_rkt->rkt_rk,
4253 TOPIC|RD_KAFKA_DBG_EOS, "NEWPID",
4254 "%.*s [%"PRId32"] changed %s -> %s "
4255 "with base MsgId %"PRIu64,
4256 RD_KAFKAP_STR_PR(rktp->rktp_rkt->rkt_topic),
4257 rktp->rktp_partition,
4258 rd_kafka_pid2str(rktp->rktp_eos.pid),
4259 rd_kafka_pid2str(pid),
4260 base_msgid);
4261
4262 rktp->rktp_eos.pid = pid;
4263 rd_kafka_toppar_reset_base_msgid(rktp, base_msgid);
4264
4265 rd_kafka_toppar_unlock(rktp);
4266
4267 return 1;
4268 }
4269
4270
4271 /**
4272 * @brief Purge messages in partition queues.
4273 * Delivery reports will be enqueued for all purged messages, the error
4274 * code is set to RD_KAFKA_RESP_ERR__PURGE_QUEUE.
4275 *
4276 * @param include_xmit_msgq If executing from the rktp's current broker handler
4277 * thread, also include the xmit message queue.
4278 *
4279 * @warning Only to be used with the producer.
4280 *
4281 * @returns the number of messages purged
4282 *
4283 * @locality any thread.
4284 * @locks_acquired rd_kafka_toppar_lock()
4285 * @locks_required none
4286 */
rd_kafka_toppar_purge_queues(rd_kafka_toppar_t * rktp,int purge_flags,rd_bool_t include_xmit_msgq)4287 int rd_kafka_toppar_purge_queues (rd_kafka_toppar_t *rktp,
4288 int purge_flags,
4289 rd_bool_t include_xmit_msgq) {
4290 rd_kafka_t *rk = rktp->rktp_rkt->rkt_rk;
4291 rd_kafka_msgq_t rkmq = RD_KAFKA_MSGQ_INITIALIZER(rkmq);
4292 int cnt;
4293
4294 rd_assert(rk->rk_type == RD_KAFKA_PRODUCER);
4295
4296 rd_kafka_dbg(rk, TOPIC, "PURGE",
4297 "%s [%"PRId32"]: purging queues "
4298 "(purge_flags 0x%x, %s xmit_msgq)",
4299 rktp->rktp_rkt->rkt_topic->str,
4300 rktp->rktp_partition,
4301 purge_flags,
4302 include_xmit_msgq ? "include" : "exclude");
4303
4304 if (!(purge_flags & RD_KAFKA_PURGE_F_QUEUE))
4305 return 0;
4306
4307 if (include_xmit_msgq) {
4308 /* xmit_msgq is owned by the toppar handler thread
4309 * (broker thread) and requires no locking. */
4310 rd_assert(rktp->rktp_broker);
4311 rd_assert(thrd_is_current(rktp->rktp_broker->rkb_thread));
4312 rd_kafka_msgq_concat(&rkmq, &rktp->rktp_xmit_msgq);
4313 }
4314
4315 rd_kafka_toppar_lock(rktp);
4316 rd_kafka_msgq_concat(&rkmq, &rktp->rktp_msgq);
4317 cnt = rd_kafka_msgq_len(&rkmq);
4318
4319 if (cnt > 0 && purge_flags & RD_KAFKA_PURGE_F_ABORT_TXN) {
4320 /* All messages in-queue are purged
4321 * on abort_transaction(). Since these messages
4322 * will not be produced (retried) we need to adjust the
4323 * idempotence epoch's base msgid to skip the messages. */
4324 rktp->rktp_eos.epoch_base_msgid += cnt;
4325 rd_kafka_dbg(rk,
4326 TOPIC|RD_KAFKA_DBG_EOS, "ADVBASE",
4327 "%.*s [%"PRId32"] "
4328 "advancing epoch base msgid to %"PRIu64
4329 " due to %d message(s) in aborted transaction",
4330 RD_KAFKAP_STR_PR(rktp->rktp_rkt->rkt_topic),
4331 rktp->rktp_partition,
4332 rktp->rktp_eos.epoch_base_msgid, cnt);
4333 }
4334 rd_kafka_toppar_unlock(rktp);
4335
4336 rd_kafka_dr_msgq(rktp->rktp_rkt, &rkmq, RD_KAFKA_RESP_ERR__PURGE_QUEUE);
4337
4338 return cnt;
4339 }
4340
4341
4342 /**
4343 * @brief Purge queues for the unassigned toppars of all known topics.
4344 *
4345 * @locality application thread
4346 * @locks none
4347 */
rd_kafka_purge_ua_toppar_queues(rd_kafka_t * rk)4348 void rd_kafka_purge_ua_toppar_queues (rd_kafka_t *rk) {
4349 rd_kafka_topic_t *rkt;
4350 int msg_cnt = 0, part_cnt = 0;
4351
4352 rd_kafka_rdlock(rk);
4353 TAILQ_FOREACH(rkt, &rk->rk_topics, rkt_link) {
4354 rd_kafka_toppar_t *rktp;
4355 int r;
4356
4357 rd_kafka_topic_rdlock(rkt);
4358 rktp = rkt->rkt_ua;
4359 if (rktp)
4360 rd_kafka_toppar_keep(rktp);
4361 rd_kafka_topic_rdunlock(rkt);
4362
4363 if (unlikely(!rktp))
4364 continue;
4365
4366
4367 rd_kafka_toppar_lock(rktp);
4368
4369 r = rd_kafka_msgq_len(&rktp->rktp_msgq);
4370 rd_kafka_dr_msgq(rkt, &rktp->rktp_msgq,
4371 RD_KAFKA_RESP_ERR__PURGE_QUEUE);
4372 rd_kafka_toppar_unlock(rktp);
4373 rd_kafka_toppar_destroy(rktp);
4374
4375 if (r > 0) {
4376 msg_cnt += r;
4377 part_cnt++;
4378 }
4379 }
4380 rd_kafka_rdunlock(rk);
4381
4382 rd_kafka_dbg(rk, QUEUE|RD_KAFKA_DBG_TOPIC, "PURGEQ",
4383 "Purged %i message(s) from %d UA-partition(s)",
4384 msg_cnt, part_cnt);
4385 }
4386
4387
rd_kafka_partition_leader_destroy_free(void * ptr)4388 void rd_kafka_partition_leader_destroy_free (void *ptr) {
4389 struct rd_kafka_partition_leader *leader = ptr;
4390 rd_kafka_partition_leader_destroy(leader);
4391 }
4392