1 /*
2  * librdkafka - The Apache Kafka C/C++ library
3  *
4  * Copyright (c) 2015 Magnus Edenhill
5  * All rights reserved.
6  *
7  * Redistribution and use in source and binary forms, with or without
8  * modification, are permitted provided that the following conditions are met:
9  *
10  * 1. Redistributions of source code must retain the above copyright notice,
11  *    this list of conditions and the following disclaimer.
12  * 2. Redistributions in binary form must reproduce the above copyright notice,
13  *    this list of conditions and the following disclaimer in the documentation
14  *    and/or other materials provided with the distribution.
15  *
16  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
17  * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
18  * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
19  * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
20  * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
21  * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
22  * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
23  * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
24  * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
25  * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
26  * POSSIBILITY OF SUCH DAMAGE.
27  */
28 #include "rdkafka_int.h"
29 #include "rdkafka_topic.h"
30 #include "rdkafka_broker.h"
31 #include "rdkafka_request.h"
32 #include "rdkafka_offset.h"
33 #include "rdkafka_partition.h"
34 #include "rdregex.h"
35 #include "rdports.h"  /* rd_qsort_r() */
36 
37 #include "rdunittest.h"
38 
39 const char *rd_kafka_fetch_states[] = {
40 	"none",
41         "stopping",
42         "stopped",
43 	"offset-query",
44 	"offset-wait",
45 	"active"
46 };
47 
48 
49 static rd_kafka_op_res_t
50 rd_kafka_toppar_op_serve (rd_kafka_t *rk,
51                           rd_kafka_q_t *rkq, rd_kafka_op_t *rko,
52                           rd_kafka_q_cb_type_t cb_type, void *opaque);
53 
54 static void rd_kafka_toppar_offset_retry (rd_kafka_toppar_t *rktp,
55                                           int backoff_ms,
56                                           const char *reason);
57 
58 
59 static RD_INLINE int32_t
rd_kafka_toppar_version_new_barrier0(rd_kafka_toppar_t * rktp,const char * func,int line)60 rd_kafka_toppar_version_new_barrier0 (rd_kafka_toppar_t *rktp,
61 				     const char *func, int line) {
62 	int32_t version = rd_atomic32_add(&rktp->rktp_version, 1);
63 	rd_kafka_dbg(rktp->rktp_rkt->rkt_rk, TOPIC, "BARRIER",
64 		     "%s [%"PRId32"]: %s:%d: new version barrier v%"PRId32,
65 		     rktp->rktp_rkt->rkt_topic->str, rktp->rktp_partition,
66 		     func, line, version);
67 	return version;
68 }
69 
70 #define rd_kafka_toppar_version_new_barrier(rktp) \
71 	rd_kafka_toppar_version_new_barrier0(rktp, __FUNCTION__, __LINE__)
72 
73 
74 /**
75  * Toppar based OffsetResponse handling.
76  * This is used for updating the low water mark for consumer lag.
77  */
rd_kafka_toppar_lag_handle_Offset(rd_kafka_t * rk,rd_kafka_broker_t * rkb,rd_kafka_resp_err_t err,rd_kafka_buf_t * rkbuf,rd_kafka_buf_t * request,void * opaque)78 static void rd_kafka_toppar_lag_handle_Offset (rd_kafka_t *rk,
79 					       rd_kafka_broker_t *rkb,
80 					       rd_kafka_resp_err_t err,
81 					       rd_kafka_buf_t *rkbuf,
82 					       rd_kafka_buf_t *request,
83 					       void *opaque) {
84         rd_kafka_toppar_t *rktp = opaque;
85         rd_kafka_topic_partition_list_t *offsets;
86         rd_kafka_topic_partition_t *rktpar;
87 
88         offsets = rd_kafka_topic_partition_list_new(1);
89 
90         /* Parse and return Offset */
91         err = rd_kafka_handle_Offset(rkb->rkb_rk, rkb, err,
92                                      rkbuf, request, offsets);
93 
94         if (err == RD_KAFKA_RESP_ERR__IN_PROGRESS) {
95                 rd_kafka_topic_partition_list_destroy(offsets);
96                 return; /* Retrying */
97         }
98 
99         if (!err && !(rktpar = rd_kafka_topic_partition_list_find(
100                               offsets,
101                               rktp->rktp_rkt->rkt_topic->str,
102                               rktp->rktp_partition)))
103                 err = RD_KAFKA_RESP_ERR__UNKNOWN_PARTITION;
104 
105         if (!err && !rktpar->err) {
106                 rd_kafka_toppar_lock(rktp);
107                 rktp->rktp_lo_offset = rktpar->offset;
108                 rd_kafka_toppar_unlock(rktp);
109         }
110 
111         rd_kafka_topic_partition_list_destroy(offsets);
112 
113         rktp->rktp_wait_consumer_lag_resp = 0;
114 
115         rd_kafka_toppar_destroy(rktp); /* from request.opaque */
116 }
117 
118 
119 
120 /**
121  * Request information from broker to keep track of consumer lag.
122  *
123  * @locality toppar handle thread
124  * @locks none
125  */
rd_kafka_toppar_consumer_lag_req(rd_kafka_toppar_t * rktp)126 static void rd_kafka_toppar_consumer_lag_req (rd_kafka_toppar_t *rktp) {
127         rd_kafka_topic_partition_list_t *partitions;
128 
129         if (rktp->rktp_wait_consumer_lag_resp)
130                 return; /* Previous request not finished yet */
131 
132         rd_kafka_toppar_lock(rktp);
133 
134         /* Offset requests can only be sent to the leader replica.
135          *
136          * Note: If rktp is delegated to a preferred replica, it is
137          * certain that FETCH >= v5 and so rktp_lo_offset will be
138          * updated via LogStartOffset in the FETCH response.
139          */
140         if (!rktp->rktp_leader || (rktp->rktp_leader != rktp->rktp_broker)) {
141                 rd_kafka_toppar_unlock(rktp);
142 		return;
143         }
144 
145         /* Also don't send a timed log start offset request if leader
146          * broker supports FETCH >= v5, since this will be set when
147          * doing fetch requests.
148          */
149         if (rd_kafka_broker_ApiVersion_supported(rktp->rktp_broker,
150                                                  RD_KAFKAP_Fetch, 0,
151                                                  5, NULL) == 5) {
152                 rd_kafka_toppar_unlock(rktp);
153                 return;
154         }
155 
156         rktp->rktp_wait_consumer_lag_resp = 1;
157 
158         partitions = rd_kafka_topic_partition_list_new(1);
159         rd_kafka_topic_partition_list_add(partitions,
160                                           rktp->rktp_rkt->rkt_topic->str,
161                                           rktp->rktp_partition)->offset =
162                                           RD_KAFKA_OFFSET_BEGINNING;
163 
164         /* Ask for oldest offset. The newest offset is automatically
165          * propagated in FetchResponse.HighwaterMark. */
166         rd_kafka_OffsetRequest(rktp->rktp_broker, partitions, 0,
167                                RD_KAFKA_REPLYQ(rktp->rktp_ops, 0),
168                                rd_kafka_toppar_lag_handle_Offset,
169                                rd_kafka_toppar_keep(rktp));
170 
171         rd_kafka_toppar_unlock(rktp);
172 
173         rd_kafka_topic_partition_list_destroy(partitions);
174 }
175 
176 
177 
178 /**
179  * Request earliest offset for a partition
180  *
181  * Locality: toppar handler thread
182  */
rd_kafka_toppar_consumer_lag_tmr_cb(rd_kafka_timers_t * rkts,void * arg)183 static void rd_kafka_toppar_consumer_lag_tmr_cb (rd_kafka_timers_t *rkts,
184 						 void *arg) {
185 	rd_kafka_toppar_t *rktp = arg;
186 	rd_kafka_toppar_consumer_lag_req(rktp);
187 }
188 
189 /**
190  * @brief Update rktp_op_version.
191  *        Enqueue an RD_KAFKA_OP_BARRIER type of operation
192  *        when the op_version is updated.
193  *
194  * @locks_required rd_kafka_toppar_lock() must be held.
195  * @locality Toppar handler thread
196  */
rd_kafka_toppar_op_version_bump(rd_kafka_toppar_t * rktp,int32_t version)197 void rd_kafka_toppar_op_version_bump (rd_kafka_toppar_t *rktp,
198                                       int32_t version) {
199         rd_kafka_op_t *rko;
200 
201         rktp->rktp_op_version = version;
202         rko = rd_kafka_op_new(RD_KAFKA_OP_BARRIER);
203         rko->rko_version = version;
204         rd_kafka_q_enq(rktp->rktp_fetchq, rko);
205 }
206 
207 
208 /**
209  * Add new partition to topic.
210  *
211  * Locks: rd_kafka_topic_wrlock() must be held.
212  * Locks: rd_kafka_wrlock() must be held.
213  */
rd_kafka_toppar_new0(rd_kafka_topic_t * rkt,int32_t partition,const char * func,int line)214 rd_kafka_toppar_t *rd_kafka_toppar_new0 (rd_kafka_topic_t *rkt,
215                                          int32_t partition,
216                                          const char *func, int line) {
217 	rd_kafka_toppar_t *rktp;
218 
219 	rktp = rd_calloc(1, sizeof(*rktp));
220 
221 	rktp->rktp_partition = partition;
222 	rktp->rktp_rkt = rkt;
223         rktp->rktp_leader_id = -1;
224         rktp->rktp_broker_id = -1;
225         rd_interval_init(&rktp->rktp_lease_intvl);
226         rd_interval_init(&rktp->rktp_new_lease_intvl);
227         rd_interval_init(&rktp->rktp_new_lease_log_intvl);
228         rd_interval_init(&rktp->rktp_metadata_intvl);
229         /* Mark partition as unknown (does not exist) until we see the
230          * partition in topic metadata. */
231         if (partition != RD_KAFKA_PARTITION_UA)
232                 rktp->rktp_flags |= RD_KAFKA_TOPPAR_F_UNKNOWN;
233 	rktp->rktp_fetch_state = RD_KAFKA_TOPPAR_FETCH_NONE;
234         rktp->rktp_fetch_msg_max_bytes
235             = rkt->rkt_rk->rk_conf.fetch_msg_max_bytes;
236 	rktp->rktp_offset_fp = NULL;
237         rd_kafka_offset_stats_reset(&rktp->rktp_offsets);
238         rd_kafka_offset_stats_reset(&rktp->rktp_offsets_fin);
239         rktp->rktp_ls_offset = RD_KAFKA_OFFSET_INVALID;
240         rktp->rktp_hi_offset = RD_KAFKA_OFFSET_INVALID;
241 	rktp->rktp_lo_offset = RD_KAFKA_OFFSET_INVALID;
242         rktp->rktp_query_offset = RD_KAFKA_OFFSET_INVALID;
243         rktp->rktp_next_offset = RD_KAFKA_OFFSET_INVALID;
244         rktp->rktp_last_next_offset = RD_KAFKA_OFFSET_INVALID;
245 	rktp->rktp_app_offset = RD_KAFKA_OFFSET_INVALID;
246         rktp->rktp_stored_offset = RD_KAFKA_OFFSET_INVALID;
247         rktp->rktp_committing_offset = RD_KAFKA_OFFSET_INVALID;
248         rktp->rktp_committed_offset = RD_KAFKA_OFFSET_INVALID;
249 	rd_kafka_msgq_init(&rktp->rktp_msgq);
250 	rd_kafka_msgq_init(&rktp->rktp_xmit_msgq);
251 	mtx_init(&rktp->rktp_lock, mtx_plain);
252 
253         rd_refcnt_init(&rktp->rktp_refcnt, 0);
254 	rktp->rktp_fetchq = rd_kafka_q_new(rkt->rkt_rk);
255         rktp->rktp_ops    = rd_kafka_q_new(rkt->rkt_rk);
256         rktp->rktp_ops->rkq_serve = rd_kafka_toppar_op_serve;
257         rktp->rktp_ops->rkq_opaque = rktp;
258         rd_atomic32_init(&rktp->rktp_version, 1);
259 	rktp->rktp_op_version = rd_atomic32_get(&rktp->rktp_version);
260 
261         rd_atomic32_init(&rktp->rktp_msgs_inflight, 0);
262         rd_kafka_pid_reset(&rktp->rktp_eos.pid);
263 
264         /* Consumer: If statistics is available we query the log start offset
265          * of each partition.
266          * Since the oldest offset only moves on log retention, we cap this
267          * value on the low end to a reasonable value to avoid flooding
268          * the brokers with OffsetRequests when our statistics interval is low.
269          * FIXME: Use a global timer to collect offsets for all partitions
270          * FIXME: This timer is superfulous for FETCH >= v5 because the log
271          *        start offset is included in fetch responses.
272          * */
273         if (rktp->rktp_rkt->rkt_rk->rk_conf.stats_interval_ms > 0 &&
274             rkt->rkt_rk->rk_type == RD_KAFKA_CONSUMER &&
275             rktp->rktp_partition != RD_KAFKA_PARTITION_UA) {
276                 int intvl = rkt->rkt_rk->rk_conf.stats_interval_ms;
277                 if (intvl < 10 * 1000 /* 10s */)
278                         intvl = 10 * 1000;
279 		rd_kafka_timer_start(&rkt->rkt_rk->rk_timers,
280 				     &rktp->rktp_consumer_lag_tmr,
281                                      intvl * 1000ll,
282 				     rd_kafka_toppar_consumer_lag_tmr_cb,
283 				     rktp);
284         }
285 
286         rktp->rktp_rkt = rd_kafka_topic_keep(rkt);
287 
288 	rd_kafka_q_fwd_set(rktp->rktp_ops, rkt->rkt_rk->rk_ops);
289         rd_kafka_dbg(rkt->rkt_rk, TOPIC, "TOPPARNEW",
290                      "NEW %s [%"PRId32"] %p refcnt %p (at %s:%d)",
291                      rkt->rkt_topic->str, rktp->rktp_partition, rktp,
292                      &rktp->rktp_refcnt,
293                      func, line);
294 
295 	return rd_kafka_toppar_keep(rktp);
296 }
297 
298 
299 
300 /**
301  * Removes a toppar from its duties, global lists, etc.
302  *
303  * Locks: rd_kafka_toppar_lock() MUST be held
304  */
rd_kafka_toppar_remove(rd_kafka_toppar_t * rktp)305 static void rd_kafka_toppar_remove (rd_kafka_toppar_t *rktp) {
306         rd_kafka_dbg(rktp->rktp_rkt->rkt_rk, TOPIC, "TOPPARREMOVE",
307                      "Removing toppar %s [%"PRId32"] %p",
308                      rktp->rktp_rkt->rkt_topic->str, rktp->rktp_partition,
309 		     rktp);
310 
311 	rd_kafka_timer_stop(&rktp->rktp_rkt->rkt_rk->rk_timers,
312 			    &rktp->rktp_offset_query_tmr, 1/*lock*/);
313 	rd_kafka_timer_stop(&rktp->rktp_rkt->rkt_rk->rk_timers,
314 			    &rktp->rktp_consumer_lag_tmr, 1/*lock*/);
315 
316 	rd_kafka_q_fwd_set(rktp->rktp_ops, NULL);
317 }
318 
319 
320 /**
321  * Final destructor for partition.
322  */
rd_kafka_toppar_destroy_final(rd_kafka_toppar_t * rktp)323 void rd_kafka_toppar_destroy_final (rd_kafka_toppar_t *rktp) {
324 
325         rd_kafka_toppar_remove(rktp);
326 
327 	rd_kafka_dbg(rktp->rktp_rkt->rkt_rk, TOPIC, "DESTROY",
328 		     "%s [%"PRId32"]: %p DESTROY_FINAL",
329 		     rktp->rktp_rkt->rkt_topic->str,
330                      rktp->rktp_partition, rktp);
331 
332 	/* Clear queues */
333 	rd_kafka_assert(rktp->rktp_rkt->rkt_rk,
334 			rd_kafka_msgq_len(&rktp->rktp_xmit_msgq) == 0);
335 	rd_kafka_dr_msgq(rktp->rktp_rkt, &rktp->rktp_msgq,
336 			 RD_KAFKA_RESP_ERR__DESTROY);
337 	rd_kafka_q_destroy_owner(rktp->rktp_fetchq);
338         rd_kafka_q_destroy_owner(rktp->rktp_ops);
339 
340 	rd_kafka_replyq_destroy(&rktp->rktp_replyq);
341 
342 	rd_kafka_topic_destroy0(rktp->rktp_rkt);
343 
344 	mtx_destroy(&rktp->rktp_lock);
345 
346         if (rktp->rktp_leader)
347                 rd_kafka_broker_destroy(rktp->rktp_leader);
348 
349         rd_refcnt_destroy(&rktp->rktp_refcnt);
350 
351 	rd_free(rktp);
352 }
353 
354 
355 /**
356  * Set toppar fetching state.
357  *
358  * Locality: broker thread
359  * Locks: rd_kafka_toppar_lock() MUST be held.
360  */
rd_kafka_toppar_set_fetch_state(rd_kafka_toppar_t * rktp,int fetch_state)361 void rd_kafka_toppar_set_fetch_state (rd_kafka_toppar_t *rktp,
362                                       int fetch_state) {
363 	rd_kafka_assert(NULL,
364 			thrd_is_current(rktp->rktp_rkt->rkt_rk->rk_thread));
365 
366         if ((int)rktp->rktp_fetch_state == fetch_state)
367                 return;
368 
369         rd_kafka_dbg(rktp->rktp_rkt->rkt_rk, TOPIC, "PARTSTATE",
370                      "Partition %.*s [%"PRId32"] changed fetch state %s -> %s",
371                      RD_KAFKAP_STR_PR(rktp->rktp_rkt->rkt_topic),
372                      rktp->rktp_partition,
373                      rd_kafka_fetch_states[rktp->rktp_fetch_state],
374                      rd_kafka_fetch_states[fetch_state]);
375 
376         rktp->rktp_fetch_state = fetch_state;
377 
378         if (fetch_state == RD_KAFKA_TOPPAR_FETCH_ACTIVE)
379                 rd_kafka_dbg(rktp->rktp_rkt->rkt_rk,
380                              CONSUMER|RD_KAFKA_DBG_TOPIC,
381                              "FETCH",
382                              "Partition %.*s [%"PRId32"] start fetching "
383                              "at offset %s",
384                              RD_KAFKAP_STR_PR(rktp->rktp_rkt->rkt_topic),
385                              rktp->rktp_partition,
386                              rd_kafka_offset2str(rktp->rktp_next_offset));
387 }
388 
389 
390 /**
391  * Returns the appropriate toppar for a given rkt and partition.
392  * The returned toppar has increased refcnt and must be unreffed by calling
393  *  rd_kafka_toppar_destroy().
394  * May return NULL.
395  *
396  * If 'ua_on_miss' is true the UA (unassigned) toppar is returned if
397  * 'partition' was not known locally, else NULL is returned.
398  *
399  * Locks: Caller must hold rd_kafka_topic_*lock()
400  */
rd_kafka_toppar_get0(const char * func,int line,const rd_kafka_topic_t * rkt,int32_t partition,int ua_on_miss)401 rd_kafka_toppar_t *rd_kafka_toppar_get0 (const char *func, int line,
402                                          const rd_kafka_topic_t *rkt,
403                                          int32_t partition,
404                                          int ua_on_miss) {
405         rd_kafka_toppar_t *rktp;
406 
407 	if (partition >= 0 && partition < rkt->rkt_partition_cnt)
408 		rktp = rkt->rkt_p[partition];
409 	else if (partition == RD_KAFKA_PARTITION_UA || ua_on_miss)
410 		rktp = rkt->rkt_ua;
411 	else
412 		return NULL;
413 
414 	if (rktp)
415                 return rd_kafka_toppar_keep_fl(func, line, rktp);
416 
417 	return NULL;
418 }
419 
420 
421 /**
422  * Same as rd_kafka_toppar_get() but no need for locking and
423  * looks up the topic first.
424  *
425  * Locality: any
426  * Locks: none
427  */
rd_kafka_toppar_get2(rd_kafka_t * rk,const char * topic,int32_t partition,int ua_on_miss,int create_on_miss)428 rd_kafka_toppar_t *rd_kafka_toppar_get2 (rd_kafka_t *rk,
429                                          const char *topic,
430                                          int32_t partition,
431                                          int ua_on_miss,
432                                          int create_on_miss) {
433         rd_kafka_topic_t *rkt;
434         rd_kafka_toppar_t *rktp;
435 
436         rd_kafka_wrlock(rk);
437 
438         /* Find or create topic */
439 	if (unlikely(!(rkt = rd_kafka_topic_find(rk, topic, 0/*no-lock*/)))) {
440                 if (!create_on_miss) {
441                         rd_kafka_wrunlock(rk);
442                         return NULL;
443                 }
444                 rkt = rd_kafka_topic_new0(rk, topic, NULL,
445 					    NULL, 0/*no-lock*/);
446                 if (!rkt) {
447                         rd_kafka_wrunlock(rk);
448                         rd_kafka_log(rk, LOG_ERR, "TOPIC",
449                                      "Failed to create local topic \"%s\": %s",
450                                      topic, rd_strerror(errno));
451                         return NULL;
452                 }
453         }
454 
455         rd_kafka_wrunlock(rk);
456 
457 	rd_kafka_topic_wrlock(rkt);
458 	rktp = rd_kafka_toppar_desired_add(rkt, partition);
459 	rd_kafka_topic_wrunlock(rkt);
460 
461         rd_kafka_topic_destroy0(rkt);
462 
463 	return rktp;
464 }
465 
466 
467 /**
468  * Returns a toppar if it is available in the cluster.
469  * '*errp' is set to the error-code if lookup fails.
470  *
471  * Locks: topic_*lock() MUST be held
472  */
473 rd_kafka_toppar_t *
rd_kafka_toppar_get_avail(const rd_kafka_topic_t * rkt,int32_t partition,int ua_on_miss,rd_kafka_resp_err_t * errp)474 rd_kafka_toppar_get_avail (const rd_kafka_topic_t *rkt,
475                            int32_t partition, int ua_on_miss,
476                            rd_kafka_resp_err_t *errp) {
477 	rd_kafka_toppar_t *rktp;
478 
479         switch (rkt->rkt_state)
480         {
481         case RD_KAFKA_TOPIC_S_UNKNOWN:
482                 /* No metadata received from cluster yet.
483                  * Put message in UA partition and re-run partitioner when
484                  * cluster comes up. */
485 		partition = RD_KAFKA_PARTITION_UA;
486                 break;
487 
488         case RD_KAFKA_TOPIC_S_NOTEXISTS:
489                 /* Topic not found in cluster.
490                  * Fail message immediately. */
491                 *errp = RD_KAFKA_RESP_ERR__UNKNOWN_TOPIC;
492                 return NULL;
493 
494         case RD_KAFKA_TOPIC_S_ERROR:
495                 /* Permanent topic error. */
496                 *errp = rkt->rkt_err;
497                 return NULL;
498 
499         case RD_KAFKA_TOPIC_S_EXISTS:
500                 /* Topic exists in cluster. */
501 
502                 /* Topic exists but has no partitions.
503                  * This is usually an transient state following the
504                  * auto-creation of a topic. */
505                 if (unlikely(rkt->rkt_partition_cnt == 0)) {
506                         partition = RD_KAFKA_PARTITION_UA;
507                         break;
508                 }
509 
510                 /* Check that partition exists. */
511                 if (partition >= rkt->rkt_partition_cnt) {
512                         *errp = RD_KAFKA_RESP_ERR__UNKNOWN_PARTITION;
513                         return NULL;
514                 }
515                 break;
516 
517         default:
518                 rd_kafka_assert(rkt->rkt_rk, !*"NOTREACHED");
519                 break;
520         }
521 
522 	/* Get new partition */
523 	rktp = rd_kafka_toppar_get(rkt, partition, 0);
524 
525 	if (unlikely(!rktp)) {
526 		/* Unknown topic or partition */
527 		if (rkt->rkt_state == RD_KAFKA_TOPIC_S_NOTEXISTS)
528 			*errp = RD_KAFKA_RESP_ERR__UNKNOWN_TOPIC;
529 		else
530 			*errp = RD_KAFKA_RESP_ERR__UNKNOWN_PARTITION;
531 
532 		return NULL;
533 	}
534 
535 	return rktp;
536 }
537 
538 
539 /**
540  * Looks for partition 'i' in topic 'rkt's desired list.
541  *
542  * The desired partition list is the list of partitions that are desired
543  * (e.g., by the consumer) but not yet seen on a broker.
544  * As soon as the partition is seen on a broker the toppar is moved from
545  * the desired list and onto the normal rkt_p array.
546  * When the partition on the broker goes away a desired partition is put
547  * back on the desired list.
548  *
549  * Locks: rd_kafka_topic_*lock() must be held.
550  * Note: 'rktp' refcount is increased.
551  */
552 
rd_kafka_toppar_desired_get(rd_kafka_topic_t * rkt,int32_t partition)553 rd_kafka_toppar_t *rd_kafka_toppar_desired_get (rd_kafka_topic_t *rkt,
554                                                 int32_t partition) {
555 	rd_kafka_toppar_t *rktp;
556         int i;
557 
558 	RD_LIST_FOREACH(rktp, &rkt->rkt_desp, i) {
559 		if (rktp->rktp_partition == partition)
560 			return rd_kafka_toppar_keep(rktp);
561         }
562 
563 	return NULL;
564 }
565 
566 
567 /**
568  * Link toppar on desired list.
569  *
570  * Locks: rd_kafka_topic_wrlock() and toppar_lock() must be held.
571  */
rd_kafka_toppar_desired_link(rd_kafka_toppar_t * rktp)572 void rd_kafka_toppar_desired_link (rd_kafka_toppar_t *rktp) {
573 
574         if (rktp->rktp_flags & RD_KAFKA_TOPPAR_F_ON_DESP)
575                 return; /* Already linked */
576 
577         rd_kafka_toppar_keep(rktp);
578         rd_list_add(&rktp->rktp_rkt->rkt_desp, rktp);
579         rd_interval_reset(&rktp->rktp_rkt->rkt_desp_refresh_intvl);
580         rktp->rktp_flags |= RD_KAFKA_TOPPAR_F_ON_DESP;
581 }
582 
583 /**
584  * Unlink toppar from desired list.
585  *
586  * Locks: rd_kafka_topic_wrlock() and toppar_lock() must be held.
587  */
rd_kafka_toppar_desired_unlink(rd_kafka_toppar_t * rktp)588 void rd_kafka_toppar_desired_unlink (rd_kafka_toppar_t *rktp) {
589         if (!(rktp->rktp_flags & RD_KAFKA_TOPPAR_F_ON_DESP))
590                 return; /* Not linked */
591 
592         rktp->rktp_flags &= ~RD_KAFKA_TOPPAR_F_ON_DESP;
593         rd_list_remove(&rktp->rktp_rkt->rkt_desp, rktp);
594         rd_interval_reset(&rktp->rktp_rkt->rkt_desp_refresh_intvl);
595         rd_kafka_toppar_destroy(rktp);
596 }
597 
598 
599 /**
600  * @brief If rktp is not already desired:
601  *  - mark as DESIRED|~REMOVE
602  *  - add to desired list if unknown
603  *
604  * @remark toppar_lock() MUST be held
605  */
rd_kafka_toppar_desired_add0(rd_kafka_toppar_t * rktp)606 void rd_kafka_toppar_desired_add0 (rd_kafka_toppar_t *rktp) {
607         if ((rktp->rktp_flags & RD_KAFKA_TOPPAR_F_DESIRED))
608                 return;
609 
610         rd_kafka_dbg(rktp->rktp_rkt->rkt_rk, TOPIC, "DESIRED",
611                      "%s [%"PRId32"]: marking as DESIRED",
612                      rktp->rktp_rkt->rkt_topic->str, rktp->rktp_partition);
613 
614         /* If toppar was marked for removal this is no longer
615          * the case since the partition is now desired. */
616         rktp->rktp_flags &= ~RD_KAFKA_TOPPAR_F_REMOVE;
617 
618         rktp->rktp_flags |= RD_KAFKA_TOPPAR_F_DESIRED;
619 
620         if (rktp->rktp_flags & RD_KAFKA_TOPPAR_F_UNKNOWN) {
621                 rd_kafka_dbg(rktp->rktp_rkt->rkt_rk, TOPIC, "DESIRED",
622                      "%s [%"PRId32"]: adding to DESIRED list",
623                      rktp->rktp_rkt->rkt_topic->str, rktp->rktp_partition);
624                 rd_kafka_toppar_desired_link(rktp);
625         }
626 }
627 
628 
629 /**
630  * Adds 'partition' as a desired partition to topic 'rkt', or updates
631  * an existing partition to be desired.
632  *
633  * Locks: rd_kafka_topic_wrlock() must be held.
634  */
rd_kafka_toppar_desired_add(rd_kafka_topic_t * rkt,int32_t partition)635 rd_kafka_toppar_t *rd_kafka_toppar_desired_add (rd_kafka_topic_t *rkt,
636                                                 int32_t partition) {
637         rd_kafka_toppar_t *rktp;
638 
639         rktp = rd_kafka_toppar_get(rkt, partition, 0/*no_ua_on_miss*/);
640 
641         if (!rktp)
642                 rktp = rd_kafka_toppar_desired_get(rkt, partition);
643 
644         if (!rktp)
645                 rktp = rd_kafka_toppar_new(rkt, partition);
646 
647         rd_kafka_toppar_lock(rktp);
648         rd_kafka_toppar_desired_add0(rktp);
649         rd_kafka_toppar_unlock(rktp);
650 
651         return rktp; /* Callers refcount */
652 }
653 
654 
655 
656 
657 /**
658  * Unmarks an 'rktp' as desired.
659  *
660  * Locks: rd_kafka_topic_wrlock() and rd_kafka_toppar_lock() MUST be held.
661  */
rd_kafka_toppar_desired_del(rd_kafka_toppar_t * rktp)662 void rd_kafka_toppar_desired_del (rd_kafka_toppar_t *rktp) {
663 
664 	if (!(rktp->rktp_flags & RD_KAFKA_TOPPAR_F_DESIRED))
665 		return;
666 
667 	rktp->rktp_flags &= ~RD_KAFKA_TOPPAR_F_DESIRED;
668         rd_kafka_toppar_desired_unlink(rktp);
669 
670 	rd_kafka_dbg(rktp->rktp_rkt->rkt_rk, TOPIC, "DESP",
671 		     "Removing (un)desired topic %s [%"PRId32"]",
672 		     rktp->rktp_rkt->rkt_topic->str, rktp->rktp_partition);
673 
674         if (rktp->rktp_flags & RD_KAFKA_TOPPAR_F_UNKNOWN) {
675                 /* If this partition does not exist in the cluster
676                  * and is no longer desired, remove it. */
677                 rd_kafka_toppar_broker_leave_for_remove(rktp);
678         }
679 }
680 
681 
682 
683 /**
684  * Append message at tail of 'rktp' message queue.
685  */
rd_kafka_toppar_enq_msg(rd_kafka_toppar_t * rktp,rd_kafka_msg_t * rkm)686 void rd_kafka_toppar_enq_msg (rd_kafka_toppar_t *rktp, rd_kafka_msg_t *rkm) {
687         int queue_len;
688         rd_kafka_q_t *wakeup_q = NULL;
689 
690         rd_kafka_toppar_lock(rktp);
691 
692         if (!rkm->rkm_u.producer.msgid &&
693             rktp->rktp_partition != RD_KAFKA_PARTITION_UA)
694                 rkm->rkm_u.producer.msgid = ++rktp->rktp_msgid;
695 
696         if (rktp->rktp_partition == RD_KAFKA_PARTITION_UA ||
697             rktp->rktp_rkt->rkt_conf.queuing_strategy == RD_KAFKA_QUEUE_FIFO) {
698                 /* No need for enq_sorted(), this is the oldest message. */
699                 queue_len = rd_kafka_msgq_enq(&rktp->rktp_msgq, rkm);
700         } else {
701                 queue_len = rd_kafka_msgq_enq_sorted(rktp->rktp_rkt,
702                                                      &rktp->rktp_msgq, rkm);
703         }
704 
705         if (unlikely(queue_len == 1 &&
706                      (wakeup_q = rktp->rktp_msgq_wakeup_q)))
707                 rd_kafka_q_keep(wakeup_q);
708 
709         rd_kafka_toppar_unlock(rktp);
710 
711         if (wakeup_q) {
712                 rd_kafka_q_yield(wakeup_q);
713                 rd_kafka_q_destroy(wakeup_q);
714         }
715 }
716 
717 
718 /**
719  * @brief Insert \p srcq before \p insert_before in \p destq.
720  *
721  * If \p srcq and \p destq overlaps only part of the \p srcq will be inserted.
722  *
723  * Upon return \p srcq will contain any remaining messages that require
724  * another insert position in \p destq.
725  */
726 static void
rd_kafka_msgq_insert_msgq_before(rd_kafka_msgq_t * destq,rd_kafka_msg_t * insert_before,rd_kafka_msgq_t * srcq,int (* cmp)(const void * a,const void * b))727 rd_kafka_msgq_insert_msgq_before (rd_kafka_msgq_t *destq,
728                                   rd_kafka_msg_t *insert_before,
729                                   rd_kafka_msgq_t *srcq,
730                                   int (*cmp) (const void *a, const void *b)) {
731         rd_kafka_msg_t *slast;
732         rd_kafka_msgq_t tmpq;
733 
734         if (!insert_before) {
735                 /* Append all of srcq to destq */
736                 rd_kafka_msgq_concat(destq, srcq);
737                 rd_kafka_msgq_verify_order(NULL, destq, 0, rd_false);
738                 return;
739         }
740 
741         slast = rd_kafka_msgq_last(srcq);
742         rd_dassert(slast);
743 
744         if (cmp(slast, insert_before) > 0) {
745                 rd_kafka_msg_t *new_sfirst;
746                 int cnt;
747                 int64_t bytes;
748 
749                 /* destq insert_before resides somewhere between
750                  * srcq.first and srcq.last, find the first message in
751                  * srcq that is > insert_before and split srcq into
752                  * a left part that contains the messages to insert before
753                  * insert_before, and a right part that will need another
754                  * insert position. */
755 
756                 new_sfirst = rd_kafka_msgq_find_pos(srcq, NULL,
757                                                     insert_before,
758                                                     cmp, &cnt, &bytes);
759                 rd_assert(new_sfirst);
760 
761                 /* split srcq into two parts using the divider message */
762                 rd_kafka_msgq_split(srcq, &tmpq, new_sfirst, cnt, bytes);
763 
764                 rd_kafka_msgq_verify_order(NULL, srcq, 0, rd_false);
765                 rd_kafka_msgq_verify_order(NULL, &tmpq, 0, rd_false);
766         } else {
767                 rd_kafka_msgq_init(&tmpq);
768         }
769 
770         /* srcq now contains messages up to the first message in destq,
771          * insert srcq at insert_before in destq. */
772         rd_dassert(!TAILQ_EMPTY(&destq->rkmq_msgs));
773         rd_dassert(!TAILQ_EMPTY(&srcq->rkmq_msgs));
774         TAILQ_INSERT_LIST_BEFORE(&destq->rkmq_msgs,
775                                  insert_before,
776                                  &srcq->rkmq_msgs,
777                                  rd_kafka_msgs_head_s,
778                                  rd_kafka_msg_t *,
779                                  rkm_link);
780         destq->rkmq_msg_cnt   += srcq->rkmq_msg_cnt;
781         destq->rkmq_msg_bytes += srcq->rkmq_msg_bytes;
782         srcq->rkmq_msg_cnt     = 0;
783         srcq->rkmq_msg_bytes   = 0;
784 
785         rd_kafka_msgq_verify_order(NULL, destq, 0, rd_false);
786         rd_kafka_msgq_verify_order(NULL, srcq, 0, rd_false);
787 
788         /* tmpq contains the remaining messages in srcq, move it over. */
789         rd_kafka_msgq_move(srcq, &tmpq);
790 
791         rd_kafka_msgq_verify_order(NULL, srcq, 0, rd_false);
792 }
793 
794 
795 /**
796  * @brief Insert all messages from \p srcq into \p destq in their sorted
797  *        position (using \p cmp)
798  */
rd_kafka_msgq_insert_msgq(rd_kafka_msgq_t * destq,rd_kafka_msgq_t * srcq,int (* cmp)(const void * a,const void * b))799 void rd_kafka_msgq_insert_msgq (rd_kafka_msgq_t *destq,
800                                 rd_kafka_msgq_t *srcq,
801                                 int (*cmp) (const void *a, const void *b)) {
802         rd_kafka_msg_t *sfirst, *dlast, *start_pos = NULL;
803 
804         if (unlikely(RD_KAFKA_MSGQ_EMPTY(srcq))) {
805                 /* srcq is empty */
806                 return;
807         }
808 
809         if (unlikely(RD_KAFKA_MSGQ_EMPTY(destq))) {
810                 /* destq is empty, simply move the srcq. */
811                 rd_kafka_msgq_move(destq, srcq);
812                 rd_kafka_msgq_verify_order(NULL, destq, 0, rd_false);
813                 return;
814         }
815 
816         /* Optimize insertion by bulk-moving messages in place.
817          * We know that:
818          *  - destq is sorted but might not be continous (1,2,3,7)
819          *  - srcq is sorted but might not be continous (4,5,6,8)
820          *  - there migt be (multiple) overlaps between the two, e.g:
821          *     destq = (1,2,3,7), srcq = (4,5,6,8)
822          *  - there may be millions of messages.
823          */
824 
825         rd_kafka_msgq_verify_order(NULL, destq, 0, rd_false);
826         rd_kafka_msgq_verify_order(NULL, srcq, 0, rd_false);
827 
828         dlast = rd_kafka_msgq_last(destq);
829         sfirst = rd_kafka_msgq_first(srcq);
830 
831         /* Most common case, all of srcq goes after destq */
832         if (likely(cmp(dlast, sfirst) < 0)) {
833                 rd_kafka_msgq_concat(destq, srcq);
834 
835                 rd_kafka_msgq_verify_order(NULL, destq, 0, rd_false);
836 
837                 rd_assert(RD_KAFKA_MSGQ_EMPTY(srcq));
838                 return;
839         }
840 
841         /* Insert messages from srcq into destq in non-overlapping
842          * chunks until srcq is exhausted. */
843         while (likely(sfirst != NULL)) {
844                 rd_kafka_msg_t *insert_before;
845 
846                 /* Get insert position in destq of first element in srcq */
847                 insert_before = rd_kafka_msgq_find_pos(destq, start_pos,
848                                                        sfirst, cmp,
849                                                        NULL, NULL);
850 
851                 /* Insert as much of srcq as possible at insert_before */
852                 rd_kafka_msgq_insert_msgq_before(destq, insert_before,
853                                                  srcq, cmp);
854 
855                 /* Remember the current destq position so the next find_pos()
856                  * does not have to re-scan destq and what was
857                  * added from srcq. */
858                 start_pos = insert_before;
859 
860                 /* For next iteration */
861                 sfirst = rd_kafka_msgq_first(srcq);
862 
863                 rd_kafka_msgq_verify_order(NULL, destq, 0, rd_false);
864                 rd_kafka_msgq_verify_order(NULL, srcq, 0, rd_false);
865         }
866 
867         rd_kafka_msgq_verify_order(NULL, destq, 0, rd_false);
868 
869         rd_assert(RD_KAFKA_MSGQ_EMPTY(srcq));
870 }
871 
872 
873 /**
874  * @brief Inserts messages from \p srcq according to their sorted position
875  *        into \p destq, filtering out messages that can not be retried.
876  *
877  * @param incr_retry Increment retry count for messages.
878  * @param max_retries Maximum retries allowed per message.
879  * @param backoff Absolute retry backoff for retried messages.
880  *
881  * @returns 0 if all messages were retried, or 1 if some messages
882  *          could not be retried.
883  */
rd_kafka_retry_msgq(rd_kafka_msgq_t * destq,rd_kafka_msgq_t * srcq,int incr_retry,int max_retries,rd_ts_t backoff,rd_kafka_msg_status_t status,int (* cmp)(const void * a,const void * b))884 int rd_kafka_retry_msgq (rd_kafka_msgq_t *destq,
885                          rd_kafka_msgq_t *srcq,
886                          int incr_retry, int max_retries, rd_ts_t backoff,
887                          rd_kafka_msg_status_t status,
888                          int (*cmp) (const void *a, const void *b)) {
889         rd_kafka_msgq_t retryable = RD_KAFKA_MSGQ_INITIALIZER(retryable);
890         rd_kafka_msg_t *rkm, *tmp;
891 
892         /* Scan through messages to see which ones are eligible for retry,
893          * move the retryable ones to temporary queue and
894          * set backoff time for first message and optionally
895          * increase retry count for each message.
896          * Sorted insert is not necessary since the original order
897          * srcq order is maintained. */
898         TAILQ_FOREACH_SAFE(rkm, &srcq->rkmq_msgs, rkm_link, tmp) {
899                 if (rkm->rkm_u.producer.retries + incr_retry > max_retries)
900                         continue;
901 
902                 rd_kafka_msgq_deq(srcq, rkm, 1);
903                 rd_kafka_msgq_enq(&retryable, rkm);
904 
905                 rkm->rkm_u.producer.ts_backoff = backoff;
906                 rkm->rkm_u.producer.retries  += incr_retry;
907 
908                 /* Don't downgrade a message from any form of PERSISTED
909                  * to NOT_PERSISTED, since the original cause of indicating
910                  * PERSISTED can't be changed.
911                  * E.g., a previous ack or in-flight timeout. */
912                 if (likely(!(status == RD_KAFKA_MSG_STATUS_NOT_PERSISTED &&
913                              rkm->rkm_status !=
914                              RD_KAFKA_MSG_STATUS_NOT_PERSISTED)))
915                         rkm->rkm_status = status;
916         }
917 
918         /* No messages are retryable */
919         if (RD_KAFKA_MSGQ_EMPTY(&retryable))
920                 return 0;
921 
922         /* Insert retryable list at sorted position */
923         rd_kafka_msgq_insert_msgq(destq, &retryable, cmp);
924 
925         return 1;
926 }
927 
928 /**
929  * @brief Inserts messages from \p rkmq according to their sorted position
930  *        into the partition's message queue.
931  *
932  * @param incr_retry Increment retry count for messages.
933  * @param status Set status on each message.
934  *
935  * @returns 0 if all messages were retried, or 1 if some messages
936  *          could not be retried.
937  *
938  * @locality Broker thread (but not necessarily the leader broker thread)
939  */
940 
rd_kafka_toppar_retry_msgq(rd_kafka_toppar_t * rktp,rd_kafka_msgq_t * rkmq,int incr_retry,rd_kafka_msg_status_t status)941 int rd_kafka_toppar_retry_msgq (rd_kafka_toppar_t *rktp, rd_kafka_msgq_t *rkmq,
942                                 int incr_retry, rd_kafka_msg_status_t status) {
943         rd_kafka_t *rk = rktp->rktp_rkt->rkt_rk;
944         rd_ts_t backoff = rd_clock() + (rk->rk_conf.retry_backoff_ms * 1000);
945         int r;
946 
947         if (rd_kafka_terminating(rk))
948                 return 1;
949 
950         rd_kafka_toppar_lock(rktp);
951         r = rd_kafka_retry_msgq(&rktp->rktp_msgq, rkmq,
952                                 incr_retry, rk->rk_conf.max_retries,
953                                 backoff, status,
954                                 rktp->rktp_rkt->rkt_conf.msg_order_cmp);
955         rd_kafka_toppar_unlock(rktp);
956 
957         return r;
958 }
959 
960 /**
961  * @brief Insert sorted message list \p rkmq at sorted position in \p rktp 's
962  *        message queue. The queues must not overlap.
963  * @remark \p rkmq will be cleared.
964  */
rd_kafka_toppar_insert_msgq(rd_kafka_toppar_t * rktp,rd_kafka_msgq_t * rkmq)965 void rd_kafka_toppar_insert_msgq (rd_kafka_toppar_t *rktp,
966                                   rd_kafka_msgq_t *rkmq) {
967         rd_kafka_toppar_lock(rktp);
968         rd_kafka_msgq_insert_msgq(&rktp->rktp_msgq, rkmq,
969                                   rktp->rktp_rkt->rkt_conf.msg_order_cmp);
970         rd_kafka_toppar_unlock(rktp);
971 }
972 
973 
974 
975 /**
976  * Helper method for purging queues when removing a toppar.
977  * Locks: rd_kafka_toppar_lock() MUST be held
978  */
rd_kafka_toppar_purge_and_disable_queues(rd_kafka_toppar_t * rktp)979 void rd_kafka_toppar_purge_and_disable_queues (rd_kafka_toppar_t *rktp) {
980         rd_kafka_q_disable(rktp->rktp_fetchq);
981         rd_kafka_q_purge(rktp->rktp_fetchq);
982         rd_kafka_q_disable(rktp->rktp_ops);
983         rd_kafka_q_purge(rktp->rktp_ops);
984 }
985 
986 
987 /**
988  * @brief Migrate rktp from (optional) \p old_rkb to (optional) \p new_rkb,
989  *        but at least one is required to be non-NULL.
990  *
991  * This is an async operation.
992  *
993  * @locks rd_kafka_toppar_lock() MUST be held
994  */
rd_kafka_toppar_broker_migrate(rd_kafka_toppar_t * rktp,rd_kafka_broker_t * old_rkb,rd_kafka_broker_t * new_rkb)995 static void rd_kafka_toppar_broker_migrate (rd_kafka_toppar_t *rktp,
996                                             rd_kafka_broker_t *old_rkb,
997                                             rd_kafka_broker_t *new_rkb) {
998         rd_kafka_op_t *rko;
999         rd_kafka_broker_t *dest_rkb;
1000         int had_next_broker = rktp->rktp_next_broker ? 1 : 0;
1001 
1002         rd_assert(old_rkb || new_rkb);
1003 
1004         /* Update next broker */
1005         if (new_rkb)
1006                 rd_kafka_broker_keep(new_rkb);
1007         if (rktp->rktp_next_broker)
1008                 rd_kafka_broker_destroy(rktp->rktp_next_broker);
1009         rktp->rktp_next_broker = new_rkb;
1010 
1011         /* If next_broker is set it means there is already an async
1012          * migration op going on and we should not send a new one
1013          * but simply change the next_broker (which we did above). */
1014         if (had_next_broker)
1015                 return;
1016 
1017         /* Revert from offset-wait state back to offset-query
1018          * prior to leaving the broker to avoid stalling
1019          * on the new broker waiting for a offset reply from
1020          * this old broker (that might not come and thus need
1021          * to time out..slowly) */
1022         if (rktp->rktp_fetch_state == RD_KAFKA_TOPPAR_FETCH_OFFSET_WAIT)
1023                 rd_kafka_toppar_offset_retry(rktp, 500,
1024                                              "migrating to new broker");
1025 
1026         if (old_rkb) {
1027                 /* If there is an existing broker for this toppar we let it
1028                  * first handle its own leave and then trigger the join for
1029                  * the next broker, if any. */
1030                 rko = rd_kafka_op_new(RD_KAFKA_OP_PARTITION_LEAVE);
1031                 dest_rkb = old_rkb;
1032         } else {
1033                 /* No existing broker, send join op directly to new broker. */
1034                 rko = rd_kafka_op_new(RD_KAFKA_OP_PARTITION_JOIN);
1035                 dest_rkb = new_rkb;
1036         }
1037 
1038         rko->rko_rktp = rd_kafka_toppar_keep(rktp);
1039 
1040         rd_kafka_dbg(rktp->rktp_rkt->rkt_rk, TOPIC, "BRKMIGR",
1041                      "Migrating topic %.*s [%"PRId32"] %p from %s to %s "
1042 		     "(sending %s to %s)",
1043                      RD_KAFKAP_STR_PR(rktp->rktp_rkt->rkt_topic),
1044                      rktp->rktp_partition, rktp,
1045                      old_rkb ? rd_kafka_broker_name(old_rkb) : "(none)",
1046                      new_rkb ? rd_kafka_broker_name(new_rkb) : "(none)",
1047 		     rd_kafka_op2str(rko->rko_type),
1048 		     rd_kafka_broker_name(dest_rkb));
1049 
1050         rd_kafka_q_enq(dest_rkb->rkb_ops, rko);
1051 }
1052 
1053 
1054 /**
1055  * Async toppar leave from broker.
1056  * Only use this when partitions are to be removed.
1057  *
1058  * Locks: rd_kafka_toppar_lock() MUST be held
1059  */
rd_kafka_toppar_broker_leave_for_remove(rd_kafka_toppar_t * rktp)1060 void rd_kafka_toppar_broker_leave_for_remove (rd_kafka_toppar_t *rktp) {
1061         rd_kafka_op_t *rko;
1062         rd_kafka_broker_t *dest_rkb;
1063 
1064         rktp->rktp_flags |= RD_KAFKA_TOPPAR_F_REMOVE;
1065 
1066 	if (rktp->rktp_next_broker)
1067 		dest_rkb = rktp->rktp_next_broker;
1068 	else if (rktp->rktp_broker)
1069 		dest_rkb = rktp->rktp_broker;
1070 	else {
1071 		rd_kafka_dbg(rktp->rktp_rkt->rkt_rk, TOPIC, "TOPPARDEL",
1072 			     "%.*s [%"PRId32"] %p not handled by any broker: "
1073 			     "not sending LEAVE for remove",
1074 			     RD_KAFKAP_STR_PR(rktp->rktp_rkt->rkt_topic),
1075 			     rktp->rktp_partition, rktp);
1076 		return;
1077 	}
1078 
1079 
1080 	/* Revert from offset-wait state back to offset-query
1081 	 * prior to leaving the broker to avoid stalling
1082 	 * on the new broker waiting for a offset reply from
1083 	 * this old broker (that might not come and thus need
1084 	 * to time out..slowly) */
1085 	if (rktp->rktp_fetch_state == RD_KAFKA_TOPPAR_FETCH_OFFSET_WAIT)
1086 		rd_kafka_toppar_set_fetch_state(
1087 			rktp, RD_KAFKA_TOPPAR_FETCH_OFFSET_QUERY);
1088 
1089 	rko = rd_kafka_op_new(RD_KAFKA_OP_PARTITION_LEAVE);
1090         rko->rko_rktp = rd_kafka_toppar_keep(rktp);
1091 
1092         rd_kafka_dbg(rktp->rktp_rkt->rkt_rk, TOPIC, "BRKMIGR",
1093                      "%.*s [%"PRId32"] %p sending final LEAVE for removal by %s",
1094                      RD_KAFKAP_STR_PR(rktp->rktp_rkt->rkt_topic),
1095                      rktp->rktp_partition, rktp,
1096                      rd_kafka_broker_name(dest_rkb));
1097 
1098         rd_kafka_q_enq(dest_rkb->rkb_ops, rko);
1099 }
1100 
1101 
1102 /**
1103  * @brief Delegates toppar 'rktp' to broker 'rkb'. 'rkb' may be NULL to
1104  *        undelegate broker.
1105  *
1106  * @locks Caller must have rd_kafka_toppar_lock(rktp) held.
1107  */
rd_kafka_toppar_broker_delegate(rd_kafka_toppar_t * rktp,rd_kafka_broker_t * rkb)1108 void rd_kafka_toppar_broker_delegate (rd_kafka_toppar_t *rktp,
1109 				      rd_kafka_broker_t *rkb) {
1110         rd_kafka_t *rk = rktp->rktp_rkt->rkt_rk;
1111         int internal_fallback = 0;
1112 
1113 	rd_kafka_dbg(rktp->rktp_rkt->rkt_rk, TOPIC, "BRKDELGT",
1114 		     "%s [%"PRId32"]: delegate to broker %s "
1115 		     "(rktp %p, term %d, ref %d)",
1116 		     rktp->rktp_rkt->rkt_topic->str, rktp->rktp_partition,
1117 		     rkb ? rkb->rkb_name : "(none)",
1118 		     rktp, rd_kafka_terminating(rk),
1119 		     rd_refcnt_get(&rktp->rktp_refcnt));
1120 
1121         /* Undelegated toppars are delgated to the internal
1122          * broker for bookkeeping. */
1123         if (!rkb && !rd_kafka_terminating(rk)) {
1124                 rkb = rd_kafka_broker_internal(rk);
1125                 internal_fallback = 1;
1126         }
1127 
1128 	if (rktp->rktp_broker == rkb && !rktp->rktp_next_broker) {
1129                 rd_kafka_dbg(rktp->rktp_rkt->rkt_rk, TOPIC, "BRKDELGT",
1130 			     "%.*s [%"PRId32"]: not updating broker: "
1131                              "already on correct broker %s",
1132 			     RD_KAFKAP_STR_PR(rktp->rktp_rkt->rkt_topic),
1133 			     rktp->rktp_partition,
1134                              rkb ? rd_kafka_broker_name(rkb) : "(none)");
1135 
1136                 if (internal_fallback)
1137                         rd_kafka_broker_destroy(rkb);
1138 		return;
1139         }
1140 
1141 	if (rktp->rktp_broker)
1142 		rd_kafka_dbg(rktp->rktp_rkt->rkt_rk, TOPIC, "BRKDELGT",
1143 			     "%.*s [%"PRId32"]: no longer delegated to "
1144 			     "broker %s",
1145 			     RD_KAFKAP_STR_PR(rktp->rktp_rkt->rkt_topic),
1146 			     rktp->rktp_partition,
1147 			     rd_kafka_broker_name(rktp->rktp_broker));
1148 
1149 
1150 	if (rkb) {
1151 		rd_kafka_dbg(rktp->rktp_rkt->rkt_rk, TOPIC, "BRKDELGT",
1152 			     "%.*s [%"PRId32"]: delegating to broker %s "
1153 			     "for partition with %i messages "
1154 			     "(%"PRIu64" bytes) queued",
1155 			     RD_KAFKAP_STR_PR(rktp->rktp_rkt->rkt_topic),
1156 			     rktp->rktp_partition,
1157 			     rd_kafka_broker_name(rkb),
1158                              rktp->rktp_msgq.rkmq_msg_cnt,
1159                              rktp->rktp_msgq.rkmq_msg_bytes);
1160 
1161 
1162 	} else {
1163 		rd_kafka_dbg(rktp->rktp_rkt->rkt_rk, TOPIC, "BRKDELGT",
1164 			     "%.*s [%"PRId32"]: no broker delegated",
1165 			     RD_KAFKAP_STR_PR(rktp->rktp_rkt->rkt_topic),
1166 			     rktp->rktp_partition);
1167 	}
1168 
1169         if (rktp->rktp_broker || rkb)
1170                 rd_kafka_toppar_broker_migrate(rktp, rktp->rktp_broker, rkb);
1171 
1172         if (internal_fallback)
1173                 rd_kafka_broker_destroy(rkb);
1174 }
1175 
1176 
1177 
1178 
1179 
1180 void
rd_kafka_toppar_offset_commit_result(rd_kafka_toppar_t * rktp,rd_kafka_resp_err_t err,rd_kafka_topic_partition_list_t * offsets)1181 rd_kafka_toppar_offset_commit_result (rd_kafka_toppar_t *rktp,
1182                                       rd_kafka_resp_err_t err,
1183                                       rd_kafka_topic_partition_list_t *offsets){
1184         if (err)
1185                 rd_kafka_consumer_err(rktp->rktp_fetchq,
1186                                       /* FIXME: propagate broker_id */
1187                                       RD_KAFKA_NODEID_UA,
1188                                       err, 0 /* FIXME:VERSION*/,
1189                                       NULL, rktp, RD_KAFKA_OFFSET_INVALID,
1190                                       "Offset commit failed: %s",
1191                                       rd_kafka_err2str(err));
1192 
1193 	rd_kafka_toppar_lock(rktp);
1194         if (!err)
1195                 rktp->rktp_committed_offset = offsets->elems[0].offset;
1196 
1197 	/* When stopping toppars:
1198 	 * Final commit is now done (or failed), propagate. */
1199 	if (rktp->rktp_fetch_state == RD_KAFKA_TOPPAR_FETCH_STOPPING)
1200 		rd_kafka_toppar_fetch_stopped(rktp, err);
1201 
1202 	rd_kafka_toppar_unlock(rktp);
1203 }
1204 
1205 
1206 
1207 
1208 
1209 
1210 
1211 
1212 
1213 
1214 
1215 /**
1216  * Handle the next offset to consume for a toppar.
1217  * This is used during initial setup when trying to figure out what
1218  * offset to start consuming from.
1219  *
1220  * Locality: toppar handler thread.
1221  * Locks: toppar_lock(rktp) must be held
1222  */
rd_kafka_toppar_next_offset_handle(rd_kafka_toppar_t * rktp,int64_t Offset)1223 void rd_kafka_toppar_next_offset_handle (rd_kafka_toppar_t *rktp,
1224                                          int64_t Offset) {
1225 
1226         if (RD_KAFKA_OFFSET_IS_LOGICAL(Offset)) {
1227                 /* Offset storage returned logical offset (e.g. "end"),
1228                  * look it up. */
1229 
1230                 /* Save next offset, even if logical, so that e.g.,
1231                  * assign(BEGINNING) survives a pause+resume, etc.
1232                  * See issue #2105. */
1233                 rktp->rktp_next_offset = Offset;
1234 
1235                 rd_kafka_offset_reset(rktp, Offset, RD_KAFKA_RESP_ERR_NO_ERROR,
1236                                       "update");
1237                 return;
1238         }
1239 
1240         /* Adjust by TAIL count if, if wanted */
1241         if (rktp->rktp_query_offset <=
1242             RD_KAFKA_OFFSET_TAIL_BASE) {
1243                 int64_t orig_Offset = Offset;
1244                 int64_t tail_cnt =
1245                         llabs(rktp->rktp_query_offset -
1246                               RD_KAFKA_OFFSET_TAIL_BASE);
1247 
1248                 if (tail_cnt > Offset)
1249                         Offset = 0;
1250                 else
1251                         Offset -= tail_cnt;
1252 
1253                 rd_kafka_dbg(rktp->rktp_rkt->rkt_rk, TOPIC, "OFFSET",
1254                              "OffsetReply for topic %s [%"PRId32"]: "
1255                              "offset %"PRId64": adjusting for "
1256                              "OFFSET_TAIL(%"PRId64"): "
1257                              "effective offset %"PRId64,
1258                              rktp->rktp_rkt->rkt_topic->str,
1259                              rktp->rktp_partition,
1260                              orig_Offset, tail_cnt,
1261                              Offset);
1262         }
1263 
1264         rktp->rktp_next_offset = Offset;
1265 
1266         rd_kafka_toppar_set_fetch_state(rktp, RD_KAFKA_TOPPAR_FETCH_ACTIVE);
1267 
1268         /* Wake-up broker thread which might be idling on IO */
1269         if (rktp->rktp_broker)
1270                 rd_kafka_broker_wakeup(rktp->rktp_broker);
1271 
1272 }
1273 
1274 
1275 
1276 /**
1277  * Fetch committed offset for a single partition. (simple consumer)
1278  *
1279  * Locality: toppar thread
1280  */
rd_kafka_toppar_offset_fetch(rd_kafka_toppar_t * rktp,rd_kafka_replyq_t replyq)1281 void rd_kafka_toppar_offset_fetch (rd_kafka_toppar_t *rktp,
1282                                    rd_kafka_replyq_t replyq) {
1283         rd_kafka_t *rk = rktp->rktp_rkt->rkt_rk;
1284         rd_kafka_topic_partition_list_t *part;
1285         rd_kafka_op_t *rko;
1286 
1287         rd_kafka_dbg(rk, TOPIC, "OFFSETREQ",
1288                      "Partition %.*s [%"PRId32"]: querying cgrp for "
1289                      "committed offset (opv %d)",
1290                      RD_KAFKAP_STR_PR(rktp->rktp_rkt->rkt_topic),
1291                      rktp->rktp_partition, replyq.version);
1292 
1293         part = rd_kafka_topic_partition_list_new(1);
1294         rd_kafka_topic_partition_list_add0(__FUNCTION__,__LINE__,part,
1295                                            rktp->rktp_rkt->rkt_topic->str,
1296                                            rktp->rktp_partition,
1297 					   rktp);
1298 
1299         rko = rd_kafka_op_new(RD_KAFKA_OP_OFFSET_FETCH);
1300 	rko->rko_rktp = rd_kafka_toppar_keep(rktp);
1301 	rko->rko_replyq = replyq;
1302 
1303 	rko->rko_u.offset_fetch.partitions = part;
1304         rko->rko_u.offset_fetch.require_stable =
1305                 rk->rk_conf.isolation_level == RD_KAFKA_READ_COMMITTED;
1306 	rko->rko_u.offset_fetch.do_free = 1;
1307 
1308         rd_kafka_q_enq(rktp->rktp_cgrp->rkcg_ops, rko);
1309 }
1310 
1311 
1312 
1313 
1314 /**
1315  * Toppar based OffsetResponse handling.
1316  * This is used for finding the next offset to Fetch.
1317  *
1318  * Locality: toppar handler thread
1319  */
rd_kafka_toppar_handle_Offset(rd_kafka_t * rk,rd_kafka_broker_t * rkb,rd_kafka_resp_err_t err,rd_kafka_buf_t * rkbuf,rd_kafka_buf_t * request,void * opaque)1320 static void rd_kafka_toppar_handle_Offset (rd_kafka_t *rk,
1321 					   rd_kafka_broker_t *rkb,
1322 					   rd_kafka_resp_err_t err,
1323 					   rd_kafka_buf_t *rkbuf,
1324 					   rd_kafka_buf_t *request,
1325 					   void *opaque) {
1326         rd_kafka_toppar_t *rktp = opaque;
1327         rd_kafka_topic_partition_list_t *offsets;
1328         rd_kafka_topic_partition_t *rktpar;
1329         int64_t Offset;
1330 
1331 	rd_kafka_toppar_lock(rktp);
1332 	/* Drop reply from previous partition leader */
1333 	if (err != RD_KAFKA_RESP_ERR__DESTROY && rktp->rktp_broker != rkb)
1334 		err = RD_KAFKA_RESP_ERR__OUTDATED;
1335 	rd_kafka_toppar_unlock(rktp);
1336 
1337         offsets = rd_kafka_topic_partition_list_new(1);
1338 
1339 	rd_rkb_dbg(rkb, TOPIC, "OFFSET",
1340 		   "Offset reply for "
1341 		   "topic %.*s [%"PRId32"] (v%d vs v%d)",
1342 		   RD_KAFKAP_STR_PR(rktp->rktp_rkt->rkt_topic),
1343 		   rktp->rktp_partition, request->rkbuf_replyq.version,
1344 		   rktp->rktp_op_version);
1345 
1346 	rd_dassert(request->rkbuf_replyq.version > 0);
1347 	if (err != RD_KAFKA_RESP_ERR__DESTROY &&
1348             rd_kafka_buf_version_outdated(request, rktp->rktp_op_version)) {
1349 		/* Outdated request response, ignore. */
1350 		    err = RD_KAFKA_RESP_ERR__OUTDATED;
1351 	}
1352 
1353         if (err != RD_KAFKA_RESP_ERR__OUTDATED) {
1354                 /* Parse and return Offset */
1355                 err = rd_kafka_handle_Offset(rkb->rkb_rk, rkb, err,
1356                                              rkbuf, request, offsets);
1357         }
1358 
1359         if (!err) {
1360                 if (!(rktpar = rd_kafka_topic_partition_list_find(
1361                               offsets,
1362                               rktp->rktp_rkt->rkt_topic->str,
1363                               rktp->rktp_partition)))
1364                         err = RD_KAFKA_RESP_ERR__UNKNOWN_PARTITION;
1365                 else if (rktpar->err)
1366                         err = rktpar->err;
1367         }
1368 
1369         if (err) {
1370                 rd_rkb_dbg(rkb, TOPIC, "OFFSET",
1371                            "Offset reply error for "
1372                            "topic %.*s [%"PRId32"] (v%d): %s",
1373                            RD_KAFKAP_STR_PR(rktp->rktp_rkt->rkt_topic),
1374                            rktp->rktp_partition, request->rkbuf_replyq.version,
1375 			   rd_kafka_err2str(err));
1376 
1377                 rd_kafka_topic_partition_list_destroy(offsets);
1378 
1379                 if (err == RD_KAFKA_RESP_ERR__DESTROY ||
1380                     err == RD_KAFKA_RESP_ERR__OUTDATED) {
1381                         /* Termination or outdated, quick cleanup. */
1382 
1383                         if (err == RD_KAFKA_RESP_ERR__OUTDATED) {
1384                                 rd_kafka_toppar_lock(rktp);
1385                                 rd_kafka_toppar_offset_retry(
1386                                         rktp, 500, "outdated offset response");
1387                                 rd_kafka_toppar_unlock(rktp);
1388                         }
1389 
1390                         /* from request.opaque */
1391                         rd_kafka_toppar_destroy(rktp);
1392                         return;
1393 
1394 		} else if (err == RD_KAFKA_RESP_ERR__IN_PROGRESS)
1395 			return; /* Retry in progress */
1396 
1397 
1398                 rd_kafka_toppar_lock(rktp);
1399                 rd_kafka_offset_reset(rktp, rktp->rktp_query_offset,
1400                                       err,
1401                                       "failed to query logical offset");
1402 
1403                 /* Signal error back to application,
1404                  * unless this is an intermittent problem
1405                  * (e.g.,connection lost) */
1406                 if (!(err == RD_KAFKA_RESP_ERR_NOT_LEADER_FOR_PARTITION ||
1407                       err == RD_KAFKA_RESP_ERR_LEADER_NOT_AVAILABLE ||
1408                       err == RD_KAFKA_RESP_ERR__TRANSPORT ||
1409                       err == RD_KAFKA_RESP_ERR__TIMED_OUT)) {
1410                         rd_kafka_consumer_err(
1411                                 rktp->rktp_fetchq, rkb->rkb_nodeid,
1412                                 err, 0, NULL, rktp,
1413                                 (rktp->rktp_query_offset <=
1414                                  RD_KAFKA_OFFSET_TAIL_BASE ?
1415                                  rktp->rktp_query_offset -
1416                                  RD_KAFKA_OFFSET_TAIL_BASE :
1417                                  rktp->rktp_query_offset),
1418                                 "Failed to query logical offset %s: %s",
1419                                 rd_kafka_offset2str(rktp->rktp_query_offset),
1420                                 rd_kafka_err2str(err));
1421                 }
1422                 rd_kafka_toppar_unlock(rktp);
1423 
1424                 rd_kafka_toppar_destroy(rktp); /* from request.opaque */
1425                 return;
1426         }
1427 
1428         Offset = rktpar->offset;
1429         rd_kafka_topic_partition_list_destroy(offsets);
1430 
1431 	rd_kafka_toppar_lock(rktp);
1432         rd_kafka_dbg(rktp->rktp_rkt->rkt_rk, TOPIC, "OFFSET",
1433                      "Offset %s request for %.*s [%"PRId32"] "
1434                      "returned offset %s (%"PRId64")",
1435                      rd_kafka_offset2str(rktp->rktp_query_offset),
1436                      RD_KAFKAP_STR_PR(rktp->rktp_rkt->rkt_topic),
1437                      rktp->rktp_partition, rd_kafka_offset2str(Offset), Offset);
1438 
1439         rd_kafka_toppar_next_offset_handle(rktp, Offset);
1440 	rd_kafka_toppar_unlock(rktp);
1441 
1442         rd_kafka_toppar_destroy(rktp); /* from request.opaque */
1443 }
1444 
1445 
1446 /**
1447  * @brief An Offset fetch failed (for whatever reason) in
1448  *        the RD_KAFKA_TOPPAR_FETCH_OFFSET_WAIT state:
1449  *        set the state back to FETCH_OFFSET_QUERY and start the
1450  *        offset_query_tmr to trigger a new request eventually.
1451  *
1452  * @locality toppar handler thread
1453  * @locks toppar_lock() MUST be held
1454  */
rd_kafka_toppar_offset_retry(rd_kafka_toppar_t * rktp,int backoff_ms,const char * reason)1455 static void rd_kafka_toppar_offset_retry (rd_kafka_toppar_t *rktp,
1456                                           int backoff_ms,
1457                                           const char *reason) {
1458         rd_ts_t tmr_next;
1459         int restart_tmr;
1460 
1461         /* (Re)start timer if not started or the current timeout
1462          * is larger than \p backoff_ms. */
1463         tmr_next = rd_kafka_timer_next(&rktp->rktp_rkt->rkt_rk->rk_timers,
1464                                        &rktp->rktp_offset_query_tmr, 1);
1465 
1466         restart_tmr = (tmr_next == -1 ||
1467                        tmr_next > rd_clock() + (backoff_ms * 1000ll));
1468 
1469         rd_kafka_dbg(rktp->rktp_rkt->rkt_rk, TOPIC, "OFFSET",
1470                      "%s [%"PRId32"]: %s: %s for offset %s",
1471                      rktp->rktp_rkt->rkt_topic->str,
1472                      rktp->rktp_partition,
1473                      reason,
1474                      restart_tmr ?
1475                      "(re)starting offset query timer" :
1476                      "offset query timer already scheduled",
1477                      rd_kafka_offset2str(rktp->rktp_query_offset));
1478 
1479         rd_kafka_toppar_set_fetch_state(rktp,
1480                                         RD_KAFKA_TOPPAR_FETCH_OFFSET_QUERY);
1481 
1482         if (restart_tmr)
1483                 rd_kafka_timer_start(&rktp->rktp_rkt->rkt_rk->rk_timers,
1484                                      &rktp->rktp_offset_query_tmr,
1485                                      backoff_ms*1000ll,
1486                                      rd_kafka_offset_query_tmr_cb, rktp);
1487 }
1488 
1489 
1490 
1491 /**
1492  * Send OffsetRequest for toppar.
1493  *
1494  * If \p backoff_ms is non-zero only the query timer is started,
1495  * otherwise a query is triggered directly.
1496  *
1497  * Locality: toppar handler thread
1498  * Locks: toppar_lock() must be held
1499  */
rd_kafka_toppar_offset_request(rd_kafka_toppar_t * rktp,int64_t query_offset,int backoff_ms)1500 void rd_kafka_toppar_offset_request (rd_kafka_toppar_t *rktp,
1501 				     int64_t query_offset, int backoff_ms) {
1502 	rd_kafka_broker_t *rkb;
1503 
1504 	rd_kafka_assert(NULL,
1505 			thrd_is_current(rktp->rktp_rkt->rkt_rk->rk_thread));
1506 
1507         rkb = rktp->rktp_broker;
1508 
1509         if (!backoff_ms && (!rkb || rkb->rkb_source == RD_KAFKA_INTERNAL))
1510                 backoff_ms = 500;
1511 
1512         if (backoff_ms) {
1513                 rd_kafka_toppar_offset_retry(rktp, backoff_ms,
1514                                              !rkb ?
1515                                              "no current leader for partition":
1516                                              "backoff");
1517                 return;
1518         }
1519 
1520 
1521         rd_kafka_timer_stop(&rktp->rktp_rkt->rkt_rk->rk_timers,
1522                             &rktp->rktp_offset_query_tmr, 1/*lock*/);
1523 
1524 
1525 	if (query_offset == RD_KAFKA_OFFSET_STORED &&
1526             rktp->rktp_rkt->rkt_conf.offset_store_method ==
1527             RD_KAFKA_OFFSET_METHOD_BROKER) {
1528                 /*
1529                  * Get stored offset from broker based storage:
1530                  * ask cgrp manager for offsets
1531                  */
1532                 rd_kafka_toppar_offset_fetch(
1533 			rktp,
1534 			RD_KAFKA_REPLYQ(rktp->rktp_ops,
1535 					rktp->rktp_op_version));
1536 
1537 	} else {
1538                 rd_kafka_topic_partition_list_t *offsets;
1539 
1540                 /*
1541                  * Look up logical offset (end,beginning,tail,..)
1542                  */
1543 
1544                 rd_rkb_dbg(rkb, TOPIC, "OFFREQ",
1545                            "Partition %.*s [%"PRId32"]: querying for logical "
1546                            "offset %s (opv %d)",
1547                            RD_KAFKAP_STR_PR(rktp->rktp_rkt->rkt_topic),
1548                            rktp->rktp_partition,
1549                            rd_kafka_offset2str(query_offset),
1550 			   rktp->rktp_op_version);
1551 
1552                 rd_kafka_toppar_keep(rktp); /* refcnt for OffsetRequest opaque*/
1553 
1554 		if (query_offset <= RD_KAFKA_OFFSET_TAIL_BASE)
1555 			query_offset = RD_KAFKA_OFFSET_END;
1556 
1557                 offsets = rd_kafka_topic_partition_list_new(1);
1558                 rd_kafka_topic_partition_list_add(
1559                         offsets,
1560                         rktp->rktp_rkt->rkt_topic->str,
1561                         rktp->rktp_partition)->offset = query_offset;
1562 
1563                 rd_kafka_OffsetRequest(rkb, offsets, 0,
1564                                        RD_KAFKA_REPLYQ(rktp->rktp_ops,
1565                                                        rktp->rktp_op_version),
1566                                        rd_kafka_toppar_handle_Offset,
1567                                        rktp);
1568 
1569                 rd_kafka_topic_partition_list_destroy(offsets);
1570         }
1571 
1572         rd_kafka_toppar_set_fetch_state(rktp,
1573 					RD_KAFKA_TOPPAR_FETCH_OFFSET_WAIT);
1574 }
1575 
1576 
1577 /**
1578  * Start fetching toppar.
1579  *
1580  * Locality: toppar handler thread
1581  * Locks: none
1582  */
rd_kafka_toppar_fetch_start(rd_kafka_toppar_t * rktp,int64_t offset,rd_kafka_op_t * rko_orig)1583 static void rd_kafka_toppar_fetch_start (rd_kafka_toppar_t *rktp,
1584 					 int64_t offset,
1585 					 rd_kafka_op_t *rko_orig) {
1586         rd_kafka_cgrp_t *rkcg = rko_orig->rko_u.fetch_start.rkcg;
1587         rd_kafka_resp_err_t err = 0;
1588         int32_t version = rko_orig->rko_version;
1589 
1590 	rd_kafka_toppar_lock(rktp);
1591 
1592         rd_kafka_dbg(rktp->rktp_rkt->rkt_rk, TOPIC, "FETCH",
1593                      "Start fetch for %.*s [%"PRId32"] in "
1594                      "state %s at offset %s (v%"PRId32")",
1595                      RD_KAFKAP_STR_PR(rktp->rktp_rkt->rkt_topic),
1596                      rktp->rktp_partition,
1597                      rd_kafka_fetch_states[rktp->rktp_fetch_state],
1598                      rd_kafka_offset2str(offset), version);
1599 
1600         if (rktp->rktp_fetch_state == RD_KAFKA_TOPPAR_FETCH_STOPPING) {
1601                 err = RD_KAFKA_RESP_ERR__PREV_IN_PROGRESS;
1602 		rd_kafka_toppar_unlock(rktp);
1603                 goto err_reply;
1604         }
1605 
1606         rd_kafka_toppar_op_version_bump(rktp, version);
1607 
1608         if (rkcg) {
1609                 rd_kafka_assert(rktp->rktp_rkt->rkt_rk, !rktp->rktp_cgrp);
1610                 /* Attach toppar to cgrp */
1611                 rktp->rktp_cgrp = rkcg;
1612                 rd_kafka_cgrp_op(rkcg, rktp, RD_KAFKA_NO_REPLYQ,
1613                                  RD_KAFKA_OP_PARTITION_JOIN, 0);
1614         }
1615 
1616 
1617         if (offset == RD_KAFKA_OFFSET_BEGINNING ||
1618 	    offset == RD_KAFKA_OFFSET_END ||
1619             offset <= RD_KAFKA_OFFSET_TAIL_BASE) {
1620 		rd_kafka_toppar_next_offset_handle(rktp, offset);
1621 
1622 	} else if (offset == RD_KAFKA_OFFSET_STORED) {
1623                 rd_kafka_offset_store_init(rktp);
1624 
1625 	} else if (offset == RD_KAFKA_OFFSET_INVALID) {
1626 		rd_kafka_offset_reset(rktp, offset,
1627 				      RD_KAFKA_RESP_ERR__NO_OFFSET,
1628 				      "no previously committed offset "
1629 				      "available");
1630 
1631 	} else {
1632 		rktp->rktp_next_offset = offset;
1633                 rd_kafka_toppar_set_fetch_state(rktp,
1634 						RD_KAFKA_TOPPAR_FETCH_ACTIVE);
1635 
1636                 /* Wake-up broker thread which might be idling on IO */
1637                 if (rktp->rktp_broker)
1638                         rd_kafka_broker_wakeup(rktp->rktp_broker);
1639 
1640 	}
1641 
1642         rktp->rktp_offsets_fin.eof_offset = RD_KAFKA_OFFSET_INVALID;
1643 
1644 	rd_kafka_toppar_unlock(rktp);
1645 
1646         /* Signal back to caller thread that start has commenced, or err */
1647 err_reply:
1648         if (rko_orig->rko_replyq.q) {
1649                 rd_kafka_op_t *rko;
1650 
1651                 rko = rd_kafka_op_new(RD_KAFKA_OP_FETCH_START);
1652 
1653                 rko->rko_err = err;
1654                 rko->rko_rktp = rd_kafka_toppar_keep(rktp);
1655 
1656                 rd_kafka_replyq_enq(&rko_orig->rko_replyq, rko, 0);
1657         }
1658 }
1659 
1660 
1661 
1662 
1663 /**
1664  * Mark toppar's fetch state as stopped (all decommissioning is done,
1665  * offsets are stored, etc).
1666  *
1667  * Locality: toppar handler thread
1668  * Locks: toppar_lock(rktp) MUST be held
1669  */
rd_kafka_toppar_fetch_stopped(rd_kafka_toppar_t * rktp,rd_kafka_resp_err_t err)1670 void rd_kafka_toppar_fetch_stopped (rd_kafka_toppar_t *rktp,
1671                                     rd_kafka_resp_err_t err) {
1672 
1673 
1674         rd_kafka_toppar_set_fetch_state(rktp, RD_KAFKA_TOPPAR_FETCH_STOPPED);
1675 
1676         rktp->rktp_app_offset = RD_KAFKA_OFFSET_INVALID;
1677 
1678         if (rktp->rktp_cgrp) {
1679                 /* Detach toppar from cgrp */
1680                 rd_kafka_cgrp_op(rktp->rktp_cgrp, rktp, RD_KAFKA_NO_REPLYQ,
1681                                  RD_KAFKA_OP_PARTITION_LEAVE, 0);
1682                 rktp->rktp_cgrp = NULL;
1683         }
1684 
1685         /* Signal back to application thread that stop is done. */
1686 	if (rktp->rktp_replyq.q) {
1687 		rd_kafka_op_t *rko;
1688 		rko = rd_kafka_op_new(RD_KAFKA_OP_FETCH_STOP|RD_KAFKA_OP_REPLY);
1689                 rko->rko_err = err;
1690 		rko->rko_rktp = rd_kafka_toppar_keep(rktp);
1691 
1692 		rd_kafka_replyq_enq(&rktp->rktp_replyq, rko, 0);
1693 	}
1694 }
1695 
1696 
1697 /**
1698  * Stop toppar fetcher.
1699  * This is usually an async operation.
1700  *
1701  * Locality: toppar handler thread
1702  */
rd_kafka_toppar_fetch_stop(rd_kafka_toppar_t * rktp,rd_kafka_op_t * rko_orig)1703 void rd_kafka_toppar_fetch_stop (rd_kafka_toppar_t *rktp,
1704 				 rd_kafka_op_t *rko_orig) {
1705         int32_t version = rko_orig->rko_version;
1706 
1707 	rd_kafka_toppar_lock(rktp);
1708 
1709         rd_kafka_dbg(rktp->rktp_rkt->rkt_rk, TOPIC, "FETCH",
1710                      "Stopping fetch for %.*s [%"PRId32"] in state %s (v%d)",
1711                      RD_KAFKAP_STR_PR(rktp->rktp_rkt->rkt_topic),
1712                      rktp->rktp_partition,
1713                      rd_kafka_fetch_states[rktp->rktp_fetch_state], version);
1714 
1715         rd_kafka_toppar_op_version_bump(rktp, version);
1716 
1717 	/* Abort pending offset lookups. */
1718 	if (rktp->rktp_fetch_state == RD_KAFKA_TOPPAR_FETCH_OFFSET_QUERY)
1719 		rd_kafka_timer_stop(&rktp->rktp_rkt->rkt_rk->rk_timers,
1720 				    &rktp->rktp_offset_query_tmr,
1721 				    1/*lock*/);
1722 
1723         /* Clear out the forwarding queue. */
1724         rd_kafka_q_fwd_set(rktp->rktp_fetchq, NULL);
1725 
1726         /* Assign the future replyq to propagate stop results. */
1727         rd_kafka_assert(rktp->rktp_rkt->rkt_rk, rktp->rktp_replyq.q == NULL);
1728         rktp->rktp_replyq = rko_orig->rko_replyq;
1729         rd_kafka_replyq_clear(&rko_orig->rko_replyq);
1730 
1731         rd_kafka_toppar_set_fetch_state(rktp, RD_KAFKA_TOPPAR_FETCH_STOPPING);
1732 
1733         /* Stop offset store (possibly async).
1734          * NOTE: will call .._stopped() if store finishes immediately,
1735          *       so no more operations after this call! */
1736         rd_kafka_offset_store_stop(rktp);
1737 
1738 	rd_kafka_toppar_unlock(rktp);
1739 }
1740 
1741 
1742 /**
1743  * Update a toppars offset.
1744  * The toppar must have been previously FETCH_START:ed
1745  *
1746  * Locality: toppar handler thread
1747  */
rd_kafka_toppar_seek(rd_kafka_toppar_t * rktp,int64_t offset,rd_kafka_op_t * rko_orig)1748 void rd_kafka_toppar_seek (rd_kafka_toppar_t *rktp,
1749 			   int64_t offset, rd_kafka_op_t *rko_orig) {
1750         rd_kafka_resp_err_t err = 0;
1751         int32_t version = rko_orig->rko_version;
1752 
1753 	rd_kafka_toppar_lock(rktp);
1754 
1755         rd_kafka_dbg(rktp->rktp_rkt->rkt_rk, TOPIC, "FETCH",
1756                      "Seek %.*s [%"PRId32"] to offset %s "
1757                      "in state %s (v%"PRId32")",
1758                      RD_KAFKAP_STR_PR(rktp->rktp_rkt->rkt_topic),
1759                      rktp->rktp_partition,
1760 		     rd_kafka_offset2str(offset),
1761                      rd_kafka_fetch_states[rktp->rktp_fetch_state], version);
1762 
1763 
1764         if (rktp->rktp_fetch_state == RD_KAFKA_TOPPAR_FETCH_STOPPING) {
1765                 err = RD_KAFKA_RESP_ERR__PREV_IN_PROGRESS;
1766                 goto err_reply;
1767         } else if (!RD_KAFKA_TOPPAR_FETCH_IS_STARTED(rktp->rktp_fetch_state)) {
1768                 err = RD_KAFKA_RESP_ERR__STATE;
1769                 goto err_reply;
1770         } else if (offset == RD_KAFKA_OFFSET_STORED) {
1771 		err = RD_KAFKA_RESP_ERR__INVALID_ARG;
1772 		goto err_reply;
1773 	}
1774 
1775         rd_kafka_toppar_op_version_bump(rktp, version);
1776 
1777 	/* Abort pending offset lookups. */
1778 	if (rktp->rktp_fetch_state == RD_KAFKA_TOPPAR_FETCH_OFFSET_QUERY)
1779 		rd_kafka_timer_stop(&rktp->rktp_rkt->rkt_rk->rk_timers,
1780 				    &rktp->rktp_offset_query_tmr,
1781 				    1/*lock*/);
1782 
1783 	if (RD_KAFKA_OFFSET_IS_LOGICAL(offset))
1784 		rd_kafka_toppar_next_offset_handle(rktp, offset);
1785 	else {
1786 		rktp->rktp_next_offset = offset;
1787                 rd_kafka_toppar_set_fetch_state(rktp,
1788 						RD_KAFKA_TOPPAR_FETCH_ACTIVE);
1789 
1790                 /* Wake-up broker thread which might be idling on IO */
1791                 if (rktp->rktp_broker)
1792                         rd_kafka_broker_wakeup(rktp->rktp_broker);
1793 	}
1794 
1795         /* Signal back to caller thread that seek has commenced, or err */
1796 err_reply:
1797 	rd_kafka_toppar_unlock(rktp);
1798 
1799         if (rko_orig->rko_replyq.q) {
1800                 rd_kafka_op_t *rko;
1801 
1802                 rko = rd_kafka_op_new(RD_KAFKA_OP_SEEK|RD_KAFKA_OP_REPLY);
1803 
1804                 rko->rko_err = err;
1805 		rko->rko_u.fetch_start.offset =
1806 			rko_orig->rko_u.fetch_start.offset;
1807                 rko->rko_rktp = rd_kafka_toppar_keep(rktp);
1808 
1809                 rd_kafka_replyq_enq(&rko_orig->rko_replyq, rko, 0);
1810         }
1811 }
1812 
1813 
1814 /**
1815  * @brief Pause/resume toppar.
1816  *
1817  * This is the internal handler of the pause/resume op.
1818  *
1819  * @locality toppar's handler thread
1820  */
rd_kafka_toppar_pause_resume(rd_kafka_toppar_t * rktp,rd_kafka_op_t * rko_orig)1821 static void rd_kafka_toppar_pause_resume (rd_kafka_toppar_t *rktp,
1822                                           rd_kafka_op_t *rko_orig) {
1823 	rd_kafka_t *rk = rktp->rktp_rkt->rkt_rk;
1824 	int pause = rko_orig->rko_u.pause.pause;
1825 	int flag = rko_orig->rko_u.pause.flag;
1826         int32_t version = rko_orig->rko_version;
1827 
1828 	rd_kafka_toppar_lock(rktp);
1829 
1830         rd_kafka_toppar_op_version_bump(rktp, version);
1831 
1832         if (!pause && (rktp->rktp_flags & flag) != flag) {
1833                 rd_kafka_dbg(rk, TOPIC, "RESUME",
1834                              "Not resuming %s [%"PRId32"]: "
1835                              "partition is not paused by %s",
1836                              rktp->rktp_rkt->rkt_topic->str,
1837                              rktp->rktp_partition,
1838                              (flag & RD_KAFKA_TOPPAR_F_APP_PAUSE ?
1839                               "application" : "library"));
1840                 rd_kafka_toppar_unlock(rktp);
1841                 return;
1842         }
1843 
1844 	if (pause) {
1845                 /* Pause partition by setting either
1846                  * RD_KAFKA_TOPPAR_F_APP_PAUSE or
1847                  * RD_KAFKA_TOPPAR_F_LIB_PAUSE */
1848 		rktp->rktp_flags |= flag;
1849 
1850 		if (rk->rk_type == RD_KAFKA_CONSUMER) {
1851 			/* Save offset of last consumed message+1 as the
1852 			 * next message to fetch on resume. */
1853 			if (rktp->rktp_app_offset != RD_KAFKA_OFFSET_INVALID) {
1854 				rktp->rktp_next_offset = rktp->rktp_app_offset;
1855 			}
1856 
1857 			rd_kafka_dbg(rk, TOPIC, pause?"PAUSE":"RESUME",
1858 				     "%s %s [%"PRId32"]: at offset %s "
1859 				     "(state %s, v%d)",
1860 				     pause ? "Pause":"Resume",
1861 				     rktp->rktp_rkt->rkt_topic->str,
1862 				     rktp->rktp_partition,
1863 				     rd_kafka_offset2str(
1864 					     rktp->rktp_next_offset),
1865 				     rd_kafka_fetch_states[rktp->
1866 							   rktp_fetch_state],
1867 				     version);
1868 		} else {
1869 			rd_kafka_dbg(rk, TOPIC, pause?"PAUSE":"RESUME",
1870 				     "%s %s [%"PRId32"] (state %s, v%d)",
1871 				     pause ? "Pause":"Resume",
1872 				     rktp->rktp_rkt->rkt_topic->str,
1873 				     rktp->rktp_partition,
1874 				     rd_kafka_fetch_states[rktp->
1875 							   rktp_fetch_state],
1876 				     version);
1877 			}
1878 
1879 	} else {
1880                 /* Unset the RD_KAFKA_TOPPAR_F_APP_PAUSE or
1881                  * RD_KAFKA_TOPPAR_F_LIB_PAUSE flag */
1882 		rktp->rktp_flags &= ~flag;
1883 
1884 		if (rk->rk_type == RD_KAFKA_CONSUMER) {
1885 			rd_kafka_dbg(rk, TOPIC, pause?"PAUSE":"RESUME",
1886 				     "%s %s [%"PRId32"]: at offset %s "
1887 				     "(state %s, v%d)",
1888 				     rktp->rktp_fetch_state ==
1889 				     RD_KAFKA_TOPPAR_FETCH_ACTIVE ?
1890 				     "Resuming" : "Not resuming stopped",
1891 				     rktp->rktp_rkt->rkt_topic->str,
1892 				     rktp->rktp_partition,
1893 				     rd_kafka_offset2str(
1894 					     rktp->rktp_next_offset),
1895 				     rd_kafka_fetch_states[rktp->
1896 							   rktp_fetch_state],
1897 				     version);
1898 
1899 			/* If the resuming offset is logical we
1900 			 * need to trigger a seek (that performs the
1901 			 * logical->absolute lookup logic) to get
1902 			 * things going.
1903 			 * Typical case is when a partition is paused
1904 			 * before anything has been consumed by app
1905 			 * yet thus having rktp_app_offset=INVALID. */
1906                         if (!RD_KAFKA_TOPPAR_IS_PAUSED(rktp) &&
1907                             (rktp->rktp_fetch_state ==
1908                              RD_KAFKA_TOPPAR_FETCH_ACTIVE ||
1909                              rktp->rktp_fetch_state ==
1910                              RD_KAFKA_TOPPAR_FETCH_OFFSET_WAIT) &&
1911                             rktp->rktp_next_offset == RD_KAFKA_OFFSET_INVALID)
1912                         	rd_kafka_toppar_next_offset_handle(
1913                         		rktp, rktp->rktp_next_offset);
1914 
1915 		} else
1916 			rd_kafka_dbg(rk, TOPIC, pause?"PAUSE":"RESUME",
1917 				     "%s %s [%"PRId32"] (state %s, v%d)",
1918 				     pause ? "Pause":"Resume",
1919 				     rktp->rktp_rkt->rkt_topic->str,
1920 				     rktp->rktp_partition,
1921 				     rd_kafka_fetch_states[rktp->
1922 							   rktp_fetch_state],
1923 				     version);
1924 	}
1925 	rd_kafka_toppar_unlock(rktp);
1926 
1927 	if (pause && rk->rk_type == RD_KAFKA_CONSUMER) {
1928 		/* Flush partition's fetch queue */
1929 		rd_kafka_q_purge_toppar_version(rktp->rktp_fetchq, rktp,
1930 						rko_orig->rko_version);
1931 	}
1932 }
1933 
1934 
1935 
1936 
1937 /**
1938  * @brief Decide whether this toppar should be on the fetch list or not.
1939  *
1940  * Also:
1941  *  - update toppar's op version (for broker thread's copy)
1942  *  - finalize statistics (move rktp_offsets to rktp_offsets_fin)
1943  *
1944  * @returns the partition's Fetch backoff timestamp, or 0 if no backoff.
1945  *
1946  * @locality broker thread
1947  * @locks none
1948  */
rd_kafka_toppar_fetch_decide(rd_kafka_toppar_t * rktp,rd_kafka_broker_t * rkb,int force_remove)1949 rd_ts_t rd_kafka_toppar_fetch_decide (rd_kafka_toppar_t *rktp,
1950                                       rd_kafka_broker_t *rkb,
1951                                       int force_remove) {
1952         int should_fetch = 1;
1953         const char *reason = "";
1954         int32_t version;
1955         rd_ts_t ts_backoff = 0;
1956         rd_bool_t lease_expired = rd_false;
1957 
1958         rd_kafka_toppar_lock(rktp);
1959 
1960         /* Check for preferred replica lease expiry */
1961         lease_expired =
1962                 rktp->rktp_leader_id != rktp->rktp_broker_id &&
1963                 rd_interval(&rktp->rktp_lease_intvl,
1964                             5*60*1000*1000/*5 minutes*/, 0) > 0;
1965         if (lease_expired) {
1966                 /* delete_to_leader() requires no locks to be held */
1967                 rd_kafka_toppar_unlock(rktp);
1968                 rd_kafka_toppar_delegate_to_leader(rktp);
1969                 rd_kafka_toppar_lock(rktp);
1970 
1971                 reason = "preferred replica lease expired";
1972                 should_fetch = 0;
1973                 goto done;
1974         }
1975 
1976 	/* Forced removal from fetch list */
1977 	if (unlikely(force_remove)) {
1978 		reason = "forced removal";
1979 		should_fetch = 0;
1980 		goto done;
1981 	}
1982 
1983 	if (unlikely((rktp->rktp_flags & RD_KAFKA_TOPPAR_F_REMOVE) != 0)) {
1984 		reason = "partition removed";
1985 		should_fetch = 0;
1986 		goto done;
1987 	}
1988 
1989 	/* Skip toppars not in active fetch state */
1990 	if (rktp->rktp_fetch_state != RD_KAFKA_TOPPAR_FETCH_ACTIVE) {
1991                 reason = "not in active fetch state";
1992 		should_fetch = 0;
1993 		goto done;
1994 	}
1995 
1996         /* Update broker thread's fetch op version */
1997         version = rktp->rktp_op_version;
1998         if (version > rktp->rktp_fetch_version ||
1999             rktp->rktp_next_offset != rktp->rktp_last_next_offset ||
2000             rktp->rktp_offsets.fetch_offset == RD_KAFKA_OFFSET_INVALID) {
2001                 /* New version barrier, something was modified from the
2002                  * control plane. Reset and start over.
2003 		 * Alternatively only the next_offset changed but not the
2004 		 * barrier, which is the case when automatically triggering
2005 		 * offset.reset (such as on PARTITION_EOF or
2006                  * OFFSET_OUT_OF_RANGE). */
2007 
2008                 rd_kafka_dbg(rktp->rktp_rkt->rkt_rk, TOPIC, "FETCHDEC",
2009                              "Topic %s [%"PRId32"]: fetch decide: "
2010                              "updating to version %d (was %d) at "
2011                              "offset %"PRId64" (was %"PRId64")",
2012                              rktp->rktp_rkt->rkt_topic->str,
2013                              rktp->rktp_partition,
2014                              version, rktp->rktp_fetch_version,
2015                              rktp->rktp_next_offset,
2016                              rktp->rktp_offsets.fetch_offset);
2017 
2018                 rd_kafka_offset_stats_reset(&rktp->rktp_offsets);
2019 
2020                 /* New start offset */
2021                 rktp->rktp_offsets.fetch_offset = rktp->rktp_next_offset;
2022 		rktp->rktp_last_next_offset = rktp->rktp_next_offset;
2023 
2024                 rktp->rktp_fetch_version = version;
2025 
2026                 /* Clear last error to propagate new fetch
2027                  * errors if encountered. */
2028                 rktp->rktp_last_error = RD_KAFKA_RESP_ERR_NO_ERROR;
2029 
2030                 rd_kafka_q_purge_toppar_version(rktp->rktp_fetchq, rktp,
2031                                                 version);
2032         }
2033 
2034 
2035 	if (RD_KAFKA_TOPPAR_IS_PAUSED(rktp)) {
2036 		should_fetch = 0;
2037 		reason = "paused";
2038 
2039 	} else if (RD_KAFKA_OFFSET_IS_LOGICAL(rktp->rktp_next_offset)) {
2040                 should_fetch = 0;
2041                 reason = "no concrete offset";
2042 
2043         } else if (rd_kafka_q_len(rktp->rktp_fetchq) >=
2044 		   rkb->rkb_rk->rk_conf.queued_min_msgs) {
2045 		/* Skip toppars who's local message queue is already above
2046 		 * the lower threshold. */
2047                 reason = "queued.min.messages exceeded";
2048                 should_fetch = 0;
2049 
2050         } else if ((int64_t)rd_kafka_q_size(rktp->rktp_fetchq) >=
2051             rkb->rkb_rk->rk_conf.queued_max_msg_bytes) {
2052                 reason = "queued.max.messages.kbytes exceeded";
2053                 should_fetch = 0;
2054 
2055         } else if (rktp->rktp_ts_fetch_backoff > rd_clock()) {
2056                 reason = "fetch backed off";
2057                 ts_backoff = rktp->rktp_ts_fetch_backoff;
2058                 should_fetch = 0;
2059         }
2060 
2061  done:
2062         /* Copy offset stats to finalized place holder. */
2063         rktp->rktp_offsets_fin = rktp->rktp_offsets;
2064 
2065         if (rktp->rktp_fetch != should_fetch) {
2066                 rd_rkb_dbg(rkb, FETCH, "FETCH",
2067                            "Topic %s [%"PRId32"] in state %s at offset %s "
2068                            "(%d/%d msgs, %"PRId64"/%d kb queued, "
2069 			   "opv %"PRId32") is %s%s",
2070                            rktp->rktp_rkt->rkt_topic->str,
2071                            rktp->rktp_partition,
2072 			   rd_kafka_fetch_states[rktp->rktp_fetch_state],
2073                            rd_kafka_offset2str(rktp->rktp_next_offset),
2074                            rd_kafka_q_len(rktp->rktp_fetchq),
2075                            rkb->rkb_rk->rk_conf.queued_min_msgs,
2076                            rd_kafka_q_size(rktp->rktp_fetchq) / 1024,
2077                            rkb->rkb_rk->rk_conf.queued_max_msg_kbytes,
2078 			   rktp->rktp_fetch_version,
2079                            should_fetch ? "fetchable" : "not fetchable: ",
2080                            reason);
2081 
2082                 if (should_fetch) {
2083 			rd_dassert(rktp->rktp_fetch_version > 0);
2084                         rd_kafka_broker_active_toppar_add(rkb, rktp,
2085                                                           *reason ? reason :
2086                                                           "fetchable");
2087                 } else {
2088                         rd_kafka_broker_active_toppar_del(rkb, rktp, reason);
2089                 }
2090         }
2091 
2092         rd_kafka_toppar_unlock(rktp);
2093 
2094         /* Non-fetching partitions will have an
2095          * indefinate backoff, unless explicitly specified. */
2096         if (!should_fetch && !ts_backoff)
2097                 ts_backoff = RD_TS_MAX;
2098 
2099         return ts_backoff;
2100 }
2101 
2102 
2103 /**
2104  * @brief Serve a toppar in a consumer broker thread.
2105  *        This is considered the fast path and should be minimal,
2106  *        mostly focusing on fetch related mechanisms.
2107  *
2108  * @returns the partition's Fetch backoff timestamp, or 0 if no backoff.
2109  *
2110  * @locality broker thread
2111  * @locks none
2112  */
rd_kafka_broker_consumer_toppar_serve(rd_kafka_broker_t * rkb,rd_kafka_toppar_t * rktp)2113 rd_ts_t rd_kafka_broker_consumer_toppar_serve (rd_kafka_broker_t *rkb,
2114                                                rd_kafka_toppar_t *rktp) {
2115         return rd_kafka_toppar_fetch_decide(rktp, rkb, 0);
2116 }
2117 
2118 
2119 
2120 /**
2121  * @brief Serve a toppar op
2122  *
2123  * @param rktp may be NULL for certain ops (OP_RECV_BUF)
2124  *
2125  * Will send an empty reply op if the request rko has a replyq set,
2126  * providing synchronous operation.
2127  *
2128  * @locality toppar handler thread
2129  */
2130 static rd_kafka_op_res_t
rd_kafka_toppar_op_serve(rd_kafka_t * rk,rd_kafka_q_t * rkq,rd_kafka_op_t * rko,rd_kafka_q_cb_type_t cb_type,void * opaque)2131 rd_kafka_toppar_op_serve (rd_kafka_t *rk,
2132                           rd_kafka_q_t *rkq, rd_kafka_op_t *rko,
2133                           rd_kafka_q_cb_type_t cb_type, void *opaque) {
2134 	rd_kafka_toppar_t *rktp = NULL;
2135 	int outdated = 0;
2136 
2137 	if (rko->rko_rktp)
2138 		rktp = rko->rko_rktp;
2139 
2140 	if (rktp) {
2141 		outdated = rd_kafka_op_version_outdated(rko,
2142 							rktp->rktp_op_version);
2143 
2144 		rd_kafka_dbg(rktp->rktp_rkt->rkt_rk, TOPIC, "OP",
2145 			     "%.*s [%"PRId32"] received %sop %s "
2146 			     "(v%"PRId32") in fetch-state %s (opv%d)",
2147 			     RD_KAFKAP_STR_PR(rktp->rktp_rkt->rkt_topic),
2148 			     rktp->rktp_partition,
2149 			     outdated ? "outdated ": "",
2150 			     rd_kafka_op2str(rko->rko_type),
2151 			     rko->rko_version,
2152 			     rd_kafka_fetch_states[rktp->rktp_fetch_state],
2153 			     rktp->rktp_op_version);
2154 
2155 		if (outdated) {
2156 #if ENABLE_DEVEL
2157 			rd_kafka_op_print(stdout, "PART_OUTDATED", rko);
2158 #endif
2159                         rd_kafka_op_reply(rko, RD_KAFKA_RESP_ERR__OUTDATED);
2160 			return RD_KAFKA_OP_RES_HANDLED;
2161 		}
2162 	}
2163 
2164 	switch ((int)rko->rko_type)
2165 	{
2166 	case RD_KAFKA_OP_FETCH_START:
2167 		rd_kafka_toppar_fetch_start(rktp,
2168 					    rko->rko_u.fetch_start.offset, rko);
2169 		break;
2170 
2171 	case RD_KAFKA_OP_FETCH_STOP:
2172 		rd_kafka_toppar_fetch_stop(rktp, rko);
2173 		break;
2174 
2175 	case RD_KAFKA_OP_SEEK:
2176 		rd_kafka_toppar_seek(rktp, rko->rko_u.fetch_start.offset, rko);
2177 		break;
2178 
2179 	case RD_KAFKA_OP_PAUSE:
2180 		rd_kafka_toppar_pause_resume(rktp, rko);
2181 		break;
2182 
2183         case RD_KAFKA_OP_OFFSET_COMMIT | RD_KAFKA_OP_REPLY:
2184                 rd_kafka_assert(NULL, rko->rko_u.offset_commit.cb);
2185                 rko->rko_u.offset_commit.cb(
2186                         rk, rko->rko_err,
2187                         rko->rko_u.offset_commit.partitions,
2188                         rko->rko_u.offset_commit.opaque);
2189                 break;
2190 
2191 	case RD_KAFKA_OP_OFFSET_FETCH | RD_KAFKA_OP_REPLY:
2192         {
2193                 /* OffsetFetch reply */
2194                 rd_kafka_topic_partition_list_t *offsets =
2195 			rko->rko_u.offset_fetch.partitions;
2196 		int64_t offset = RD_KAFKA_OFFSET_INVALID;
2197 
2198                 rktp = offsets->elems[0]._private;
2199                 if (!rko->rko_err) {
2200                         /* Request succeeded but per-partition might have failed */
2201                         rko->rko_err = offsets->elems[0].err;
2202 			offset       = offsets->elems[0].offset;
2203                 }
2204                 offsets->elems[0]._private = NULL;
2205                 rd_kafka_topic_partition_list_destroy(offsets);
2206 		rko->rko_u.offset_fetch.partitions = NULL;
2207 
2208 		rd_kafka_timer_stop(&rktp->rktp_rkt->rkt_rk->rk_timers,
2209 				    &rktp->rktp_offset_query_tmr,
2210 				    1/*lock*/);
2211 
2212 		rd_kafka_toppar_lock(rktp);
2213 
2214 		if (rko->rko_err) {
2215 			rd_kafka_dbg(rktp->rktp_rkt->rkt_rk,
2216 				     TOPIC, "OFFSET",
2217 				     "Failed to fetch offset for "
2218 				     "%.*s [%"PRId32"]: %s",
2219 				     RD_KAFKAP_STR_PR(rktp->rktp_rkt->rkt_topic),
2220 				     rktp->rktp_partition,
2221 				     rd_kafka_err2str(rko->rko_err));
2222 
2223                         /* Keep on querying until we succeed. */
2224                         rd_kafka_toppar_offset_retry(rktp, 500,
2225                                                      "failed to fetch offsets");
2226                         rd_kafka_toppar_unlock(rktp);
2227 
2228 
2229                         /* Propagate error to application */
2230                         if (rko->rko_err != RD_KAFKA_RESP_ERR__WAIT_COORD &&
2231                             rko->rko_err !=
2232                             RD_KAFKA_RESP_ERR_UNSTABLE_OFFSET_COMMIT)
2233                                 rd_kafka_consumer_err(
2234                                         rktp->rktp_fetchq,
2235                                         RD_KAFKA_NODEID_UA,
2236                                         rko->rko_err, 0,
2237                                         NULL, rktp,
2238                                         RD_KAFKA_OFFSET_INVALID,
2239                                         "Failed to fetch "
2240                                         "offsets from brokers: %s",
2241                                         rd_kafka_err2str(rko->rko_err));
2242 
2243 			rd_kafka_toppar_destroy(rktp);
2244 
2245 			break;
2246 		}
2247 
2248 		rd_kafka_dbg(rktp->rktp_rkt->rkt_rk,
2249 			     TOPIC, "OFFSET",
2250 			     "%.*s [%"PRId32"]: OffsetFetch returned "
2251 			     "offset %s (%"PRId64")",
2252 			     RD_KAFKAP_STR_PR(rktp->rktp_rkt->rkt_topic),
2253 			     rktp->rktp_partition,
2254 			     rd_kafka_offset2str(offset), offset);
2255 
2256 		if (offset > 0)
2257 			rktp->rktp_committed_offset = offset;
2258 
2259 		if (offset >= 0)
2260 			rd_kafka_toppar_next_offset_handle(rktp, offset);
2261 		else
2262 			rd_kafka_offset_reset(rktp, offset,
2263 					      RD_KAFKA_RESP_ERR__NO_OFFSET,
2264 					      "no previously committed offset "
2265 					      "available");
2266 		rd_kafka_toppar_unlock(rktp);
2267 
2268                 rd_kafka_toppar_destroy(rktp);
2269         }
2270         break;
2271 
2272         default:
2273                 rd_kafka_assert(NULL, !*"unknown type");
2274                 break;
2275         }
2276 
2277         rd_kafka_op_reply(rko, RD_KAFKA_RESP_ERR_NO_ERROR);
2278 
2279         return RD_KAFKA_OP_RES_HANDLED;
2280 }
2281 
2282 
2283 
2284 
2285 
2286 /**
2287  * Send command op to toppar (handled by toppar's thread).
2288  *
2289  * Locality: any thread
2290  */
rd_kafka_toppar_op0(rd_kafka_toppar_t * rktp,rd_kafka_op_t * rko,rd_kafka_replyq_t replyq)2291 static void rd_kafka_toppar_op0 (rd_kafka_toppar_t *rktp, rd_kafka_op_t *rko,
2292 				 rd_kafka_replyq_t replyq) {
2293         rko->rko_rktp = rd_kafka_toppar_keep(rktp);
2294 	rko->rko_replyq = replyq;
2295 
2296         rd_kafka_q_enq(rktp->rktp_ops, rko);
2297 }
2298 
2299 
2300 /**
2301  * Send command op to toppar (handled by toppar's thread).
2302  *
2303  * Locality: any thread
2304  */
rd_kafka_toppar_op(rd_kafka_toppar_t * rktp,rd_kafka_op_type_t type,int32_t version,int64_t offset,rd_kafka_cgrp_t * rkcg,rd_kafka_replyq_t replyq)2305 static void rd_kafka_toppar_op (rd_kafka_toppar_t *rktp,
2306 				rd_kafka_op_type_t type, int32_t version,
2307 				int64_t offset, rd_kafka_cgrp_t *rkcg,
2308 				rd_kafka_replyq_t replyq) {
2309         rd_kafka_op_t *rko;
2310 
2311         rko = rd_kafka_op_new(type);
2312 	rko->rko_version = version;
2313         if (type == RD_KAFKA_OP_FETCH_START ||
2314 	    type == RD_KAFKA_OP_SEEK) {
2315 		if (rkcg)
2316 			rko->rko_u.fetch_start.rkcg = rkcg;
2317 		rko->rko_u.fetch_start.offset = offset;
2318 	}
2319 
2320 	rd_kafka_toppar_op0(rktp, rko, replyq);
2321 }
2322 
2323 
2324 
2325 /**
2326  * Start consuming partition (async operation).
2327  *  'offset' is the initial offset
2328  *  'fwdq' is an optional queue to forward messages to, if this is NULL
2329  *  then messages will be enqueued on rktp_fetchq.
2330  *  'replyq' is an optional queue for handling the consume_start ack.
2331  *
2332  * This is the thread-safe interface that can be called from any thread.
2333  */
rd_kafka_toppar_op_fetch_start(rd_kafka_toppar_t * rktp,int64_t offset,rd_kafka_q_t * fwdq,rd_kafka_replyq_t replyq)2334 rd_kafka_resp_err_t rd_kafka_toppar_op_fetch_start (rd_kafka_toppar_t *rktp,
2335                                                     int64_t offset,
2336                                                     rd_kafka_q_t *fwdq,
2337                                                     rd_kafka_replyq_t replyq) {
2338 	int32_t version;
2339 
2340         rd_kafka_q_lock(rktp->rktp_fetchq);
2341         if (fwdq && !(rktp->rktp_fetchq->rkq_flags & RD_KAFKA_Q_F_FWD_APP))
2342                 rd_kafka_q_fwd_set0(rktp->rktp_fetchq, fwdq,
2343                                     0, /* no do_lock */
2344                                     0 /* no fwd_app */);
2345         rd_kafka_q_unlock(rktp->rktp_fetchq);
2346 
2347 	/* Bump version barrier. */
2348 	version = rd_kafka_toppar_version_new_barrier(rktp);
2349 
2350 	rd_kafka_dbg(rktp->rktp_rkt->rkt_rk, TOPIC, "CONSUMER",
2351 		     "Start consuming %.*s [%"PRId32"] at "
2352 		     "offset %s (v%"PRId32")",
2353 		     RD_KAFKAP_STR_PR(rktp->rktp_rkt->rkt_topic),
2354 		     rktp->rktp_partition, rd_kafka_offset2str(offset),
2355 		     version);
2356 
2357         rd_kafka_toppar_op(rktp, RD_KAFKA_OP_FETCH_START, version,
2358                            offset, rktp->rktp_rkt->rkt_rk->rk_cgrp, replyq);
2359 
2360         return RD_KAFKA_RESP_ERR_NO_ERROR;
2361 }
2362 
2363 
2364 /**
2365  * Stop consuming partition (async operatoin)
2366  * This is thread-safe interface that can be called from any thread.
2367  *
2368  * Locality: any thread
2369  */
rd_kafka_toppar_op_fetch_stop(rd_kafka_toppar_t * rktp,rd_kafka_replyq_t replyq)2370 rd_kafka_resp_err_t rd_kafka_toppar_op_fetch_stop (rd_kafka_toppar_t *rktp,
2371                                                    rd_kafka_replyq_t replyq) {
2372 	int32_t version;
2373 
2374 	/* Bump version barrier. */
2375         version = rd_kafka_toppar_version_new_barrier(rktp);
2376 
2377         rd_kafka_dbg(rktp->rktp_rkt->rkt_rk, TOPIC, "CONSUMER",
2378 		     "Stop consuming %.*s [%"PRId32"] (v%"PRId32")",
2379 		     RD_KAFKAP_STR_PR(rktp->rktp_rkt->rkt_topic),
2380 		     rktp->rktp_partition, version);
2381 
2382         rd_kafka_toppar_op(rktp, RD_KAFKA_OP_FETCH_STOP, version,
2383 			   0, NULL, replyq);
2384 
2385         return RD_KAFKA_RESP_ERR_NO_ERROR;
2386 }
2387 
2388 
2389 /**
2390  * Set/Seek offset of a consumed partition (async operation).
2391  *  'offset' is the target offset
2392  *  'replyq' is an optional queue for handling the ack.
2393  *
2394  * This is the thread-safe interface that can be called from any thread.
2395  */
rd_kafka_toppar_op_seek(rd_kafka_toppar_t * rktp,int64_t offset,rd_kafka_replyq_t replyq)2396 rd_kafka_resp_err_t rd_kafka_toppar_op_seek (rd_kafka_toppar_t *rktp,
2397                                              int64_t offset,
2398                                              rd_kafka_replyq_t replyq) {
2399 	int32_t version;
2400 
2401 	/* Bump version barrier. */
2402 	version = rd_kafka_toppar_version_new_barrier(rktp);
2403 
2404 	rd_kafka_dbg(rktp->rktp_rkt->rkt_rk, TOPIC, "CONSUMER",
2405 		     "Seek %.*s [%"PRId32"] to "
2406 		     "offset %s (v%"PRId32")",
2407 		     RD_KAFKAP_STR_PR(rktp->rktp_rkt->rkt_topic),
2408 		     rktp->rktp_partition, rd_kafka_offset2str(offset),
2409 		     version);
2410 
2411         rd_kafka_toppar_op(rktp, RD_KAFKA_OP_SEEK, version,
2412 			   offset, NULL, replyq);
2413 
2414         return RD_KAFKA_RESP_ERR_NO_ERROR;
2415 }
2416 
2417 
2418 /**
2419  * @brief Pause/resume partition (async operation).
2420  *
2421  * @param flag is either RD_KAFKA_TOPPAR_F_APP_PAUSE or .._F_LIB_PAUSE
2422  *             depending on if the app paused or librdkafka.
2423  * @param pause is 1 for pausing or 0 for resuming.
2424  *
2425  * @locality any
2426  */
2427 rd_kafka_resp_err_t
rd_kafka_toppar_op_pause_resume(rd_kafka_toppar_t * rktp,int pause,int flag,rd_kafka_replyq_t replyq)2428 rd_kafka_toppar_op_pause_resume (rd_kafka_toppar_t *rktp, int pause, int flag,
2429                                  rd_kafka_replyq_t replyq) {
2430 	int32_t version;
2431 	rd_kafka_op_t *rko;
2432 
2433 	/* Bump version barrier. */
2434 	version = rd_kafka_toppar_version_new_barrier(rktp);
2435 
2436 	rd_kafka_dbg(rktp->rktp_rkt->rkt_rk, TOPIC, pause ? "PAUSE":"RESUME",
2437 		     "%s %.*s [%"PRId32"] (v%"PRId32")",
2438 		     pause ? "Pause" : "Resume",
2439 		     RD_KAFKAP_STR_PR(rktp->rktp_rkt->rkt_topic),
2440 		     rktp->rktp_partition, version);
2441 
2442 	rko = rd_kafka_op_new(RD_KAFKA_OP_PAUSE);
2443 	rko->rko_version = version;
2444 	rko->rko_u.pause.pause = pause;
2445 	rko->rko_u.pause.flag = flag;
2446 
2447         rd_kafka_toppar_op0(rktp, rko, replyq);
2448 
2449         return RD_KAFKA_RESP_ERR_NO_ERROR;
2450 }
2451 
2452 
2453 /**
2454  * @brief Pause a toppar (asynchronous).
2455  *
2456  * @param flag is either RD_KAFKA_TOPPAR_F_APP_PAUSE or .._F_LIB_PAUSE
2457  *             depending on if the app paused or librdkafka.
2458  *
2459  * @locality any
2460  * @locks none needed
2461  */
rd_kafka_toppar_pause(rd_kafka_toppar_t * rktp,int flag)2462 void rd_kafka_toppar_pause (rd_kafka_toppar_t *rktp, int flag) {
2463         rd_kafka_toppar_op_pause_resume(rktp, 1/*pause*/, flag,
2464                                         RD_KAFKA_NO_REPLYQ);
2465 }
2466 
2467 /**
2468  * @brief Resume a toppar (asynchronous).
2469  *
2470  * @param flag is either RD_KAFKA_TOPPAR_F_APP_PAUSE or .._F_LIB_PAUSE
2471  *             depending on if the app paused or librdkafka.
2472  *
2473  * @locality any
2474  * @locks none needed
2475  */
rd_kafka_toppar_resume(rd_kafka_toppar_t * rktp,int flag)2476 void rd_kafka_toppar_resume (rd_kafka_toppar_t *rktp, int flag) {
2477         rd_kafka_toppar_op_pause_resume(rktp, 1/*pause*/, flag,
2478                                         RD_KAFKA_NO_REPLYQ);
2479 }
2480 
2481 
2482 
2483 /**
2484  * @brief Pause or resume a list of partitions.
2485  *
2486  * @param flag is either RD_KAFKA_TOPPAR_F_APP_PAUSE or .._F_LIB_PAUSE
2487  *             depending on if the app paused or librdkafka.
2488  * @param pause true for pausing, false for resuming.
2489  * @param async RD_SYNC to wait for background thread to handle op,
2490  *              RD_ASYNC for asynchronous operation.
2491  *
2492  * @locality any
2493  *
2494  * @remark This is an asynchronous call, the actual pause/resume is performed
2495  *         by toppar_pause() in the toppar's handler thread.
2496  */
2497 rd_kafka_resp_err_t
rd_kafka_toppars_pause_resume(rd_kafka_t * rk,rd_bool_t pause,rd_async_t async,int flag,rd_kafka_topic_partition_list_t * partitions)2498 rd_kafka_toppars_pause_resume (rd_kafka_t *rk,
2499                                rd_bool_t pause, rd_async_t async, int flag,
2500                                rd_kafka_topic_partition_list_t *partitions) {
2501         int i;
2502         int waitcnt = 0;
2503         rd_kafka_q_t *tmpq = NULL;
2504 
2505         if (!async)
2506                 tmpq = rd_kafka_q_new(rk);
2507 
2508 	rd_kafka_dbg(rk, TOPIC, pause ? "PAUSE":"RESUME",
2509 		     "%s %s %d partition(s)",
2510 		     flag & RD_KAFKA_TOPPAR_F_APP_PAUSE ? "Application" : "Library",
2511 		     pause ? "pausing" : "resuming", partitions->cnt);
2512 
2513 	for (i = 0 ; i < partitions->cnt ; i++) {
2514 		rd_kafka_topic_partition_t *rktpar = &partitions->elems[i];
2515 		rd_kafka_toppar_t *rktp;
2516 
2517                 rktp = rd_kafka_topic_partition_get_toppar(rk, rktpar,
2518                                                            rd_false);
2519 		if (!rktp) {
2520 			rd_kafka_dbg(rk, TOPIC, pause ? "PAUSE":"RESUME",
2521 				     "%s %s [%"PRId32"]: skipped: "
2522 				     "unknown partition",
2523 				     pause ? "Pause":"Resume",
2524 				     rktpar->topic, rktpar->partition);
2525 
2526 			rktpar->err = RD_KAFKA_RESP_ERR__UNKNOWN_PARTITION;
2527 			continue;
2528 		}
2529 
2530                 rd_kafka_toppar_op_pause_resume(rktp, pause, flag,
2531                                                 RD_KAFKA_REPLYQ(tmpq, 0));
2532 
2533                 if (!async)
2534                         waitcnt++;
2535 
2536 		rd_kafka_toppar_destroy(rktp);
2537 
2538 		rktpar->err = RD_KAFKA_RESP_ERR_NO_ERROR;
2539 	}
2540 
2541         if (!async) {
2542                 while (waitcnt-- > 0)
2543                         rd_kafka_q_wait_result(tmpq, RD_POLL_INFINITE);
2544 
2545                 rd_kafka_q_destroy_owner(tmpq);
2546         }
2547 
2548 	return RD_KAFKA_RESP_ERR_NO_ERROR;
2549 }
2550 
2551 
2552 
2553 
2554 
2555 /**
2556  * Propagate error for toppar
2557  */
rd_kafka_toppar_enq_error(rd_kafka_toppar_t * rktp,rd_kafka_resp_err_t err,const char * reason)2558 void rd_kafka_toppar_enq_error (rd_kafka_toppar_t *rktp,
2559                                 rd_kafka_resp_err_t err,
2560                                 const char *reason) {
2561         rd_kafka_op_t *rko;
2562         char buf[512];
2563 
2564         rko = rd_kafka_op_new(RD_KAFKA_OP_ERR);
2565         rko->rko_err  = err;
2566         rko->rko_rktp = rd_kafka_toppar_keep(rktp);
2567 
2568         rd_snprintf(buf, sizeof(buf), "%.*s [%"PRId32"]: %s (%s)",
2569                     RD_KAFKAP_STR_PR(rktp->rktp_rkt->rkt_topic),
2570                     rktp->rktp_partition, reason,
2571                     rd_kafka_err2str(err));
2572 
2573         rko->rko_u.err.errstr = rd_strdup(buf);
2574 
2575         rd_kafka_q_enq(rktp->rktp_fetchq, rko);
2576 }
2577 
2578 
2579 
2580 
2581 
2582 /**
2583  * Returns the currently delegated broker for this toppar.
2584  * If \p proper_broker is set NULL will be returned if current handler
2585  * is not a proper broker (INTERNAL broker).
2586  *
2587  * The returned broker has an increased refcount.
2588  *
2589  * Locks: none
2590  */
rd_kafka_toppar_broker(rd_kafka_toppar_t * rktp,int proper_broker)2591 rd_kafka_broker_t *rd_kafka_toppar_broker (rd_kafka_toppar_t *rktp,
2592                                            int proper_broker) {
2593         rd_kafka_broker_t *rkb;
2594         rd_kafka_toppar_lock(rktp);
2595         rkb = rktp->rktp_broker;
2596         if (rkb) {
2597                 if (proper_broker && rkb->rkb_source == RD_KAFKA_INTERNAL)
2598                         rkb = NULL;
2599                 else
2600                         rd_kafka_broker_keep(rkb);
2601         }
2602         rd_kafka_toppar_unlock(rktp);
2603 
2604         return rkb;
2605 }
2606 
2607 
2608 /**
2609  * @brief Take action when partition broker becomes unavailable.
2610  *        This should be called when requests fail with
2611  *        NOT_LEADER_FOR.. or similar error codes, e.g. ProduceRequest.
2612  *
2613  * @locks none
2614  * @locality any
2615  */
rd_kafka_toppar_leader_unavailable(rd_kafka_toppar_t * rktp,const char * reason,rd_kafka_resp_err_t err)2616 void rd_kafka_toppar_leader_unavailable (rd_kafka_toppar_t *rktp,
2617                                          const char *reason,
2618                                          rd_kafka_resp_err_t err) {
2619         rd_kafka_topic_t *rkt = rktp->rktp_rkt;
2620 
2621         rd_kafka_dbg(rkt->rkt_rk, TOPIC, "BROKERUA",
2622                      "%s [%"PRId32"]: broker unavailable: %s: %s",
2623                      rkt->rkt_topic->str, rktp->rktp_partition, reason,
2624                      rd_kafka_err2str(err));
2625 
2626         rd_kafka_topic_wrlock(rkt);
2627         rkt->rkt_flags |= RD_KAFKA_TOPIC_F_LEADER_UNAVAIL;
2628         rd_kafka_topic_wrunlock(rkt);
2629 
2630         rd_kafka_topic_fast_leader_query(rkt->rkt_rk);
2631 }
2632 
2633 
2634 const char *
rd_kafka_topic_partition_topic(const rd_kafka_topic_partition_t * rktpar)2635 rd_kafka_topic_partition_topic (const rd_kafka_topic_partition_t *rktpar) {
2636         const rd_kafka_toppar_t *rktp = (const rd_kafka_toppar_t *)rktpar;
2637         return rktp->rktp_rkt->rkt_topic->str;
2638 }
2639 
2640 int32_t
rd_kafka_topic_partition_partition(const rd_kafka_topic_partition_t * rktpar)2641 rd_kafka_topic_partition_partition (const rd_kafka_topic_partition_t *rktpar) {
2642         const rd_kafka_toppar_t *rktp = (const rd_kafka_toppar_t *)rktpar;
2643         return rktp->rktp_partition;
2644 }
2645 
rd_kafka_topic_partition_get(const rd_kafka_topic_partition_t * rktpar,const char ** name,int32_t * partition)2646 void rd_kafka_topic_partition_get (const rd_kafka_topic_partition_t *rktpar,
2647                                    const char **name, int32_t *partition) {
2648         const rd_kafka_toppar_t *rktp = (const rd_kafka_toppar_t *)rktpar;
2649         *name = rktp->rktp_rkt->rkt_topic->str;
2650         *partition = rktp->rktp_partition;
2651 }
2652 
2653 
2654 
2655 
2656 /**
2657  *
2658  * rd_kafka_topic_partition_t lists
2659  * Fixed-size non-growable list of partitions for propagation to application.
2660  *
2661  */
2662 
2663 
2664 static void
rd_kafka_topic_partition_list_grow(rd_kafka_topic_partition_list_t * rktparlist,int add_size)2665 rd_kafka_topic_partition_list_grow (rd_kafka_topic_partition_list_t *rktparlist,
2666                                     int add_size) {
2667         if (add_size < rktparlist->size)
2668                 add_size = RD_MAX(rktparlist->size, 32);
2669 
2670         rktparlist->size += add_size;
2671         rktparlist->elems = rd_realloc(rktparlist->elems,
2672                                        sizeof(*rktparlist->elems) *
2673                                        rktparlist->size);
2674 
2675 }
2676 
2677 
2678 /**
2679  * @brief Initialize a list for fitting \p size partitions.
2680  */
rd_kafka_topic_partition_list_init(rd_kafka_topic_partition_list_t * rktparlist,int size)2681 void rd_kafka_topic_partition_list_init (
2682         rd_kafka_topic_partition_list_t *rktparlist, int size) {
2683         memset(rktparlist, 0, sizeof(*rktparlist));
2684 
2685         if (size > 0)
2686                 rd_kafka_topic_partition_list_grow(rktparlist, size);
2687 }
2688 
2689 
2690 /**
2691  * Create a list for fitting 'size' topic_partitions (rktp).
2692  */
rd_kafka_topic_partition_list_new(int size)2693 rd_kafka_topic_partition_list_t *rd_kafka_topic_partition_list_new (int size) {
2694         rd_kafka_topic_partition_list_t *rktparlist;
2695 
2696         rktparlist = rd_calloc(1, sizeof(*rktparlist));
2697 
2698         if (size > 0)
2699                 rd_kafka_topic_partition_list_grow(rktparlist, size);
2700 
2701         return rktparlist;
2702 }
2703 
2704 
2705 
rd_kafka_topic_partition_new(const char * topic,int32_t partition)2706 rd_kafka_topic_partition_t *rd_kafka_topic_partition_new (const char *topic,
2707 							  int32_t partition) {
2708 	rd_kafka_topic_partition_t *rktpar = rd_calloc(1, sizeof(*rktpar));
2709 
2710 	rktpar->topic = rd_strdup(topic);
2711 	rktpar->partition = partition;
2712 
2713 	return rktpar;
2714 }
2715 
2716 
2717 rd_kafka_topic_partition_t *
rd_kafka_topic_partition_copy(const rd_kafka_topic_partition_t * src)2718 rd_kafka_topic_partition_copy (const rd_kafka_topic_partition_t *src) {
2719         return rd_kafka_topic_partition_new(src->topic, src->partition);
2720 }
2721 
2722 
2723 /** Same as above but with generic void* signature */
rd_kafka_topic_partition_copy_void(const void * src)2724 void *rd_kafka_topic_partition_copy_void (const void *src) {
2725         return rd_kafka_topic_partition_copy(src);
2726 }
2727 
2728 
2729 rd_kafka_topic_partition_t *
rd_kafka_topic_partition_new_from_rktp(rd_kafka_toppar_t * rktp)2730 rd_kafka_topic_partition_new_from_rktp (rd_kafka_toppar_t *rktp) {
2731 	rd_kafka_topic_partition_t *rktpar = rd_calloc(1, sizeof(*rktpar));
2732 
2733 	rktpar->topic = RD_KAFKAP_STR_DUP(rktp->rktp_rkt->rkt_topic);
2734 	rktpar->partition = rktp->rktp_partition;
2735 
2736 	return rktpar;
2737 }
2738 
2739 
2740 
2741 static void
rd_kafka_topic_partition_destroy0(rd_kafka_topic_partition_t * rktpar,int do_free)2742 rd_kafka_topic_partition_destroy0 (rd_kafka_topic_partition_t *rktpar, int do_free) {
2743 	if (rktpar->topic)
2744 		rd_free(rktpar->topic);
2745 	if (rktpar->metadata)
2746 		rd_free(rktpar->metadata);
2747 	if (rktpar->_private)
2748 		rd_kafka_toppar_destroy((rd_kafka_toppar_t *)rktpar->_private);
2749 
2750 	if (do_free)
2751 		rd_free(rktpar);
2752 }
2753 
2754 
2755 /**
2756  * @brief Destroy all partitions in list.
2757  *
2758  * @remark The allocated size of the list will not shrink.
2759  */
rd_kafka_topic_partition_list_clear(rd_kafka_topic_partition_list_t * rktparlist)2760 void rd_kafka_topic_partition_list_clear (
2761         rd_kafka_topic_partition_list_t *rktparlist) {
2762         int i;
2763 
2764         for (i = 0 ; i < rktparlist->cnt ; i++)
2765                 rd_kafka_topic_partition_destroy0(&rktparlist->elems[i], 0);
2766 
2767         rktparlist->cnt = 0;
2768 }
2769 
2770 
rd_kafka_topic_partition_destroy_free(void * ptr)2771 void rd_kafka_topic_partition_destroy_free (void *ptr) {
2772         rd_kafka_topic_partition_destroy0(ptr, rd_true/*do_free*/);
2773 }
2774 
rd_kafka_topic_partition_destroy(rd_kafka_topic_partition_t * rktpar)2775 void rd_kafka_topic_partition_destroy (rd_kafka_topic_partition_t *rktpar) {
2776 	rd_kafka_topic_partition_destroy0(rktpar, 1);
2777 }
2778 
2779 
2780 /**
2781  * Destroys a list previously created with .._list_new() and drops
2782  * any references to contained toppars.
2783  */
2784 void
rd_kafka_topic_partition_list_destroy(rd_kafka_topic_partition_list_t * rktparlist)2785 rd_kafka_topic_partition_list_destroy (rd_kafka_topic_partition_list_t *rktparlist) {
2786         int i;
2787 
2788         for (i = 0 ; i < rktparlist->cnt ; i++)
2789 		rd_kafka_topic_partition_destroy0(&rktparlist->elems[i], 0);
2790 
2791         if (rktparlist->elems)
2792                 rd_free(rktparlist->elems);
2793 
2794         rd_free(rktparlist);
2795 }
2796 
2797 
2798 /**
2799  * @brief Wrapper for rd_kafka_topic_partition_list_destroy() that
2800  *        matches the standard free(void *) signature, for callback use.
2801  */
rd_kafka_topic_partition_list_destroy_free(void * ptr)2802 void rd_kafka_topic_partition_list_destroy_free (void *ptr) {
2803         rd_kafka_topic_partition_list_destroy(
2804                 (rd_kafka_topic_partition_list_t *)ptr);
2805 }
2806 
2807 
2808 /**
2809  * Add a partition to an rktpar list.
2810  * The list must have enough room to fit it.
2811  *
2812  * '_private' must be NULL or a valid 'rd_kafka_toppar_t *'.
2813  *
2814  * Returns a pointer to the added element.
2815  */
2816 rd_kafka_topic_partition_t *
rd_kafka_topic_partition_list_add0(const char * func,int line,rd_kafka_topic_partition_list_t * rktparlist,const char * topic,int32_t partition,rd_kafka_toppar_t * _private)2817 rd_kafka_topic_partition_list_add0 (const char *func, int line,
2818                                     rd_kafka_topic_partition_list_t *rktparlist,
2819                                     const char *topic, int32_t partition,
2820 				    rd_kafka_toppar_t *_private) {
2821         rd_kafka_topic_partition_t *rktpar;
2822         if (rktparlist->cnt == rktparlist->size)
2823                 rd_kafka_topic_partition_list_grow(rktparlist, 1);
2824         rd_kafka_assert(NULL, rktparlist->cnt < rktparlist->size);
2825 
2826         rktpar = &rktparlist->elems[rktparlist->cnt++];
2827         memset(rktpar, 0, sizeof(*rktpar));
2828         rktpar->topic = rd_strdup(topic);
2829         rktpar->partition = partition;
2830 	rktpar->offset = RD_KAFKA_OFFSET_INVALID;
2831         rktpar->_private = _private;
2832         if (_private)
2833                 rd_kafka_toppar_keep_fl(func, line, _private);
2834 
2835         return rktpar;
2836 }
2837 
2838 
2839 rd_kafka_topic_partition_t *
rd_kafka_topic_partition_list_add(rd_kafka_topic_partition_list_t * rktparlist,const char * topic,int32_t partition)2840 rd_kafka_topic_partition_list_add (rd_kafka_topic_partition_list_t *rktparlist,
2841                                    const char *topic, int32_t partition) {
2842         return rd_kafka_topic_partition_list_add0(__FUNCTION__,__LINE__,
2843                                                   rktparlist,
2844                                                   topic, partition, NULL);
2845 }
2846 
2847 
2848 /**
2849  * Adds a consecutive list of partitions to a list
2850  */
2851 void
rd_kafka_topic_partition_list_add_range(rd_kafka_topic_partition_list_t * rktparlist,const char * topic,int32_t start,int32_t stop)2852 rd_kafka_topic_partition_list_add_range (rd_kafka_topic_partition_list_t
2853                                          *rktparlist,
2854                                          const char *topic,
2855                                          int32_t start, int32_t stop) {
2856 
2857         for (; start <= stop ; start++)
2858                 rd_kafka_topic_partition_list_add(rktparlist, topic, start);
2859 }
2860 
2861 
2862 rd_kafka_topic_partition_t *
rd_kafka_topic_partition_list_upsert(rd_kafka_topic_partition_list_t * rktparlist,const char * topic,int32_t partition)2863 rd_kafka_topic_partition_list_upsert (
2864         rd_kafka_topic_partition_list_t *rktparlist,
2865         const char *topic, int32_t partition) {
2866         rd_kafka_topic_partition_t *rktpar;
2867 
2868         if ((rktpar = rd_kafka_topic_partition_list_find(rktparlist,
2869                                                          topic, partition)))
2870                 return rktpar;
2871 
2872         return rd_kafka_topic_partition_list_add(rktparlist, topic, partition);
2873 }
2874 
2875 
2876 /**
2877  * @brief Update \p dst with info from \p src.
2878  */
rd_kafka_topic_partition_update(rd_kafka_topic_partition_t * dst,const rd_kafka_topic_partition_t * src)2879 void rd_kafka_topic_partition_update (rd_kafka_topic_partition_t *dst,
2880                                       const rd_kafka_topic_partition_t *src) {
2881         rd_dassert(!strcmp(dst->topic, src->topic));
2882         rd_dassert(dst->partition == src->partition);
2883         rd_dassert(dst != src);
2884 
2885         dst->offset = src->offset;
2886         dst->opaque = src->opaque;
2887         dst->err = src->err;
2888 
2889         if (src->metadata_size > 0) {
2890                 dst->metadata = rd_malloc(src->metadata_size);
2891                 dst->metadata_size = src->metadata_size;;
2892                 memcpy(dst->metadata, src->metadata, dst->metadata_size);
2893         }
2894 }
2895 
2896 /**
2897  * @brief Creates a copy of \p rktpar and adds it to \p rktparlist
2898  */
rd_kafka_topic_partition_list_add_copy(rd_kafka_topic_partition_list_t * rktparlist,const rd_kafka_topic_partition_t * rktpar)2899 void rd_kafka_topic_partition_list_add_copy (
2900         rd_kafka_topic_partition_list_t *rktparlist,
2901         const rd_kafka_topic_partition_t *rktpar) {
2902         rd_kafka_topic_partition_t *dst;
2903 
2904         dst = rd_kafka_topic_partition_list_add0(
2905                 __FUNCTION__,__LINE__,
2906                 rktparlist,
2907                 rktpar->topic,
2908                 rktpar->partition,
2909                 rktpar->_private);
2910 
2911         rd_kafka_topic_partition_update(dst, rktpar);
2912 }
2913 
2914 
2915 
2916 /**
2917  * Create and return a copy of list 'src'
2918  */
2919 rd_kafka_topic_partition_list_t *
rd_kafka_topic_partition_list_copy(const rd_kafka_topic_partition_list_t * src)2920 rd_kafka_topic_partition_list_copy (const rd_kafka_topic_partition_list_t *src){
2921         rd_kafka_topic_partition_list_t *dst;
2922         int i;
2923 
2924         dst = rd_kafka_topic_partition_list_new(src->size);
2925 
2926         for (i = 0 ; i < src->cnt ; i++)
2927                 rd_kafka_topic_partition_list_add_copy(dst, &src->elems[i]);
2928         return dst;
2929 }
2930 
2931 /**
2932  * @brief Same as rd_kafka_topic_partition_list_copy() but suitable for
2933  *        rd_list_copy(). The \p opaque is ignored.
2934  */
2935 void *
rd_kafka_topic_partition_list_copy_opaque(const void * src,void * opaque)2936 rd_kafka_topic_partition_list_copy_opaque (const void *src, void *opaque) {
2937         return rd_kafka_topic_partition_list_copy(src);
2938 }
2939 
2940 /**
2941  * @brief Append copies of all elements in \p src to \p dst.
2942  *        No duplicate-checks are performed.
2943  */
rd_kafka_topic_partition_list_add_list(rd_kafka_topic_partition_list_t * dst,const rd_kafka_topic_partition_list_t * src)2944 void rd_kafka_topic_partition_list_add_list (
2945         rd_kafka_topic_partition_list_t *dst,
2946         const rd_kafka_topic_partition_list_t *src) {
2947         int i;
2948 
2949         if (src->cnt == 0)
2950                 return;
2951 
2952         if (dst->size < dst->cnt + src->cnt)
2953                 rd_kafka_topic_partition_list_grow(dst, src->cnt);
2954 
2955         for (i = 0 ; i < src->cnt ; i++)
2956                 rd_kafka_topic_partition_list_add_copy(dst, &src->elems[i]);
2957 }
2958 
2959 
2960 /**
2961  * @brief Compare two partition lists using partition comparator \p cmp.
2962  *
2963  * @warning This is an O(Na*Nb) operation.
2964  */
2965 int
rd_kafka_topic_partition_list_cmp(const void * _a,const void * _b,int (* cmp)(const void *,const void *))2966 rd_kafka_topic_partition_list_cmp (const void *_a, const void *_b,
2967                                    int (*cmp) (const void *, const void *)) {
2968         const rd_kafka_topic_partition_list_t *a = _a, *b = _b;
2969         int r;
2970         int i;
2971 
2972         r = a->cnt - b->cnt;
2973         if (r || a->cnt == 0)
2974                 return r;
2975 
2976         /* Since the lists may not be sorted we need to scan all of B
2977          * for each element in A.
2978          * FIXME: If the list sizes are larger than X we could create a
2979          *        temporary hash map instead. */
2980         for (i = 0 ; i < a->cnt ; i++) {
2981                 int j;
2982 
2983                 for (j = 0 ; j < b->cnt ; j++) {
2984                         r = cmp(&a->elems[i], &b->elems[j]);
2985                         if (!r)
2986                                 break;
2987                 }
2988 
2989                 if (j == b->cnt)
2990                         return 1;
2991         }
2992 
2993         return 0;
2994 }
2995 
2996 
2997 /**
2998  * @brief Ensures the \p rktpar has a toppar set in _private.
2999  *
3000  * @returns the toppar object (or possibly NULL if \p create_on_miss is true)
3001  *          WITHOUT refcnt increased.
3002  */
3003 rd_kafka_toppar_t *
rd_kafka_topic_partition_ensure_toppar(rd_kafka_t * rk,rd_kafka_topic_partition_t * rktpar,rd_bool_t create_on_miss)3004 rd_kafka_topic_partition_ensure_toppar (rd_kafka_t *rk,
3005                                         rd_kafka_topic_partition_t *rktpar,
3006                                         rd_bool_t create_on_miss) {
3007         if (!rktpar->_private)
3008                 rktpar->_private =
3009                         rd_kafka_toppar_get2(rk,
3010                                              rktpar->topic,
3011                                              rktpar->partition, 0,
3012                                              create_on_miss);
3013         return rktpar->_private;
3014 }
3015 
3016 
3017 /**
3018  * @returns (and sets if necessary) the \p rktpar's _private / toppar.
3019  * @remark a new reference is returned.
3020  */
3021 rd_kafka_toppar_t *
rd_kafka_topic_partition_get_toppar(rd_kafka_t * rk,rd_kafka_topic_partition_t * rktpar,rd_bool_t create_on_miss)3022 rd_kafka_topic_partition_get_toppar (rd_kafka_t *rk,
3023                                      rd_kafka_topic_partition_t *rktpar,
3024                                      rd_bool_t create_on_miss) {
3025         rd_kafka_toppar_t *rktp;
3026 
3027         rktp = rd_kafka_topic_partition_ensure_toppar(rk, rktpar,
3028                                                       create_on_miss);
3029 
3030         if (rktp)
3031                 rd_kafka_toppar_keep(rktp);
3032 
3033         return rktp;
3034 }
3035 
3036 
rd_kafka_topic_partition_cmp(const void * _a,const void * _b)3037 int rd_kafka_topic_partition_cmp (const void *_a, const void *_b) {
3038         const rd_kafka_topic_partition_t *a = _a;
3039         const rd_kafka_topic_partition_t *b = _b;
3040         int r = strcmp(a->topic, b->topic);
3041         if (r)
3042                 return r;
3043         else
3044                 return RD_CMP(a->partition, b->partition);
3045 }
3046 
3047 /** @brief Compare only the topic */
rd_kafka_topic_partition_cmp_topic(const void * _a,const void * _b)3048 int rd_kafka_topic_partition_cmp_topic (const void *_a, const void *_b) {
3049         const rd_kafka_topic_partition_t *a = _a;
3050         const rd_kafka_topic_partition_t *b = _b;
3051         return strcmp(a->topic, b->topic);
3052 }
3053 
rd_kafka_topic_partition_cmp_opaque(const void * _a,const void * _b,void * opaque)3054 static int rd_kafka_topic_partition_cmp_opaque (const void *_a, const void *_b,
3055                                                 void *opaque) {
3056         return rd_kafka_topic_partition_cmp(_a, _b);
3057 }
3058 
3059 /** @returns a hash of the topic and partition */
rd_kafka_topic_partition_hash(const void * _a)3060 unsigned int rd_kafka_topic_partition_hash (const void *_a) {
3061         const rd_kafka_topic_partition_t *a = _a;
3062         int r = 31 * 17 + a->partition;
3063         return 31 * r + rd_string_hash(a->topic, -1);
3064 }
3065 
3066 
3067 
3068 /**
3069  * @brief Search 'rktparlist' for 'topic' and 'partition'.
3070  * @returns the elems[] index or -1 on miss.
3071  */
3072 static int
rd_kafka_topic_partition_list_find0(const rd_kafka_topic_partition_list_t * rktparlist,const char * topic,int32_t partition,int (* cmp)(const void *,const void *))3073 rd_kafka_topic_partition_list_find0 (
3074         const rd_kafka_topic_partition_list_t *rktparlist,
3075         const char *topic, int32_t partition,
3076         int (*cmp) (const void *, const void *)) {
3077         rd_kafka_topic_partition_t skel;
3078         int i;
3079 
3080         skel.topic = (char *)topic;
3081         skel.partition = partition;
3082 
3083         for (i = 0 ; i < rktparlist->cnt ; i++) {
3084                 if (!cmp(&skel, &rktparlist->elems[i]))
3085                         return i;
3086         }
3087 
3088         return -1;
3089 }
3090 
3091 rd_kafka_topic_partition_t *
rd_kafka_topic_partition_list_find(const rd_kafka_topic_partition_list_t * rktparlist,const char * topic,int32_t partition)3092 rd_kafka_topic_partition_list_find (
3093         const rd_kafka_topic_partition_list_t *rktparlist,
3094         const char *topic, int32_t partition) {
3095         int i = rd_kafka_topic_partition_list_find0(
3096                 rktparlist, topic, partition, rd_kafka_topic_partition_cmp);
3097         if (i == -1)
3098                 return NULL;
3099         else
3100                 return &rktparlist->elems[i];
3101 }
3102 
3103 
3104 int
rd_kafka_topic_partition_list_find_idx(const rd_kafka_topic_partition_list_t * rktparlist,const char * topic,int32_t partition)3105 rd_kafka_topic_partition_list_find_idx (
3106         const rd_kafka_topic_partition_list_t *rktparlist,
3107         const char *topic, int32_t partition) {
3108         return rd_kafka_topic_partition_list_find0(
3109                 rktparlist, topic, partition, rd_kafka_topic_partition_cmp);
3110 }
3111 
3112 
3113 /**
3114  * @returns the first element that matches \p topic, regardless of partition.
3115  */
3116 rd_kafka_topic_partition_t *
rd_kafka_topic_partition_list_find_topic(const rd_kafka_topic_partition_list_t * rktparlist,const char * topic)3117 rd_kafka_topic_partition_list_find_topic (
3118         const rd_kafka_topic_partition_list_t *rktparlist, const char *topic) {
3119         int i = rd_kafka_topic_partition_list_find0(
3120                 rktparlist, topic, RD_KAFKA_PARTITION_UA,
3121                 rd_kafka_topic_partition_cmp_topic);
3122         if (i == -1)
3123                 return NULL;
3124         else
3125                 return &rktparlist->elems[i];
3126 }
3127 
3128 
3129 int
rd_kafka_topic_partition_list_del_by_idx(rd_kafka_topic_partition_list_t * rktparlist,int idx)3130 rd_kafka_topic_partition_list_del_by_idx (rd_kafka_topic_partition_list_t *rktparlist,
3131 					  int idx) {
3132 	if (unlikely(idx < 0 || idx >= rktparlist->cnt))
3133 		return 0;
3134 
3135 	rd_kafka_topic_partition_destroy0(&rktparlist->elems[idx], 0);
3136 	memmove(&rktparlist->elems[idx], &rktparlist->elems[idx+1],
3137 		(rktparlist->cnt - idx - 1) * sizeof(rktparlist->elems[idx]));
3138 	rktparlist->cnt--;
3139 
3140 	return 1;
3141 }
3142 
3143 
3144 int
rd_kafka_topic_partition_list_del(rd_kafka_topic_partition_list_t * rktparlist,const char * topic,int32_t partition)3145 rd_kafka_topic_partition_list_del (rd_kafka_topic_partition_list_t *rktparlist,
3146 				   const char *topic, int32_t partition) {
3147         int i = rd_kafka_topic_partition_list_find0(
3148                 rktparlist, topic, partition, rd_kafka_topic_partition_cmp);
3149 	if (i == -1)
3150 		return 0;
3151 
3152 	return rd_kafka_topic_partition_list_del_by_idx(rktparlist, i);
3153 }
3154 
3155 
3156 
3157 /**
3158  * Returns true if 'topic' matches the 'rktpar', else false.
3159  * On match, if rktpar is a regex pattern then 'matched_by_regex' is set to 1.
3160  */
rd_kafka_topic_partition_match(rd_kafka_t * rk,const rd_kafka_group_member_t * rkgm,const rd_kafka_topic_partition_t * rktpar,const char * topic,int * matched_by_regex)3161 int rd_kafka_topic_partition_match (rd_kafka_t *rk,
3162 				    const rd_kafka_group_member_t *rkgm,
3163 				    const rd_kafka_topic_partition_t *rktpar,
3164 				    const char *topic, int *matched_by_regex) {
3165 	int ret = 0;
3166 
3167 	if (*rktpar->topic == '^') {
3168 		char errstr[128];
3169 
3170 		ret = rd_regex_match(rktpar->topic, topic,
3171 				     errstr, sizeof(errstr));
3172 		if (ret == -1) {
3173 			rd_kafka_dbg(rk, CGRP,
3174 				     "SUBMATCH",
3175 				     "Invalid regex for member "
3176 				     "\"%.*s\" subscription \"%s\": %s",
3177 				     RD_KAFKAP_STR_PR(rkgm->rkgm_member_id),
3178 				     rktpar->topic, errstr);
3179 			return 0;
3180 		}
3181 
3182 		if (ret && matched_by_regex)
3183 			*matched_by_regex = 1;
3184 
3185 	} else if (!strcmp(rktpar->topic, topic)) {
3186 
3187 		if (matched_by_regex)
3188 			*matched_by_regex = 0;
3189 
3190 		ret = 1;
3191 	}
3192 
3193 	return ret;
3194 }
3195 
3196 
3197 
rd_kafka_topic_partition_list_sort(rd_kafka_topic_partition_list_t * rktparlist,int (* cmp)(const void *,const void *,void *),void * opaque)3198 void rd_kafka_topic_partition_list_sort (
3199         rd_kafka_topic_partition_list_t *rktparlist,
3200         int (*cmp) (const void *, const void *, void *),
3201         void *opaque) {
3202 
3203         if (!cmp)
3204                 cmp = rd_kafka_topic_partition_cmp_opaque;
3205 
3206         rd_qsort_r(rktparlist->elems, rktparlist->cnt,
3207                    sizeof(*rktparlist->elems),
3208                    cmp, opaque);
3209 }
3210 
3211 
rd_kafka_topic_partition_list_sort_by_topic(rd_kafka_topic_partition_list_t * rktparlist)3212 void rd_kafka_topic_partition_list_sort_by_topic (
3213         rd_kafka_topic_partition_list_t *rktparlist) {
3214         rd_kafka_topic_partition_list_sort(rktparlist,
3215                                            rd_kafka_topic_partition_cmp_opaque,
3216                                            NULL);
3217 }
3218 
rd_kafka_topic_partition_list_set_offset(rd_kafka_topic_partition_list_t * rktparlist,const char * topic,int32_t partition,int64_t offset)3219 rd_kafka_resp_err_t rd_kafka_topic_partition_list_set_offset (
3220 	rd_kafka_topic_partition_list_t *rktparlist,
3221 	const char *topic, int32_t partition, int64_t offset) {
3222 	rd_kafka_topic_partition_t *rktpar;
3223 
3224 	if (!(rktpar = rd_kafka_topic_partition_list_find(rktparlist,
3225 							  topic, partition)))
3226 		return RD_KAFKA_RESP_ERR__UNKNOWN_PARTITION;
3227 
3228 	rktpar->offset = offset;
3229 
3230 	return RD_KAFKA_RESP_ERR_NO_ERROR;
3231 }
3232 
3233 
3234 /**
3235  * @brief Reset all offsets to the provided value.
3236  */
3237 void
rd_kafka_topic_partition_list_reset_offsets(rd_kafka_topic_partition_list_t * rktparlist,int64_t offset)3238 rd_kafka_topic_partition_list_reset_offsets (rd_kafka_topic_partition_list_t *rktparlist,
3239 					     int64_t offset) {
3240 
3241         int i;
3242         for (i = 0 ; i < rktparlist->cnt ; i++)
3243 		rktparlist->elems[i].offset = offset;
3244 }
3245 
3246 
3247 /**
3248  * Set offset values in partition list based on toppar's last stored offset.
3249  *
3250  *  from_rktp - true: set rktp's last stored offset, false: set def_value
3251  *  unless a concrete offset is set.
3252  *  is_commit: indicates that set offset is to be committed (for debug log)
3253  *
3254  * Returns the number of valid non-logical offsets (>=0).
3255  */
rd_kafka_topic_partition_list_set_offsets(rd_kafka_t * rk,rd_kafka_topic_partition_list_t * rktparlist,int from_rktp,int64_t def_value,int is_commit)3256 int rd_kafka_topic_partition_list_set_offsets (
3257 	rd_kafka_t *rk,
3258         rd_kafka_topic_partition_list_t *rktparlist,
3259         int from_rktp, int64_t def_value, int is_commit) {
3260         int i;
3261 	int valid_cnt = 0;
3262 
3263         for (i = 0 ; i < rktparlist->cnt ; i++) {
3264                 rd_kafka_topic_partition_t *rktpar = &rktparlist->elems[i];
3265 		const char *verb = "setting";
3266                 char preamble[80];
3267 
3268                 *preamble = '\0'; /* Avoid warning */
3269 
3270                 if (from_rktp) {
3271                         rd_kafka_toppar_t *rktp = rktpar->_private;
3272                         rd_kafka_toppar_lock(rktp);
3273 
3274                         if (rk->rk_conf.debug & (RD_KAFKA_DBG_CGRP |
3275                                                  RD_KAFKA_DBG_TOPIC))
3276                                 rd_snprintf(preamble, sizeof(preamble),
3277                                             "stored offset %"PRId64
3278                                             ", committed offset %"PRId64": ",
3279                                             rktp->rktp_stored_offset,
3280                                             rktp->rktp_committed_offset);
3281 
3282 			if (rktp->rktp_stored_offset >
3283 			    rktp->rktp_committed_offset) {
3284 				verb = "setting stored";
3285 				rktpar->offset = rktp->rktp_stored_offset;
3286 			} else {
3287 				rktpar->offset = RD_KAFKA_OFFSET_INVALID;
3288 			}
3289                         rd_kafka_toppar_unlock(rktp);
3290                 } else {
3291 			if (RD_KAFKA_OFFSET_IS_LOGICAL(rktpar->offset)) {
3292 				verb = "setting default";
3293 				rktpar->offset = def_value;
3294 			} else
3295 				verb = "keeping";
3296                 }
3297 
3298                 if (is_commit && rktpar->offset == RD_KAFKA_OFFSET_INVALID)
3299                         rd_kafka_dbg(rk, CGRP | RD_KAFKA_DBG_TOPIC, "OFFSET",
3300                                      "Topic %s [%"PRId32"]: "
3301                                      "%snot including in commit",
3302                                      rktpar->topic, rktpar->partition,
3303                                      preamble);
3304                 else
3305                         rd_kafka_dbg(rk, CGRP | RD_KAFKA_DBG_TOPIC, "OFFSET",
3306                                      "Topic %s [%"PRId32"]: "
3307                                      "%s%s offset %s%s",
3308                                      rktpar->topic, rktpar->partition,
3309                                      preamble,
3310                                      verb,
3311                                      rd_kafka_offset2str(rktpar->offset),
3312                                      is_commit ? " for commit" : "");
3313 
3314 		if (!RD_KAFKA_OFFSET_IS_LOGICAL(rktpar->offset))
3315 			valid_cnt++;
3316         }
3317 
3318 	return valid_cnt;
3319 }
3320 
3321 
3322 /**
3323  * @returns the number of partitions with absolute (non-logical) offsets set.
3324  */
rd_kafka_topic_partition_list_count_abs_offsets(const rd_kafka_topic_partition_list_t * rktparlist)3325 int rd_kafka_topic_partition_list_count_abs_offsets (
3326 	const rd_kafka_topic_partition_list_t *rktparlist) {
3327 	int i;
3328 	int valid_cnt = 0;
3329 
3330         for (i = 0 ; i < rktparlist->cnt ; i++)
3331 		if (!RD_KAFKA_OFFSET_IS_LOGICAL(rktparlist->elems[i].offset))
3332 			valid_cnt++;
3333 
3334 	return valid_cnt;
3335 }
3336 
3337 
3338 /**
3339  * @brief Update _private (toppar) field to point to valid rktp
3340  *        for each parition.
3341  *
3342  * @param create_on_miss Create partition (and topic_t object) if necessary.
3343  */
3344 void
rd_kafka_topic_partition_list_update_toppars(rd_kafka_t * rk,rd_kafka_topic_partition_list_t * rktparlist,rd_bool_t create_on_miss)3345 rd_kafka_topic_partition_list_update_toppars (rd_kafka_t *rk,
3346                                               rd_kafka_topic_partition_list_t
3347                                               *rktparlist,
3348                                               rd_bool_t create_on_miss) {
3349         int i;
3350         for (i = 0 ; i < rktparlist->cnt ; i++) {
3351                 rd_kafka_topic_partition_t *rktpar = &rktparlist->elems[i];
3352 
3353                 if (!rktpar->_private)
3354                         rktpar->_private =
3355                                 rd_kafka_toppar_get2(rk,
3356                                                      rktpar->topic,
3357                                                      rktpar->partition,
3358                                                      0/*not ua-on-miss*/,
3359                                                      create_on_miss);
3360 
3361         }
3362 }
3363 
3364 
3365 /**
3366  * @brief Populate \p leaders with the leaders+partitions for the partitions in
3367  *        \p rktparlist. Duplicates are suppressed.
3368  *
3369  *        If no leader is found for a partition that element's \c .err will
3370  *        be set to RD_KAFKA_RESP_ERR_LEADER_NOT_AVAILABLE.
3371  *
3372  *        If the partition does not exist \c .err will be set to
3373  *        RD_KAFKA_RESP_ERR__UNKNOWN_PARTITION.
3374  *
3375  * @param rktparlist The partitions to look up leaders for, the .err field
3376  *                   will be set according to outcome, e.g., ERR_NO_ERROR,
3377  *                   ERR_UNKNOWN_TOPIC_OR_PART, etc.
3378  * @param leaders rd_list_t of allocated (struct rd_kafka_partition_leader *)
3379  * @param query_topics (optional) rd_list of strdupped (char *)
3380  * @param query_unknown Add unknown topics to \p query_topics.
3381  * @param eonce (optional) For triggering asynchronously on cache change
3382  *              in case not all leaders are known now.
3383  *
3384  * @remark This is based on the current topic_t and partition state
3385  *         which may lag behind the last metadata update due to internal
3386  *         threading and also the fact that no topic_t may have been created.
3387  *
3388  * @param leaders rd_list_t of type (struct rd_kafka_partition_leader *)
3389  *
3390  * @returns true if all partitions have leaders, else false.
3391  *
3392  * @sa rd_kafka_topic_partition_list_get_leaders_by_metadata
3393  *
3394  * @locks rd_kafka_*lock() MUST NOT be held
3395  */
3396 static rd_bool_t
rd_kafka_topic_partition_list_get_leaders(rd_kafka_t * rk,rd_kafka_topic_partition_list_t * rktparlist,rd_list_t * leaders,rd_list_t * query_topics,rd_bool_t query_unknown,rd_kafka_enq_once_t * eonce)3397 rd_kafka_topic_partition_list_get_leaders (
3398         rd_kafka_t *rk,
3399         rd_kafka_topic_partition_list_t *rktparlist,
3400         rd_list_t *leaders,
3401         rd_list_t *query_topics,
3402         rd_bool_t query_unknown,
3403         rd_kafka_enq_once_t *eonce) {
3404         rd_bool_t complete;
3405         int cnt = 0;
3406         int i;
3407 
3408         if (eonce)
3409                 rd_kafka_wrlock(rk);
3410         else
3411                 rd_kafka_rdlock(rk);
3412 
3413         for (i = 0 ; i < rktparlist->cnt ; i++) {
3414                 rd_kafka_topic_partition_t *rktpar = &rktparlist->elems[i];
3415                 rd_kafka_topic_partition_t *rktpar2;
3416                 rd_kafka_broker_t *rkb = NULL;
3417                 struct rd_kafka_partition_leader leader_skel;
3418                 struct rd_kafka_partition_leader *leader;
3419                 const rd_kafka_metadata_topic_t *mtopic;
3420                 const rd_kafka_metadata_partition_t *mpart;
3421                 rd_bool_t topic_wait_cache;
3422 
3423                 rd_kafka_metadata_cache_topic_partition_get(
3424                         rk, &mtopic, &mpart,
3425                         rktpar->topic, rktpar->partition,
3426                         0/*negative entries too*/);
3427 
3428                 topic_wait_cache =
3429                         !mtopic ||
3430                         RD_KAFKA_METADATA_CACHE_ERR_IS_TEMPORARY(mtopic->err);
3431 
3432                 if (!topic_wait_cache &&
3433                     mtopic &&
3434                     mtopic->err != RD_KAFKA_RESP_ERR_NO_ERROR &&
3435                     mtopic->err != RD_KAFKA_RESP_ERR_LEADER_NOT_AVAILABLE) {
3436                         /* Topic permanently errored */
3437                         rktpar->err = mtopic->err;
3438                         continue;
3439                 }
3440 
3441                 if (mtopic && !mpart && mtopic->partition_cnt > 0) {
3442                         /* Topic exists but partition doesnt.
3443                          * This is a permanent error. */
3444                         rktpar->err = RD_KAFKA_RESP_ERR__UNKNOWN_PARTITION;
3445                         continue;
3446                 }
3447 
3448                 if (mpart &&
3449                     (mpart->leader == -1 ||
3450                      !(rkb = rd_kafka_broker_find_by_nodeid0(
3451                                rk, mpart->leader, -1/*any state*/,
3452                                rd_false)))) {
3453                         /* Partition has no (valid) leader.
3454                          * This is a permanent error. */
3455                         rktpar->err =
3456                                 mtopic->err ? mtopic->err :
3457                                 RD_KAFKA_RESP_ERR_LEADER_NOT_AVAILABLE;
3458                         continue;
3459                 }
3460 
3461                 if (topic_wait_cache || !rkb) {
3462                         /* Topic unknown or no current leader for partition,
3463                          * add topic to query list. */
3464                         rktpar->err = RD_KAFKA_RESP_ERR__IN_PROGRESS;
3465                         if (query_topics &&
3466                             !rd_list_find(query_topics, rktpar->topic,
3467                                           (void *)strcmp))
3468                                 rd_list_add(query_topics,
3469                                             rd_strdup(rktpar->topic));
3470                         continue;
3471                 }
3472 
3473                 /* Leader exists, add to leader list. */
3474 
3475                 rktpar->err = RD_KAFKA_RESP_ERR_NO_ERROR;
3476 
3477                 memset(&leader_skel, 0, sizeof(leader_skel));
3478                 leader_skel.rkb = rkb;
3479 
3480                 leader = rd_list_find(leaders, &leader_skel,
3481                                       rd_kafka_partition_leader_cmp);
3482 
3483                 if (!leader) {
3484                         leader = rd_kafka_partition_leader_new(rkb);
3485                         rd_list_add(leaders, leader);
3486                 }
3487 
3488                 rktpar2 = rd_kafka_topic_partition_list_find(leader->partitions,
3489                                                              rktpar->topic,
3490                                                              rktpar->partition);
3491                 if (rktpar2) {
3492                         /* Already exists in partitions list, just update. */
3493                         rd_kafka_topic_partition_update(rktpar2, rktpar);
3494                 } else {
3495                         /* Make a copy of rktpar and add to partitions list */
3496                         rd_kafka_topic_partition_list_add_copy(
3497                                 leader->partitions, rktpar);
3498                 }
3499 
3500                 rktpar->err = RD_KAFKA_RESP_ERR_NO_ERROR;
3501 
3502                 rd_kafka_broker_destroy(rkb);    /* loose refcount */
3503                 cnt++;
3504         }
3505 
3506         complete = cnt == rktparlist->cnt;
3507 
3508         if (!complete && eonce)
3509                 /* Add eonce to cache observers */
3510                 rd_kafka_metadata_cache_wait_state_change_async(rk, eonce);
3511 
3512         if (eonce)
3513                 rd_kafka_wrunlock(rk);
3514         else
3515                 rd_kafka_rdunlock(rk);
3516 
3517         return complete;
3518 }
3519 
3520 
3521 /**
3522  * @brief Timer timeout callback for query_leaders_async rko's eonce object.
3523  */
3524 static void
rd_kafka_partition_leader_query_eonce_timeout_cb(rd_kafka_timers_t * rkts,void * arg)3525 rd_kafka_partition_leader_query_eonce_timeout_cb (rd_kafka_timers_t *rkts,
3526                                                   void *arg) {
3527         rd_kafka_enq_once_t *eonce = arg;
3528         rd_kafka_enq_once_trigger(eonce, RD_KAFKA_RESP_ERR__TIMED_OUT,
3529                                   "timeout timer");
3530 }
3531 
3532 
3533 /**
3534  * @brief Query timer callback for query_leaders_async rko's eonce object.
3535  */
3536 static void
rd_kafka_partition_leader_query_eonce_timer_cb(rd_kafka_timers_t * rkts,void * arg)3537 rd_kafka_partition_leader_query_eonce_timer_cb (rd_kafka_timers_t *rkts,
3538                                                 void *arg) {
3539         rd_kafka_enq_once_t *eonce = arg;
3540         rd_kafka_enq_once_trigger(eonce, RD_KAFKA_RESP_ERR_NO_ERROR,
3541                                   "query timer");
3542 }
3543 
3544 
3545 /**
3546  * @brief Query metadata cache for partition leaders, or trigger metadata
3547  *        refresh if leaders not known.
3548  *
3549  * @locks_required none
3550  * @locality any
3551  */
3552 static rd_kafka_op_res_t
rd_kafka_topic_partition_list_query_leaders_async_worker(rd_kafka_op_t * rko)3553 rd_kafka_topic_partition_list_query_leaders_async_worker (rd_kafka_op_t *rko) {
3554         rd_kafka_t *rk = rko->rko_rk;
3555         rd_list_t query_topics, *leaders = NULL;
3556         rd_kafka_op_t *reply;
3557 
3558         RD_KAFKA_OP_TYPE_ASSERT(rko, RD_KAFKA_OP_LEADERS);
3559 
3560         if (rko->rko_err)
3561                 goto reply; /* Timeout or ERR__DESTROY */
3562 
3563         /* Since we're iterating over get_leaders() until all partition leaders
3564          * are known we need to re-enable the eonce to be triggered again (which
3565          * is not necessary the first time we get here, but there
3566          * is no harm doing it then either). */
3567         rd_kafka_enq_once_reenable(rko->rko_u.leaders.eonce,
3568                                    rko, RD_KAFKA_REPLYQ(rk->rk_ops, 0));
3569 
3570         /* Look up the leaders in the metadata cache, if not all leaders
3571          * are known the eonce is registered for metadata cache changes
3572          * which will cause our function to be called
3573          * again on (any) metadata cache change.
3574          *
3575          * When we are called again we perform the cache lookup again and
3576          * hopefully get all leaders, otherwise defer a new async wait.
3577          * Repeat until success or timeout. */
3578 
3579         rd_list_init(&query_topics, 4 + rko->rko_u.leaders.partitions->cnt/2,
3580                      rd_free);
3581 
3582         leaders = rd_list_new(1 + rko->rko_u.leaders.partitions->cnt / 2,
3583                               rd_kafka_partition_leader_destroy_free);
3584 
3585         if (rd_kafka_topic_partition_list_get_leaders(
3586                     rk, rko->rko_u.leaders.partitions,
3587                     leaders,
3588                     &query_topics,
3589                     /* Add unknown topics to query_topics only on the
3590                      * first query, after that we consider them permanently
3591                      * non-existent */
3592                     rko->rko_u.leaders.query_cnt == 0,
3593                     rko->rko_u.leaders.eonce)) {
3594                 /* All leaders now known (or failed), reply to caller */
3595                 rd_list_destroy(&query_topics);
3596                 goto reply;
3597         }
3598 
3599         if (rd_list_empty(&query_topics)) {
3600                 /* Not all leaders known but no topics left to query,
3601                  * reply to caller. */
3602                 rd_list_destroy(&query_topics);
3603                 goto reply;
3604         }
3605 
3606         /* Need to refresh topic metadata, but at most every interval. */
3607         if (!rd_kafka_timer_is_started(&rk->rk_timers,
3608                                        &rko->rko_u.leaders.query_tmr)) {
3609 
3610                 rko->rko_u.leaders.query_cnt++;
3611 
3612                 /* Add query interval timer. */
3613                 rd_kafka_enq_once_add_source(rko->rko_u.leaders.eonce,
3614                                              "query timer");
3615                 rd_kafka_timer_start_oneshot(
3616                         &rk->rk_timers,
3617                         &rko->rko_u.leaders.query_tmr,
3618                         rd_true,
3619                         3*1000*1000 /* 3s */,
3620                         rd_kafka_partition_leader_query_eonce_timer_cb,
3621                         rko->rko_u.leaders.eonce);
3622 
3623                 /* Request metadata refresh */
3624                 rd_kafka_metadata_refresh_topics(
3625                         rk, NULL, &query_topics,
3626                         rd_true/*force*/,
3627                         rd_false/*!allow_auto_create*/,
3628                         rd_false/*!cgrp_update*/,
3629                         "query partition leaders");
3630 
3631         }
3632 
3633         rd_list_destroy(leaders);
3634         rd_list_destroy(&query_topics);
3635 
3636         /* Wait for next eonce trigger */
3637         return RD_KAFKA_OP_RES_KEEP; /* rko is still used */
3638 
3639  reply:
3640         /* Decommission worker state and reply to caller */
3641 
3642         if (rd_kafka_timer_stop(&rk->rk_timers,
3643                                 &rko->rko_u.leaders.query_tmr,
3644                                 RD_DO_LOCK))
3645                 rd_kafka_enq_once_del_source(rko->rko_u.leaders.eonce,
3646                                              "query timer");
3647         if (rd_kafka_timer_stop(&rk->rk_timers,
3648                                 &rko->rko_u.leaders.timeout_tmr,
3649                                 RD_DO_LOCK))
3650                 rd_kafka_enq_once_del_source(rko->rko_u.leaders.eonce,
3651                                              "timeout timer");
3652 
3653         if (rko->rko_u.leaders.eonce) {
3654                 rd_kafka_enq_once_disable(rko->rko_u.leaders.eonce);
3655                 rko->rko_u.leaders.eonce = NULL;
3656         }
3657 
3658         /* No leaders found, set a request-level error */
3659         if (leaders && rd_list_cnt(leaders) == 0) {
3660                 if (!rko->rko_err)
3661                         rko->rko_err = RD_KAFKA_RESP_ERR__NOENT;
3662                 rd_list_destroy(leaders);
3663                 leaders = NULL;
3664         }
3665 
3666         /* Create and enqueue reply rko */
3667         if (rko->rko_u.leaders.replyq.q) {
3668                 reply = rd_kafka_op_new_cb(rk, RD_KAFKA_OP_LEADERS,
3669                                            rko->rko_u.leaders.cb);
3670                 rd_kafka_op_get_reply_version(reply, rko);
3671                 reply->rko_err = rko->rko_err;
3672                 reply->rko_u.leaders.partitions =
3673                         rko->rko_u.leaders.partitions; /* Transfer ownership for
3674                                                         * partition list that
3675                                                         * now contains
3676                                                         * per-partition errors*/
3677                 rko->rko_u.leaders.partitions = NULL;
3678                 reply->rko_u.leaders.leaders = leaders; /* Possibly NULL */
3679                 reply->rko_u.leaders.opaque = rko->rko_u.leaders.opaque;
3680 
3681                 rd_kafka_replyq_enq(&rko->rko_u.leaders.replyq, reply, 0);
3682         }
3683 
3684         return RD_KAFKA_OP_RES_HANDLED;
3685 }
3686 
3687 
3688 static rd_kafka_op_res_t
rd_kafka_topic_partition_list_query_leaders_async_worker_op_cb(rd_kafka_t * rk,rd_kafka_q_t * rkq,rd_kafka_op_t * rko)3689 rd_kafka_topic_partition_list_query_leaders_async_worker_op_cb (
3690         rd_kafka_t *rk,
3691         rd_kafka_q_t *rkq,
3692         rd_kafka_op_t *rko) {
3693         return rd_kafka_topic_partition_list_query_leaders_async_worker(rko);
3694 }
3695 
3696 /**
3697  * @brief Async variant of rd_kafka_topic_partition_list_query_leaders().
3698  *
3699  * The reply rko op will contain:
3700  * - .leaders which is a list of leaders and their partitions, this may be
3701  *    NULL for overall errors (such as no leaders are found), or a
3702  *    partial or complete list of leaders.
3703  * - .partitions which is a copy of the input list of partitions with the
3704  *   .err field set to the outcome of the leader query, typically ERR_NO_ERROR
3705  *   or ERR_UNKNOWN_TOPIC_OR_PART.
3706  *
3707  * @locks_acquired rd_kafka_*lock()
3708  *
3709  * @remark rd_kafka_*lock() MUST NOT be held
3710  */
3711 void
rd_kafka_topic_partition_list_query_leaders_async(rd_kafka_t * rk,const rd_kafka_topic_partition_list_t * rktparlist,int timeout_ms,rd_kafka_replyq_t replyq,rd_kafka_op_cb_t * cb,void * opaque)3712 rd_kafka_topic_partition_list_query_leaders_async (
3713         rd_kafka_t *rk,
3714         const rd_kafka_topic_partition_list_t *rktparlist,
3715         int timeout_ms,
3716         rd_kafka_replyq_t replyq,
3717         rd_kafka_op_cb_t *cb,
3718         void *opaque) {
3719         rd_kafka_op_t *rko;
3720 
3721         rd_assert(rktparlist && rktparlist->cnt > 0);
3722         rd_assert(replyq.q);
3723 
3724         rko = rd_kafka_op_new_cb(
3725                 rk,
3726                 RD_KAFKA_OP_LEADERS,
3727                 rd_kafka_topic_partition_list_query_leaders_async_worker_op_cb);
3728         rko->rko_u.leaders.replyq = replyq;
3729         rko->rko_u.leaders.partitions =
3730                 rd_kafka_topic_partition_list_copy(rktparlist);
3731         rko->rko_u.leaders.ts_timeout = rd_timeout_init(timeout_ms);
3732         rko->rko_u.leaders.cb = cb;
3733         rko->rko_u.leaders.opaque = opaque;
3734 
3735         /* Create an eonce to be triggered either by metadata cache update
3736          * (from refresh_topics()), query interval, or timeout. */
3737         rko->rko_u.leaders.eonce = rd_kafka_enq_once_new(
3738                 rko, RD_KAFKA_REPLYQ(rk->rk_ops, 0));
3739 
3740         rd_kafka_enq_once_add_source(rko->rko_u.leaders.eonce, "timeout timer");
3741         rd_kafka_timer_start_oneshot(
3742                 &rk->rk_timers,
3743                 &rko->rko_u.leaders.timeout_tmr,
3744                 rd_true,
3745                 rd_timeout_remains_us(rko->rko_u.leaders.ts_timeout),
3746                 rd_kafka_partition_leader_query_eonce_timeout_cb,
3747                 rko->rko_u.leaders.eonce);
3748 
3749         if (rd_kafka_topic_partition_list_query_leaders_async_worker(rko) ==
3750             RD_KAFKA_OP_RES_HANDLED)
3751                 rd_kafka_op_destroy(rko); /* Reply queue already disabled */
3752 }
3753 
3754 
3755 /**
3756  * @brief Get leaders for all partitions in \p rktparlist, querying metadata
3757  *        if needed.
3758  *
3759  * @param leaders is a pre-initialized (empty) list which will be populated
3760  *        with the leader brokers and their partitions
3761  *        (struct rd_kafka_partition_leader *)
3762  *
3763  * @remark Will not trigger topic auto creation (unless configured).
3764  *
3765  * @returns an error code on error.
3766  *
3767  * @locks rd_kafka_*lock() MUST NOT be held
3768  */
3769 rd_kafka_resp_err_t
rd_kafka_topic_partition_list_query_leaders(rd_kafka_t * rk,rd_kafka_topic_partition_list_t * rktparlist,rd_list_t * leaders,int timeout_ms)3770 rd_kafka_topic_partition_list_query_leaders (
3771         rd_kafka_t *rk,
3772         rd_kafka_topic_partition_list_t *rktparlist,
3773         rd_list_t *leaders, int timeout_ms) {
3774         rd_ts_t ts_end = rd_timeout_init(timeout_ms);
3775         rd_ts_t ts_query = 0;
3776         rd_ts_t now;
3777         int query_cnt = 0;
3778         int i = 0;
3779 
3780         /* Get all the partition leaders, try multiple times:
3781          * if there are no leaders after the first run fire off a leader
3782          * query and wait for broker state update before trying again,
3783          * keep trying and re-querying at increasing intervals until
3784          * success or timeout. */
3785         do {
3786                 rd_list_t query_topics;
3787                 int query_intvl;
3788 
3789                 rd_list_init(&query_topics, rktparlist->cnt, rd_free);
3790 
3791                 rd_kafka_topic_partition_list_get_leaders(
3792                         rk, rktparlist, leaders, &query_topics,
3793                         /* Add unknown topics to query_topics only on the
3794                          * first query, after that we consider them
3795                          * permanently non-existent */
3796                         query_cnt == 0,
3797                         NULL);
3798 
3799                 if (rd_list_empty(&query_topics)) {
3800                         /* No remaining topics to query: leader-list complete.*/
3801                         rd_list_destroy(&query_topics);
3802 
3803                         /* No leader(s) for partitions means all partitions
3804                          * are unknown. */
3805                         if (rd_list_empty(leaders))
3806                                 return RD_KAFKA_RESP_ERR__UNKNOWN_PARTITION;
3807 
3808                         return RD_KAFKA_RESP_ERR_NO_ERROR;
3809                 }
3810 
3811                 now = rd_clock();
3812 
3813                 /*
3814                  * Missing leader for some partitions
3815                  */
3816                 query_intvl = (i+1) * 100; /* add 100ms per iteration */
3817                 if (query_intvl > 2*1000)
3818                         query_intvl = 2*1000; /* Cap to 2s */
3819 
3820                 if (now >= ts_query + (query_intvl*1000)) {
3821                         /* Query metadata for missing leaders,
3822                          * possibly creating the topic. */
3823                         rd_kafka_metadata_refresh_topics(
3824                                 rk, NULL, &query_topics,
3825                                 rd_true/*force*/,
3826                                 rd_false/*!allow_auto_create*/,
3827                                 rd_false/*!cgrp_update*/,
3828                                 "query partition leaders");
3829                         ts_query = now;
3830                         query_cnt++;
3831 
3832                 } else {
3833                         /* Wait for broker ids to be updated from
3834                          * metadata refresh above. */
3835                         int wait_ms = rd_timeout_remains_limit(ts_end,
3836                                                                query_intvl);
3837                         rd_kafka_metadata_cache_wait_change(rk, wait_ms);
3838                 }
3839 
3840                 rd_list_destroy(&query_topics);
3841 
3842                 i++;
3843         } while (ts_end == RD_POLL_INFINITE ||
3844                  now < ts_end); /* now is deliberately outdated here
3845                                  * since wait_change() will block.
3846                                  * This gives us one more chance to spin thru*/
3847 
3848         if (rd_atomic32_get(&rk->rk_broker_up_cnt) == 0)
3849                 return RD_KAFKA_RESP_ERR__ALL_BROKERS_DOWN;
3850 
3851         return RD_KAFKA_RESP_ERR__TIMED_OUT;
3852 }
3853 
3854 
3855 /**
3856  * @brief Populate \p rkts with the rd_kafka_topic_t objects for the
3857  *        partitions in. Duplicates are suppressed.
3858  *
3859  * @returns the number of topics added.
3860  */
3861 int
rd_kafka_topic_partition_list_get_topics(rd_kafka_t * rk,rd_kafka_topic_partition_list_t * rktparlist,rd_list_t * rkts)3862 rd_kafka_topic_partition_list_get_topics (
3863         rd_kafka_t *rk,
3864         rd_kafka_topic_partition_list_t *rktparlist,
3865         rd_list_t *rkts) {
3866         int cnt = 0;
3867 
3868         int i;
3869         for (i = 0 ; i < rktparlist->cnt ; i++) {
3870                 rd_kafka_topic_partition_t *rktpar = &rktparlist->elems[i];
3871                 rd_kafka_toppar_t *rktp;
3872 
3873                 rktp = rd_kafka_topic_partition_get_toppar(rk, rktpar,
3874                                                            rd_false);
3875                 if (!rktp) {
3876                         rktpar->err = RD_KAFKA_RESP_ERR__UNKNOWN_PARTITION;
3877                         continue;
3878                 }
3879 
3880                 if (!rd_list_find(rkts, rktp->rktp_rkt,
3881                                   rd_kafka_topic_cmp_rkt)) {
3882                         rd_list_add(rkts, rd_kafka_topic_keep(rktp->rktp_rkt));
3883                         cnt++;
3884                 }
3885 
3886                 rd_kafka_toppar_destroy(rktp);
3887         }
3888 
3889         return cnt;
3890 }
3891 
3892 
3893 /**
3894  * @brief Populate \p topics with the strdupped topic names in \p rktparlist.
3895  *        Duplicates are suppressed.
3896  *
3897  * @param include_regex: include regex topics
3898  *
3899  * @returns the number of topics added.
3900  */
3901 int
rd_kafka_topic_partition_list_get_topic_names(const rd_kafka_topic_partition_list_t * rktparlist,rd_list_t * topics,int include_regex)3902 rd_kafka_topic_partition_list_get_topic_names (
3903         const rd_kafka_topic_partition_list_t *rktparlist,
3904         rd_list_t *topics, int include_regex) {
3905         int cnt = 0;
3906         int i;
3907 
3908         for (i = 0 ; i < rktparlist->cnt ; i++) {
3909                 const rd_kafka_topic_partition_t *rktpar = &rktparlist->elems[i];
3910 
3911                 if (!include_regex && *rktpar->topic == '^')
3912                         continue;
3913 
3914                 if (!rd_list_find(topics, rktpar->topic, (void *)strcmp)) {
3915                         rd_list_add(topics, rd_strdup(rktpar->topic));
3916                         cnt++;
3917                 }
3918         }
3919 
3920         return cnt;
3921 }
3922 
3923 
3924 /**
3925  * @brief Create a copy of \p rktparlist only containing the partitions
3926  *        matched by \p match function.
3927  *
3928  * \p match shall return 1 for match, else 0.
3929  *
3930  * @returns a new list
3931  */
rd_kafka_topic_partition_list_match(const rd_kafka_topic_partition_list_t * rktparlist,int (* match)(const void * elem,const void * opaque),void * opaque)3932 rd_kafka_topic_partition_list_t *rd_kafka_topic_partition_list_match (
3933         const rd_kafka_topic_partition_list_t *rktparlist,
3934         int (*match) (const void *elem, const void *opaque),
3935         void *opaque) {
3936         rd_kafka_topic_partition_list_t *newlist;
3937         int i;
3938 
3939         newlist = rd_kafka_topic_partition_list_new(0);
3940 
3941         for (i = 0 ; i < rktparlist->cnt ; i++) {
3942                 const rd_kafka_topic_partition_t *rktpar =
3943                         &rktparlist->elems[i];
3944 
3945                 if (!match(rktpar, opaque))
3946                         continue;
3947 
3948                 rd_kafka_topic_partition_list_add_copy(newlist, rktpar);
3949         }
3950 
3951         return newlist;
3952 }
3953 
3954 void
rd_kafka_topic_partition_list_log(rd_kafka_t * rk,const char * fac,int dbg,const rd_kafka_topic_partition_list_t * rktparlist)3955 rd_kafka_topic_partition_list_log (rd_kafka_t *rk, const char *fac, int dbg,
3956 				   const rd_kafka_topic_partition_list_t *rktparlist) {
3957         int i;
3958 
3959 	rd_kafka_dbg(rk, NONE|dbg, fac, "List with %d partition(s):",
3960 		     rktparlist->cnt);
3961         for (i = 0 ; i < rktparlist->cnt ; i++) {
3962 		const rd_kafka_topic_partition_t *rktpar =
3963 			&rktparlist->elems[i];
3964 		rd_kafka_dbg(rk, NONE|dbg, fac, " %s [%"PRId32"] offset %s%s%s",
3965 			     rktpar->topic, rktpar->partition,
3966 			     rd_kafka_offset2str(rktpar->offset),
3967 			     rktpar->err ? ": error: " : "",
3968 			     rktpar->err ? rd_kafka_err2str(rktpar->err) : "");
3969 	}
3970 }
3971 
3972 /**
3973  * @returns a comma-separated list of partitions.
3974  */
3975 const char *
rd_kafka_topic_partition_list_str(const rd_kafka_topic_partition_list_t * rktparlist,char * dest,size_t dest_size,int fmt_flags)3976 rd_kafka_topic_partition_list_str (const rd_kafka_topic_partition_list_t *rktparlist,
3977                                    char *dest, size_t dest_size,
3978                                    int fmt_flags) {
3979         int i;
3980         size_t of = 0;
3981 
3982         for (i = 0 ; i < rktparlist->cnt ; i++) {
3983                 const rd_kafka_topic_partition_t *rktpar =
3984                         &rktparlist->elems[i];
3985                 char errstr[128];
3986                 char offsetstr[32];
3987                 int r;
3988 
3989                 if (!rktpar->err && (fmt_flags & RD_KAFKA_FMT_F_ONLY_ERR))
3990                         continue;
3991 
3992                 if (rktpar->err && !(fmt_flags & RD_KAFKA_FMT_F_NO_ERR))
3993                         rd_snprintf(errstr, sizeof(errstr),
3994                                     "(%s)", rd_kafka_err2str(rktpar->err));
3995                 else
3996                         errstr[0] = '\0';
3997 
3998                 if (rktpar->offset != RD_KAFKA_OFFSET_INVALID)
3999                         rd_snprintf(offsetstr, sizeof(offsetstr),
4000                                     "@%"PRId64, rktpar->offset);
4001                 else
4002                         offsetstr[0] = '\0';
4003 
4004                 r = rd_snprintf(&dest[of], dest_size-of,
4005                                 "%s"
4006                                 "%s[%"PRId32"]"
4007                                 "%s"
4008                                 "%s",
4009                                 of == 0 ? "" : ", ",
4010                                 rktpar->topic, rktpar->partition,
4011                                 offsetstr,
4012                                 errstr);
4013 
4014                 if ((size_t)r >= dest_size-of) {
4015                         rd_snprintf(&dest[dest_size-4], 4, "...");
4016                         break;
4017                 }
4018 
4019                 of += r;
4020         }
4021 
4022         return dest;
4023 }
4024 
4025 
4026 
4027 /**
4028  * @brief Update \p dst with info from \p src.
4029  *
4030  * Fields updated:
4031  *  - metadata
4032  *  - metadata_size
4033  *  - offset
4034  *  - err
4035  *
4036  * Will only update partitions that are in both dst and src, other partitions will
4037  * remain unchanged.
4038  */
4039 void
rd_kafka_topic_partition_list_update(rd_kafka_topic_partition_list_t * dst,const rd_kafka_topic_partition_list_t * src)4040 rd_kafka_topic_partition_list_update (rd_kafka_topic_partition_list_t *dst,
4041                                       const rd_kafka_topic_partition_list_t *src){
4042         int i;
4043 
4044         for (i = 0 ; i < dst->cnt ; i++) {
4045                 rd_kafka_topic_partition_t *d = &dst->elems[i];
4046                 rd_kafka_topic_partition_t *s;
4047 
4048                 if (!(s = rd_kafka_topic_partition_list_find(
4049                               (rd_kafka_topic_partition_list_t *)src,
4050                               d->topic, d->partition)))
4051                         continue;
4052 
4053                 d->offset = s->offset;
4054                 d->err    = s->err;
4055                 if (d->metadata) {
4056                         rd_free(d->metadata);
4057                         d->metadata = NULL;
4058                         d->metadata_size = 0;
4059                 }
4060                 if (s->metadata_size > 0) {
4061                         d->metadata =
4062                                 rd_malloc(s->metadata_size);
4063                         d->metadata_size = s->metadata_size;
4064                         memcpy((void *)d->metadata, s->metadata,
4065                                 s->metadata_size);
4066                 }
4067         }
4068 }
4069 
4070 
4071 /**
4072  * @returns the sum of \p cb called for each element.
4073  */
4074 size_t
rd_kafka_topic_partition_list_sum(const rd_kafka_topic_partition_list_t * rktparlist,size_t (* cb)(const rd_kafka_topic_partition_t * rktpar,void * opaque),void * opaque)4075 rd_kafka_topic_partition_list_sum (
4076         const rd_kafka_topic_partition_list_t *rktparlist,
4077         size_t (*cb) (const rd_kafka_topic_partition_t *rktpar, void *opaque),
4078         void *opaque) {
4079         int i;
4080         size_t sum = 0;
4081 
4082         for (i = 0 ; i < rktparlist->cnt ; i++) {
4083                 const rd_kafka_topic_partition_t *rktpar =
4084                         &rktparlist->elems[i];
4085                 sum += cb(rktpar, opaque);
4086         }
4087 
4088         return sum;
4089 }
4090 
4091 
4092 /**
4093  * @returns rd_true if there are duplicate topic/partitions in the list,
4094  *          rd_false if not.
4095  *
4096  * @remarks sorts the elements of the list.
4097  */
4098 rd_bool_t
rd_kafka_topic_partition_list_has_duplicates(rd_kafka_topic_partition_list_t * rktparlist,rd_bool_t ignore_partition)4099 rd_kafka_topic_partition_list_has_duplicates (
4100                 rd_kafka_topic_partition_list_t *rktparlist,
4101                 rd_bool_t ignore_partition) {
4102 
4103         int i;
4104 
4105         if (rktparlist->cnt <= 1)
4106                 return rd_false;
4107 
4108         rd_kafka_topic_partition_list_sort_by_topic(rktparlist);
4109 
4110         for (i=1; i<rktparlist->cnt; i++) {
4111                 const rd_kafka_topic_partition_t *p1 = &rktparlist->elems[i-1];
4112                 const rd_kafka_topic_partition_t *p2 = &rktparlist->elems[i];
4113 
4114                 if (((p1->partition == p2->partition) || ignore_partition) &&
4115                     !strcmp(p1->topic, p2->topic)) {
4116                         return rd_true;
4117                 }
4118         }
4119 
4120         return rd_false;
4121 }
4122 
4123 
4124 /**
4125  * @brief Set \c .err field \p err on all partitions in list.
4126  */
rd_kafka_topic_partition_list_set_err(rd_kafka_topic_partition_list_t * rktparlist,rd_kafka_resp_err_t err)4127 void rd_kafka_topic_partition_list_set_err (
4128         rd_kafka_topic_partition_list_t *rktparlist,
4129         rd_kafka_resp_err_t err) {
4130         int i;
4131 
4132         for (i = 0 ; i < rktparlist->cnt ; i++)
4133                 rktparlist->elems[i].err = err;
4134 }
4135 
4136 /**
4137  * @brief Get the first set error in the partition list.
4138  */
rd_kafka_topic_partition_list_get_err(const rd_kafka_topic_partition_list_t * rktparlist)4139 rd_kafka_resp_err_t rd_kafka_topic_partition_list_get_err (
4140         const rd_kafka_topic_partition_list_t *rktparlist) {
4141         int i;
4142 
4143         for (i = 0 ; i < rktparlist->cnt ; i++)
4144                 if (rktparlist->elems[i].err)
4145                         return rktparlist->elems[i].err;
4146 
4147         return RD_KAFKA_RESP_ERR_NO_ERROR;
4148 }
4149 
4150 
4151 /**
4152  * @returns the number of wildcard/regex topics
4153  */
rd_kafka_topic_partition_list_regex_cnt(const rd_kafka_topic_partition_list_t * rktparlist)4154 int rd_kafka_topic_partition_list_regex_cnt (
4155         const rd_kafka_topic_partition_list_t *rktparlist) {
4156         int i;
4157         int cnt = 0;
4158 
4159         for (i = 0 ; i < rktparlist->cnt ; i++) {
4160                 const rd_kafka_topic_partition_t *rktpar =
4161                         &rktparlist->elems[i];
4162                 cnt += *rktpar->topic == '^';
4163         }
4164         return cnt;
4165 }
4166 
4167 
4168 /**
4169  * @brief Reset base sequence for this toppar.
4170  *
4171  * See rd_kafka_toppar_pid_change() below.
4172  *
4173  * @warning Toppar must be completely drained.
4174  *
4175  * @locality toppar handler thread
4176  * @locks toppar_lock MUST be held.
4177  */
rd_kafka_toppar_reset_base_msgid(rd_kafka_toppar_t * rktp,uint64_t new_base_msgid)4178 static void rd_kafka_toppar_reset_base_msgid (rd_kafka_toppar_t *rktp,
4179                                               uint64_t new_base_msgid) {
4180         rd_kafka_dbg(rktp->rktp_rkt->rkt_rk,
4181                      TOPIC|RD_KAFKA_DBG_EOS, "RESETSEQ",
4182                      "%.*s [%"PRId32"] "
4183                      "resetting epoch base seq from %"PRIu64" to %"PRIu64,
4184                      RD_KAFKAP_STR_PR(rktp->rktp_rkt->rkt_topic),
4185                      rktp->rktp_partition,
4186                      rktp->rktp_eos.epoch_base_msgid, new_base_msgid);
4187 
4188         rktp->rktp_eos.next_ack_seq = 0;
4189         rktp->rktp_eos.next_err_seq = 0;
4190         rktp->rktp_eos.epoch_base_msgid = new_base_msgid;
4191 }
4192 
4193 
4194 /**
4195  * @brief Update/change the Producer ID for this toppar.
4196  *
4197  * Must only be called when pid is different from the current toppar pid.
4198  *
4199  * The epoch base sequence will be set to \p base_msgid, which must be the
4200  * first message in the partition
4201  * queue. However, if there are outstanding messages in-flight to the broker
4202  * we will need to wait for these ProduceRequests to finish (most likely
4203  * with failure) and have their messages re-enqueued to maintain original order.
4204  * In this case the pid will not be updated and this function should be
4205  * called again when there are no outstanding messages.
4206  *
4207  * @remark This function must only be called when rktp_xmitq is non-empty.
4208  *
4209  * @returns 1 if a new pid was set, else 0.
4210  *
4211  * @locality toppar handler thread
4212  * @locks none
4213  */
rd_kafka_toppar_pid_change(rd_kafka_toppar_t * rktp,rd_kafka_pid_t pid,uint64_t base_msgid)4214 int rd_kafka_toppar_pid_change (rd_kafka_toppar_t *rktp, rd_kafka_pid_t pid,
4215                                 uint64_t base_msgid) {
4216         int inflight = rd_atomic32_get(&rktp->rktp_msgs_inflight);
4217 
4218         if (unlikely(inflight > 0)) {
4219                 rd_kafka_dbg(rktp->rktp_rkt->rkt_rk,
4220                              TOPIC|RD_KAFKA_DBG_EOS, "NEWPID",
4221                              "%.*s [%"PRId32"] will not change %s -> %s yet: "
4222                              "%d message(s) still in-flight from current "
4223                              "epoch",
4224                              RD_KAFKAP_STR_PR(rktp->rktp_rkt->rkt_topic),
4225                              rktp->rktp_partition,
4226                              rd_kafka_pid2str(rktp->rktp_eos.pid),
4227                              rd_kafka_pid2str(pid),
4228                              inflight);
4229                 return 0;
4230         }
4231 
4232         rd_assert(base_msgid != 0 &&
4233                   *"BUG: pid_change() must only be called with "
4234                   "non-empty xmitq");
4235 
4236         rd_kafka_toppar_lock(rktp);
4237         rd_kafka_dbg(rktp->rktp_rkt->rkt_rk,
4238                      TOPIC|RD_KAFKA_DBG_EOS, "NEWPID",
4239                      "%.*s [%"PRId32"] changed %s -> %s "
4240                      "with base MsgId %"PRIu64,
4241                      RD_KAFKAP_STR_PR(rktp->rktp_rkt->rkt_topic),
4242                      rktp->rktp_partition,
4243                      rd_kafka_pid2str(rktp->rktp_eos.pid),
4244                      rd_kafka_pid2str(pid),
4245                      base_msgid);
4246 
4247         rktp->rktp_eos.pid = pid;
4248         rd_kafka_toppar_reset_base_msgid(rktp, base_msgid);
4249 
4250         rd_kafka_toppar_unlock(rktp);
4251 
4252         return 1;
4253 }
4254 
4255 
4256 /**
4257  * @brief Purge messages in partition queues.
4258  *        Delivery reports will be enqueued for all purged messages, the error
4259  *        code is set to RD_KAFKA_RESP_ERR__PURGE_QUEUE.
4260  *
4261  * @param include_xmit_msgq If executing from the rktp's current broker handler
4262  *                          thread, also include the xmit message queue.
4263  *
4264  * @warning Only to be used with the producer.
4265  *
4266  * @returns the number of messages purged
4267  *
4268  * @locality any thread.
4269  * @locks_acquired rd_kafka_toppar_lock()
4270  * @locks_required none
4271  */
rd_kafka_toppar_purge_queues(rd_kafka_toppar_t * rktp,int purge_flags,rd_bool_t include_xmit_msgq)4272 int rd_kafka_toppar_purge_queues (rd_kafka_toppar_t *rktp,
4273                                   int purge_flags,
4274                                   rd_bool_t include_xmit_msgq) {
4275         rd_kafka_t *rk = rktp->rktp_rkt->rkt_rk;
4276         rd_kafka_msgq_t rkmq = RD_KAFKA_MSGQ_INITIALIZER(rkmq);
4277         int cnt;
4278 
4279         rd_assert(rk->rk_type == RD_KAFKA_PRODUCER);
4280 
4281         rd_kafka_dbg(rk, TOPIC, "PURGE",
4282                      "%s [%"PRId32"]: purging queues "
4283                      "(purge_flags 0x%x, %s xmit_msgq)",
4284                      rktp->rktp_rkt->rkt_topic->str,
4285                      rktp->rktp_partition,
4286                      purge_flags,
4287                      include_xmit_msgq ? "include" : "exclude");
4288 
4289         if (!(purge_flags & RD_KAFKA_PURGE_F_QUEUE))
4290                 return 0;
4291 
4292         if (include_xmit_msgq) {
4293                 /* xmit_msgq is owned by the toppar handler thread
4294                  * (broker thread) and requires no locking. */
4295                 rd_assert(rktp->rktp_broker);
4296                 rd_assert(thrd_is_current(rktp->rktp_broker->rkb_thread));
4297                 rd_kafka_msgq_concat(&rkmq, &rktp->rktp_xmit_msgq);
4298         }
4299 
4300         rd_kafka_toppar_lock(rktp);
4301         rd_kafka_msgq_concat(&rkmq, &rktp->rktp_msgq);
4302         cnt = rd_kafka_msgq_len(&rkmq);
4303 
4304         if (cnt > 0 && purge_flags & RD_KAFKA_PURGE_F_ABORT_TXN) {
4305                 /* All messages in-queue are purged
4306                  * on abort_transaction(). Since these messages
4307                  * will not be produced (retried) we need to adjust the
4308                  * idempotence epoch's base msgid to skip the messages. */
4309                 rktp->rktp_eos.epoch_base_msgid += cnt;
4310                 rd_kafka_dbg(rk,
4311                              TOPIC|RD_KAFKA_DBG_EOS, "ADVBASE",
4312                              "%.*s [%"PRId32"] "
4313                              "advancing epoch base msgid to %"PRIu64
4314                              " due to %d message(s) in aborted transaction",
4315                              RD_KAFKAP_STR_PR(rktp->rktp_rkt->rkt_topic),
4316                              rktp->rktp_partition,
4317                              rktp->rktp_eos.epoch_base_msgid, cnt);
4318         }
4319         rd_kafka_toppar_unlock(rktp);
4320 
4321         rd_kafka_dr_msgq(rktp->rktp_rkt, &rkmq, RD_KAFKA_RESP_ERR__PURGE_QUEUE);
4322 
4323         return cnt;
4324 }
4325 
4326 
4327 /**
4328  * @brief Purge queues for the unassigned toppars of all known topics.
4329  *
4330  * @locality application thread
4331  * @locks none
4332  */
rd_kafka_purge_ua_toppar_queues(rd_kafka_t * rk)4333 void rd_kafka_purge_ua_toppar_queues (rd_kafka_t *rk) {
4334         rd_kafka_topic_t *rkt;
4335         int msg_cnt = 0, part_cnt = 0;
4336 
4337         rd_kafka_rdlock(rk);
4338         TAILQ_FOREACH(rkt, &rk->rk_topics, rkt_link) {
4339                 rd_kafka_toppar_t *rktp;
4340                 int r;
4341 
4342                 rd_kafka_topic_rdlock(rkt);
4343                 rktp = rkt->rkt_ua;
4344                 if (rktp)
4345                         rd_kafka_toppar_keep(rktp);
4346                 rd_kafka_topic_rdunlock(rkt);
4347 
4348                 if (unlikely(!rktp))
4349                         continue;
4350 
4351 
4352                 rd_kafka_toppar_lock(rktp);
4353 
4354                 r = rd_kafka_msgq_len(&rktp->rktp_msgq);
4355                 rd_kafka_dr_msgq(rkt, &rktp->rktp_msgq,
4356                                  RD_KAFKA_RESP_ERR__PURGE_QUEUE);
4357                 rd_kafka_toppar_unlock(rktp);
4358                 rd_kafka_toppar_destroy(rktp);
4359 
4360                 if (r > 0) {
4361                         msg_cnt += r;
4362                         part_cnt++;
4363                 }
4364         }
4365         rd_kafka_rdunlock(rk);
4366 
4367         rd_kafka_dbg(rk, QUEUE|RD_KAFKA_DBG_TOPIC, "PURGEQ",
4368                      "Purged %i message(s) from %d UA-partition(s)",
4369                      msg_cnt, part_cnt);
4370 }
4371 
4372 
rd_kafka_partition_leader_destroy_free(void * ptr)4373 void rd_kafka_partition_leader_destroy_free (void *ptr) {
4374         struct rd_kafka_partition_leader *leader = ptr;
4375         rd_kafka_partition_leader_destroy(leader);
4376 }
4377