1 /*
2  * librdkafka - The Apache Kafka C/C++ library
3  *
4  * Copyright (c) 2015 Magnus Edenhill
5  * All rights reserved.
6  *
7  * Redistribution and use in source and binary forms, with or without
8  * modification, are permitted provided that the following conditions are met:
9  *
10  * 1. Redistributions of source code must retain the above copyright notice,
11  *    this list of conditions and the following disclaimer.
12  * 2. Redistributions in binary form must reproduce the above copyright notice,
13  *    this list of conditions and the following disclaimer in the documentation
14  *    and/or other materials provided with the distribution.
15  *
16  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
17  * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
18  * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
19  * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
20  * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
21  * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
22  * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
23  * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
24  * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
25  * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
26  * POSSIBILITY OF SUCH DAMAGE.
27  */
28 #include "rdkafka_int.h"
29 #include "rdkafka_topic.h"
30 #include "rdkafka_broker.h"
31 #include "rdkafka_request.h"
32 #include "rdkafka_offset.h"
33 #include "rdkafka_partition.h"
34 #include "rdregex.h"
35 #include "rdports.h"  /* rd_qsort_r() */
36 
37 #include "rdunittest.h"
38 
39 const char *rd_kafka_fetch_states[] = {
40 	"none",
41         "stopping",
42         "stopped",
43 	"offset-query",
44 	"offset-wait",
45 	"active"
46 };
47 
48 
49 static rd_kafka_op_res_t
50 rd_kafka_toppar_op_serve (rd_kafka_t *rk,
51                           rd_kafka_q_t *rkq, rd_kafka_op_t *rko,
52                           rd_kafka_q_cb_type_t cb_type, void *opaque);
53 
54 static void rd_kafka_toppar_offset_retry (rd_kafka_toppar_t *rktp,
55                                           int backoff_ms,
56                                           const char *reason);
57 
58 
59 static RD_INLINE int32_t
rd_kafka_toppar_version_new_barrier0(rd_kafka_toppar_t * rktp,const char * func,int line)60 rd_kafka_toppar_version_new_barrier0 (rd_kafka_toppar_t *rktp,
61 				     const char *func, int line) {
62 	int32_t version = rd_atomic32_add(&rktp->rktp_version, 1);
63 	rd_kafka_dbg(rktp->rktp_rkt->rkt_rk, TOPIC, "BARRIER",
64 		     "%s [%"PRId32"]: %s:%d: new version barrier v%"PRId32,
65 		     rktp->rktp_rkt->rkt_topic->str, rktp->rktp_partition,
66 		     func, line, version);
67 	return version;
68 }
69 
70 #define rd_kafka_toppar_version_new_barrier(rktp) \
71 	rd_kafka_toppar_version_new_barrier0(rktp, __FUNCTION__, __LINE__)
72 
73 
74 /**
75  * Toppar based OffsetResponse handling.
76  * This is used for updating the low water mark for consumer lag.
77  */
rd_kafka_toppar_lag_handle_Offset(rd_kafka_t * rk,rd_kafka_broker_t * rkb,rd_kafka_resp_err_t err,rd_kafka_buf_t * rkbuf,rd_kafka_buf_t * request,void * opaque)78 static void rd_kafka_toppar_lag_handle_Offset (rd_kafka_t *rk,
79 					       rd_kafka_broker_t *rkb,
80 					       rd_kafka_resp_err_t err,
81 					       rd_kafka_buf_t *rkbuf,
82 					       rd_kafka_buf_t *request,
83 					       void *opaque) {
84         rd_kafka_toppar_t *rktp = opaque;
85         rd_kafka_topic_partition_list_t *offsets;
86         rd_kafka_topic_partition_t *rktpar;
87 
88         offsets = rd_kafka_topic_partition_list_new(1);
89 
90         /* Parse and return Offset */
91         err = rd_kafka_handle_ListOffsets(rk, rkb, err,
92                                           rkbuf, request, offsets, NULL);
93 
94         if (err == RD_KAFKA_RESP_ERR__IN_PROGRESS) {
95                 rd_kafka_topic_partition_list_destroy(offsets);
96                 return; /* Retrying */
97         }
98 
99         if (!err && !(rktpar = rd_kafka_topic_partition_list_find(
100                               offsets,
101                               rktp->rktp_rkt->rkt_topic->str,
102                               rktp->rktp_partition)))
103                 err = RD_KAFKA_RESP_ERR__UNKNOWN_PARTITION;
104 
105         if (!err && !rktpar->err) {
106                 rd_kafka_toppar_lock(rktp);
107                 rktp->rktp_lo_offset = rktpar->offset;
108                 rd_kafka_toppar_unlock(rktp);
109         }
110 
111         rd_kafka_topic_partition_list_destroy(offsets);
112 
113         rktp->rktp_wait_consumer_lag_resp = 0;
114 
115         rd_kafka_toppar_destroy(rktp); /* from request.opaque */
116 }
117 
118 
119 
120 /**
121  * Request information from broker to keep track of consumer lag.
122  *
123  * @locality toppar handle thread
124  * @locks none
125  */
rd_kafka_toppar_consumer_lag_req(rd_kafka_toppar_t * rktp)126 static void rd_kafka_toppar_consumer_lag_req (rd_kafka_toppar_t *rktp) {
127         rd_kafka_topic_partition_list_t *partitions;
128 
129         if (rktp->rktp_wait_consumer_lag_resp)
130                 return; /* Previous request not finished yet */
131 
132         rd_kafka_toppar_lock(rktp);
133 
134         /* Offset requests can only be sent to the leader replica.
135          *
136          * Note: If rktp is delegated to a preferred replica, it is
137          * certain that FETCH >= v5 and so rktp_lo_offset will be
138          * updated via LogStartOffset in the FETCH response.
139          */
140         if (!rktp->rktp_leader || (rktp->rktp_leader != rktp->rktp_broker)) {
141                 rd_kafka_toppar_unlock(rktp);
142 		return;
143         }
144 
145         /* Also don't send a timed log start offset request if leader
146          * broker supports FETCH >= v5, since this will be set when
147          * doing fetch requests.
148          */
149         if (rd_kafka_broker_ApiVersion_supported(rktp->rktp_broker,
150                                                  RD_KAFKAP_Fetch, 0,
151                                                  5, NULL) == 5) {
152                 rd_kafka_toppar_unlock(rktp);
153                 return;
154         }
155 
156         rktp->rktp_wait_consumer_lag_resp = 1;
157 
158         partitions = rd_kafka_topic_partition_list_new(1);
159         rd_kafka_topic_partition_list_add(partitions,
160                                           rktp->rktp_rkt->rkt_topic->str,
161                                           rktp->rktp_partition)->offset =
162                                           RD_KAFKA_OFFSET_BEGINNING;
163 
164         /* Ask for oldest offset. The newest offset is automatically
165          * propagated in FetchResponse.HighwaterMark. */
166         rd_kafka_ListOffsetsRequest(rktp->rktp_broker, partitions,
167                                     RD_KAFKA_REPLYQ(rktp->rktp_ops, 0),
168                                     rd_kafka_toppar_lag_handle_Offset,
169                                     rd_kafka_toppar_keep(rktp));
170 
171         rd_kafka_toppar_unlock(rktp);
172 
173         rd_kafka_topic_partition_list_destroy(partitions);
174 }
175 
176 
177 
178 /**
179  * Request earliest offset for a partition
180  *
181  * Locality: toppar handler thread
182  */
rd_kafka_toppar_consumer_lag_tmr_cb(rd_kafka_timers_t * rkts,void * arg)183 static void rd_kafka_toppar_consumer_lag_tmr_cb (rd_kafka_timers_t *rkts,
184 						 void *arg) {
185 	rd_kafka_toppar_t *rktp = arg;
186 	rd_kafka_toppar_consumer_lag_req(rktp);
187 }
188 
189 /**
190  * @brief Update rktp_op_version.
191  *        Enqueue an RD_KAFKA_OP_BARRIER type of operation
192  *        when the op_version is updated.
193  *
194  * @locks_required rd_kafka_toppar_lock() must be held.
195  * @locality Toppar handler thread
196  */
rd_kafka_toppar_op_version_bump(rd_kafka_toppar_t * rktp,int32_t version)197 void rd_kafka_toppar_op_version_bump (rd_kafka_toppar_t *rktp,
198                                       int32_t version) {
199         rd_kafka_op_t *rko;
200 
201         rktp->rktp_op_version = version;
202         rko = rd_kafka_op_new(RD_KAFKA_OP_BARRIER);
203         rko->rko_version = version;
204         rd_kafka_q_enq(rktp->rktp_fetchq, rko);
205 }
206 
207 
208 /**
209  * Add new partition to topic.
210  *
211  * Locks: rd_kafka_topic_wrlock() must be held.
212  * Locks: rd_kafka_wrlock() must be held.
213  */
rd_kafka_toppar_new0(rd_kafka_topic_t * rkt,int32_t partition,const char * func,int line)214 rd_kafka_toppar_t *rd_kafka_toppar_new0 (rd_kafka_topic_t *rkt,
215                                          int32_t partition,
216                                          const char *func, int line) {
217 	rd_kafka_toppar_t *rktp;
218 
219 	rktp = rd_calloc(1, sizeof(*rktp));
220 
221 	rktp->rktp_partition = partition;
222 	rktp->rktp_rkt = rkt;
223         rktp->rktp_leader_id = -1;
224         rktp->rktp_broker_id = -1;
225         rd_interval_init(&rktp->rktp_lease_intvl);
226         rd_interval_init(&rktp->rktp_new_lease_intvl);
227         rd_interval_init(&rktp->rktp_new_lease_log_intvl);
228         rd_interval_init(&rktp->rktp_metadata_intvl);
229         /* Mark partition as unknown (does not exist) until we see the
230          * partition in topic metadata. */
231         if (partition != RD_KAFKA_PARTITION_UA)
232                 rktp->rktp_flags |= RD_KAFKA_TOPPAR_F_UNKNOWN;
233 	rktp->rktp_fetch_state = RD_KAFKA_TOPPAR_FETCH_NONE;
234         rktp->rktp_fetch_msg_max_bytes
235             = rkt->rkt_rk->rk_conf.fetch_msg_max_bytes;
236 	rktp->rktp_offset_fp = NULL;
237         rd_kafka_offset_stats_reset(&rktp->rktp_offsets);
238         rd_kafka_offset_stats_reset(&rktp->rktp_offsets_fin);
239         rktp->rktp_ls_offset = RD_KAFKA_OFFSET_INVALID;
240         rktp->rktp_hi_offset = RD_KAFKA_OFFSET_INVALID;
241 	rktp->rktp_lo_offset = RD_KAFKA_OFFSET_INVALID;
242         rktp->rktp_query_offset = RD_KAFKA_OFFSET_INVALID;
243         rktp->rktp_next_offset = RD_KAFKA_OFFSET_INVALID;
244         rktp->rktp_last_next_offset = RD_KAFKA_OFFSET_INVALID;
245 	rktp->rktp_app_offset = RD_KAFKA_OFFSET_INVALID;
246         rktp->rktp_stored_offset = RD_KAFKA_OFFSET_INVALID;
247         rktp->rktp_committing_offset = RD_KAFKA_OFFSET_INVALID;
248         rktp->rktp_committed_offset = RD_KAFKA_OFFSET_INVALID;
249 	rd_kafka_msgq_init(&rktp->rktp_msgq);
250 	rd_kafka_msgq_init(&rktp->rktp_xmit_msgq);
251 	mtx_init(&rktp->rktp_lock, mtx_plain);
252 
253         rd_refcnt_init(&rktp->rktp_refcnt, 0);
254 	rktp->rktp_fetchq = rd_kafka_q_new(rkt->rkt_rk);
255         rktp->rktp_ops    = rd_kafka_q_new(rkt->rkt_rk);
256         rktp->rktp_ops->rkq_serve = rd_kafka_toppar_op_serve;
257         rktp->rktp_ops->rkq_opaque = rktp;
258         rd_atomic32_init(&rktp->rktp_version, 1);
259 	rktp->rktp_op_version = rd_atomic32_get(&rktp->rktp_version);
260 
261         rd_atomic32_init(&rktp->rktp_msgs_inflight, 0);
262         rd_kafka_pid_reset(&rktp->rktp_eos.pid);
263 
264         /* Consumer: If statistics is available we query the log start offset
265          * of each partition.
266          * Since the oldest offset only moves on log retention, we cap this
267          * value on the low end to a reasonable value to avoid flooding
268          * the brokers with OffsetRequests when our statistics interval is low.
269          * FIXME: Use a global timer to collect offsets for all partitions
270          * FIXME: This timer is superfulous for FETCH >= v5 because the log
271          *        start offset is included in fetch responses.
272          * */
273         if (rktp->rktp_rkt->rkt_rk->rk_conf.stats_interval_ms > 0 &&
274             rkt->rkt_rk->rk_type == RD_KAFKA_CONSUMER &&
275             rktp->rktp_partition != RD_KAFKA_PARTITION_UA) {
276                 int intvl = rkt->rkt_rk->rk_conf.stats_interval_ms;
277                 if (intvl < 10 * 1000 /* 10s */)
278                         intvl = 10 * 1000;
279 		rd_kafka_timer_start(&rkt->rkt_rk->rk_timers,
280 				     &rktp->rktp_consumer_lag_tmr,
281                                      intvl * 1000ll,
282 				     rd_kafka_toppar_consumer_lag_tmr_cb,
283 				     rktp);
284         }
285 
286         rktp->rktp_rkt = rd_kafka_topic_keep(rkt);
287 
288 	rd_kafka_q_fwd_set(rktp->rktp_ops, rkt->rkt_rk->rk_ops);
289         rd_kafka_dbg(rkt->rkt_rk, TOPIC, "TOPPARNEW",
290                      "NEW %s [%"PRId32"] %p refcnt %p (at %s:%d)",
291                      rkt->rkt_topic->str, rktp->rktp_partition, rktp,
292                      &rktp->rktp_refcnt,
293                      func, line);
294 
295 	return rd_kafka_toppar_keep(rktp);
296 }
297 
298 
299 
300 /**
301  * Removes a toppar from its duties, global lists, etc.
302  *
303  * Locks: rd_kafka_toppar_lock() MUST be held
304  */
rd_kafka_toppar_remove(rd_kafka_toppar_t * rktp)305 static void rd_kafka_toppar_remove (rd_kafka_toppar_t *rktp) {
306         rd_kafka_dbg(rktp->rktp_rkt->rkt_rk, TOPIC, "TOPPARREMOVE",
307                      "Removing toppar %s [%"PRId32"] %p",
308                      rktp->rktp_rkt->rkt_topic->str, rktp->rktp_partition,
309 		     rktp);
310 
311 	rd_kafka_timer_stop(&rktp->rktp_rkt->rkt_rk->rk_timers,
312 			    &rktp->rktp_offset_query_tmr, 1/*lock*/);
313 	rd_kafka_timer_stop(&rktp->rktp_rkt->rkt_rk->rk_timers,
314 			    &rktp->rktp_consumer_lag_tmr, 1/*lock*/);
315 
316 	rd_kafka_q_fwd_set(rktp->rktp_ops, NULL);
317 }
318 
319 
320 /**
321  * Final destructor for partition.
322  */
rd_kafka_toppar_destroy_final(rd_kafka_toppar_t * rktp)323 void rd_kafka_toppar_destroy_final (rd_kafka_toppar_t *rktp) {
324 
325         rd_kafka_toppar_remove(rktp);
326 
327 	rd_kafka_dbg(rktp->rktp_rkt->rkt_rk, TOPIC, "DESTROY",
328 		     "%s [%"PRId32"]: %p DESTROY_FINAL",
329 		     rktp->rktp_rkt->rkt_topic->str,
330                      rktp->rktp_partition, rktp);
331 
332 	/* Clear queues */
333 	rd_kafka_assert(rktp->rktp_rkt->rkt_rk,
334 			rd_kafka_msgq_len(&rktp->rktp_xmit_msgq) == 0);
335 	rd_kafka_dr_msgq(rktp->rktp_rkt, &rktp->rktp_msgq,
336 			 RD_KAFKA_RESP_ERR__DESTROY);
337 	rd_kafka_q_destroy_owner(rktp->rktp_fetchq);
338         rd_kafka_q_destroy_owner(rktp->rktp_ops);
339 
340 	rd_kafka_replyq_destroy(&rktp->rktp_replyq);
341 
342 	rd_kafka_topic_destroy0(rktp->rktp_rkt);
343 
344 	mtx_destroy(&rktp->rktp_lock);
345 
346         if (rktp->rktp_leader)
347                 rd_kafka_broker_destroy(rktp->rktp_leader);
348 
349         rd_refcnt_destroy(&rktp->rktp_refcnt);
350 
351 	rd_free(rktp);
352 }
353 
354 
355 /**
356  * Set toppar fetching state.
357  *
358  * Locality: broker thread
359  * Locks: rd_kafka_toppar_lock() MUST be held.
360  */
rd_kafka_toppar_set_fetch_state(rd_kafka_toppar_t * rktp,int fetch_state)361 void rd_kafka_toppar_set_fetch_state (rd_kafka_toppar_t *rktp,
362                                       int fetch_state) {
363 	rd_kafka_assert(NULL,
364 			thrd_is_current(rktp->rktp_rkt->rkt_rk->rk_thread));
365 
366         if ((int)rktp->rktp_fetch_state == fetch_state)
367                 return;
368 
369         rd_kafka_dbg(rktp->rktp_rkt->rkt_rk, TOPIC, "PARTSTATE",
370                      "Partition %.*s [%"PRId32"] changed fetch state %s -> %s",
371                      RD_KAFKAP_STR_PR(rktp->rktp_rkt->rkt_topic),
372                      rktp->rktp_partition,
373                      rd_kafka_fetch_states[rktp->rktp_fetch_state],
374                      rd_kafka_fetch_states[fetch_state]);
375 
376         rktp->rktp_fetch_state = fetch_state;
377 
378         if (fetch_state == RD_KAFKA_TOPPAR_FETCH_ACTIVE)
379                 rd_kafka_dbg(rktp->rktp_rkt->rkt_rk,
380                              CONSUMER|RD_KAFKA_DBG_TOPIC,
381                              "FETCH",
382                              "Partition %.*s [%"PRId32"] start fetching "
383                              "at offset %s",
384                              RD_KAFKAP_STR_PR(rktp->rktp_rkt->rkt_topic),
385                              rktp->rktp_partition,
386                              rd_kafka_offset2str(rktp->rktp_next_offset));
387 }
388 
389 
390 /**
391  * Returns the appropriate toppar for a given rkt and partition.
392  * The returned toppar has increased refcnt and must be unreffed by calling
393  *  rd_kafka_toppar_destroy().
394  * May return NULL.
395  *
396  * If 'ua_on_miss' is true the UA (unassigned) toppar is returned if
397  * 'partition' was not known locally, else NULL is returned.
398  *
399  * Locks: Caller must hold rd_kafka_topic_*lock()
400  */
rd_kafka_toppar_get0(const char * func,int line,const rd_kafka_topic_t * rkt,int32_t partition,int ua_on_miss)401 rd_kafka_toppar_t *rd_kafka_toppar_get0 (const char *func, int line,
402                                          const rd_kafka_topic_t *rkt,
403                                          int32_t partition,
404                                          int ua_on_miss) {
405         rd_kafka_toppar_t *rktp;
406 
407 	if (partition >= 0 && partition < rkt->rkt_partition_cnt)
408 		rktp = rkt->rkt_p[partition];
409 	else if (partition == RD_KAFKA_PARTITION_UA || ua_on_miss)
410 		rktp = rkt->rkt_ua;
411 	else
412 		return NULL;
413 
414 	if (rktp)
415                 return rd_kafka_toppar_keep_fl(func, line, rktp);
416 
417 	return NULL;
418 }
419 
420 
421 /**
422  * Same as rd_kafka_toppar_get() but no need for locking and
423  * looks up the topic first.
424  *
425  * Locality: any
426  * Locks: none
427  */
rd_kafka_toppar_get2(rd_kafka_t * rk,const char * topic,int32_t partition,int ua_on_miss,int create_on_miss)428 rd_kafka_toppar_t *rd_kafka_toppar_get2 (rd_kafka_t *rk,
429                                          const char *topic,
430                                          int32_t partition,
431                                          int ua_on_miss,
432                                          int create_on_miss) {
433         rd_kafka_topic_t *rkt;
434         rd_kafka_toppar_t *rktp;
435 
436         rd_kafka_wrlock(rk);
437 
438         /* Find or create topic */
439 	if (unlikely(!(rkt = rd_kafka_topic_find(rk, topic, 0/*no-lock*/)))) {
440                 if (!create_on_miss) {
441                         rd_kafka_wrunlock(rk);
442                         return NULL;
443                 }
444                 rkt = rd_kafka_topic_new0(rk, topic, NULL,
445 					    NULL, 0/*no-lock*/);
446                 if (!rkt) {
447                         rd_kafka_wrunlock(rk);
448                         rd_kafka_log(rk, LOG_ERR, "TOPIC",
449                                      "Failed to create local topic \"%s\": %s",
450                                      topic, rd_strerror(errno));
451                         return NULL;
452                 }
453         }
454 
455         rd_kafka_wrunlock(rk);
456 
457 	rd_kafka_topic_wrlock(rkt);
458 	rktp = rd_kafka_toppar_desired_add(rkt, partition);
459 	rd_kafka_topic_wrunlock(rkt);
460 
461         rd_kafka_topic_destroy0(rkt);
462 
463 	return rktp;
464 }
465 
466 
467 /**
468  * Returns a toppar if it is available in the cluster.
469  * '*errp' is set to the error-code if lookup fails.
470  *
471  * Locks: topic_*lock() MUST be held
472  */
473 rd_kafka_toppar_t *
rd_kafka_toppar_get_avail(const rd_kafka_topic_t * rkt,int32_t partition,int ua_on_miss,rd_kafka_resp_err_t * errp)474 rd_kafka_toppar_get_avail (const rd_kafka_topic_t *rkt,
475                            int32_t partition, int ua_on_miss,
476                            rd_kafka_resp_err_t *errp) {
477 	rd_kafka_toppar_t *rktp;
478 
479         switch (rkt->rkt_state)
480         {
481         case RD_KAFKA_TOPIC_S_UNKNOWN:
482                 /* No metadata received from cluster yet.
483                  * Put message in UA partition and re-run partitioner when
484                  * cluster comes up. */
485 		partition = RD_KAFKA_PARTITION_UA;
486                 break;
487 
488         case RD_KAFKA_TOPIC_S_NOTEXISTS:
489                 /* Topic not found in cluster.
490                  * Fail message immediately. */
491                 *errp = RD_KAFKA_RESP_ERR__UNKNOWN_TOPIC;
492                 return NULL;
493 
494         case RD_KAFKA_TOPIC_S_ERROR:
495                 /* Permanent topic error. */
496                 *errp = rkt->rkt_err;
497                 return NULL;
498 
499         case RD_KAFKA_TOPIC_S_EXISTS:
500                 /* Topic exists in cluster. */
501 
502                 /* Topic exists but has no partitions.
503                  * This is usually an transient state following the
504                  * auto-creation of a topic. */
505                 if (unlikely(rkt->rkt_partition_cnt == 0)) {
506                         partition = RD_KAFKA_PARTITION_UA;
507                         break;
508                 }
509 
510                 /* Check that partition exists. */
511                 if (partition >= rkt->rkt_partition_cnt) {
512                         *errp = RD_KAFKA_RESP_ERR__UNKNOWN_PARTITION;
513                         return NULL;
514                 }
515                 break;
516 
517         default:
518                 rd_kafka_assert(rkt->rkt_rk, !*"NOTREACHED");
519                 break;
520         }
521 
522 	/* Get new partition */
523 	rktp = rd_kafka_toppar_get(rkt, partition, 0);
524 
525 	if (unlikely(!rktp)) {
526 		/* Unknown topic or partition */
527 		if (rkt->rkt_state == RD_KAFKA_TOPIC_S_NOTEXISTS)
528 			*errp = RD_KAFKA_RESP_ERR__UNKNOWN_TOPIC;
529 		else
530 			*errp = RD_KAFKA_RESP_ERR__UNKNOWN_PARTITION;
531 
532 		return NULL;
533 	}
534 
535 	return rktp;
536 }
537 
538 
539 /**
540  * Looks for partition 'i' in topic 'rkt's desired list.
541  *
542  * The desired partition list is the list of partitions that are desired
543  * (e.g., by the consumer) but not yet seen on a broker.
544  * As soon as the partition is seen on a broker the toppar is moved from
545  * the desired list and onto the normal rkt_p array.
546  * When the partition on the broker goes away a desired partition is put
547  * back on the desired list.
548  *
549  * Locks: rd_kafka_topic_*lock() must be held.
550  * Note: 'rktp' refcount is increased.
551  */
552 
rd_kafka_toppar_desired_get(rd_kafka_topic_t * rkt,int32_t partition)553 rd_kafka_toppar_t *rd_kafka_toppar_desired_get (rd_kafka_topic_t *rkt,
554                                                 int32_t partition) {
555 	rd_kafka_toppar_t *rktp;
556         int i;
557 
558 	RD_LIST_FOREACH(rktp, &rkt->rkt_desp, i) {
559 		if (rktp->rktp_partition == partition)
560 			return rd_kafka_toppar_keep(rktp);
561         }
562 
563 	return NULL;
564 }
565 
566 
567 /**
568  * Link toppar on desired list.
569  *
570  * Locks: rd_kafka_topic_wrlock() and toppar_lock() must be held.
571  */
rd_kafka_toppar_desired_link(rd_kafka_toppar_t * rktp)572 void rd_kafka_toppar_desired_link (rd_kafka_toppar_t *rktp) {
573 
574         if (rktp->rktp_flags & RD_KAFKA_TOPPAR_F_ON_DESP)
575                 return; /* Already linked */
576 
577         rd_kafka_toppar_keep(rktp);
578         rd_list_add(&rktp->rktp_rkt->rkt_desp, rktp);
579         rd_interval_reset(&rktp->rktp_rkt->rkt_desp_refresh_intvl);
580         rktp->rktp_flags |= RD_KAFKA_TOPPAR_F_ON_DESP;
581 }
582 
583 /**
584  * Unlink toppar from desired list.
585  *
586  * Locks: rd_kafka_topic_wrlock() and toppar_lock() must be held.
587  */
rd_kafka_toppar_desired_unlink(rd_kafka_toppar_t * rktp)588 void rd_kafka_toppar_desired_unlink (rd_kafka_toppar_t *rktp) {
589         if (!(rktp->rktp_flags & RD_KAFKA_TOPPAR_F_ON_DESP))
590                 return; /* Not linked */
591 
592         rktp->rktp_flags &= ~RD_KAFKA_TOPPAR_F_ON_DESP;
593         rd_list_remove(&rktp->rktp_rkt->rkt_desp, rktp);
594         rd_interval_reset(&rktp->rktp_rkt->rkt_desp_refresh_intvl);
595         rd_kafka_toppar_destroy(rktp);
596 }
597 
598 
599 /**
600  * @brief If rktp is not already desired:
601  *  - mark as DESIRED|~REMOVE
602  *  - add to desired list if unknown
603  *
604  * @remark toppar_lock() MUST be held
605  */
rd_kafka_toppar_desired_add0(rd_kafka_toppar_t * rktp)606 void rd_kafka_toppar_desired_add0 (rd_kafka_toppar_t *rktp) {
607         if ((rktp->rktp_flags & RD_KAFKA_TOPPAR_F_DESIRED))
608                 return;
609 
610         rd_kafka_dbg(rktp->rktp_rkt->rkt_rk, TOPIC, "DESIRED",
611                      "%s [%"PRId32"]: marking as DESIRED",
612                      rktp->rktp_rkt->rkt_topic->str, rktp->rktp_partition);
613 
614         /* If toppar was marked for removal this is no longer
615          * the case since the partition is now desired. */
616         rktp->rktp_flags &= ~RD_KAFKA_TOPPAR_F_REMOVE;
617 
618         rktp->rktp_flags |= RD_KAFKA_TOPPAR_F_DESIRED;
619 
620         if (rktp->rktp_flags & RD_KAFKA_TOPPAR_F_UNKNOWN) {
621                 rd_kafka_dbg(rktp->rktp_rkt->rkt_rk, TOPIC, "DESIRED",
622                      "%s [%"PRId32"]: adding to DESIRED list",
623                      rktp->rktp_rkt->rkt_topic->str, rktp->rktp_partition);
624                 rd_kafka_toppar_desired_link(rktp);
625         }
626 }
627 
628 
629 /**
630  * Adds 'partition' as a desired partition to topic 'rkt', or updates
631  * an existing partition to be desired.
632  *
633  * Locks: rd_kafka_topic_wrlock() must be held.
634  */
rd_kafka_toppar_desired_add(rd_kafka_topic_t * rkt,int32_t partition)635 rd_kafka_toppar_t *rd_kafka_toppar_desired_add (rd_kafka_topic_t *rkt,
636                                                 int32_t partition) {
637         rd_kafka_toppar_t *rktp;
638 
639         rktp = rd_kafka_toppar_get(rkt, partition, 0/*no_ua_on_miss*/);
640 
641         if (!rktp)
642                 rktp = rd_kafka_toppar_desired_get(rkt, partition);
643 
644         if (!rktp)
645                 rktp = rd_kafka_toppar_new(rkt, partition);
646 
647         rd_kafka_toppar_lock(rktp);
648         rd_kafka_toppar_desired_add0(rktp);
649         rd_kafka_toppar_unlock(rktp);
650 
651         return rktp; /* Callers refcount */
652 }
653 
654 
655 
656 
657 /**
658  * Unmarks an 'rktp' as desired.
659  *
660  * Locks: rd_kafka_topic_wrlock() and rd_kafka_toppar_lock() MUST be held.
661  */
rd_kafka_toppar_desired_del(rd_kafka_toppar_t * rktp)662 void rd_kafka_toppar_desired_del (rd_kafka_toppar_t *rktp) {
663 
664 	if (!(rktp->rktp_flags & RD_KAFKA_TOPPAR_F_DESIRED))
665 		return;
666 
667 	rktp->rktp_flags &= ~RD_KAFKA_TOPPAR_F_DESIRED;
668         rd_kafka_toppar_desired_unlink(rktp);
669 
670 	rd_kafka_dbg(rktp->rktp_rkt->rkt_rk, TOPIC, "DESP",
671 		     "Removing (un)desired topic %s [%"PRId32"]",
672 		     rktp->rktp_rkt->rkt_topic->str, rktp->rktp_partition);
673 
674         if (rktp->rktp_flags & RD_KAFKA_TOPPAR_F_UNKNOWN) {
675                 /* If this partition does not exist in the cluster
676                  * and is no longer desired, remove it. */
677                 rd_kafka_toppar_broker_leave_for_remove(rktp);
678         }
679 }
680 
681 
682 
683 /**
684  * Append message at tail of 'rktp' message queue.
685  */
rd_kafka_toppar_enq_msg(rd_kafka_toppar_t * rktp,rd_kafka_msg_t * rkm)686 void rd_kafka_toppar_enq_msg (rd_kafka_toppar_t *rktp, rd_kafka_msg_t *rkm) {
687         int queue_len;
688         rd_kafka_q_t *wakeup_q = NULL;
689 
690         rd_kafka_toppar_lock(rktp);
691 
692         if (!rkm->rkm_u.producer.msgid &&
693             rktp->rktp_partition != RD_KAFKA_PARTITION_UA)
694                 rkm->rkm_u.producer.msgid = ++rktp->rktp_msgid;
695 
696         if (rktp->rktp_partition == RD_KAFKA_PARTITION_UA ||
697             rktp->rktp_rkt->rkt_conf.queuing_strategy == RD_KAFKA_QUEUE_FIFO) {
698                 /* No need for enq_sorted(), this is the oldest message. */
699                 queue_len = rd_kafka_msgq_enq(&rktp->rktp_msgq, rkm);
700         } else {
701                 queue_len = rd_kafka_msgq_enq_sorted(rktp->rktp_rkt,
702                                                      &rktp->rktp_msgq, rkm);
703         }
704 
705         if (unlikely(queue_len == 1 &&
706                      (wakeup_q = rktp->rktp_msgq_wakeup_q)))
707                 rd_kafka_q_keep(wakeup_q);
708 
709         rd_kafka_toppar_unlock(rktp);
710 
711         if (wakeup_q) {
712                 rd_kafka_q_yield(wakeup_q);
713                 rd_kafka_q_destroy(wakeup_q);
714         }
715 }
716 
717 
718 /**
719  * @brief Insert \p srcq before \p insert_before in \p destq.
720  *
721  * If \p srcq and \p destq overlaps only part of the \p srcq will be inserted.
722  *
723  * Upon return \p srcq will contain any remaining messages that require
724  * another insert position in \p destq.
725  */
726 static void
rd_kafka_msgq_insert_msgq_before(rd_kafka_msgq_t * destq,rd_kafka_msg_t * insert_before,rd_kafka_msgq_t * srcq,int (* cmp)(const void * a,const void * b))727 rd_kafka_msgq_insert_msgq_before (rd_kafka_msgq_t *destq,
728                                   rd_kafka_msg_t *insert_before,
729                                   rd_kafka_msgq_t *srcq,
730                                   int (*cmp) (const void *a, const void *b)) {
731         rd_kafka_msg_t *slast;
732         rd_kafka_msgq_t tmpq;
733 
734         if (!insert_before) {
735                 /* Append all of srcq to destq */
736                 rd_kafka_msgq_concat(destq, srcq);
737                 rd_kafka_msgq_verify_order(NULL, destq, 0, rd_false);
738                 return;
739         }
740 
741         slast = rd_kafka_msgq_last(srcq);
742         rd_dassert(slast);
743 
744         if (cmp(slast, insert_before) > 0) {
745                 rd_kafka_msg_t *new_sfirst;
746                 int cnt;
747                 int64_t bytes;
748 
749                 /* destq insert_before resides somewhere between
750                  * srcq.first and srcq.last, find the first message in
751                  * srcq that is > insert_before and split srcq into
752                  * a left part that contains the messages to insert before
753                  * insert_before, and a right part that will need another
754                  * insert position. */
755 
756                 new_sfirst = rd_kafka_msgq_find_pos(srcq, NULL,
757                                                     insert_before,
758                                                     cmp, &cnt, &bytes);
759                 rd_assert(new_sfirst);
760 
761                 /* split srcq into two parts using the divider message */
762                 rd_kafka_msgq_split(srcq, &tmpq, new_sfirst, cnt, bytes);
763 
764                 rd_kafka_msgq_verify_order(NULL, srcq, 0, rd_false);
765                 rd_kafka_msgq_verify_order(NULL, &tmpq, 0, rd_false);
766         } else {
767                 rd_kafka_msgq_init(&tmpq);
768         }
769 
770         /* srcq now contains messages up to the first message in destq,
771          * insert srcq at insert_before in destq. */
772         rd_dassert(!TAILQ_EMPTY(&destq->rkmq_msgs));
773         rd_dassert(!TAILQ_EMPTY(&srcq->rkmq_msgs));
774         TAILQ_INSERT_LIST_BEFORE(&destq->rkmq_msgs,
775                                  insert_before,
776                                  &srcq->rkmq_msgs,
777                                  rd_kafka_msgs_head_s,
778                                  rd_kafka_msg_t *,
779                                  rkm_link);
780         destq->rkmq_msg_cnt   += srcq->rkmq_msg_cnt;
781         destq->rkmq_msg_bytes += srcq->rkmq_msg_bytes;
782         srcq->rkmq_msg_cnt     = 0;
783         srcq->rkmq_msg_bytes   = 0;
784 
785         rd_kafka_msgq_verify_order(NULL, destq, 0, rd_false);
786         rd_kafka_msgq_verify_order(NULL, srcq, 0, rd_false);
787 
788         /* tmpq contains the remaining messages in srcq, move it over. */
789         rd_kafka_msgq_move(srcq, &tmpq);
790 
791         rd_kafka_msgq_verify_order(NULL, srcq, 0, rd_false);
792 }
793 
794 
795 /**
796  * @brief Insert all messages from \p srcq into \p destq in their sorted
797  *        position (using \p cmp)
798  */
rd_kafka_msgq_insert_msgq(rd_kafka_msgq_t * destq,rd_kafka_msgq_t * srcq,int (* cmp)(const void * a,const void * b))799 void rd_kafka_msgq_insert_msgq (rd_kafka_msgq_t *destq,
800                                 rd_kafka_msgq_t *srcq,
801                                 int (*cmp) (const void *a, const void *b)) {
802         rd_kafka_msg_t *sfirst, *dlast, *start_pos = NULL;
803 
804         if (unlikely(RD_KAFKA_MSGQ_EMPTY(srcq))) {
805                 /* srcq is empty */
806                 return;
807         }
808 
809         if (unlikely(RD_KAFKA_MSGQ_EMPTY(destq))) {
810                 /* destq is empty, simply move the srcq. */
811                 rd_kafka_msgq_move(destq, srcq);
812                 rd_kafka_msgq_verify_order(NULL, destq, 0, rd_false);
813                 return;
814         }
815 
816         /* Optimize insertion by bulk-moving messages in place.
817          * We know that:
818          *  - destq is sorted but might not be continous (1,2,3,7)
819          *  - srcq is sorted but might not be continous (4,5,6,8)
820          *  - there migt be (multiple) overlaps between the two, e.g:
821          *     destq = (1,2,3,7), srcq = (4,5,6,8)
822          *  - there may be millions of messages.
823          */
824 
825         rd_kafka_msgq_verify_order(NULL, destq, 0, rd_false);
826         rd_kafka_msgq_verify_order(NULL, srcq, 0, rd_false);
827 
828         dlast = rd_kafka_msgq_last(destq);
829         sfirst = rd_kafka_msgq_first(srcq);
830 
831         /* Most common case, all of srcq goes after destq */
832         if (likely(cmp(dlast, sfirst) < 0)) {
833                 rd_kafka_msgq_concat(destq, srcq);
834 
835                 rd_kafka_msgq_verify_order(NULL, destq, 0, rd_false);
836 
837                 rd_assert(RD_KAFKA_MSGQ_EMPTY(srcq));
838                 return;
839         }
840 
841         /* Insert messages from srcq into destq in non-overlapping
842          * chunks until srcq is exhausted. */
843         while (likely(sfirst != NULL)) {
844                 rd_kafka_msg_t *insert_before;
845 
846                 /* Get insert position in destq of first element in srcq */
847                 insert_before = rd_kafka_msgq_find_pos(destq, start_pos,
848                                                        sfirst, cmp,
849                                                        NULL, NULL);
850 
851                 /* Insert as much of srcq as possible at insert_before */
852                 rd_kafka_msgq_insert_msgq_before(destq, insert_before,
853                                                  srcq, cmp);
854 
855                 /* Remember the current destq position so the next find_pos()
856                  * does not have to re-scan destq and what was
857                  * added from srcq. */
858                 start_pos = insert_before;
859 
860                 /* For next iteration */
861                 sfirst = rd_kafka_msgq_first(srcq);
862 
863                 rd_kafka_msgq_verify_order(NULL, destq, 0, rd_false);
864                 rd_kafka_msgq_verify_order(NULL, srcq, 0, rd_false);
865         }
866 
867         rd_kafka_msgq_verify_order(NULL, destq, 0, rd_false);
868 
869         rd_assert(RD_KAFKA_MSGQ_EMPTY(srcq));
870 }
871 
872 
873 /**
874  * @brief Inserts messages from \p srcq according to their sorted position
875  *        into \p destq, filtering out messages that can not be retried.
876  *
877  * @param incr_retry Increment retry count for messages.
878  * @param max_retries Maximum retries allowed per message.
879  * @param backoff Absolute retry backoff for retried messages.
880  *
881  * @returns 0 if all messages were retried, or 1 if some messages
882  *          could not be retried.
883  */
rd_kafka_retry_msgq(rd_kafka_msgq_t * destq,rd_kafka_msgq_t * srcq,int incr_retry,int max_retries,rd_ts_t backoff,rd_kafka_msg_status_t status,int (* cmp)(const void * a,const void * b))884 int rd_kafka_retry_msgq (rd_kafka_msgq_t *destq,
885                          rd_kafka_msgq_t *srcq,
886                          int incr_retry, int max_retries, rd_ts_t backoff,
887                          rd_kafka_msg_status_t status,
888                          int (*cmp) (const void *a, const void *b)) {
889         rd_kafka_msgq_t retryable = RD_KAFKA_MSGQ_INITIALIZER(retryable);
890         rd_kafka_msg_t *rkm, *tmp;
891 
892         /* Scan through messages to see which ones are eligible for retry,
893          * move the retryable ones to temporary queue and
894          * set backoff time for first message and optionally
895          * increase retry count for each message.
896          * Sorted insert is not necessary since the original order
897          * srcq order is maintained. */
898         TAILQ_FOREACH_SAFE(rkm, &srcq->rkmq_msgs, rkm_link, tmp) {
899                 if (rkm->rkm_u.producer.retries + incr_retry > max_retries)
900                         continue;
901 
902                 rd_kafka_msgq_deq(srcq, rkm, 1);
903                 rd_kafka_msgq_enq(&retryable, rkm);
904 
905                 rkm->rkm_u.producer.ts_backoff = backoff;
906                 rkm->rkm_u.producer.retries  += incr_retry;
907 
908                 /* Don't downgrade a message from any form of PERSISTED
909                  * to NOT_PERSISTED, since the original cause of indicating
910                  * PERSISTED can't be changed.
911                  * E.g., a previous ack or in-flight timeout. */
912                 if (likely(!(status == RD_KAFKA_MSG_STATUS_NOT_PERSISTED &&
913                              rkm->rkm_status !=
914                              RD_KAFKA_MSG_STATUS_NOT_PERSISTED)))
915                         rkm->rkm_status = status;
916         }
917 
918         /* No messages are retryable */
919         if (RD_KAFKA_MSGQ_EMPTY(&retryable))
920                 return 0;
921 
922         /* Insert retryable list at sorted position */
923         rd_kafka_msgq_insert_msgq(destq, &retryable, cmp);
924 
925         return 1;
926 }
927 
928 /**
929  * @brief Inserts messages from \p rkmq according to their sorted position
930  *        into the partition's message queue.
931  *
932  * @param incr_retry Increment retry count for messages.
933  * @param status Set status on each message.
934  *
935  * @returns 0 if all messages were retried, or 1 if some messages
936  *          could not be retried.
937  *
938  * @locality Broker thread (but not necessarily the leader broker thread)
939  */
940 
rd_kafka_toppar_retry_msgq(rd_kafka_toppar_t * rktp,rd_kafka_msgq_t * rkmq,int incr_retry,rd_kafka_msg_status_t status)941 int rd_kafka_toppar_retry_msgq (rd_kafka_toppar_t *rktp, rd_kafka_msgq_t *rkmq,
942                                 int incr_retry, rd_kafka_msg_status_t status) {
943         rd_kafka_t *rk = rktp->rktp_rkt->rkt_rk;
944         rd_ts_t backoff = rd_clock() + (rk->rk_conf.retry_backoff_ms * 1000);
945         int r;
946 
947         if (rd_kafka_terminating(rk))
948                 return 1;
949 
950         rd_kafka_toppar_lock(rktp);
951         r = rd_kafka_retry_msgq(&rktp->rktp_msgq, rkmq,
952                                 incr_retry, rk->rk_conf.max_retries,
953                                 backoff, status,
954                                 rktp->rktp_rkt->rkt_conf.msg_order_cmp);
955         rd_kafka_toppar_unlock(rktp);
956 
957         return r;
958 }
959 
960 /**
961  * @brief Insert sorted message list \p rkmq at sorted position in \p rktp 's
962  *        message queue. The queues must not overlap.
963  * @remark \p rkmq will be cleared.
964  */
rd_kafka_toppar_insert_msgq(rd_kafka_toppar_t * rktp,rd_kafka_msgq_t * rkmq)965 void rd_kafka_toppar_insert_msgq (rd_kafka_toppar_t *rktp,
966                                   rd_kafka_msgq_t *rkmq) {
967         rd_kafka_toppar_lock(rktp);
968         rd_kafka_msgq_insert_msgq(&rktp->rktp_msgq, rkmq,
969                                   rktp->rktp_rkt->rkt_conf.msg_order_cmp);
970         rd_kafka_toppar_unlock(rktp);
971 }
972 
973 
974 
975 /**
976  * Helper method for purging queues when removing a toppar.
977  * Locks: rd_kafka_toppar_lock() MUST be held
978  */
rd_kafka_toppar_purge_and_disable_queues(rd_kafka_toppar_t * rktp)979 void rd_kafka_toppar_purge_and_disable_queues (rd_kafka_toppar_t *rktp) {
980         rd_kafka_q_disable(rktp->rktp_fetchq);
981         rd_kafka_q_purge(rktp->rktp_fetchq);
982         rd_kafka_q_disable(rktp->rktp_ops);
983         rd_kafka_q_purge(rktp->rktp_ops);
984 }
985 
986 
987 /**
988  * @brief Migrate rktp from (optional) \p old_rkb to (optional) \p new_rkb,
989  *        but at least one is required to be non-NULL.
990  *
991  * This is an async operation.
992  *
993  * @locks rd_kafka_toppar_lock() MUST be held
994  */
rd_kafka_toppar_broker_migrate(rd_kafka_toppar_t * rktp,rd_kafka_broker_t * old_rkb,rd_kafka_broker_t * new_rkb)995 static void rd_kafka_toppar_broker_migrate (rd_kafka_toppar_t *rktp,
996                                             rd_kafka_broker_t *old_rkb,
997                                             rd_kafka_broker_t *new_rkb) {
998         rd_kafka_op_t *rko;
999         rd_kafka_broker_t *dest_rkb;
1000         int had_next_broker = rktp->rktp_next_broker ? 1 : 0;
1001 
1002         rd_assert(old_rkb || new_rkb);
1003 
1004         /* Update next broker */
1005         if (new_rkb)
1006                 rd_kafka_broker_keep(new_rkb);
1007         if (rktp->rktp_next_broker)
1008                 rd_kafka_broker_destroy(rktp->rktp_next_broker);
1009         rktp->rktp_next_broker = new_rkb;
1010 
1011         /* If next_broker is set it means there is already an async
1012          * migration op going on and we should not send a new one
1013          * but simply change the next_broker (which we did above). */
1014         if (had_next_broker)
1015                 return;
1016 
1017         /* Revert from offset-wait state back to offset-query
1018          * prior to leaving the broker to avoid stalling
1019          * on the new broker waiting for a offset reply from
1020          * this old broker (that might not come and thus need
1021          * to time out..slowly) */
1022         if (rktp->rktp_fetch_state == RD_KAFKA_TOPPAR_FETCH_OFFSET_WAIT)
1023                 rd_kafka_toppar_offset_retry(rktp, 500,
1024                                              "migrating to new broker");
1025 
1026         if (old_rkb) {
1027                 /* If there is an existing broker for this toppar we let it
1028                  * first handle its own leave and then trigger the join for
1029                  * the next broker, if any. */
1030                 rko = rd_kafka_op_new(RD_KAFKA_OP_PARTITION_LEAVE);
1031                 dest_rkb = old_rkb;
1032         } else {
1033                 /* No existing broker, send join op directly to new broker. */
1034                 rko = rd_kafka_op_new(RD_KAFKA_OP_PARTITION_JOIN);
1035                 dest_rkb = new_rkb;
1036         }
1037 
1038         rko->rko_rktp = rd_kafka_toppar_keep(rktp);
1039 
1040         rd_kafka_dbg(rktp->rktp_rkt->rkt_rk, TOPIC, "BRKMIGR",
1041                      "Migrating topic %.*s [%"PRId32"] %p from %s to %s "
1042 		     "(sending %s to %s)",
1043                      RD_KAFKAP_STR_PR(rktp->rktp_rkt->rkt_topic),
1044                      rktp->rktp_partition, rktp,
1045                      old_rkb ? rd_kafka_broker_name(old_rkb) : "(none)",
1046                      new_rkb ? rd_kafka_broker_name(new_rkb) : "(none)",
1047 		     rd_kafka_op2str(rko->rko_type),
1048 		     rd_kafka_broker_name(dest_rkb));
1049 
1050         rd_kafka_q_enq(dest_rkb->rkb_ops, rko);
1051 }
1052 
1053 
1054 /**
1055  * Async toppar leave from broker.
1056  * Only use this when partitions are to be removed.
1057  *
1058  * Locks: rd_kafka_toppar_lock() MUST be held
1059  */
rd_kafka_toppar_broker_leave_for_remove(rd_kafka_toppar_t * rktp)1060 void rd_kafka_toppar_broker_leave_for_remove (rd_kafka_toppar_t *rktp) {
1061         rd_kafka_op_t *rko;
1062         rd_kafka_broker_t *dest_rkb;
1063 
1064         rktp->rktp_flags |= RD_KAFKA_TOPPAR_F_REMOVE;
1065 
1066 	if (rktp->rktp_next_broker)
1067 		dest_rkb = rktp->rktp_next_broker;
1068 	else if (rktp->rktp_broker)
1069 		dest_rkb = rktp->rktp_broker;
1070 	else {
1071 		rd_kafka_dbg(rktp->rktp_rkt->rkt_rk, TOPIC, "TOPPARDEL",
1072 			     "%.*s [%"PRId32"] %p not handled by any broker: "
1073 			     "not sending LEAVE for remove",
1074 			     RD_KAFKAP_STR_PR(rktp->rktp_rkt->rkt_topic),
1075 			     rktp->rktp_partition, rktp);
1076 		return;
1077 	}
1078 
1079 
1080 	/* Revert from offset-wait state back to offset-query
1081 	 * prior to leaving the broker to avoid stalling
1082 	 * on the new broker waiting for a offset reply from
1083 	 * this old broker (that might not come and thus need
1084 	 * to time out..slowly) */
1085 	if (rktp->rktp_fetch_state == RD_KAFKA_TOPPAR_FETCH_OFFSET_WAIT)
1086 		rd_kafka_toppar_set_fetch_state(
1087 			rktp, RD_KAFKA_TOPPAR_FETCH_OFFSET_QUERY);
1088 
1089 	rko = rd_kafka_op_new(RD_KAFKA_OP_PARTITION_LEAVE);
1090         rko->rko_rktp = rd_kafka_toppar_keep(rktp);
1091 
1092         rd_kafka_dbg(rktp->rktp_rkt->rkt_rk, TOPIC, "BRKMIGR",
1093                      "%.*s [%"PRId32"] %p sending final LEAVE for removal by %s",
1094                      RD_KAFKAP_STR_PR(rktp->rktp_rkt->rkt_topic),
1095                      rktp->rktp_partition, rktp,
1096                      rd_kafka_broker_name(dest_rkb));
1097 
1098         rd_kafka_q_enq(dest_rkb->rkb_ops, rko);
1099 }
1100 
1101 
1102 /**
1103  * @brief Delegates toppar 'rktp' to broker 'rkb'. 'rkb' may be NULL to
1104  *        undelegate broker.
1105  *
1106  * @locks Caller must have rd_kafka_toppar_lock(rktp) held.
1107  */
rd_kafka_toppar_broker_delegate(rd_kafka_toppar_t * rktp,rd_kafka_broker_t * rkb)1108 void rd_kafka_toppar_broker_delegate (rd_kafka_toppar_t *rktp,
1109 				      rd_kafka_broker_t *rkb) {
1110         rd_kafka_t *rk = rktp->rktp_rkt->rkt_rk;
1111         int internal_fallback = 0;
1112 
1113 	rd_kafka_dbg(rktp->rktp_rkt->rkt_rk, TOPIC, "BRKDELGT",
1114 		     "%s [%"PRId32"]: delegate to broker %s "
1115 		     "(rktp %p, term %d, ref %d)",
1116 		     rktp->rktp_rkt->rkt_topic->str, rktp->rktp_partition,
1117 		     rkb ? rkb->rkb_name : "(none)",
1118 		     rktp, rd_kafka_terminating(rk),
1119 		     rd_refcnt_get(&rktp->rktp_refcnt));
1120 
1121         /* Undelegated toppars are delgated to the internal
1122          * broker for bookkeeping. */
1123         if (!rkb && !rd_kafka_terminating(rk)) {
1124                 rkb = rd_kafka_broker_internal(rk);
1125                 internal_fallback = 1;
1126         }
1127 
1128 	if (rktp->rktp_broker == rkb && !rktp->rktp_next_broker) {
1129                 rd_kafka_dbg(rktp->rktp_rkt->rkt_rk, TOPIC, "BRKDELGT",
1130 			     "%.*s [%"PRId32"]: not updating broker: "
1131                              "already on correct broker %s",
1132 			     RD_KAFKAP_STR_PR(rktp->rktp_rkt->rkt_topic),
1133 			     rktp->rktp_partition,
1134                              rkb ? rd_kafka_broker_name(rkb) : "(none)");
1135 
1136                 if (internal_fallback)
1137                         rd_kafka_broker_destroy(rkb);
1138 		return;
1139         }
1140 
1141 	if (rktp->rktp_broker)
1142 		rd_kafka_dbg(rktp->rktp_rkt->rkt_rk, TOPIC, "BRKDELGT",
1143 			     "%.*s [%"PRId32"]: no longer delegated to "
1144 			     "broker %s",
1145 			     RD_KAFKAP_STR_PR(rktp->rktp_rkt->rkt_topic),
1146 			     rktp->rktp_partition,
1147 			     rd_kafka_broker_name(rktp->rktp_broker));
1148 
1149 
1150 	if (rkb) {
1151 		rd_kafka_dbg(rktp->rktp_rkt->rkt_rk, TOPIC, "BRKDELGT",
1152 			     "%.*s [%"PRId32"]: delegating to broker %s "
1153 			     "for partition with %i messages "
1154 			     "(%"PRIu64" bytes) queued",
1155 			     RD_KAFKAP_STR_PR(rktp->rktp_rkt->rkt_topic),
1156 			     rktp->rktp_partition,
1157 			     rd_kafka_broker_name(rkb),
1158                              rktp->rktp_msgq.rkmq_msg_cnt,
1159                              rktp->rktp_msgq.rkmq_msg_bytes);
1160 
1161 
1162 	} else {
1163 		rd_kafka_dbg(rktp->rktp_rkt->rkt_rk, TOPIC, "BRKDELGT",
1164 			     "%.*s [%"PRId32"]: no broker delegated",
1165 			     RD_KAFKAP_STR_PR(rktp->rktp_rkt->rkt_topic),
1166 			     rktp->rktp_partition);
1167 	}
1168 
1169         if (rktp->rktp_broker || rkb)
1170                 rd_kafka_toppar_broker_migrate(rktp, rktp->rktp_broker, rkb);
1171 
1172         if (internal_fallback)
1173                 rd_kafka_broker_destroy(rkb);
1174 }
1175 
1176 
1177 
1178 
1179 
1180 void
rd_kafka_toppar_offset_commit_result(rd_kafka_toppar_t * rktp,rd_kafka_resp_err_t err,rd_kafka_topic_partition_list_t * offsets)1181 rd_kafka_toppar_offset_commit_result (rd_kafka_toppar_t *rktp,
1182                                       rd_kafka_resp_err_t err,
1183                                       rd_kafka_topic_partition_list_t *offsets){
1184         if (err)
1185                 rd_kafka_consumer_err(rktp->rktp_fetchq,
1186                                       /* FIXME: propagate broker_id */
1187                                       RD_KAFKA_NODEID_UA,
1188                                       err, 0 /* FIXME:VERSION*/,
1189                                       NULL, rktp, RD_KAFKA_OFFSET_INVALID,
1190                                       "Offset commit failed: %s",
1191                                       rd_kafka_err2str(err));
1192 
1193 	rd_kafka_toppar_lock(rktp);
1194         if (!err)
1195                 rktp->rktp_committed_offset = offsets->elems[0].offset;
1196 
1197 	/* When stopping toppars:
1198 	 * Final commit is now done (or failed), propagate. */
1199 	if (rktp->rktp_fetch_state == RD_KAFKA_TOPPAR_FETCH_STOPPING)
1200 		rd_kafka_toppar_fetch_stopped(rktp, err);
1201 
1202 	rd_kafka_toppar_unlock(rktp);
1203 }
1204 
1205 
1206 
1207 
1208 
1209 
1210 
1211 
1212 
1213 
1214 
1215 /**
1216  * Handle the next offset to consume for a toppar.
1217  * This is used during initial setup when trying to figure out what
1218  * offset to start consuming from.
1219  *
1220  * Locality: toppar handler thread.
1221  * Locks: toppar_lock(rktp) must be held
1222  */
rd_kafka_toppar_next_offset_handle(rd_kafka_toppar_t * rktp,int64_t Offset)1223 void rd_kafka_toppar_next_offset_handle (rd_kafka_toppar_t *rktp,
1224                                          int64_t Offset) {
1225 
1226         if (RD_KAFKA_OFFSET_IS_LOGICAL(Offset)) {
1227                 /* Offset storage returned logical offset (e.g. "end"),
1228                  * look it up. */
1229 
1230                 /* Save next offset, even if logical, so that e.g.,
1231                  * assign(BEGINNING) survives a pause+resume, etc.
1232                  * See issue #2105. */
1233                 rktp->rktp_next_offset = Offset;
1234 
1235                 rd_kafka_offset_reset(rktp, Offset, RD_KAFKA_RESP_ERR_NO_ERROR,
1236                                       "update");
1237                 return;
1238         }
1239 
1240         /* Adjust by TAIL count if, if wanted */
1241         if (rktp->rktp_query_offset <=
1242             RD_KAFKA_OFFSET_TAIL_BASE) {
1243                 int64_t orig_Offset = Offset;
1244                 int64_t tail_cnt =
1245                         llabs(rktp->rktp_query_offset -
1246                               RD_KAFKA_OFFSET_TAIL_BASE);
1247 
1248                 if (tail_cnt > Offset)
1249                         Offset = 0;
1250                 else
1251                         Offset -= tail_cnt;
1252 
1253                 rd_kafka_dbg(rktp->rktp_rkt->rkt_rk, TOPIC, "OFFSET",
1254                              "OffsetReply for topic %s [%"PRId32"]: "
1255                              "offset %"PRId64": adjusting for "
1256                              "OFFSET_TAIL(%"PRId64"): "
1257                              "effective offset %"PRId64,
1258                              rktp->rktp_rkt->rkt_topic->str,
1259                              rktp->rktp_partition,
1260                              orig_Offset, tail_cnt,
1261                              Offset);
1262         }
1263 
1264         rktp->rktp_next_offset = Offset;
1265 
1266         rd_kafka_toppar_set_fetch_state(rktp, RD_KAFKA_TOPPAR_FETCH_ACTIVE);
1267 
1268         /* Wake-up broker thread which might be idling on IO */
1269         if (rktp->rktp_broker)
1270                 rd_kafka_broker_wakeup(rktp->rktp_broker);
1271 
1272 }
1273 
1274 
1275 
1276 /**
1277  * Fetch committed offset for a single partition. (simple consumer)
1278  *
1279  * Locality: toppar thread
1280  */
rd_kafka_toppar_offset_fetch(rd_kafka_toppar_t * rktp,rd_kafka_replyq_t replyq)1281 void rd_kafka_toppar_offset_fetch (rd_kafka_toppar_t *rktp,
1282                                    rd_kafka_replyq_t replyq) {
1283         rd_kafka_t *rk = rktp->rktp_rkt->rkt_rk;
1284         rd_kafka_topic_partition_list_t *part;
1285         rd_kafka_op_t *rko;
1286 
1287         rd_kafka_dbg(rk, TOPIC, "OFFSETREQ",
1288                      "Partition %.*s [%"PRId32"]: querying cgrp for "
1289                      "committed offset (opv %d)",
1290                      RD_KAFKAP_STR_PR(rktp->rktp_rkt->rkt_topic),
1291                      rktp->rktp_partition, replyq.version);
1292 
1293         part = rd_kafka_topic_partition_list_new(1);
1294         rd_kafka_topic_partition_list_add0(__FUNCTION__,__LINE__,part,
1295                                            rktp->rktp_rkt->rkt_topic->str,
1296                                            rktp->rktp_partition,
1297 					   rktp);
1298 
1299         rko = rd_kafka_op_new(RD_KAFKA_OP_OFFSET_FETCH);
1300 	rko->rko_rktp = rd_kafka_toppar_keep(rktp);
1301 	rko->rko_replyq = replyq;
1302 
1303 	rko->rko_u.offset_fetch.partitions = part;
1304         rko->rko_u.offset_fetch.require_stable =
1305                 rk->rk_conf.isolation_level == RD_KAFKA_READ_COMMITTED;
1306 	rko->rko_u.offset_fetch.do_free = 1;
1307 
1308         rd_kafka_q_enq(rktp->rktp_cgrp->rkcg_ops, rko);
1309 }
1310 
1311 
1312 
1313 
1314 /**
1315  * Toppar based OffsetResponse handling.
1316  * This is used for finding the next offset to Fetch.
1317  *
1318  * Locality: toppar handler thread
1319  */
rd_kafka_toppar_handle_Offset(rd_kafka_t * rk,rd_kafka_broker_t * rkb,rd_kafka_resp_err_t err,rd_kafka_buf_t * rkbuf,rd_kafka_buf_t * request,void * opaque)1320 static void rd_kafka_toppar_handle_Offset (rd_kafka_t *rk,
1321 					   rd_kafka_broker_t *rkb,
1322 					   rd_kafka_resp_err_t err,
1323 					   rd_kafka_buf_t *rkbuf,
1324 					   rd_kafka_buf_t *request,
1325 					   void *opaque) {
1326         rd_kafka_toppar_t *rktp = opaque;
1327         rd_kafka_topic_partition_list_t *offsets;
1328         rd_kafka_topic_partition_t *rktpar;
1329         int64_t Offset;
1330         int actions = 0;
1331 
1332 	rd_kafka_toppar_lock(rktp);
1333 	/* Drop reply from previous partition leader */
1334 	if (err != RD_KAFKA_RESP_ERR__DESTROY && rktp->rktp_broker != rkb)
1335 		err = RD_KAFKA_RESP_ERR__OUTDATED;
1336 	rd_kafka_toppar_unlock(rktp);
1337 
1338         offsets = rd_kafka_topic_partition_list_new(1);
1339 
1340 	rd_rkb_dbg(rkb, TOPIC, "OFFSET",
1341 		   "Offset reply for "
1342 		   "topic %.*s [%"PRId32"] (v%d vs v%d)",
1343 		   RD_KAFKAP_STR_PR(rktp->rktp_rkt->rkt_topic),
1344 		   rktp->rktp_partition, request->rkbuf_replyq.version,
1345 		   rktp->rktp_op_version);
1346 
1347 	rd_dassert(request->rkbuf_replyq.version > 0);
1348 	if (err != RD_KAFKA_RESP_ERR__DESTROY &&
1349             rd_kafka_buf_version_outdated(request, rktp->rktp_op_version)) {
1350 		/* Outdated request response, ignore. */
1351 		    err = RD_KAFKA_RESP_ERR__OUTDATED;
1352 	}
1353 
1354         /* Parse and return Offset */
1355         if (err != RD_KAFKA_RESP_ERR__OUTDATED)
1356                 err = rd_kafka_handle_ListOffsets(rk, rkb, err,
1357                                                   rkbuf, request, offsets,
1358                                                   &actions);
1359 
1360         if (!err &&
1361             !(rktpar = rd_kafka_topic_partition_list_find(
1362                       offsets,
1363                       rktp->rktp_rkt->rkt_topic->str,
1364                       rktp->rktp_partition))) {
1365                 /* Request partition not found in response */
1366                 err = RD_KAFKA_RESP_ERR__UNKNOWN_PARTITION;
1367                 actions |= RD_KAFKA_ERR_ACTION_PERMANENT;
1368         }
1369 
1370         if (err) {
1371                 rd_rkb_dbg(rkb, TOPIC, "OFFSET",
1372                            "Offset reply error for "
1373                            "topic %.*s [%"PRId32"] (v%d, %s): %s",
1374                            RD_KAFKAP_STR_PR(rktp->rktp_rkt->rkt_topic),
1375                            rktp->rktp_partition, request->rkbuf_replyq.version,
1376 			   rd_kafka_err2str(err),
1377                            rd_kafka_actions2str(actions));
1378 
1379                 rd_kafka_topic_partition_list_destroy(offsets);
1380 
1381                 if (err == RD_KAFKA_RESP_ERR__DESTROY ||
1382                     err == RD_KAFKA_RESP_ERR__OUTDATED) {
1383                         /* Termination or outdated, quick cleanup. */
1384 
1385                         if (err == RD_KAFKA_RESP_ERR__OUTDATED) {
1386                                 rd_kafka_toppar_lock(rktp);
1387                                 rd_kafka_toppar_offset_retry(
1388                                         rktp, 500, "outdated offset response");
1389                                 rd_kafka_toppar_unlock(rktp);
1390                         }
1391 
1392                         /* from request.opaque */
1393                         rd_kafka_toppar_destroy(rktp);
1394                         return;
1395 
1396 		} else if (err == RD_KAFKA_RESP_ERR__IN_PROGRESS)
1397 			return; /* Retry in progress */
1398 
1399 
1400                 rd_kafka_toppar_lock(rktp);
1401 
1402                 if (!(actions & (RD_KAFKA_ERR_ACTION_RETRY|
1403                                  RD_KAFKA_ERR_ACTION_REFRESH))) {
1404                         /* Permanent error. Trigger auto.offset.reset policy
1405                          * and signal error back to application. */
1406 
1407                         rd_kafka_offset_reset(rktp, rktp->rktp_query_offset,
1408                                               err,
1409                                               "failed to query logical offset");
1410 
1411                         rd_kafka_consumer_err(
1412                                 rktp->rktp_fetchq, rkb->rkb_nodeid,
1413                                 err, 0, NULL, rktp,
1414                                 (rktp->rktp_query_offset <=
1415                                  RD_KAFKA_OFFSET_TAIL_BASE ?
1416                                  rktp->rktp_query_offset -
1417                                  RD_KAFKA_OFFSET_TAIL_BASE :
1418                                  rktp->rktp_query_offset),
1419                                 "Failed to query logical offset %s: %s",
1420                                 rd_kafka_offset2str(rktp->rktp_query_offset),
1421                                 rd_kafka_err2str(err));
1422 
1423                 } else {
1424                         /* Temporary error. Schedule retry. */
1425                         char tmp[256];
1426 
1427                         rd_snprintf(tmp, sizeof(tmp),
1428                                     "failed to query logical offset %s: %s",
1429                                     rd_kafka_offset2str(
1430                                             rktp->rktp_query_offset),
1431                                     rd_kafka_err2str(err));
1432 
1433                         rd_kafka_toppar_offset_retry(rktp, 500, tmp);
1434                 }
1435 
1436                 rd_kafka_toppar_unlock(rktp);
1437 
1438                 rd_kafka_toppar_destroy(rktp); /* from request.opaque */
1439                 return;
1440         }
1441 
1442         Offset = rktpar->offset;
1443         rd_kafka_topic_partition_list_destroy(offsets);
1444 
1445 	rd_kafka_toppar_lock(rktp);
1446         rd_kafka_dbg(rktp->rktp_rkt->rkt_rk, TOPIC, "OFFSET",
1447                      "Offset %s request for %.*s [%"PRId32"] "
1448                      "returned offset %s (%"PRId64")",
1449                      rd_kafka_offset2str(rktp->rktp_query_offset),
1450                      RD_KAFKAP_STR_PR(rktp->rktp_rkt->rkt_topic),
1451                      rktp->rktp_partition, rd_kafka_offset2str(Offset), Offset);
1452 
1453         rd_kafka_toppar_next_offset_handle(rktp, Offset);
1454 	rd_kafka_toppar_unlock(rktp);
1455 
1456         rd_kafka_toppar_destroy(rktp); /* from request.opaque */
1457 }
1458 
1459 
1460 /**
1461  * @brief An Offset fetch failed (for whatever reason) in
1462  *        the RD_KAFKA_TOPPAR_FETCH_OFFSET_WAIT state:
1463  *        set the state back to FETCH_OFFSET_QUERY and start the
1464  *        offset_query_tmr to trigger a new request eventually.
1465  *
1466  * @locality toppar handler thread
1467  * @locks toppar_lock() MUST be held
1468  */
rd_kafka_toppar_offset_retry(rd_kafka_toppar_t * rktp,int backoff_ms,const char * reason)1469 static void rd_kafka_toppar_offset_retry (rd_kafka_toppar_t *rktp,
1470                                           int backoff_ms,
1471                                           const char *reason) {
1472         rd_ts_t tmr_next;
1473         int restart_tmr;
1474 
1475         /* (Re)start timer if not started or the current timeout
1476          * is larger than \p backoff_ms. */
1477         tmr_next = rd_kafka_timer_next(&rktp->rktp_rkt->rkt_rk->rk_timers,
1478                                        &rktp->rktp_offset_query_tmr, 1);
1479 
1480         restart_tmr = (tmr_next == -1 ||
1481                        tmr_next > rd_clock() + (backoff_ms * 1000ll));
1482 
1483         rd_kafka_dbg(rktp->rktp_rkt->rkt_rk, TOPIC, "OFFSET",
1484                      "%s [%"PRId32"]: %s: %s for offset %s",
1485                      rktp->rktp_rkt->rkt_topic->str,
1486                      rktp->rktp_partition,
1487                      reason,
1488                      restart_tmr ?
1489                      "(re)starting offset query timer" :
1490                      "offset query timer already scheduled",
1491                      rd_kafka_offset2str(rktp->rktp_query_offset));
1492 
1493         rd_kafka_toppar_set_fetch_state(rktp,
1494                                         RD_KAFKA_TOPPAR_FETCH_OFFSET_QUERY);
1495 
1496         if (restart_tmr)
1497                 rd_kafka_timer_start(&rktp->rktp_rkt->rkt_rk->rk_timers,
1498                                      &rktp->rktp_offset_query_tmr,
1499                                      backoff_ms*1000ll,
1500                                      rd_kafka_offset_query_tmr_cb, rktp);
1501 }
1502 
1503 
1504 
1505 /**
1506  * Send OffsetRequest for toppar.
1507  *
1508  * If \p backoff_ms is non-zero only the query timer is started,
1509  * otherwise a query is triggered directly.
1510  *
1511  * Locality: toppar handler thread
1512  * Locks: toppar_lock() must be held
1513  */
rd_kafka_toppar_offset_request(rd_kafka_toppar_t * rktp,int64_t query_offset,int backoff_ms)1514 void rd_kafka_toppar_offset_request (rd_kafka_toppar_t *rktp,
1515 				     int64_t query_offset, int backoff_ms) {
1516 	rd_kafka_broker_t *rkb;
1517 
1518 	rd_kafka_assert(NULL,
1519 			thrd_is_current(rktp->rktp_rkt->rkt_rk->rk_thread));
1520 
1521         rkb = rktp->rktp_broker;
1522 
1523         if (!backoff_ms && (!rkb || rkb->rkb_source == RD_KAFKA_INTERNAL))
1524                 backoff_ms = 500;
1525 
1526         if (backoff_ms) {
1527                 rd_kafka_toppar_offset_retry(rktp, backoff_ms,
1528                                              !rkb ?
1529                                              "no current leader for partition":
1530                                              "backoff");
1531                 return;
1532         }
1533 
1534 
1535         rd_kafka_timer_stop(&rktp->rktp_rkt->rkt_rk->rk_timers,
1536                             &rktp->rktp_offset_query_tmr, 1/*lock*/);
1537 
1538 
1539 	if (query_offset == RD_KAFKA_OFFSET_STORED &&
1540             rktp->rktp_rkt->rkt_conf.offset_store_method ==
1541             RD_KAFKA_OFFSET_METHOD_BROKER) {
1542                 /*
1543                  * Get stored offset from broker based storage:
1544                  * ask cgrp manager for offsets
1545                  */
1546                 rd_kafka_toppar_offset_fetch(
1547 			rktp,
1548 			RD_KAFKA_REPLYQ(rktp->rktp_ops,
1549 					rktp->rktp_op_version));
1550 
1551 	} else {
1552                 rd_kafka_topic_partition_list_t *offsets;
1553 
1554                 /*
1555                  * Look up logical offset (end,beginning,tail,..)
1556                  */
1557 
1558                 rd_rkb_dbg(rkb, TOPIC, "OFFREQ",
1559                            "Partition %.*s [%"PRId32"]: querying for logical "
1560                            "offset %s (opv %d)",
1561                            RD_KAFKAP_STR_PR(rktp->rktp_rkt->rkt_topic),
1562                            rktp->rktp_partition,
1563                            rd_kafka_offset2str(query_offset),
1564 			   rktp->rktp_op_version);
1565 
1566                 rd_kafka_toppar_keep(rktp); /* refcnt for OffsetRequest opaque*/
1567 
1568 		if (query_offset <= RD_KAFKA_OFFSET_TAIL_BASE)
1569 			query_offset = RD_KAFKA_OFFSET_END;
1570 
1571                 offsets = rd_kafka_topic_partition_list_new(1);
1572                 rd_kafka_topic_partition_list_add(
1573                         offsets,
1574                         rktp->rktp_rkt->rkt_topic->str,
1575                         rktp->rktp_partition)->offset = query_offset;
1576 
1577                 rd_kafka_ListOffsetsRequest(
1578                         rkb, offsets,
1579                         RD_KAFKA_REPLYQ(rktp->rktp_ops,
1580                                         rktp->rktp_op_version),
1581                         rd_kafka_toppar_handle_Offset,
1582                         rktp);
1583 
1584                 rd_kafka_topic_partition_list_destroy(offsets);
1585         }
1586 
1587         rd_kafka_toppar_set_fetch_state(rktp,
1588 					RD_KAFKA_TOPPAR_FETCH_OFFSET_WAIT);
1589 }
1590 
1591 
1592 /**
1593  * Start fetching toppar.
1594  *
1595  * Locality: toppar handler thread
1596  * Locks: none
1597  */
rd_kafka_toppar_fetch_start(rd_kafka_toppar_t * rktp,int64_t offset,rd_kafka_op_t * rko_orig)1598 static void rd_kafka_toppar_fetch_start (rd_kafka_toppar_t *rktp,
1599 					 int64_t offset,
1600 					 rd_kafka_op_t *rko_orig) {
1601         rd_kafka_cgrp_t *rkcg = rko_orig->rko_u.fetch_start.rkcg;
1602         rd_kafka_resp_err_t err = 0;
1603         int32_t version = rko_orig->rko_version;
1604 
1605 	rd_kafka_toppar_lock(rktp);
1606 
1607         rd_kafka_dbg(rktp->rktp_rkt->rkt_rk, TOPIC, "FETCH",
1608                      "Start fetch for %.*s [%"PRId32"] in "
1609                      "state %s at offset %s (v%"PRId32")",
1610                      RD_KAFKAP_STR_PR(rktp->rktp_rkt->rkt_topic),
1611                      rktp->rktp_partition,
1612                      rd_kafka_fetch_states[rktp->rktp_fetch_state],
1613                      rd_kafka_offset2str(offset), version);
1614 
1615         if (rktp->rktp_fetch_state == RD_KAFKA_TOPPAR_FETCH_STOPPING) {
1616                 err = RD_KAFKA_RESP_ERR__PREV_IN_PROGRESS;
1617 		rd_kafka_toppar_unlock(rktp);
1618                 goto err_reply;
1619         }
1620 
1621         rd_kafka_toppar_op_version_bump(rktp, version);
1622 
1623         if (rkcg) {
1624                 rd_kafka_assert(rktp->rktp_rkt->rkt_rk, !rktp->rktp_cgrp);
1625                 /* Attach toppar to cgrp */
1626                 rktp->rktp_cgrp = rkcg;
1627                 rd_kafka_cgrp_op(rkcg, rktp, RD_KAFKA_NO_REPLYQ,
1628                                  RD_KAFKA_OP_PARTITION_JOIN, 0);
1629         }
1630 
1631 
1632         if (offset == RD_KAFKA_OFFSET_BEGINNING ||
1633 	    offset == RD_KAFKA_OFFSET_END ||
1634             offset <= RD_KAFKA_OFFSET_TAIL_BASE) {
1635 		rd_kafka_toppar_next_offset_handle(rktp, offset);
1636 
1637 	} else if (offset == RD_KAFKA_OFFSET_STORED) {
1638                 rd_kafka_offset_store_init(rktp);
1639 
1640 	} else if (offset == RD_KAFKA_OFFSET_INVALID) {
1641 		rd_kafka_offset_reset(rktp, offset,
1642 				      RD_KAFKA_RESP_ERR__NO_OFFSET,
1643 				      "no previously committed offset "
1644 				      "available");
1645 
1646 	} else {
1647 		rktp->rktp_next_offset = offset;
1648                 rd_kafka_toppar_set_fetch_state(rktp,
1649 						RD_KAFKA_TOPPAR_FETCH_ACTIVE);
1650 
1651                 /* Wake-up broker thread which might be idling on IO */
1652                 if (rktp->rktp_broker)
1653                         rd_kafka_broker_wakeup(rktp->rktp_broker);
1654 
1655 	}
1656 
1657         rktp->rktp_offsets_fin.eof_offset = RD_KAFKA_OFFSET_INVALID;
1658 
1659 	rd_kafka_toppar_unlock(rktp);
1660 
1661         /* Signal back to caller thread that start has commenced, or err */
1662 err_reply:
1663         if (rko_orig->rko_replyq.q) {
1664                 rd_kafka_op_t *rko;
1665 
1666                 rko = rd_kafka_op_new(RD_KAFKA_OP_FETCH_START);
1667 
1668                 rko->rko_err = err;
1669                 rko->rko_rktp = rd_kafka_toppar_keep(rktp);
1670 
1671                 rd_kafka_replyq_enq(&rko_orig->rko_replyq, rko, 0);
1672         }
1673 }
1674 
1675 
1676 
1677 
1678 /**
1679  * Mark toppar's fetch state as stopped (all decommissioning is done,
1680  * offsets are stored, etc).
1681  *
1682  * Locality: toppar handler thread
1683  * Locks: toppar_lock(rktp) MUST be held
1684  */
rd_kafka_toppar_fetch_stopped(rd_kafka_toppar_t * rktp,rd_kafka_resp_err_t err)1685 void rd_kafka_toppar_fetch_stopped (rd_kafka_toppar_t *rktp,
1686                                     rd_kafka_resp_err_t err) {
1687 
1688 
1689         rd_kafka_toppar_set_fetch_state(rktp, RD_KAFKA_TOPPAR_FETCH_STOPPED);
1690 
1691         rktp->rktp_app_offset = RD_KAFKA_OFFSET_INVALID;
1692 
1693         if (rktp->rktp_cgrp) {
1694                 /* Detach toppar from cgrp */
1695                 rd_kafka_cgrp_op(rktp->rktp_cgrp, rktp, RD_KAFKA_NO_REPLYQ,
1696                                  RD_KAFKA_OP_PARTITION_LEAVE, 0);
1697                 rktp->rktp_cgrp = NULL;
1698         }
1699 
1700         /* Signal back to application thread that stop is done. */
1701 	if (rktp->rktp_replyq.q) {
1702 		rd_kafka_op_t *rko;
1703 		rko = rd_kafka_op_new(RD_KAFKA_OP_FETCH_STOP|RD_KAFKA_OP_REPLY);
1704                 rko->rko_err = err;
1705 		rko->rko_rktp = rd_kafka_toppar_keep(rktp);
1706 
1707 		rd_kafka_replyq_enq(&rktp->rktp_replyq, rko, 0);
1708 	}
1709 }
1710 
1711 
1712 /**
1713  * Stop toppar fetcher.
1714  * This is usually an async operation.
1715  *
1716  * Locality: toppar handler thread
1717  */
rd_kafka_toppar_fetch_stop(rd_kafka_toppar_t * rktp,rd_kafka_op_t * rko_orig)1718 void rd_kafka_toppar_fetch_stop (rd_kafka_toppar_t *rktp,
1719 				 rd_kafka_op_t *rko_orig) {
1720         int32_t version = rko_orig->rko_version;
1721 
1722 	rd_kafka_toppar_lock(rktp);
1723 
1724         rd_kafka_dbg(rktp->rktp_rkt->rkt_rk, TOPIC, "FETCH",
1725                      "Stopping fetch for %.*s [%"PRId32"] in state %s (v%d)",
1726                      RD_KAFKAP_STR_PR(rktp->rktp_rkt->rkt_topic),
1727                      rktp->rktp_partition,
1728                      rd_kafka_fetch_states[rktp->rktp_fetch_state], version);
1729 
1730         rd_kafka_toppar_op_version_bump(rktp, version);
1731 
1732 	/* Abort pending offset lookups. */
1733 	if (rktp->rktp_fetch_state == RD_KAFKA_TOPPAR_FETCH_OFFSET_QUERY)
1734 		rd_kafka_timer_stop(&rktp->rktp_rkt->rkt_rk->rk_timers,
1735 				    &rktp->rktp_offset_query_tmr,
1736 				    1/*lock*/);
1737 
1738         /* Clear out the forwarding queue. */
1739         rd_kafka_q_fwd_set(rktp->rktp_fetchq, NULL);
1740 
1741         /* Assign the future replyq to propagate stop results. */
1742         rd_kafka_assert(rktp->rktp_rkt->rkt_rk, rktp->rktp_replyq.q == NULL);
1743         rktp->rktp_replyq = rko_orig->rko_replyq;
1744         rd_kafka_replyq_clear(&rko_orig->rko_replyq);
1745 
1746         rd_kafka_toppar_set_fetch_state(rktp, RD_KAFKA_TOPPAR_FETCH_STOPPING);
1747 
1748         /* Stop offset store (possibly async).
1749          * NOTE: will call .._stopped() if store finishes immediately,
1750          *       so no more operations after this call! */
1751         rd_kafka_offset_store_stop(rktp);
1752 
1753 	rd_kafka_toppar_unlock(rktp);
1754 }
1755 
1756 
1757 /**
1758  * Update a toppars offset.
1759  * The toppar must have been previously FETCH_START:ed
1760  *
1761  * Locality: toppar handler thread
1762  */
rd_kafka_toppar_seek(rd_kafka_toppar_t * rktp,int64_t offset,rd_kafka_op_t * rko_orig)1763 void rd_kafka_toppar_seek (rd_kafka_toppar_t *rktp,
1764 			   int64_t offset, rd_kafka_op_t *rko_orig) {
1765         rd_kafka_resp_err_t err = 0;
1766         int32_t version = rko_orig->rko_version;
1767 
1768 	rd_kafka_toppar_lock(rktp);
1769 
1770         rd_kafka_dbg(rktp->rktp_rkt->rkt_rk, TOPIC, "FETCH",
1771                      "Seek %.*s [%"PRId32"] to offset %s "
1772                      "in state %s (v%"PRId32")",
1773                      RD_KAFKAP_STR_PR(rktp->rktp_rkt->rkt_topic),
1774                      rktp->rktp_partition,
1775 		     rd_kafka_offset2str(offset),
1776                      rd_kafka_fetch_states[rktp->rktp_fetch_state], version);
1777 
1778 
1779         if (rktp->rktp_fetch_state == RD_KAFKA_TOPPAR_FETCH_STOPPING) {
1780                 err = RD_KAFKA_RESP_ERR__PREV_IN_PROGRESS;
1781                 goto err_reply;
1782         } else if (!RD_KAFKA_TOPPAR_FETCH_IS_STARTED(rktp->rktp_fetch_state)) {
1783                 err = RD_KAFKA_RESP_ERR__STATE;
1784                 goto err_reply;
1785         } else if (offset == RD_KAFKA_OFFSET_STORED) {
1786 		err = RD_KAFKA_RESP_ERR__INVALID_ARG;
1787 		goto err_reply;
1788 	}
1789 
1790         rd_kafka_toppar_op_version_bump(rktp, version);
1791 
1792 	/* Abort pending offset lookups. */
1793 	if (rktp->rktp_fetch_state == RD_KAFKA_TOPPAR_FETCH_OFFSET_QUERY)
1794 		rd_kafka_timer_stop(&rktp->rktp_rkt->rkt_rk->rk_timers,
1795 				    &rktp->rktp_offset_query_tmr,
1796 				    1/*lock*/);
1797 
1798 	if (RD_KAFKA_OFFSET_IS_LOGICAL(offset))
1799 		rd_kafka_toppar_next_offset_handle(rktp, offset);
1800 	else {
1801 		rktp->rktp_next_offset = offset;
1802                 rd_kafka_toppar_set_fetch_state(rktp,
1803 						RD_KAFKA_TOPPAR_FETCH_ACTIVE);
1804 
1805                 /* Wake-up broker thread which might be idling on IO */
1806                 if (rktp->rktp_broker)
1807                         rd_kafka_broker_wakeup(rktp->rktp_broker);
1808 	}
1809 
1810         /* Signal back to caller thread that seek has commenced, or err */
1811 err_reply:
1812 	rd_kafka_toppar_unlock(rktp);
1813 
1814         if (rko_orig->rko_replyq.q) {
1815                 rd_kafka_op_t *rko;
1816 
1817                 rko = rd_kafka_op_new(RD_KAFKA_OP_SEEK|RD_KAFKA_OP_REPLY);
1818 
1819                 rko->rko_err = err;
1820 		rko->rko_u.fetch_start.offset =
1821 			rko_orig->rko_u.fetch_start.offset;
1822                 rko->rko_rktp = rd_kafka_toppar_keep(rktp);
1823 
1824                 rd_kafka_replyq_enq(&rko_orig->rko_replyq, rko, 0);
1825         }
1826 }
1827 
1828 
1829 /**
1830  * @brief Pause/resume toppar.
1831  *
1832  * This is the internal handler of the pause/resume op.
1833  *
1834  * @locality toppar's handler thread
1835  */
rd_kafka_toppar_pause_resume(rd_kafka_toppar_t * rktp,rd_kafka_op_t * rko_orig)1836 static void rd_kafka_toppar_pause_resume (rd_kafka_toppar_t *rktp,
1837                                           rd_kafka_op_t *rko_orig) {
1838 	rd_kafka_t *rk = rktp->rktp_rkt->rkt_rk;
1839 	int pause = rko_orig->rko_u.pause.pause;
1840 	int flag = rko_orig->rko_u.pause.flag;
1841         int32_t version = rko_orig->rko_version;
1842 
1843 	rd_kafka_toppar_lock(rktp);
1844 
1845         rd_kafka_toppar_op_version_bump(rktp, version);
1846 
1847         if (!pause && (rktp->rktp_flags & flag) != flag) {
1848                 rd_kafka_dbg(rk, TOPIC, "RESUME",
1849                              "Not resuming %s [%"PRId32"]: "
1850                              "partition is not paused by %s",
1851                              rktp->rktp_rkt->rkt_topic->str,
1852                              rktp->rktp_partition,
1853                              (flag & RD_KAFKA_TOPPAR_F_APP_PAUSE ?
1854                               "application" : "library"));
1855                 rd_kafka_toppar_unlock(rktp);
1856                 return;
1857         }
1858 
1859 	if (pause) {
1860                 /* Pause partition by setting either
1861                  * RD_KAFKA_TOPPAR_F_APP_PAUSE or
1862                  * RD_KAFKA_TOPPAR_F_LIB_PAUSE */
1863 		rktp->rktp_flags |= flag;
1864 
1865 		if (rk->rk_type == RD_KAFKA_CONSUMER) {
1866 			/* Save offset of last consumed message+1 as the
1867 			 * next message to fetch on resume. */
1868 			if (rktp->rktp_app_offset != RD_KAFKA_OFFSET_INVALID) {
1869 				rktp->rktp_next_offset = rktp->rktp_app_offset;
1870 			}
1871 
1872 			rd_kafka_dbg(rk, TOPIC, pause?"PAUSE":"RESUME",
1873 				     "%s %s [%"PRId32"]: at offset %s "
1874 				     "(state %s, v%d)",
1875 				     pause ? "Pause":"Resume",
1876 				     rktp->rktp_rkt->rkt_topic->str,
1877 				     rktp->rktp_partition,
1878 				     rd_kafka_offset2str(
1879 					     rktp->rktp_next_offset),
1880 				     rd_kafka_fetch_states[rktp->
1881 							   rktp_fetch_state],
1882 				     version);
1883 		} else {
1884 			rd_kafka_dbg(rk, TOPIC, pause?"PAUSE":"RESUME",
1885 				     "%s %s [%"PRId32"] (state %s, v%d)",
1886 				     pause ? "Pause":"Resume",
1887 				     rktp->rktp_rkt->rkt_topic->str,
1888 				     rktp->rktp_partition,
1889 				     rd_kafka_fetch_states[rktp->
1890 							   rktp_fetch_state],
1891 				     version);
1892 			}
1893 
1894 	} else {
1895                 /* Unset the RD_KAFKA_TOPPAR_F_APP_PAUSE or
1896                  * RD_KAFKA_TOPPAR_F_LIB_PAUSE flag */
1897 		rktp->rktp_flags &= ~flag;
1898 
1899 		if (rk->rk_type == RD_KAFKA_CONSUMER) {
1900 			rd_kafka_dbg(rk, TOPIC, pause?"PAUSE":"RESUME",
1901 				     "%s %s [%"PRId32"]: at offset %s "
1902 				     "(state %s, v%d)",
1903 				     rktp->rktp_fetch_state ==
1904 				     RD_KAFKA_TOPPAR_FETCH_ACTIVE ?
1905 				     "Resuming" : "Not resuming stopped",
1906 				     rktp->rktp_rkt->rkt_topic->str,
1907 				     rktp->rktp_partition,
1908 				     rd_kafka_offset2str(
1909 					     rktp->rktp_next_offset),
1910 				     rd_kafka_fetch_states[rktp->
1911 							   rktp_fetch_state],
1912 				     version);
1913 
1914 			/* If the resuming offset is logical we
1915 			 * need to trigger a seek (that performs the
1916 			 * logical->absolute lookup logic) to get
1917 			 * things going.
1918 			 * Typical case is when a partition is paused
1919 			 * before anything has been consumed by app
1920 			 * yet thus having rktp_app_offset=INVALID. */
1921                         if (!RD_KAFKA_TOPPAR_IS_PAUSED(rktp) &&
1922                             (rktp->rktp_fetch_state ==
1923                              RD_KAFKA_TOPPAR_FETCH_ACTIVE ||
1924                              rktp->rktp_fetch_state ==
1925                              RD_KAFKA_TOPPAR_FETCH_OFFSET_WAIT) &&
1926                             rktp->rktp_next_offset == RD_KAFKA_OFFSET_INVALID)
1927                         	rd_kafka_toppar_next_offset_handle(
1928                         		rktp, rktp->rktp_next_offset);
1929 
1930 		} else
1931 			rd_kafka_dbg(rk, TOPIC, pause?"PAUSE":"RESUME",
1932 				     "%s %s [%"PRId32"] (state %s, v%d)",
1933 				     pause ? "Pause":"Resume",
1934 				     rktp->rktp_rkt->rkt_topic->str,
1935 				     rktp->rktp_partition,
1936 				     rd_kafka_fetch_states[rktp->
1937 							   rktp_fetch_state],
1938 				     version);
1939 	}
1940 	rd_kafka_toppar_unlock(rktp);
1941 
1942 	if (pause && rk->rk_type == RD_KAFKA_CONSUMER) {
1943 		/* Flush partition's fetch queue */
1944 		rd_kafka_q_purge_toppar_version(rktp->rktp_fetchq, rktp,
1945 						rko_orig->rko_version);
1946 	}
1947 }
1948 
1949 
1950 
1951 
1952 /**
1953  * @brief Decide whether this toppar should be on the fetch list or not.
1954  *
1955  * Also:
1956  *  - update toppar's op version (for broker thread's copy)
1957  *  - finalize statistics (move rktp_offsets to rktp_offsets_fin)
1958  *
1959  * @returns the partition's Fetch backoff timestamp, or 0 if no backoff.
1960  *
1961  * @locality broker thread
1962  * @locks none
1963  */
rd_kafka_toppar_fetch_decide(rd_kafka_toppar_t * rktp,rd_kafka_broker_t * rkb,int force_remove)1964 rd_ts_t rd_kafka_toppar_fetch_decide (rd_kafka_toppar_t *rktp,
1965                                       rd_kafka_broker_t *rkb,
1966                                       int force_remove) {
1967         int should_fetch = 1;
1968         const char *reason = "";
1969         int32_t version;
1970         rd_ts_t ts_backoff = 0;
1971         rd_bool_t lease_expired = rd_false;
1972 
1973         rd_kafka_toppar_lock(rktp);
1974 
1975         /* Check for preferred replica lease expiry */
1976         lease_expired =
1977                 rktp->rktp_leader_id != rktp->rktp_broker_id &&
1978                 rd_interval(&rktp->rktp_lease_intvl,
1979                             5*60*1000*1000/*5 minutes*/, 0) > 0;
1980         if (lease_expired) {
1981                 /* delete_to_leader() requires no locks to be held */
1982                 rd_kafka_toppar_unlock(rktp);
1983                 rd_kafka_toppar_delegate_to_leader(rktp);
1984                 rd_kafka_toppar_lock(rktp);
1985 
1986                 reason = "preferred replica lease expired";
1987                 should_fetch = 0;
1988                 goto done;
1989         }
1990 
1991 	/* Forced removal from fetch list */
1992 	if (unlikely(force_remove)) {
1993 		reason = "forced removal";
1994 		should_fetch = 0;
1995 		goto done;
1996 	}
1997 
1998 	if (unlikely((rktp->rktp_flags & RD_KAFKA_TOPPAR_F_REMOVE) != 0)) {
1999 		reason = "partition removed";
2000 		should_fetch = 0;
2001 		goto done;
2002 	}
2003 
2004 	/* Skip toppars not in active fetch state */
2005 	if (rktp->rktp_fetch_state != RD_KAFKA_TOPPAR_FETCH_ACTIVE) {
2006                 reason = "not in active fetch state";
2007 		should_fetch = 0;
2008 		goto done;
2009 	}
2010 
2011         /* Update broker thread's fetch op version */
2012         version = rktp->rktp_op_version;
2013         if (version > rktp->rktp_fetch_version ||
2014             rktp->rktp_next_offset != rktp->rktp_last_next_offset ||
2015             rktp->rktp_offsets.fetch_offset == RD_KAFKA_OFFSET_INVALID) {
2016                 /* New version barrier, something was modified from the
2017                  * control plane. Reset and start over.
2018 		 * Alternatively only the next_offset changed but not the
2019 		 * barrier, which is the case when automatically triggering
2020 		 * offset.reset (such as on PARTITION_EOF or
2021                  * OFFSET_OUT_OF_RANGE). */
2022 
2023                 rd_kafka_dbg(rktp->rktp_rkt->rkt_rk, TOPIC, "FETCHDEC",
2024                              "Topic %s [%"PRId32"]: fetch decide: "
2025                              "updating to version %d (was %d) at "
2026                              "offset %"PRId64" (was %"PRId64")",
2027                              rktp->rktp_rkt->rkt_topic->str,
2028                              rktp->rktp_partition,
2029                              version, rktp->rktp_fetch_version,
2030                              rktp->rktp_next_offset,
2031                              rktp->rktp_offsets.fetch_offset);
2032 
2033                 rd_kafka_offset_stats_reset(&rktp->rktp_offsets);
2034 
2035                 /* New start offset */
2036                 rktp->rktp_offsets.fetch_offset = rktp->rktp_next_offset;
2037 		rktp->rktp_last_next_offset = rktp->rktp_next_offset;
2038 
2039                 rktp->rktp_fetch_version = version;
2040 
2041                 /* Clear last error to propagate new fetch
2042                  * errors if encountered. */
2043                 rktp->rktp_last_error = RD_KAFKA_RESP_ERR_NO_ERROR;
2044 
2045                 rd_kafka_q_purge_toppar_version(rktp->rktp_fetchq, rktp,
2046                                                 version);
2047         }
2048 
2049 
2050 	if (RD_KAFKA_TOPPAR_IS_PAUSED(rktp)) {
2051 		should_fetch = 0;
2052 		reason = "paused";
2053 
2054 	} else if (RD_KAFKA_OFFSET_IS_LOGICAL(rktp->rktp_next_offset)) {
2055                 should_fetch = 0;
2056                 reason = "no concrete offset";
2057 
2058         } else if (rd_kafka_q_len(rktp->rktp_fetchq) >=
2059 		   rkb->rkb_rk->rk_conf.queued_min_msgs) {
2060 		/* Skip toppars who's local message queue is already above
2061 		 * the lower threshold. */
2062                 reason = "queued.min.messages exceeded";
2063                 should_fetch = 0;
2064 
2065         } else if ((int64_t)rd_kafka_q_size(rktp->rktp_fetchq) >=
2066             rkb->rkb_rk->rk_conf.queued_max_msg_bytes) {
2067                 reason = "queued.max.messages.kbytes exceeded";
2068                 should_fetch = 0;
2069 
2070         } else if (rktp->rktp_ts_fetch_backoff > rd_clock()) {
2071                 reason = "fetch backed off";
2072                 ts_backoff = rktp->rktp_ts_fetch_backoff;
2073                 should_fetch = 0;
2074         }
2075 
2076  done:
2077         /* Copy offset stats to finalized place holder. */
2078         rktp->rktp_offsets_fin = rktp->rktp_offsets;
2079 
2080         if (rktp->rktp_fetch != should_fetch) {
2081                 rd_rkb_dbg(rkb, FETCH, "FETCH",
2082                            "Topic %s [%"PRId32"] in state %s at offset %s "
2083                            "(%d/%d msgs, %"PRId64"/%d kb queued, "
2084 			   "opv %"PRId32") is %s%s",
2085                            rktp->rktp_rkt->rkt_topic->str,
2086                            rktp->rktp_partition,
2087 			   rd_kafka_fetch_states[rktp->rktp_fetch_state],
2088                            rd_kafka_offset2str(rktp->rktp_next_offset),
2089                            rd_kafka_q_len(rktp->rktp_fetchq),
2090                            rkb->rkb_rk->rk_conf.queued_min_msgs,
2091                            rd_kafka_q_size(rktp->rktp_fetchq) / 1024,
2092                            rkb->rkb_rk->rk_conf.queued_max_msg_kbytes,
2093 			   rktp->rktp_fetch_version,
2094                            should_fetch ? "fetchable" : "not fetchable: ",
2095                            reason);
2096 
2097                 if (should_fetch) {
2098 			rd_dassert(rktp->rktp_fetch_version > 0);
2099                         rd_kafka_broker_active_toppar_add(rkb, rktp,
2100                                                           *reason ? reason :
2101                                                           "fetchable");
2102                 } else {
2103                         rd_kafka_broker_active_toppar_del(rkb, rktp, reason);
2104                 }
2105         }
2106 
2107         rd_kafka_toppar_unlock(rktp);
2108 
2109         /* Non-fetching partitions will have an
2110          * indefinate backoff, unless explicitly specified. */
2111         if (!should_fetch && !ts_backoff)
2112                 ts_backoff = RD_TS_MAX;
2113 
2114         return ts_backoff;
2115 }
2116 
2117 
2118 /**
2119  * @brief Serve a toppar in a consumer broker thread.
2120  *        This is considered the fast path and should be minimal,
2121  *        mostly focusing on fetch related mechanisms.
2122  *
2123  * @returns the partition's Fetch backoff timestamp, or 0 if no backoff.
2124  *
2125  * @locality broker thread
2126  * @locks none
2127  */
rd_kafka_broker_consumer_toppar_serve(rd_kafka_broker_t * rkb,rd_kafka_toppar_t * rktp)2128 rd_ts_t rd_kafka_broker_consumer_toppar_serve (rd_kafka_broker_t *rkb,
2129                                                rd_kafka_toppar_t *rktp) {
2130         return rd_kafka_toppar_fetch_decide(rktp, rkb, 0);
2131 }
2132 
2133 
2134 
2135 /**
2136  * @brief Serve a toppar op
2137  *
2138  * @param rktp may be NULL for certain ops (OP_RECV_BUF)
2139  *
2140  * Will send an empty reply op if the request rko has a replyq set,
2141  * providing synchronous operation.
2142  *
2143  * @locality toppar handler thread
2144  */
2145 static rd_kafka_op_res_t
rd_kafka_toppar_op_serve(rd_kafka_t * rk,rd_kafka_q_t * rkq,rd_kafka_op_t * rko,rd_kafka_q_cb_type_t cb_type,void * opaque)2146 rd_kafka_toppar_op_serve (rd_kafka_t *rk,
2147                           rd_kafka_q_t *rkq, rd_kafka_op_t *rko,
2148                           rd_kafka_q_cb_type_t cb_type, void *opaque) {
2149 	rd_kafka_toppar_t *rktp = NULL;
2150 	int outdated = 0;
2151 
2152 	if (rko->rko_rktp)
2153 		rktp = rko->rko_rktp;
2154 
2155 	if (rktp) {
2156 		outdated = rd_kafka_op_version_outdated(rko,
2157 							rktp->rktp_op_version);
2158 
2159 		rd_kafka_dbg(rktp->rktp_rkt->rkt_rk, TOPIC, "OP",
2160 			     "%.*s [%"PRId32"] received %sop %s "
2161 			     "(v%"PRId32") in fetch-state %s (opv%d)",
2162 			     RD_KAFKAP_STR_PR(rktp->rktp_rkt->rkt_topic),
2163 			     rktp->rktp_partition,
2164 			     outdated ? "outdated ": "",
2165 			     rd_kafka_op2str(rko->rko_type),
2166 			     rko->rko_version,
2167 			     rd_kafka_fetch_states[rktp->rktp_fetch_state],
2168 			     rktp->rktp_op_version);
2169 
2170 		if (outdated) {
2171 #if ENABLE_DEVEL
2172 			rd_kafka_op_print(stdout, "PART_OUTDATED", rko);
2173 #endif
2174                         rd_kafka_op_reply(rko, RD_KAFKA_RESP_ERR__OUTDATED);
2175 			return RD_KAFKA_OP_RES_HANDLED;
2176 		}
2177 	}
2178 
2179 	switch ((int)rko->rko_type)
2180 	{
2181 	case RD_KAFKA_OP_FETCH_START:
2182 		rd_kafka_toppar_fetch_start(rktp,
2183 					    rko->rko_u.fetch_start.offset, rko);
2184 		break;
2185 
2186 	case RD_KAFKA_OP_FETCH_STOP:
2187 		rd_kafka_toppar_fetch_stop(rktp, rko);
2188 		break;
2189 
2190 	case RD_KAFKA_OP_SEEK:
2191 		rd_kafka_toppar_seek(rktp, rko->rko_u.fetch_start.offset, rko);
2192 		break;
2193 
2194 	case RD_KAFKA_OP_PAUSE:
2195 		rd_kafka_toppar_pause_resume(rktp, rko);
2196 		break;
2197 
2198         case RD_KAFKA_OP_OFFSET_COMMIT | RD_KAFKA_OP_REPLY:
2199                 rd_kafka_assert(NULL, rko->rko_u.offset_commit.cb);
2200                 rko->rko_u.offset_commit.cb(
2201                         rk, rko->rko_err,
2202                         rko->rko_u.offset_commit.partitions,
2203                         rko->rko_u.offset_commit.opaque);
2204                 break;
2205 
2206 	case RD_KAFKA_OP_OFFSET_FETCH | RD_KAFKA_OP_REPLY:
2207         {
2208                 /* OffsetFetch reply */
2209                 rd_kafka_topic_partition_list_t *offsets =
2210 			rko->rko_u.offset_fetch.partitions;
2211 		int64_t offset = RD_KAFKA_OFFSET_INVALID;
2212 
2213                 rktp = offsets->elems[0]._private;
2214                 if (!rko->rko_err) {
2215                         /* Request succeeded but per-partition might have failed */
2216                         rko->rko_err = offsets->elems[0].err;
2217 			offset       = offsets->elems[0].offset;
2218                 }
2219                 offsets->elems[0]._private = NULL;
2220                 rd_kafka_topic_partition_list_destroy(offsets);
2221 		rko->rko_u.offset_fetch.partitions = NULL;
2222 
2223 		rd_kafka_timer_stop(&rktp->rktp_rkt->rkt_rk->rk_timers,
2224 				    &rktp->rktp_offset_query_tmr,
2225 				    1/*lock*/);
2226 
2227 		rd_kafka_toppar_lock(rktp);
2228 
2229 		if (rko->rko_err) {
2230 			rd_kafka_dbg(rktp->rktp_rkt->rkt_rk,
2231 				     TOPIC, "OFFSET",
2232 				     "Failed to fetch offset for "
2233 				     "%.*s [%"PRId32"]: %s",
2234 				     RD_KAFKAP_STR_PR(rktp->rktp_rkt->rkt_topic),
2235 				     rktp->rktp_partition,
2236 				     rd_kafka_err2str(rko->rko_err));
2237 
2238                         /* Keep on querying until we succeed. */
2239                         rd_kafka_toppar_offset_retry(rktp, 500,
2240                                                      "failed to fetch offsets");
2241                         rd_kafka_toppar_unlock(rktp);
2242 
2243 
2244                         /* Propagate error to application */
2245                         if (rko->rko_err != RD_KAFKA_RESP_ERR__WAIT_COORD &&
2246                             rko->rko_err !=
2247                             RD_KAFKA_RESP_ERR_UNSTABLE_OFFSET_COMMIT)
2248                                 rd_kafka_consumer_err(
2249                                         rktp->rktp_fetchq,
2250                                         RD_KAFKA_NODEID_UA,
2251                                         rko->rko_err, 0,
2252                                         NULL, rktp,
2253                                         RD_KAFKA_OFFSET_INVALID,
2254                                         "Failed to fetch "
2255                                         "offsets from brokers: %s",
2256                                         rd_kafka_err2str(rko->rko_err));
2257 
2258 			rd_kafka_toppar_destroy(rktp);
2259 
2260 			break;
2261 		}
2262 
2263 		rd_kafka_dbg(rktp->rktp_rkt->rkt_rk,
2264 			     TOPIC, "OFFSET",
2265 			     "%.*s [%"PRId32"]: OffsetFetch returned "
2266 			     "offset %s (%"PRId64")",
2267 			     RD_KAFKAP_STR_PR(rktp->rktp_rkt->rkt_topic),
2268 			     rktp->rktp_partition,
2269 			     rd_kafka_offset2str(offset), offset);
2270 
2271 		if (offset > 0)
2272 			rktp->rktp_committed_offset = offset;
2273 
2274 		if (offset >= 0)
2275 			rd_kafka_toppar_next_offset_handle(rktp, offset);
2276 		else
2277 			rd_kafka_offset_reset(rktp, offset,
2278 					      RD_KAFKA_RESP_ERR__NO_OFFSET,
2279 					      "no previously committed offset "
2280 					      "available");
2281 		rd_kafka_toppar_unlock(rktp);
2282 
2283                 rd_kafka_toppar_destroy(rktp);
2284         }
2285         break;
2286 
2287         default:
2288                 rd_kafka_assert(NULL, !*"unknown type");
2289                 break;
2290         }
2291 
2292         rd_kafka_op_reply(rko, RD_KAFKA_RESP_ERR_NO_ERROR);
2293 
2294         return RD_KAFKA_OP_RES_HANDLED;
2295 }
2296 
2297 
2298 
2299 
2300 
2301 /**
2302  * Send command op to toppar (handled by toppar's thread).
2303  *
2304  * Locality: any thread
2305  */
rd_kafka_toppar_op0(rd_kafka_toppar_t * rktp,rd_kafka_op_t * rko,rd_kafka_replyq_t replyq)2306 static void rd_kafka_toppar_op0 (rd_kafka_toppar_t *rktp, rd_kafka_op_t *rko,
2307 				 rd_kafka_replyq_t replyq) {
2308         rko->rko_rktp = rd_kafka_toppar_keep(rktp);
2309 	rko->rko_replyq = replyq;
2310 
2311         rd_kafka_q_enq(rktp->rktp_ops, rko);
2312 }
2313 
2314 
2315 /**
2316  * Send command op to toppar (handled by toppar's thread).
2317  *
2318  * Locality: any thread
2319  */
rd_kafka_toppar_op(rd_kafka_toppar_t * rktp,rd_kafka_op_type_t type,int32_t version,int64_t offset,rd_kafka_cgrp_t * rkcg,rd_kafka_replyq_t replyq)2320 static void rd_kafka_toppar_op (rd_kafka_toppar_t *rktp,
2321 				rd_kafka_op_type_t type, int32_t version,
2322 				int64_t offset, rd_kafka_cgrp_t *rkcg,
2323 				rd_kafka_replyq_t replyq) {
2324         rd_kafka_op_t *rko;
2325 
2326         rko = rd_kafka_op_new(type);
2327 	rko->rko_version = version;
2328         if (type == RD_KAFKA_OP_FETCH_START ||
2329 	    type == RD_KAFKA_OP_SEEK) {
2330 		if (rkcg)
2331 			rko->rko_u.fetch_start.rkcg = rkcg;
2332 		rko->rko_u.fetch_start.offset = offset;
2333 	}
2334 
2335 	rd_kafka_toppar_op0(rktp, rko, replyq);
2336 }
2337 
2338 
2339 
2340 /**
2341  * Start consuming partition (async operation).
2342  *  'offset' is the initial offset
2343  *  'fwdq' is an optional queue to forward messages to, if this is NULL
2344  *  then messages will be enqueued on rktp_fetchq.
2345  *  'replyq' is an optional queue for handling the consume_start ack.
2346  *
2347  * This is the thread-safe interface that can be called from any thread.
2348  */
rd_kafka_toppar_op_fetch_start(rd_kafka_toppar_t * rktp,int64_t offset,rd_kafka_q_t * fwdq,rd_kafka_replyq_t replyq)2349 rd_kafka_resp_err_t rd_kafka_toppar_op_fetch_start (rd_kafka_toppar_t *rktp,
2350                                                     int64_t offset,
2351                                                     rd_kafka_q_t *fwdq,
2352                                                     rd_kafka_replyq_t replyq) {
2353 	int32_t version;
2354 
2355         rd_kafka_q_lock(rktp->rktp_fetchq);
2356         if (fwdq && !(rktp->rktp_fetchq->rkq_flags & RD_KAFKA_Q_F_FWD_APP))
2357                 rd_kafka_q_fwd_set0(rktp->rktp_fetchq, fwdq,
2358                                     0, /* no do_lock */
2359                                     0 /* no fwd_app */);
2360         rd_kafka_q_unlock(rktp->rktp_fetchq);
2361 
2362 	/* Bump version barrier. */
2363 	version = rd_kafka_toppar_version_new_barrier(rktp);
2364 
2365 	rd_kafka_dbg(rktp->rktp_rkt->rkt_rk, TOPIC, "CONSUMER",
2366 		     "Start consuming %.*s [%"PRId32"] at "
2367 		     "offset %s (v%"PRId32")",
2368 		     RD_KAFKAP_STR_PR(rktp->rktp_rkt->rkt_topic),
2369 		     rktp->rktp_partition, rd_kafka_offset2str(offset),
2370 		     version);
2371 
2372         rd_kafka_toppar_op(rktp, RD_KAFKA_OP_FETCH_START, version,
2373                            offset, rktp->rktp_rkt->rkt_rk->rk_cgrp, replyq);
2374 
2375         return RD_KAFKA_RESP_ERR_NO_ERROR;
2376 }
2377 
2378 
2379 /**
2380  * Stop consuming partition (async operatoin)
2381  * This is thread-safe interface that can be called from any thread.
2382  *
2383  * Locality: any thread
2384  */
rd_kafka_toppar_op_fetch_stop(rd_kafka_toppar_t * rktp,rd_kafka_replyq_t replyq)2385 rd_kafka_resp_err_t rd_kafka_toppar_op_fetch_stop (rd_kafka_toppar_t *rktp,
2386                                                    rd_kafka_replyq_t replyq) {
2387 	int32_t version;
2388 
2389 	/* Bump version barrier. */
2390         version = rd_kafka_toppar_version_new_barrier(rktp);
2391 
2392         rd_kafka_dbg(rktp->rktp_rkt->rkt_rk, TOPIC, "CONSUMER",
2393 		     "Stop consuming %.*s [%"PRId32"] (v%"PRId32")",
2394 		     RD_KAFKAP_STR_PR(rktp->rktp_rkt->rkt_topic),
2395 		     rktp->rktp_partition, version);
2396 
2397         rd_kafka_toppar_op(rktp, RD_KAFKA_OP_FETCH_STOP, version,
2398 			   0, NULL, replyq);
2399 
2400         return RD_KAFKA_RESP_ERR_NO_ERROR;
2401 }
2402 
2403 
2404 /**
2405  * Set/Seek offset of a consumed partition (async operation).
2406  *  'offset' is the target offset
2407  *  'replyq' is an optional queue for handling the ack.
2408  *
2409  * This is the thread-safe interface that can be called from any thread.
2410  */
rd_kafka_toppar_op_seek(rd_kafka_toppar_t * rktp,int64_t offset,rd_kafka_replyq_t replyq)2411 rd_kafka_resp_err_t rd_kafka_toppar_op_seek (rd_kafka_toppar_t *rktp,
2412                                              int64_t offset,
2413                                              rd_kafka_replyq_t replyq) {
2414 	int32_t version;
2415 
2416 	/* Bump version barrier. */
2417 	version = rd_kafka_toppar_version_new_barrier(rktp);
2418 
2419 	rd_kafka_dbg(rktp->rktp_rkt->rkt_rk, TOPIC, "CONSUMER",
2420 		     "Seek %.*s [%"PRId32"] to "
2421 		     "offset %s (v%"PRId32")",
2422 		     RD_KAFKAP_STR_PR(rktp->rktp_rkt->rkt_topic),
2423 		     rktp->rktp_partition, rd_kafka_offset2str(offset),
2424 		     version);
2425 
2426         rd_kafka_toppar_op(rktp, RD_KAFKA_OP_SEEK, version,
2427 			   offset, NULL, replyq);
2428 
2429         return RD_KAFKA_RESP_ERR_NO_ERROR;
2430 }
2431 
2432 
2433 /**
2434  * @brief Pause/resume partition (async operation).
2435  *
2436  * @param flag is either RD_KAFKA_TOPPAR_F_APP_PAUSE or .._F_LIB_PAUSE
2437  *             depending on if the app paused or librdkafka.
2438  * @param pause is 1 for pausing or 0 for resuming.
2439  *
2440  * @locality any
2441  */
2442 rd_kafka_resp_err_t
rd_kafka_toppar_op_pause_resume(rd_kafka_toppar_t * rktp,int pause,int flag,rd_kafka_replyq_t replyq)2443 rd_kafka_toppar_op_pause_resume (rd_kafka_toppar_t *rktp, int pause, int flag,
2444                                  rd_kafka_replyq_t replyq) {
2445 	int32_t version;
2446 	rd_kafka_op_t *rko;
2447 
2448 	/* Bump version barrier. */
2449 	version = rd_kafka_toppar_version_new_barrier(rktp);
2450 
2451 	rd_kafka_dbg(rktp->rktp_rkt->rkt_rk, TOPIC, pause ? "PAUSE":"RESUME",
2452 		     "%s %.*s [%"PRId32"] (v%"PRId32")",
2453 		     pause ? "Pause" : "Resume",
2454 		     RD_KAFKAP_STR_PR(rktp->rktp_rkt->rkt_topic),
2455 		     rktp->rktp_partition, version);
2456 
2457 	rko = rd_kafka_op_new(RD_KAFKA_OP_PAUSE);
2458 	rko->rko_version = version;
2459 	rko->rko_u.pause.pause = pause;
2460 	rko->rko_u.pause.flag = flag;
2461 
2462         rd_kafka_toppar_op0(rktp, rko, replyq);
2463 
2464         return RD_KAFKA_RESP_ERR_NO_ERROR;
2465 }
2466 
2467 
2468 /**
2469  * @brief Pause a toppar (asynchronous).
2470  *
2471  * @param flag is either RD_KAFKA_TOPPAR_F_APP_PAUSE or .._F_LIB_PAUSE
2472  *             depending on if the app paused or librdkafka.
2473  *
2474  * @locality any
2475  * @locks none needed
2476  */
rd_kafka_toppar_pause(rd_kafka_toppar_t * rktp,int flag)2477 void rd_kafka_toppar_pause (rd_kafka_toppar_t *rktp, int flag) {
2478         rd_kafka_toppar_op_pause_resume(rktp, 1/*pause*/, flag,
2479                                         RD_KAFKA_NO_REPLYQ);
2480 }
2481 
2482 /**
2483  * @brief Resume a toppar (asynchronous).
2484  *
2485  * @param flag is either RD_KAFKA_TOPPAR_F_APP_PAUSE or .._F_LIB_PAUSE
2486  *             depending on if the app paused or librdkafka.
2487  *
2488  * @locality any
2489  * @locks none needed
2490  */
rd_kafka_toppar_resume(rd_kafka_toppar_t * rktp,int flag)2491 void rd_kafka_toppar_resume (rd_kafka_toppar_t *rktp, int flag) {
2492         rd_kafka_toppar_op_pause_resume(rktp, 1/*pause*/, flag,
2493                                         RD_KAFKA_NO_REPLYQ);
2494 }
2495 
2496 
2497 
2498 /**
2499  * @brief Pause or resume a list of partitions.
2500  *
2501  * @param flag is either RD_KAFKA_TOPPAR_F_APP_PAUSE or .._F_LIB_PAUSE
2502  *             depending on if the app paused or librdkafka.
2503  * @param pause true for pausing, false for resuming.
2504  * @param async RD_SYNC to wait for background thread to handle op,
2505  *              RD_ASYNC for asynchronous operation.
2506  *
2507  * @locality any
2508  *
2509  * @remark This is an asynchronous call, the actual pause/resume is performed
2510  *         by toppar_pause() in the toppar's handler thread.
2511  */
2512 rd_kafka_resp_err_t
rd_kafka_toppars_pause_resume(rd_kafka_t * rk,rd_bool_t pause,rd_async_t async,int flag,rd_kafka_topic_partition_list_t * partitions)2513 rd_kafka_toppars_pause_resume (rd_kafka_t *rk,
2514                                rd_bool_t pause, rd_async_t async, int flag,
2515                                rd_kafka_topic_partition_list_t *partitions) {
2516         int i;
2517         int waitcnt = 0;
2518         rd_kafka_q_t *tmpq = NULL;
2519 
2520         if (!async)
2521                 tmpq = rd_kafka_q_new(rk);
2522 
2523 	rd_kafka_dbg(rk, TOPIC, pause ? "PAUSE":"RESUME",
2524 		     "%s %s %d partition(s)",
2525 		     flag & RD_KAFKA_TOPPAR_F_APP_PAUSE ? "Application" : "Library",
2526 		     pause ? "pausing" : "resuming", partitions->cnt);
2527 
2528 	for (i = 0 ; i < partitions->cnt ; i++) {
2529 		rd_kafka_topic_partition_t *rktpar = &partitions->elems[i];
2530 		rd_kafka_toppar_t *rktp;
2531 
2532                 rktp = rd_kafka_topic_partition_get_toppar(rk, rktpar,
2533                                                            rd_false);
2534 		if (!rktp) {
2535 			rd_kafka_dbg(rk, TOPIC, pause ? "PAUSE":"RESUME",
2536 				     "%s %s [%"PRId32"]: skipped: "
2537 				     "unknown partition",
2538 				     pause ? "Pause":"Resume",
2539 				     rktpar->topic, rktpar->partition);
2540 
2541 			rktpar->err = RD_KAFKA_RESP_ERR__UNKNOWN_PARTITION;
2542 			continue;
2543 		}
2544 
2545                 rd_kafka_toppar_op_pause_resume(rktp, pause, flag,
2546                                                 RD_KAFKA_REPLYQ(tmpq, 0));
2547 
2548                 if (!async)
2549                         waitcnt++;
2550 
2551 		rd_kafka_toppar_destroy(rktp);
2552 
2553 		rktpar->err = RD_KAFKA_RESP_ERR_NO_ERROR;
2554 	}
2555 
2556         if (!async) {
2557                 while (waitcnt-- > 0)
2558                         rd_kafka_q_wait_result(tmpq, RD_POLL_INFINITE);
2559 
2560                 rd_kafka_q_destroy_owner(tmpq);
2561         }
2562 
2563 	return RD_KAFKA_RESP_ERR_NO_ERROR;
2564 }
2565 
2566 
2567 
2568 
2569 
2570 /**
2571  * Propagate error for toppar
2572  */
rd_kafka_toppar_enq_error(rd_kafka_toppar_t * rktp,rd_kafka_resp_err_t err,const char * reason)2573 void rd_kafka_toppar_enq_error (rd_kafka_toppar_t *rktp,
2574                                 rd_kafka_resp_err_t err,
2575                                 const char *reason) {
2576         rd_kafka_op_t *rko;
2577         char buf[512];
2578 
2579         rko = rd_kafka_op_new(RD_KAFKA_OP_ERR);
2580         rko->rko_err  = err;
2581         rko->rko_rktp = rd_kafka_toppar_keep(rktp);
2582 
2583         rd_snprintf(buf, sizeof(buf), "%.*s [%"PRId32"]: %s (%s)",
2584                     RD_KAFKAP_STR_PR(rktp->rktp_rkt->rkt_topic),
2585                     rktp->rktp_partition, reason,
2586                     rd_kafka_err2str(err));
2587 
2588         rko->rko_u.err.errstr = rd_strdup(buf);
2589 
2590         rd_kafka_q_enq(rktp->rktp_fetchq, rko);
2591 }
2592 
2593 
2594 
2595 
2596 
2597 /**
2598  * Returns the currently delegated broker for this toppar.
2599  * If \p proper_broker is set NULL will be returned if current handler
2600  * is not a proper broker (INTERNAL broker).
2601  *
2602  * The returned broker has an increased refcount.
2603  *
2604  * Locks: none
2605  */
rd_kafka_toppar_broker(rd_kafka_toppar_t * rktp,int proper_broker)2606 rd_kafka_broker_t *rd_kafka_toppar_broker (rd_kafka_toppar_t *rktp,
2607                                            int proper_broker) {
2608         rd_kafka_broker_t *rkb;
2609         rd_kafka_toppar_lock(rktp);
2610         rkb = rktp->rktp_broker;
2611         if (rkb) {
2612                 if (proper_broker && rkb->rkb_source == RD_KAFKA_INTERNAL)
2613                         rkb = NULL;
2614                 else
2615                         rd_kafka_broker_keep(rkb);
2616         }
2617         rd_kafka_toppar_unlock(rktp);
2618 
2619         return rkb;
2620 }
2621 
2622 
2623 /**
2624  * @brief Take action when partition broker becomes unavailable.
2625  *        This should be called when requests fail with
2626  *        NOT_LEADER_FOR.. or similar error codes, e.g. ProduceRequest.
2627  *
2628  * @locks none
2629  * @locality any
2630  */
rd_kafka_toppar_leader_unavailable(rd_kafka_toppar_t * rktp,const char * reason,rd_kafka_resp_err_t err)2631 void rd_kafka_toppar_leader_unavailable (rd_kafka_toppar_t *rktp,
2632                                          const char *reason,
2633                                          rd_kafka_resp_err_t err) {
2634         rd_kafka_topic_t *rkt = rktp->rktp_rkt;
2635 
2636         rd_kafka_dbg(rkt->rkt_rk, TOPIC, "BROKERUA",
2637                      "%s [%"PRId32"]: broker unavailable: %s: %s",
2638                      rkt->rkt_topic->str, rktp->rktp_partition, reason,
2639                      rd_kafka_err2str(err));
2640 
2641         rd_kafka_topic_wrlock(rkt);
2642         rkt->rkt_flags |= RD_KAFKA_TOPIC_F_LEADER_UNAVAIL;
2643         rd_kafka_topic_wrunlock(rkt);
2644 
2645         rd_kafka_topic_fast_leader_query(rkt->rkt_rk);
2646 }
2647 
2648 
2649 const char *
rd_kafka_topic_partition_topic(const rd_kafka_topic_partition_t * rktpar)2650 rd_kafka_topic_partition_topic (const rd_kafka_topic_partition_t *rktpar) {
2651         const rd_kafka_toppar_t *rktp = (const rd_kafka_toppar_t *)rktpar;
2652         return rktp->rktp_rkt->rkt_topic->str;
2653 }
2654 
2655 int32_t
rd_kafka_topic_partition_partition(const rd_kafka_topic_partition_t * rktpar)2656 rd_kafka_topic_partition_partition (const rd_kafka_topic_partition_t *rktpar) {
2657         const rd_kafka_toppar_t *rktp = (const rd_kafka_toppar_t *)rktpar;
2658         return rktp->rktp_partition;
2659 }
2660 
rd_kafka_topic_partition_get(const rd_kafka_topic_partition_t * rktpar,const char ** name,int32_t * partition)2661 void rd_kafka_topic_partition_get (const rd_kafka_topic_partition_t *rktpar,
2662                                    const char **name, int32_t *partition) {
2663         const rd_kafka_toppar_t *rktp = (const rd_kafka_toppar_t *)rktpar;
2664         *name = rktp->rktp_rkt->rkt_topic->str;
2665         *partition = rktp->rktp_partition;
2666 }
2667 
2668 
2669 
2670 
2671 /**
2672  *
2673  * rd_kafka_topic_partition_t lists
2674  * Fixed-size non-growable list of partitions for propagation to application.
2675  *
2676  */
2677 
2678 
2679 static void
rd_kafka_topic_partition_list_grow(rd_kafka_topic_partition_list_t * rktparlist,int add_size)2680 rd_kafka_topic_partition_list_grow (rd_kafka_topic_partition_list_t *rktparlist,
2681                                     int add_size) {
2682         if (add_size < rktparlist->size)
2683                 add_size = RD_MAX(rktparlist->size, 32);
2684 
2685         rktparlist->size += add_size;
2686         rktparlist->elems = rd_realloc(rktparlist->elems,
2687                                        sizeof(*rktparlist->elems) *
2688                                        rktparlist->size);
2689 
2690 }
2691 
2692 
2693 /**
2694  * @brief Initialize a list for fitting \p size partitions.
2695  */
rd_kafka_topic_partition_list_init(rd_kafka_topic_partition_list_t * rktparlist,int size)2696 void rd_kafka_topic_partition_list_init (
2697         rd_kafka_topic_partition_list_t *rktparlist, int size) {
2698         memset(rktparlist, 0, sizeof(*rktparlist));
2699 
2700         if (size > 0)
2701                 rd_kafka_topic_partition_list_grow(rktparlist, size);
2702 }
2703 
2704 
2705 /**
2706  * Create a list for fitting 'size' topic_partitions (rktp).
2707  */
rd_kafka_topic_partition_list_new(int size)2708 rd_kafka_topic_partition_list_t *rd_kafka_topic_partition_list_new (int size) {
2709         rd_kafka_topic_partition_list_t *rktparlist;
2710 
2711         rktparlist = rd_calloc(1, sizeof(*rktparlist));
2712 
2713         if (size > 0)
2714                 rd_kafka_topic_partition_list_grow(rktparlist, size);
2715 
2716         return rktparlist;
2717 }
2718 
2719 
2720 
rd_kafka_topic_partition_new(const char * topic,int32_t partition)2721 rd_kafka_topic_partition_t *rd_kafka_topic_partition_new (const char *topic,
2722 							  int32_t partition) {
2723 	rd_kafka_topic_partition_t *rktpar = rd_calloc(1, sizeof(*rktpar));
2724 
2725 	rktpar->topic = rd_strdup(topic);
2726 	rktpar->partition = partition;
2727 
2728 	return rktpar;
2729 }
2730 
2731 
2732 rd_kafka_topic_partition_t *
rd_kafka_topic_partition_copy(const rd_kafka_topic_partition_t * src)2733 rd_kafka_topic_partition_copy (const rd_kafka_topic_partition_t *src) {
2734         return rd_kafka_topic_partition_new(src->topic, src->partition);
2735 }
2736 
2737 
2738 /** Same as above but with generic void* signature */
rd_kafka_topic_partition_copy_void(const void * src)2739 void *rd_kafka_topic_partition_copy_void (const void *src) {
2740         return rd_kafka_topic_partition_copy(src);
2741 }
2742 
2743 
2744 rd_kafka_topic_partition_t *
rd_kafka_topic_partition_new_from_rktp(rd_kafka_toppar_t * rktp)2745 rd_kafka_topic_partition_new_from_rktp (rd_kafka_toppar_t *rktp) {
2746 	rd_kafka_topic_partition_t *rktpar = rd_calloc(1, sizeof(*rktpar));
2747 
2748 	rktpar->topic = RD_KAFKAP_STR_DUP(rktp->rktp_rkt->rkt_topic);
2749 	rktpar->partition = rktp->rktp_partition;
2750 
2751 	return rktpar;
2752 }
2753 
2754 
2755 
2756 static void
rd_kafka_topic_partition_destroy0(rd_kafka_topic_partition_t * rktpar,int do_free)2757 rd_kafka_topic_partition_destroy0 (rd_kafka_topic_partition_t *rktpar, int do_free) {
2758 	if (rktpar->topic)
2759 		rd_free(rktpar->topic);
2760 	if (rktpar->metadata)
2761 		rd_free(rktpar->metadata);
2762 	if (rktpar->_private)
2763 		rd_kafka_toppar_destroy((rd_kafka_toppar_t *)rktpar->_private);
2764 
2765 	if (do_free)
2766 		rd_free(rktpar);
2767 }
2768 
2769 
2770 /**
2771  * @brief Destroy all partitions in list.
2772  *
2773  * @remark The allocated size of the list will not shrink.
2774  */
rd_kafka_topic_partition_list_clear(rd_kafka_topic_partition_list_t * rktparlist)2775 void rd_kafka_topic_partition_list_clear (
2776         rd_kafka_topic_partition_list_t *rktparlist) {
2777         int i;
2778 
2779         for (i = 0 ; i < rktparlist->cnt ; i++)
2780                 rd_kafka_topic_partition_destroy0(&rktparlist->elems[i], 0);
2781 
2782         rktparlist->cnt = 0;
2783 }
2784 
2785 
rd_kafka_topic_partition_destroy_free(void * ptr)2786 void rd_kafka_topic_partition_destroy_free (void *ptr) {
2787         rd_kafka_topic_partition_destroy0(ptr, rd_true/*do_free*/);
2788 }
2789 
rd_kafka_topic_partition_destroy(rd_kafka_topic_partition_t * rktpar)2790 void rd_kafka_topic_partition_destroy (rd_kafka_topic_partition_t *rktpar) {
2791 	rd_kafka_topic_partition_destroy0(rktpar, 1);
2792 }
2793 
2794 
2795 /**
2796  * Destroys a list previously created with .._list_new() and drops
2797  * any references to contained toppars.
2798  */
2799 void
rd_kafka_topic_partition_list_destroy(rd_kafka_topic_partition_list_t * rktparlist)2800 rd_kafka_topic_partition_list_destroy (rd_kafka_topic_partition_list_t *rktparlist) {
2801         int i;
2802 
2803         for (i = 0 ; i < rktparlist->cnt ; i++)
2804 		rd_kafka_topic_partition_destroy0(&rktparlist->elems[i], 0);
2805 
2806         if (rktparlist->elems)
2807                 rd_free(rktparlist->elems);
2808 
2809         rd_free(rktparlist);
2810 }
2811 
2812 
2813 /**
2814  * @brief Wrapper for rd_kafka_topic_partition_list_destroy() that
2815  *        matches the standard free(void *) signature, for callback use.
2816  */
rd_kafka_topic_partition_list_destroy_free(void * ptr)2817 void rd_kafka_topic_partition_list_destroy_free (void *ptr) {
2818         rd_kafka_topic_partition_list_destroy(
2819                 (rd_kafka_topic_partition_list_t *)ptr);
2820 }
2821 
2822 
2823 /**
2824  * Add a partition to an rktpar list.
2825  * The list must have enough room to fit it.
2826  *
2827  * '_private' must be NULL or a valid 'rd_kafka_toppar_t *'.
2828  *
2829  * Returns a pointer to the added element.
2830  */
2831 rd_kafka_topic_partition_t *
rd_kafka_topic_partition_list_add0(const char * func,int line,rd_kafka_topic_partition_list_t * rktparlist,const char * topic,int32_t partition,rd_kafka_toppar_t * _private)2832 rd_kafka_topic_partition_list_add0 (const char *func, int line,
2833                                     rd_kafka_topic_partition_list_t *rktparlist,
2834                                     const char *topic, int32_t partition,
2835 				    rd_kafka_toppar_t *_private) {
2836         rd_kafka_topic_partition_t *rktpar;
2837         if (rktparlist->cnt == rktparlist->size)
2838                 rd_kafka_topic_partition_list_grow(rktparlist, 1);
2839         rd_kafka_assert(NULL, rktparlist->cnt < rktparlist->size);
2840 
2841         rktpar = &rktparlist->elems[rktparlist->cnt++];
2842         memset(rktpar, 0, sizeof(*rktpar));
2843         rktpar->topic = rd_strdup(topic);
2844         rktpar->partition = partition;
2845 	rktpar->offset = RD_KAFKA_OFFSET_INVALID;
2846         rktpar->_private = _private;
2847         if (_private)
2848                 rd_kafka_toppar_keep_fl(func, line, _private);
2849 
2850         return rktpar;
2851 }
2852 
2853 
2854 rd_kafka_topic_partition_t *
rd_kafka_topic_partition_list_add(rd_kafka_topic_partition_list_t * rktparlist,const char * topic,int32_t partition)2855 rd_kafka_topic_partition_list_add (rd_kafka_topic_partition_list_t *rktparlist,
2856                                    const char *topic, int32_t partition) {
2857         return rd_kafka_topic_partition_list_add0(__FUNCTION__,__LINE__,
2858                                                   rktparlist,
2859                                                   topic, partition, NULL);
2860 }
2861 
2862 
2863 /**
2864  * Adds a consecutive list of partitions to a list
2865  */
2866 void
rd_kafka_topic_partition_list_add_range(rd_kafka_topic_partition_list_t * rktparlist,const char * topic,int32_t start,int32_t stop)2867 rd_kafka_topic_partition_list_add_range (rd_kafka_topic_partition_list_t
2868                                          *rktparlist,
2869                                          const char *topic,
2870                                          int32_t start, int32_t stop) {
2871 
2872         for (; start <= stop ; start++)
2873                 rd_kafka_topic_partition_list_add(rktparlist, topic, start);
2874 }
2875 
2876 
2877 rd_kafka_topic_partition_t *
rd_kafka_topic_partition_list_upsert(rd_kafka_topic_partition_list_t * rktparlist,const char * topic,int32_t partition)2878 rd_kafka_topic_partition_list_upsert (
2879         rd_kafka_topic_partition_list_t *rktparlist,
2880         const char *topic, int32_t partition) {
2881         rd_kafka_topic_partition_t *rktpar;
2882 
2883         if ((rktpar = rd_kafka_topic_partition_list_find(rktparlist,
2884                                                          topic, partition)))
2885                 return rktpar;
2886 
2887         return rd_kafka_topic_partition_list_add(rktparlist, topic, partition);
2888 }
2889 
2890 
2891 /**
2892  * @brief Update \p dst with info from \p src.
2893  */
rd_kafka_topic_partition_update(rd_kafka_topic_partition_t * dst,const rd_kafka_topic_partition_t * src)2894 void rd_kafka_topic_partition_update (rd_kafka_topic_partition_t *dst,
2895                                       const rd_kafka_topic_partition_t *src) {
2896         rd_dassert(!strcmp(dst->topic, src->topic));
2897         rd_dassert(dst->partition == src->partition);
2898         rd_dassert(dst != src);
2899 
2900         dst->offset = src->offset;
2901         dst->opaque = src->opaque;
2902         dst->err = src->err;
2903 
2904         if (src->metadata_size > 0) {
2905                 dst->metadata = rd_malloc(src->metadata_size);
2906                 dst->metadata_size = src->metadata_size;;
2907                 memcpy(dst->metadata, src->metadata, dst->metadata_size);
2908         }
2909 }
2910 
2911 /**
2912  * @brief Creates a copy of \p rktpar and adds it to \p rktparlist
2913  */
rd_kafka_topic_partition_list_add_copy(rd_kafka_topic_partition_list_t * rktparlist,const rd_kafka_topic_partition_t * rktpar)2914 void rd_kafka_topic_partition_list_add_copy (
2915         rd_kafka_topic_partition_list_t *rktparlist,
2916         const rd_kafka_topic_partition_t *rktpar) {
2917         rd_kafka_topic_partition_t *dst;
2918 
2919         dst = rd_kafka_topic_partition_list_add0(
2920                 __FUNCTION__,__LINE__,
2921                 rktparlist,
2922                 rktpar->topic,
2923                 rktpar->partition,
2924                 rktpar->_private);
2925 
2926         rd_kafka_topic_partition_update(dst, rktpar);
2927 }
2928 
2929 
2930 
2931 /**
2932  * Create and return a copy of list 'src'
2933  */
2934 rd_kafka_topic_partition_list_t *
rd_kafka_topic_partition_list_copy(const rd_kafka_topic_partition_list_t * src)2935 rd_kafka_topic_partition_list_copy (const rd_kafka_topic_partition_list_t *src){
2936         rd_kafka_topic_partition_list_t *dst;
2937         int i;
2938 
2939         dst = rd_kafka_topic_partition_list_new(src->size);
2940 
2941         for (i = 0 ; i < src->cnt ; i++)
2942                 rd_kafka_topic_partition_list_add_copy(dst, &src->elems[i]);
2943         return dst;
2944 }
2945 
2946 /**
2947  * @brief Same as rd_kafka_topic_partition_list_copy() but suitable for
2948  *        rd_list_copy(). The \p opaque is ignored.
2949  */
2950 void *
rd_kafka_topic_partition_list_copy_opaque(const void * src,void * opaque)2951 rd_kafka_topic_partition_list_copy_opaque (const void *src, void *opaque) {
2952         return rd_kafka_topic_partition_list_copy(src);
2953 }
2954 
2955 /**
2956  * @brief Append copies of all elements in \p src to \p dst.
2957  *        No duplicate-checks are performed.
2958  */
rd_kafka_topic_partition_list_add_list(rd_kafka_topic_partition_list_t * dst,const rd_kafka_topic_partition_list_t * src)2959 void rd_kafka_topic_partition_list_add_list (
2960         rd_kafka_topic_partition_list_t *dst,
2961         const rd_kafka_topic_partition_list_t *src) {
2962         int i;
2963 
2964         if (src->cnt == 0)
2965                 return;
2966 
2967         if (dst->size < dst->cnt + src->cnt)
2968                 rd_kafka_topic_partition_list_grow(dst, src->cnt);
2969 
2970         for (i = 0 ; i < src->cnt ; i++)
2971                 rd_kafka_topic_partition_list_add_copy(dst, &src->elems[i]);
2972 }
2973 
2974 
2975 /**
2976  * @brief Compare two partition lists using partition comparator \p cmp.
2977  *
2978  * @warning This is an O(Na*Nb) operation.
2979  */
2980 int
rd_kafka_topic_partition_list_cmp(const void * _a,const void * _b,int (* cmp)(const void *,const void *))2981 rd_kafka_topic_partition_list_cmp (const void *_a, const void *_b,
2982                                    int (*cmp) (const void *, const void *)) {
2983         const rd_kafka_topic_partition_list_t *a = _a, *b = _b;
2984         int r;
2985         int i;
2986 
2987         r = a->cnt - b->cnt;
2988         if (r || a->cnt == 0)
2989                 return r;
2990 
2991         /* Since the lists may not be sorted we need to scan all of B
2992          * for each element in A.
2993          * FIXME: If the list sizes are larger than X we could create a
2994          *        temporary hash map instead. */
2995         for (i = 0 ; i < a->cnt ; i++) {
2996                 int j;
2997 
2998                 for (j = 0 ; j < b->cnt ; j++) {
2999                         r = cmp(&a->elems[i], &b->elems[j]);
3000                         if (!r)
3001                                 break;
3002                 }
3003 
3004                 if (j == b->cnt)
3005                         return 1;
3006         }
3007 
3008         return 0;
3009 }
3010 
3011 
3012 /**
3013  * @brief Ensures the \p rktpar has a toppar set in _private.
3014  *
3015  * @returns the toppar object (or possibly NULL if \p create_on_miss is true)
3016  *          WITHOUT refcnt increased.
3017  */
3018 rd_kafka_toppar_t *
rd_kafka_topic_partition_ensure_toppar(rd_kafka_t * rk,rd_kafka_topic_partition_t * rktpar,rd_bool_t create_on_miss)3019 rd_kafka_topic_partition_ensure_toppar (rd_kafka_t *rk,
3020                                         rd_kafka_topic_partition_t *rktpar,
3021                                         rd_bool_t create_on_miss) {
3022         if (!rktpar->_private)
3023                 rktpar->_private =
3024                         rd_kafka_toppar_get2(rk,
3025                                              rktpar->topic,
3026                                              rktpar->partition, 0,
3027                                              create_on_miss);
3028         return rktpar->_private;
3029 }
3030 
3031 
3032 /**
3033  * @returns (and sets if necessary) the \p rktpar's _private / toppar.
3034  * @remark a new reference is returned.
3035  */
3036 rd_kafka_toppar_t *
rd_kafka_topic_partition_get_toppar(rd_kafka_t * rk,rd_kafka_topic_partition_t * rktpar,rd_bool_t create_on_miss)3037 rd_kafka_topic_partition_get_toppar (rd_kafka_t *rk,
3038                                      rd_kafka_topic_partition_t *rktpar,
3039                                      rd_bool_t create_on_miss) {
3040         rd_kafka_toppar_t *rktp;
3041 
3042         rktp = rd_kafka_topic_partition_ensure_toppar(rk, rktpar,
3043                                                       create_on_miss);
3044 
3045         if (rktp)
3046                 rd_kafka_toppar_keep(rktp);
3047 
3048         return rktp;
3049 }
3050 
3051 
rd_kafka_topic_partition_cmp(const void * _a,const void * _b)3052 int rd_kafka_topic_partition_cmp (const void *_a, const void *_b) {
3053         const rd_kafka_topic_partition_t *a = _a;
3054         const rd_kafka_topic_partition_t *b = _b;
3055         int r = strcmp(a->topic, b->topic);
3056         if (r)
3057                 return r;
3058         else
3059                 return RD_CMP(a->partition, b->partition);
3060 }
3061 
3062 /** @brief Compare only the topic */
rd_kafka_topic_partition_cmp_topic(const void * _a,const void * _b)3063 int rd_kafka_topic_partition_cmp_topic (const void *_a, const void *_b) {
3064         const rd_kafka_topic_partition_t *a = _a;
3065         const rd_kafka_topic_partition_t *b = _b;
3066         return strcmp(a->topic, b->topic);
3067 }
3068 
rd_kafka_topic_partition_cmp_opaque(const void * _a,const void * _b,void * opaque)3069 static int rd_kafka_topic_partition_cmp_opaque (const void *_a, const void *_b,
3070                                                 void *opaque) {
3071         return rd_kafka_topic_partition_cmp(_a, _b);
3072 }
3073 
3074 /** @returns a hash of the topic and partition */
rd_kafka_topic_partition_hash(const void * _a)3075 unsigned int rd_kafka_topic_partition_hash (const void *_a) {
3076         const rd_kafka_topic_partition_t *a = _a;
3077         int r = 31 * 17 + a->partition;
3078         return 31 * r + rd_string_hash(a->topic, -1);
3079 }
3080 
3081 
3082 
3083 /**
3084  * @brief Search 'rktparlist' for 'topic' and 'partition'.
3085  * @returns the elems[] index or -1 on miss.
3086  */
3087 static int
rd_kafka_topic_partition_list_find0(const rd_kafka_topic_partition_list_t * rktparlist,const char * topic,int32_t partition,int (* cmp)(const void *,const void *))3088 rd_kafka_topic_partition_list_find0 (
3089         const rd_kafka_topic_partition_list_t *rktparlist,
3090         const char *topic, int32_t partition,
3091         int (*cmp) (const void *, const void *)) {
3092         rd_kafka_topic_partition_t skel;
3093         int i;
3094 
3095         skel.topic = (char *)topic;
3096         skel.partition = partition;
3097 
3098         for (i = 0 ; i < rktparlist->cnt ; i++) {
3099                 if (!cmp(&skel, &rktparlist->elems[i]))
3100                         return i;
3101         }
3102 
3103         return -1;
3104 }
3105 
3106 rd_kafka_topic_partition_t *
rd_kafka_topic_partition_list_find(const rd_kafka_topic_partition_list_t * rktparlist,const char * topic,int32_t partition)3107 rd_kafka_topic_partition_list_find (
3108         const rd_kafka_topic_partition_list_t *rktparlist,
3109         const char *topic, int32_t partition) {
3110         int i = rd_kafka_topic_partition_list_find0(
3111                 rktparlist, topic, partition, rd_kafka_topic_partition_cmp);
3112         if (i == -1)
3113                 return NULL;
3114         else
3115                 return &rktparlist->elems[i];
3116 }
3117 
3118 
3119 int
rd_kafka_topic_partition_list_find_idx(const rd_kafka_topic_partition_list_t * rktparlist,const char * topic,int32_t partition)3120 rd_kafka_topic_partition_list_find_idx (
3121         const rd_kafka_topic_partition_list_t *rktparlist,
3122         const char *topic, int32_t partition) {
3123         return rd_kafka_topic_partition_list_find0(
3124                 rktparlist, topic, partition, rd_kafka_topic_partition_cmp);
3125 }
3126 
3127 
3128 /**
3129  * @returns the first element that matches \p topic, regardless of partition.
3130  */
3131 rd_kafka_topic_partition_t *
rd_kafka_topic_partition_list_find_topic(const rd_kafka_topic_partition_list_t * rktparlist,const char * topic)3132 rd_kafka_topic_partition_list_find_topic (
3133         const rd_kafka_topic_partition_list_t *rktparlist, const char *topic) {
3134         int i = rd_kafka_topic_partition_list_find0(
3135                 rktparlist, topic, RD_KAFKA_PARTITION_UA,
3136                 rd_kafka_topic_partition_cmp_topic);
3137         if (i == -1)
3138                 return NULL;
3139         else
3140                 return &rktparlist->elems[i];
3141 }
3142 
3143 
3144 int
rd_kafka_topic_partition_list_del_by_idx(rd_kafka_topic_partition_list_t * rktparlist,int idx)3145 rd_kafka_topic_partition_list_del_by_idx (rd_kafka_topic_partition_list_t *rktparlist,
3146 					  int idx) {
3147 	if (unlikely(idx < 0 || idx >= rktparlist->cnt))
3148 		return 0;
3149 
3150 	rd_kafka_topic_partition_destroy0(&rktparlist->elems[idx], 0);
3151 	memmove(&rktparlist->elems[idx], &rktparlist->elems[idx+1],
3152 		(rktparlist->cnt - idx - 1) * sizeof(rktparlist->elems[idx]));
3153 	rktparlist->cnt--;
3154 
3155 	return 1;
3156 }
3157 
3158 
3159 int
rd_kafka_topic_partition_list_del(rd_kafka_topic_partition_list_t * rktparlist,const char * topic,int32_t partition)3160 rd_kafka_topic_partition_list_del (rd_kafka_topic_partition_list_t *rktparlist,
3161 				   const char *topic, int32_t partition) {
3162         int i = rd_kafka_topic_partition_list_find0(
3163                 rktparlist, topic, partition, rd_kafka_topic_partition_cmp);
3164 	if (i == -1)
3165 		return 0;
3166 
3167 	return rd_kafka_topic_partition_list_del_by_idx(rktparlist, i);
3168 }
3169 
3170 
3171 
3172 /**
3173  * Returns true if 'topic' matches the 'rktpar', else false.
3174  * On match, if rktpar is a regex pattern then 'matched_by_regex' is set to 1.
3175  */
rd_kafka_topic_partition_match(rd_kafka_t * rk,const rd_kafka_group_member_t * rkgm,const rd_kafka_topic_partition_t * rktpar,const char * topic,int * matched_by_regex)3176 int rd_kafka_topic_partition_match (rd_kafka_t *rk,
3177 				    const rd_kafka_group_member_t *rkgm,
3178 				    const rd_kafka_topic_partition_t *rktpar,
3179 				    const char *topic, int *matched_by_regex) {
3180 	int ret = 0;
3181 
3182 	if (*rktpar->topic == '^') {
3183 		char errstr[128];
3184 
3185 		ret = rd_regex_match(rktpar->topic, topic,
3186 				     errstr, sizeof(errstr));
3187 		if (ret == -1) {
3188 			rd_kafka_dbg(rk, CGRP,
3189 				     "SUBMATCH",
3190 				     "Invalid regex for member "
3191 				     "\"%.*s\" subscription \"%s\": %s",
3192 				     RD_KAFKAP_STR_PR(rkgm->rkgm_member_id),
3193 				     rktpar->topic, errstr);
3194 			return 0;
3195 		}
3196 
3197 		if (ret && matched_by_regex)
3198 			*matched_by_regex = 1;
3199 
3200 	} else if (!strcmp(rktpar->topic, topic)) {
3201 
3202 		if (matched_by_regex)
3203 			*matched_by_regex = 0;
3204 
3205 		ret = 1;
3206 	}
3207 
3208 	return ret;
3209 }
3210 
3211 
3212 
rd_kafka_topic_partition_list_sort(rd_kafka_topic_partition_list_t * rktparlist,int (* cmp)(const void *,const void *,void *),void * opaque)3213 void rd_kafka_topic_partition_list_sort (
3214         rd_kafka_topic_partition_list_t *rktparlist,
3215         int (*cmp) (const void *, const void *, void *),
3216         void *opaque) {
3217 
3218         if (!cmp)
3219                 cmp = rd_kafka_topic_partition_cmp_opaque;
3220 
3221         rd_qsort_r(rktparlist->elems, rktparlist->cnt,
3222                    sizeof(*rktparlist->elems),
3223                    cmp, opaque);
3224 }
3225 
3226 
rd_kafka_topic_partition_list_sort_by_topic(rd_kafka_topic_partition_list_t * rktparlist)3227 void rd_kafka_topic_partition_list_sort_by_topic (
3228         rd_kafka_topic_partition_list_t *rktparlist) {
3229         rd_kafka_topic_partition_list_sort(rktparlist,
3230                                            rd_kafka_topic_partition_cmp_opaque,
3231                                            NULL);
3232 }
3233 
rd_kafka_topic_partition_list_set_offset(rd_kafka_topic_partition_list_t * rktparlist,const char * topic,int32_t partition,int64_t offset)3234 rd_kafka_resp_err_t rd_kafka_topic_partition_list_set_offset (
3235 	rd_kafka_topic_partition_list_t *rktparlist,
3236 	const char *topic, int32_t partition, int64_t offset) {
3237 	rd_kafka_topic_partition_t *rktpar;
3238 
3239 	if (!(rktpar = rd_kafka_topic_partition_list_find(rktparlist,
3240 							  topic, partition)))
3241 		return RD_KAFKA_RESP_ERR__UNKNOWN_PARTITION;
3242 
3243 	rktpar->offset = offset;
3244 
3245 	return RD_KAFKA_RESP_ERR_NO_ERROR;
3246 }
3247 
3248 
3249 /**
3250  * @brief Reset all offsets to the provided value.
3251  */
3252 void
rd_kafka_topic_partition_list_reset_offsets(rd_kafka_topic_partition_list_t * rktparlist,int64_t offset)3253 rd_kafka_topic_partition_list_reset_offsets (rd_kafka_topic_partition_list_t *rktparlist,
3254 					     int64_t offset) {
3255 
3256         int i;
3257         for (i = 0 ; i < rktparlist->cnt ; i++)
3258 		rktparlist->elems[i].offset = offset;
3259 }
3260 
3261 
3262 /**
3263  * Set offset values in partition list based on toppar's last stored offset.
3264  *
3265  *  from_rktp - true: set rktp's last stored offset, false: set def_value
3266  *  unless a concrete offset is set.
3267  *  is_commit: indicates that set offset is to be committed (for debug log)
3268  *
3269  * Returns the number of valid non-logical offsets (>=0).
3270  */
rd_kafka_topic_partition_list_set_offsets(rd_kafka_t * rk,rd_kafka_topic_partition_list_t * rktparlist,int from_rktp,int64_t def_value,int is_commit)3271 int rd_kafka_topic_partition_list_set_offsets (
3272 	rd_kafka_t *rk,
3273         rd_kafka_topic_partition_list_t *rktparlist,
3274         int from_rktp, int64_t def_value, int is_commit) {
3275         int i;
3276 	int valid_cnt = 0;
3277 
3278         for (i = 0 ; i < rktparlist->cnt ; i++) {
3279                 rd_kafka_topic_partition_t *rktpar = &rktparlist->elems[i];
3280 		const char *verb = "setting";
3281                 char preamble[80];
3282 
3283                 *preamble = '\0'; /* Avoid warning */
3284 
3285                 if (from_rktp) {
3286                         rd_kafka_toppar_t *rktp = rktpar->_private;
3287                         rd_kafka_toppar_lock(rktp);
3288 
3289                         if (rk->rk_conf.debug & (RD_KAFKA_DBG_CGRP |
3290                                                  RD_KAFKA_DBG_TOPIC))
3291                                 rd_snprintf(preamble, sizeof(preamble),
3292                                             "stored offset %"PRId64
3293                                             ", committed offset %"PRId64": ",
3294                                             rktp->rktp_stored_offset,
3295                                             rktp->rktp_committed_offset);
3296 
3297 			if (rktp->rktp_stored_offset >
3298 			    rktp->rktp_committed_offset) {
3299 				verb = "setting stored";
3300 				rktpar->offset = rktp->rktp_stored_offset;
3301 			} else {
3302 				rktpar->offset = RD_KAFKA_OFFSET_INVALID;
3303 			}
3304                         rd_kafka_toppar_unlock(rktp);
3305                 } else {
3306 			if (RD_KAFKA_OFFSET_IS_LOGICAL(rktpar->offset)) {
3307 				verb = "setting default";
3308 				rktpar->offset = def_value;
3309 			} else
3310 				verb = "keeping";
3311                 }
3312 
3313                 if (is_commit && rktpar->offset == RD_KAFKA_OFFSET_INVALID)
3314                         rd_kafka_dbg(rk, CGRP | RD_KAFKA_DBG_TOPIC, "OFFSET",
3315                                      "Topic %s [%"PRId32"]: "
3316                                      "%snot including in commit",
3317                                      rktpar->topic, rktpar->partition,
3318                                      preamble);
3319                 else
3320                         rd_kafka_dbg(rk, CGRP | RD_KAFKA_DBG_TOPIC, "OFFSET",
3321                                      "Topic %s [%"PRId32"]: "
3322                                      "%s%s offset %s%s",
3323                                      rktpar->topic, rktpar->partition,
3324                                      preamble,
3325                                      verb,
3326                                      rd_kafka_offset2str(rktpar->offset),
3327                                      is_commit ? " for commit" : "");
3328 
3329 		if (!RD_KAFKA_OFFSET_IS_LOGICAL(rktpar->offset))
3330 			valid_cnt++;
3331         }
3332 
3333 	return valid_cnt;
3334 }
3335 
3336 
3337 /**
3338  * @returns the number of partitions with absolute (non-logical) offsets set.
3339  */
rd_kafka_topic_partition_list_count_abs_offsets(const rd_kafka_topic_partition_list_t * rktparlist)3340 int rd_kafka_topic_partition_list_count_abs_offsets (
3341 	const rd_kafka_topic_partition_list_t *rktparlist) {
3342 	int i;
3343 	int valid_cnt = 0;
3344 
3345         for (i = 0 ; i < rktparlist->cnt ; i++)
3346 		if (!RD_KAFKA_OFFSET_IS_LOGICAL(rktparlist->elems[i].offset))
3347 			valid_cnt++;
3348 
3349 	return valid_cnt;
3350 }
3351 
3352 
3353 /**
3354  * @brief Update _private (toppar) field to point to valid rktp
3355  *        for each parition.
3356  *
3357  * @param create_on_miss Create partition (and topic_t object) if necessary.
3358  */
3359 void
rd_kafka_topic_partition_list_update_toppars(rd_kafka_t * rk,rd_kafka_topic_partition_list_t * rktparlist,rd_bool_t create_on_miss)3360 rd_kafka_topic_partition_list_update_toppars (rd_kafka_t *rk,
3361                                               rd_kafka_topic_partition_list_t
3362                                               *rktparlist,
3363                                               rd_bool_t create_on_miss) {
3364         int i;
3365         for (i = 0 ; i < rktparlist->cnt ; i++) {
3366                 rd_kafka_topic_partition_t *rktpar = &rktparlist->elems[i];
3367 
3368                 if (!rktpar->_private)
3369                         rktpar->_private =
3370                                 rd_kafka_toppar_get2(rk,
3371                                                      rktpar->topic,
3372                                                      rktpar->partition,
3373                                                      0/*not ua-on-miss*/,
3374                                                      create_on_miss);
3375 
3376         }
3377 }
3378 
3379 
3380 /**
3381  * @brief Populate \p leaders with the leaders+partitions for the partitions in
3382  *        \p rktparlist. Duplicates are suppressed.
3383  *
3384  *        If no leader is found for a partition that element's \c .err will
3385  *        be set to RD_KAFKA_RESP_ERR_LEADER_NOT_AVAILABLE.
3386  *
3387  *        If the partition does not exist \c .err will be set to
3388  *        RD_KAFKA_RESP_ERR__UNKNOWN_PARTITION.
3389  *
3390  * @param rktparlist The partitions to look up leaders for, the .err field
3391  *                   will be set according to outcome, e.g., ERR_NO_ERROR,
3392  *                   ERR_UNKNOWN_TOPIC_OR_PART, etc.
3393  * @param leaders rd_list_t of allocated (struct rd_kafka_partition_leader *)
3394  * @param query_topics (optional) rd_list of strdupped (char *)
3395  * @param query_unknown Add unknown topics to \p query_topics.
3396  * @param eonce (optional) For triggering asynchronously on cache change
3397  *              in case not all leaders are known now.
3398  *
3399  * @remark This is based on the current topic_t and partition state
3400  *         which may lag behind the last metadata update due to internal
3401  *         threading and also the fact that no topic_t may have been created.
3402  *
3403  * @param leaders rd_list_t of type (struct rd_kafka_partition_leader *)
3404  *
3405  * @returns true if all partitions have leaders, else false.
3406  *
3407  * @sa rd_kafka_topic_partition_list_get_leaders_by_metadata
3408  *
3409  * @locks rd_kafka_*lock() MUST NOT be held
3410  */
3411 static rd_bool_t
rd_kafka_topic_partition_list_get_leaders(rd_kafka_t * rk,rd_kafka_topic_partition_list_t * rktparlist,rd_list_t * leaders,rd_list_t * query_topics,rd_bool_t query_unknown,rd_kafka_enq_once_t * eonce)3412 rd_kafka_topic_partition_list_get_leaders (
3413         rd_kafka_t *rk,
3414         rd_kafka_topic_partition_list_t *rktparlist,
3415         rd_list_t *leaders,
3416         rd_list_t *query_topics,
3417         rd_bool_t query_unknown,
3418         rd_kafka_enq_once_t *eonce) {
3419         rd_bool_t complete;
3420         int cnt = 0;
3421         int i;
3422 
3423         if (eonce)
3424                 rd_kafka_wrlock(rk);
3425         else
3426                 rd_kafka_rdlock(rk);
3427 
3428         for (i = 0 ; i < rktparlist->cnt ; i++) {
3429                 rd_kafka_topic_partition_t *rktpar = &rktparlist->elems[i];
3430                 rd_kafka_topic_partition_t *rktpar2;
3431                 rd_kafka_broker_t *rkb = NULL;
3432                 struct rd_kafka_partition_leader leader_skel;
3433                 struct rd_kafka_partition_leader *leader;
3434                 const rd_kafka_metadata_topic_t *mtopic;
3435                 const rd_kafka_metadata_partition_t *mpart;
3436                 rd_bool_t topic_wait_cache;
3437 
3438                 rd_kafka_metadata_cache_topic_partition_get(
3439                         rk, &mtopic, &mpart,
3440                         rktpar->topic, rktpar->partition,
3441                         0/*negative entries too*/);
3442 
3443                 topic_wait_cache =
3444                         !mtopic ||
3445                         RD_KAFKA_METADATA_CACHE_ERR_IS_TEMPORARY(mtopic->err);
3446 
3447                 if (!topic_wait_cache &&
3448                     mtopic &&
3449                     mtopic->err != RD_KAFKA_RESP_ERR_NO_ERROR &&
3450                     mtopic->err != RD_KAFKA_RESP_ERR_LEADER_NOT_AVAILABLE) {
3451                         /* Topic permanently errored */
3452                         rktpar->err = mtopic->err;
3453                         continue;
3454                 }
3455 
3456                 if (mtopic && !mpart && mtopic->partition_cnt > 0) {
3457                         /* Topic exists but partition doesnt.
3458                          * This is a permanent error. */
3459                         rktpar->err = RD_KAFKA_RESP_ERR__UNKNOWN_PARTITION;
3460                         continue;
3461                 }
3462 
3463                 if (mpart &&
3464                     (mpart->leader == -1 ||
3465                      !(rkb = rd_kafka_broker_find_by_nodeid0(
3466                                rk, mpart->leader, -1/*any state*/,
3467                                rd_false)))) {
3468                         /* Partition has no (valid) leader.
3469                          * This is a permanent error. */
3470                         rktpar->err =
3471                                 mtopic->err ? mtopic->err :
3472                                 RD_KAFKA_RESP_ERR_LEADER_NOT_AVAILABLE;
3473                         continue;
3474                 }
3475 
3476                 if (topic_wait_cache || !rkb) {
3477                         /* Topic unknown or no current leader for partition,
3478                          * add topic to query list. */
3479                         rktpar->err = RD_KAFKA_RESP_ERR__IN_PROGRESS;
3480                         if (query_topics &&
3481                             !rd_list_find(query_topics, rktpar->topic,
3482                                           (void *)strcmp))
3483                                 rd_list_add(query_topics,
3484                                             rd_strdup(rktpar->topic));
3485                         continue;
3486                 }
3487 
3488                 /* Leader exists, add to leader list. */
3489 
3490                 rktpar->err = RD_KAFKA_RESP_ERR_NO_ERROR;
3491 
3492                 memset(&leader_skel, 0, sizeof(leader_skel));
3493                 leader_skel.rkb = rkb;
3494 
3495                 leader = rd_list_find(leaders, &leader_skel,
3496                                       rd_kafka_partition_leader_cmp);
3497 
3498                 if (!leader) {
3499                         leader = rd_kafka_partition_leader_new(rkb);
3500                         rd_list_add(leaders, leader);
3501                 }
3502 
3503                 rktpar2 = rd_kafka_topic_partition_list_find(leader->partitions,
3504                                                              rktpar->topic,
3505                                                              rktpar->partition);
3506                 if (rktpar2) {
3507                         /* Already exists in partitions list, just update. */
3508                         rd_kafka_topic_partition_update(rktpar2, rktpar);
3509                 } else {
3510                         /* Make a copy of rktpar and add to partitions list */
3511                         rd_kafka_topic_partition_list_add_copy(
3512                                 leader->partitions, rktpar);
3513                 }
3514 
3515                 rktpar->err = RD_KAFKA_RESP_ERR_NO_ERROR;
3516 
3517                 rd_kafka_broker_destroy(rkb);    /* loose refcount */
3518                 cnt++;
3519         }
3520 
3521         complete = cnt == rktparlist->cnt;
3522 
3523         if (!complete && eonce)
3524                 /* Add eonce to cache observers */
3525                 rd_kafka_metadata_cache_wait_state_change_async(rk, eonce);
3526 
3527         if (eonce)
3528                 rd_kafka_wrunlock(rk);
3529         else
3530                 rd_kafka_rdunlock(rk);
3531 
3532         return complete;
3533 }
3534 
3535 
3536 /**
3537  * @brief Timer timeout callback for query_leaders_async rko's eonce object.
3538  */
3539 static void
rd_kafka_partition_leader_query_eonce_timeout_cb(rd_kafka_timers_t * rkts,void * arg)3540 rd_kafka_partition_leader_query_eonce_timeout_cb (rd_kafka_timers_t *rkts,
3541                                                   void *arg) {
3542         rd_kafka_enq_once_t *eonce = arg;
3543         rd_kafka_enq_once_trigger(eonce, RD_KAFKA_RESP_ERR__TIMED_OUT,
3544                                   "timeout timer");
3545 }
3546 
3547 
3548 /**
3549  * @brief Query timer callback for query_leaders_async rko's eonce object.
3550  */
3551 static void
rd_kafka_partition_leader_query_eonce_timer_cb(rd_kafka_timers_t * rkts,void * arg)3552 rd_kafka_partition_leader_query_eonce_timer_cb (rd_kafka_timers_t *rkts,
3553                                                 void *arg) {
3554         rd_kafka_enq_once_t *eonce = arg;
3555         rd_kafka_enq_once_trigger(eonce, RD_KAFKA_RESP_ERR_NO_ERROR,
3556                                   "query timer");
3557 }
3558 
3559 
3560 /**
3561  * @brief Query metadata cache for partition leaders, or trigger metadata
3562  *        refresh if leaders not known.
3563  *
3564  * @locks_required none
3565  * @locality any
3566  */
3567 static rd_kafka_op_res_t
rd_kafka_topic_partition_list_query_leaders_async_worker(rd_kafka_op_t * rko)3568 rd_kafka_topic_partition_list_query_leaders_async_worker (rd_kafka_op_t *rko) {
3569         rd_kafka_t *rk = rko->rko_rk;
3570         rd_list_t query_topics, *leaders = NULL;
3571         rd_kafka_op_t *reply;
3572 
3573         RD_KAFKA_OP_TYPE_ASSERT(rko, RD_KAFKA_OP_LEADERS);
3574 
3575         if (rko->rko_err)
3576                 goto reply; /* Timeout or ERR__DESTROY */
3577 
3578         /* Since we're iterating over get_leaders() until all partition leaders
3579          * are known we need to re-enable the eonce to be triggered again (which
3580          * is not necessary the first time we get here, but there
3581          * is no harm doing it then either). */
3582         rd_kafka_enq_once_reenable(rko->rko_u.leaders.eonce,
3583                                    rko, RD_KAFKA_REPLYQ(rk->rk_ops, 0));
3584 
3585         /* Look up the leaders in the metadata cache, if not all leaders
3586          * are known the eonce is registered for metadata cache changes
3587          * which will cause our function to be called
3588          * again on (any) metadata cache change.
3589          *
3590          * When we are called again we perform the cache lookup again and
3591          * hopefully get all leaders, otherwise defer a new async wait.
3592          * Repeat until success or timeout. */
3593 
3594         rd_list_init(&query_topics, 4 + rko->rko_u.leaders.partitions->cnt/2,
3595                      rd_free);
3596 
3597         leaders = rd_list_new(1 + rko->rko_u.leaders.partitions->cnt / 2,
3598                               rd_kafka_partition_leader_destroy_free);
3599 
3600         if (rd_kafka_topic_partition_list_get_leaders(
3601                     rk, rko->rko_u.leaders.partitions,
3602                     leaders,
3603                     &query_topics,
3604                     /* Add unknown topics to query_topics only on the
3605                      * first query, after that we consider them permanently
3606                      * non-existent */
3607                     rko->rko_u.leaders.query_cnt == 0,
3608                     rko->rko_u.leaders.eonce)) {
3609                 /* All leaders now known (or failed), reply to caller */
3610                 rd_list_destroy(&query_topics);
3611                 goto reply;
3612         }
3613 
3614         if (rd_list_empty(&query_topics)) {
3615                 /* Not all leaders known but no topics left to query,
3616                  * reply to caller. */
3617                 rd_list_destroy(&query_topics);
3618                 goto reply;
3619         }
3620 
3621         /* Need to refresh topic metadata, but at most every interval. */
3622         if (!rd_kafka_timer_is_started(&rk->rk_timers,
3623                                        &rko->rko_u.leaders.query_tmr)) {
3624 
3625                 rko->rko_u.leaders.query_cnt++;
3626 
3627                 /* Add query interval timer. */
3628                 rd_kafka_enq_once_add_source(rko->rko_u.leaders.eonce,
3629                                              "query timer");
3630                 rd_kafka_timer_start_oneshot(
3631                         &rk->rk_timers,
3632                         &rko->rko_u.leaders.query_tmr,
3633                         rd_true,
3634                         3*1000*1000 /* 3s */,
3635                         rd_kafka_partition_leader_query_eonce_timer_cb,
3636                         rko->rko_u.leaders.eonce);
3637 
3638                 /* Request metadata refresh */
3639                 rd_kafka_metadata_refresh_topics(
3640                         rk, NULL, &query_topics,
3641                         rd_true/*force*/,
3642                         rd_false/*!allow_auto_create*/,
3643                         rd_false/*!cgrp_update*/,
3644                         "query partition leaders");
3645 
3646         }
3647 
3648         rd_list_destroy(leaders);
3649         rd_list_destroy(&query_topics);
3650 
3651         /* Wait for next eonce trigger */
3652         return RD_KAFKA_OP_RES_KEEP; /* rko is still used */
3653 
3654  reply:
3655         /* Decommission worker state and reply to caller */
3656 
3657         if (rd_kafka_timer_stop(&rk->rk_timers,
3658                                 &rko->rko_u.leaders.query_tmr,
3659                                 RD_DO_LOCK))
3660                 rd_kafka_enq_once_del_source(rko->rko_u.leaders.eonce,
3661                                              "query timer");
3662         if (rd_kafka_timer_stop(&rk->rk_timers,
3663                                 &rko->rko_u.leaders.timeout_tmr,
3664                                 RD_DO_LOCK))
3665                 rd_kafka_enq_once_del_source(rko->rko_u.leaders.eonce,
3666                                              "timeout timer");
3667 
3668         if (rko->rko_u.leaders.eonce) {
3669                 rd_kafka_enq_once_disable(rko->rko_u.leaders.eonce);
3670                 rko->rko_u.leaders.eonce = NULL;
3671         }
3672 
3673         /* No leaders found, set a request-level error */
3674         if (leaders && rd_list_cnt(leaders) == 0) {
3675                 if (!rko->rko_err)
3676                         rko->rko_err = RD_KAFKA_RESP_ERR__NOENT;
3677                 rd_list_destroy(leaders);
3678                 leaders = NULL;
3679         }
3680 
3681         /* Create and enqueue reply rko */
3682         if (rko->rko_u.leaders.replyq.q) {
3683                 reply = rd_kafka_op_new_cb(rk, RD_KAFKA_OP_LEADERS,
3684                                            rko->rko_u.leaders.cb);
3685                 rd_kafka_op_get_reply_version(reply, rko);
3686                 reply->rko_err = rko->rko_err;
3687                 reply->rko_u.leaders.partitions =
3688                         rko->rko_u.leaders.partitions; /* Transfer ownership for
3689                                                         * partition list that
3690                                                         * now contains
3691                                                         * per-partition errors*/
3692                 rko->rko_u.leaders.partitions = NULL;
3693                 reply->rko_u.leaders.leaders = leaders; /* Possibly NULL */
3694                 reply->rko_u.leaders.opaque = rko->rko_u.leaders.opaque;
3695 
3696                 rd_kafka_replyq_enq(&rko->rko_u.leaders.replyq, reply, 0);
3697         }
3698 
3699         return RD_KAFKA_OP_RES_HANDLED;
3700 }
3701 
3702 
3703 static rd_kafka_op_res_t
rd_kafka_topic_partition_list_query_leaders_async_worker_op_cb(rd_kafka_t * rk,rd_kafka_q_t * rkq,rd_kafka_op_t * rko)3704 rd_kafka_topic_partition_list_query_leaders_async_worker_op_cb (
3705         rd_kafka_t *rk,
3706         rd_kafka_q_t *rkq,
3707         rd_kafka_op_t *rko) {
3708         return rd_kafka_topic_partition_list_query_leaders_async_worker(rko);
3709 }
3710 
3711 /**
3712  * @brief Async variant of rd_kafka_topic_partition_list_query_leaders().
3713  *
3714  * The reply rko op will contain:
3715  * - .leaders which is a list of leaders and their partitions, this may be
3716  *    NULL for overall errors (such as no leaders are found), or a
3717  *    partial or complete list of leaders.
3718  * - .partitions which is a copy of the input list of partitions with the
3719  *   .err field set to the outcome of the leader query, typically ERR_NO_ERROR
3720  *   or ERR_UNKNOWN_TOPIC_OR_PART.
3721  *
3722  * @locks_acquired rd_kafka_*lock()
3723  *
3724  * @remark rd_kafka_*lock() MUST NOT be held
3725  */
3726 void
rd_kafka_topic_partition_list_query_leaders_async(rd_kafka_t * rk,const rd_kafka_topic_partition_list_t * rktparlist,int timeout_ms,rd_kafka_replyq_t replyq,rd_kafka_op_cb_t * cb,void * opaque)3727 rd_kafka_topic_partition_list_query_leaders_async (
3728         rd_kafka_t *rk,
3729         const rd_kafka_topic_partition_list_t *rktparlist,
3730         int timeout_ms,
3731         rd_kafka_replyq_t replyq,
3732         rd_kafka_op_cb_t *cb,
3733         void *opaque) {
3734         rd_kafka_op_t *rko;
3735 
3736         rd_assert(rktparlist && rktparlist->cnt > 0);
3737         rd_assert(replyq.q);
3738 
3739         rko = rd_kafka_op_new_cb(
3740                 rk,
3741                 RD_KAFKA_OP_LEADERS,
3742                 rd_kafka_topic_partition_list_query_leaders_async_worker_op_cb);
3743         rko->rko_u.leaders.replyq = replyq;
3744         rko->rko_u.leaders.partitions =
3745                 rd_kafka_topic_partition_list_copy(rktparlist);
3746         rko->rko_u.leaders.ts_timeout = rd_timeout_init(timeout_ms);
3747         rko->rko_u.leaders.cb = cb;
3748         rko->rko_u.leaders.opaque = opaque;
3749 
3750         /* Create an eonce to be triggered either by metadata cache update
3751          * (from refresh_topics()), query interval, or timeout. */
3752         rko->rko_u.leaders.eonce = rd_kafka_enq_once_new(
3753                 rko, RD_KAFKA_REPLYQ(rk->rk_ops, 0));
3754 
3755         rd_kafka_enq_once_add_source(rko->rko_u.leaders.eonce, "timeout timer");
3756         rd_kafka_timer_start_oneshot(
3757                 &rk->rk_timers,
3758                 &rko->rko_u.leaders.timeout_tmr,
3759                 rd_true,
3760                 rd_timeout_remains_us(rko->rko_u.leaders.ts_timeout),
3761                 rd_kafka_partition_leader_query_eonce_timeout_cb,
3762                 rko->rko_u.leaders.eonce);
3763 
3764         if (rd_kafka_topic_partition_list_query_leaders_async_worker(rko) ==
3765             RD_KAFKA_OP_RES_HANDLED)
3766                 rd_kafka_op_destroy(rko); /* Reply queue already disabled */
3767 }
3768 
3769 
3770 /**
3771  * @brief Get leaders for all partitions in \p rktparlist, querying metadata
3772  *        if needed.
3773  *
3774  * @param leaders is a pre-initialized (empty) list which will be populated
3775  *        with the leader brokers and their partitions
3776  *        (struct rd_kafka_partition_leader *)
3777  *
3778  * @remark Will not trigger topic auto creation (unless configured).
3779  *
3780  * @returns an error code on error.
3781  *
3782  * @locks rd_kafka_*lock() MUST NOT be held
3783  */
3784 rd_kafka_resp_err_t
rd_kafka_topic_partition_list_query_leaders(rd_kafka_t * rk,rd_kafka_topic_partition_list_t * rktparlist,rd_list_t * leaders,int timeout_ms)3785 rd_kafka_topic_partition_list_query_leaders (
3786         rd_kafka_t *rk,
3787         rd_kafka_topic_partition_list_t *rktparlist,
3788         rd_list_t *leaders, int timeout_ms) {
3789         rd_ts_t ts_end = rd_timeout_init(timeout_ms);
3790         rd_ts_t ts_query = 0;
3791         rd_ts_t now;
3792         int query_cnt = 0;
3793         int i = 0;
3794 
3795         /* Get all the partition leaders, try multiple times:
3796          * if there are no leaders after the first run fire off a leader
3797          * query and wait for broker state update before trying again,
3798          * keep trying and re-querying at increasing intervals until
3799          * success or timeout. */
3800         do {
3801                 rd_list_t query_topics;
3802                 int query_intvl;
3803 
3804                 rd_list_init(&query_topics, rktparlist->cnt, rd_free);
3805 
3806                 rd_kafka_topic_partition_list_get_leaders(
3807                         rk, rktparlist, leaders, &query_topics,
3808                         /* Add unknown topics to query_topics only on the
3809                          * first query, after that we consider them
3810                          * permanently non-existent */
3811                         query_cnt == 0,
3812                         NULL);
3813 
3814                 if (rd_list_empty(&query_topics)) {
3815                         /* No remaining topics to query: leader-list complete.*/
3816                         rd_list_destroy(&query_topics);
3817 
3818                         /* No leader(s) for partitions means all partitions
3819                          * are unknown. */
3820                         if (rd_list_empty(leaders))
3821                                 return RD_KAFKA_RESP_ERR__UNKNOWN_PARTITION;
3822 
3823                         return RD_KAFKA_RESP_ERR_NO_ERROR;
3824                 }
3825 
3826                 now = rd_clock();
3827 
3828                 /*
3829                  * Missing leader for some partitions
3830                  */
3831                 query_intvl = (i+1) * 100; /* add 100ms per iteration */
3832                 if (query_intvl > 2*1000)
3833                         query_intvl = 2*1000; /* Cap to 2s */
3834 
3835                 if (now >= ts_query + (query_intvl*1000)) {
3836                         /* Query metadata for missing leaders,
3837                          * possibly creating the topic. */
3838                         rd_kafka_metadata_refresh_topics(
3839                                 rk, NULL, &query_topics,
3840                                 rd_true/*force*/,
3841                                 rd_false/*!allow_auto_create*/,
3842                                 rd_false/*!cgrp_update*/,
3843                                 "query partition leaders");
3844                         ts_query = now;
3845                         query_cnt++;
3846 
3847                 } else {
3848                         /* Wait for broker ids to be updated from
3849                          * metadata refresh above. */
3850                         int wait_ms = rd_timeout_remains_limit(ts_end,
3851                                                                query_intvl);
3852                         rd_kafka_metadata_cache_wait_change(rk, wait_ms);
3853                 }
3854 
3855                 rd_list_destroy(&query_topics);
3856 
3857                 i++;
3858         } while (ts_end == RD_POLL_INFINITE ||
3859                  now < ts_end); /* now is deliberately outdated here
3860                                  * since wait_change() will block.
3861                                  * This gives us one more chance to spin thru*/
3862 
3863         if (rd_atomic32_get(&rk->rk_broker_up_cnt) == 0)
3864                 return RD_KAFKA_RESP_ERR__ALL_BROKERS_DOWN;
3865 
3866         return RD_KAFKA_RESP_ERR__TIMED_OUT;
3867 }
3868 
3869 
3870 /**
3871  * @brief Populate \p rkts with the rd_kafka_topic_t objects for the
3872  *        partitions in. Duplicates are suppressed.
3873  *
3874  * @returns the number of topics added.
3875  */
3876 int
rd_kafka_topic_partition_list_get_topics(rd_kafka_t * rk,rd_kafka_topic_partition_list_t * rktparlist,rd_list_t * rkts)3877 rd_kafka_topic_partition_list_get_topics (
3878         rd_kafka_t *rk,
3879         rd_kafka_topic_partition_list_t *rktparlist,
3880         rd_list_t *rkts) {
3881         int cnt = 0;
3882 
3883         int i;
3884         for (i = 0 ; i < rktparlist->cnt ; i++) {
3885                 rd_kafka_topic_partition_t *rktpar = &rktparlist->elems[i];
3886                 rd_kafka_toppar_t *rktp;
3887 
3888                 rktp = rd_kafka_topic_partition_get_toppar(rk, rktpar,
3889                                                            rd_false);
3890                 if (!rktp) {
3891                         rktpar->err = RD_KAFKA_RESP_ERR__UNKNOWN_PARTITION;
3892                         continue;
3893                 }
3894 
3895                 if (!rd_list_find(rkts, rktp->rktp_rkt,
3896                                   rd_kafka_topic_cmp_rkt)) {
3897                         rd_list_add(rkts, rd_kafka_topic_keep(rktp->rktp_rkt));
3898                         cnt++;
3899                 }
3900 
3901                 rd_kafka_toppar_destroy(rktp);
3902         }
3903 
3904         return cnt;
3905 }
3906 
3907 
3908 /**
3909  * @brief Populate \p topics with the strdupped topic names in \p rktparlist.
3910  *        Duplicates are suppressed.
3911  *
3912  * @param include_regex: include regex topics
3913  *
3914  * @returns the number of topics added.
3915  */
3916 int
rd_kafka_topic_partition_list_get_topic_names(const rd_kafka_topic_partition_list_t * rktparlist,rd_list_t * topics,int include_regex)3917 rd_kafka_topic_partition_list_get_topic_names (
3918         const rd_kafka_topic_partition_list_t *rktparlist,
3919         rd_list_t *topics, int include_regex) {
3920         int cnt = 0;
3921         int i;
3922 
3923         for (i = 0 ; i < rktparlist->cnt ; i++) {
3924                 const rd_kafka_topic_partition_t *rktpar = &rktparlist->elems[i];
3925 
3926                 if (!include_regex && *rktpar->topic == '^')
3927                         continue;
3928 
3929                 if (!rd_list_find(topics, rktpar->topic, (void *)strcmp)) {
3930                         rd_list_add(topics, rd_strdup(rktpar->topic));
3931                         cnt++;
3932                 }
3933         }
3934 
3935         return cnt;
3936 }
3937 
3938 
3939 /**
3940  * @brief Create a copy of \p rktparlist only containing the partitions
3941  *        matched by \p match function.
3942  *
3943  * \p match shall return 1 for match, else 0.
3944  *
3945  * @returns a new list
3946  */
rd_kafka_topic_partition_list_match(const rd_kafka_topic_partition_list_t * rktparlist,int (* match)(const void * elem,const void * opaque),void * opaque)3947 rd_kafka_topic_partition_list_t *rd_kafka_topic_partition_list_match (
3948         const rd_kafka_topic_partition_list_t *rktparlist,
3949         int (*match) (const void *elem, const void *opaque),
3950         void *opaque) {
3951         rd_kafka_topic_partition_list_t *newlist;
3952         int i;
3953 
3954         newlist = rd_kafka_topic_partition_list_new(0);
3955 
3956         for (i = 0 ; i < rktparlist->cnt ; i++) {
3957                 const rd_kafka_topic_partition_t *rktpar =
3958                         &rktparlist->elems[i];
3959 
3960                 if (!match(rktpar, opaque))
3961                         continue;
3962 
3963                 rd_kafka_topic_partition_list_add_copy(newlist, rktpar);
3964         }
3965 
3966         return newlist;
3967 }
3968 
3969 void
rd_kafka_topic_partition_list_log(rd_kafka_t * rk,const char * fac,int dbg,const rd_kafka_topic_partition_list_t * rktparlist)3970 rd_kafka_topic_partition_list_log (rd_kafka_t *rk, const char *fac, int dbg,
3971 				   const rd_kafka_topic_partition_list_t *rktparlist) {
3972         int i;
3973 
3974 	rd_kafka_dbg(rk, NONE|dbg, fac, "List with %d partition(s):",
3975 		     rktparlist->cnt);
3976         for (i = 0 ; i < rktparlist->cnt ; i++) {
3977 		const rd_kafka_topic_partition_t *rktpar =
3978 			&rktparlist->elems[i];
3979 		rd_kafka_dbg(rk, NONE|dbg, fac, " %s [%"PRId32"] offset %s%s%s",
3980 			     rktpar->topic, rktpar->partition,
3981 			     rd_kafka_offset2str(rktpar->offset),
3982 			     rktpar->err ? ": error: " : "",
3983 			     rktpar->err ? rd_kafka_err2str(rktpar->err) : "");
3984 	}
3985 }
3986 
3987 /**
3988  * @returns a comma-separated list of partitions.
3989  */
3990 const char *
rd_kafka_topic_partition_list_str(const rd_kafka_topic_partition_list_t * rktparlist,char * dest,size_t dest_size,int fmt_flags)3991 rd_kafka_topic_partition_list_str (const rd_kafka_topic_partition_list_t *rktparlist,
3992                                    char *dest, size_t dest_size,
3993                                    int fmt_flags) {
3994         int i;
3995         size_t of = 0;
3996 
3997         for (i = 0 ; i < rktparlist->cnt ; i++) {
3998                 const rd_kafka_topic_partition_t *rktpar =
3999                         &rktparlist->elems[i];
4000                 char errstr[128];
4001                 char offsetstr[32];
4002                 int r;
4003 
4004                 if (!rktpar->err && (fmt_flags & RD_KAFKA_FMT_F_ONLY_ERR))
4005                         continue;
4006 
4007                 if (rktpar->err && !(fmt_flags & RD_KAFKA_FMT_F_NO_ERR))
4008                         rd_snprintf(errstr, sizeof(errstr),
4009                                     "(%s)", rd_kafka_err2str(rktpar->err));
4010                 else
4011                         errstr[0] = '\0';
4012 
4013                 if (rktpar->offset != RD_KAFKA_OFFSET_INVALID)
4014                         rd_snprintf(offsetstr, sizeof(offsetstr),
4015                                     "@%"PRId64, rktpar->offset);
4016                 else
4017                         offsetstr[0] = '\0';
4018 
4019                 r = rd_snprintf(&dest[of], dest_size-of,
4020                                 "%s"
4021                                 "%s[%"PRId32"]"
4022                                 "%s"
4023                                 "%s",
4024                                 of == 0 ? "" : ", ",
4025                                 rktpar->topic, rktpar->partition,
4026                                 offsetstr,
4027                                 errstr);
4028 
4029                 if ((size_t)r >= dest_size-of) {
4030                         rd_snprintf(&dest[dest_size-4], 4, "...");
4031                         break;
4032                 }
4033 
4034                 of += r;
4035         }
4036 
4037         return dest;
4038 }
4039 
4040 
4041 
4042 /**
4043  * @brief Update \p dst with info from \p src.
4044  *
4045  * Fields updated:
4046  *  - metadata
4047  *  - metadata_size
4048  *  - offset
4049  *  - err
4050  *
4051  * Will only update partitions that are in both dst and src, other partitions will
4052  * remain unchanged.
4053  */
4054 void
rd_kafka_topic_partition_list_update(rd_kafka_topic_partition_list_t * dst,const rd_kafka_topic_partition_list_t * src)4055 rd_kafka_topic_partition_list_update (rd_kafka_topic_partition_list_t *dst,
4056                                       const rd_kafka_topic_partition_list_t *src){
4057         int i;
4058 
4059         for (i = 0 ; i < dst->cnt ; i++) {
4060                 rd_kafka_topic_partition_t *d = &dst->elems[i];
4061                 rd_kafka_topic_partition_t *s;
4062 
4063                 if (!(s = rd_kafka_topic_partition_list_find(
4064                               (rd_kafka_topic_partition_list_t *)src,
4065                               d->topic, d->partition)))
4066                         continue;
4067 
4068                 d->offset = s->offset;
4069                 d->err    = s->err;
4070                 if (d->metadata) {
4071                         rd_free(d->metadata);
4072                         d->metadata = NULL;
4073                         d->metadata_size = 0;
4074                 }
4075                 if (s->metadata_size > 0) {
4076                         d->metadata =
4077                                 rd_malloc(s->metadata_size);
4078                         d->metadata_size = s->metadata_size;
4079                         memcpy((void *)d->metadata, s->metadata,
4080                                 s->metadata_size);
4081                 }
4082         }
4083 }
4084 
4085 
4086 /**
4087  * @returns the sum of \p cb called for each element.
4088  */
4089 size_t
rd_kafka_topic_partition_list_sum(const rd_kafka_topic_partition_list_t * rktparlist,size_t (* cb)(const rd_kafka_topic_partition_t * rktpar,void * opaque),void * opaque)4090 rd_kafka_topic_partition_list_sum (
4091         const rd_kafka_topic_partition_list_t *rktparlist,
4092         size_t (*cb) (const rd_kafka_topic_partition_t *rktpar, void *opaque),
4093         void *opaque) {
4094         int i;
4095         size_t sum = 0;
4096 
4097         for (i = 0 ; i < rktparlist->cnt ; i++) {
4098                 const rd_kafka_topic_partition_t *rktpar =
4099                         &rktparlist->elems[i];
4100                 sum += cb(rktpar, opaque);
4101         }
4102 
4103         return sum;
4104 }
4105 
4106 
4107 /**
4108  * @returns rd_true if there are duplicate topic/partitions in the list,
4109  *          rd_false if not.
4110  *
4111  * @remarks sorts the elements of the list.
4112  */
4113 rd_bool_t
rd_kafka_topic_partition_list_has_duplicates(rd_kafka_topic_partition_list_t * rktparlist,rd_bool_t ignore_partition)4114 rd_kafka_topic_partition_list_has_duplicates (
4115                 rd_kafka_topic_partition_list_t *rktparlist,
4116                 rd_bool_t ignore_partition) {
4117 
4118         int i;
4119 
4120         if (rktparlist->cnt <= 1)
4121                 return rd_false;
4122 
4123         rd_kafka_topic_partition_list_sort_by_topic(rktparlist);
4124 
4125         for (i=1; i<rktparlist->cnt; i++) {
4126                 const rd_kafka_topic_partition_t *p1 = &rktparlist->elems[i-1];
4127                 const rd_kafka_topic_partition_t *p2 = &rktparlist->elems[i];
4128 
4129                 if (((p1->partition == p2->partition) || ignore_partition) &&
4130                     !strcmp(p1->topic, p2->topic)) {
4131                         return rd_true;
4132                 }
4133         }
4134 
4135         return rd_false;
4136 }
4137 
4138 
4139 /**
4140  * @brief Set \c .err field \p err on all partitions in list.
4141  */
rd_kafka_topic_partition_list_set_err(rd_kafka_topic_partition_list_t * rktparlist,rd_kafka_resp_err_t err)4142 void rd_kafka_topic_partition_list_set_err (
4143         rd_kafka_topic_partition_list_t *rktparlist,
4144         rd_kafka_resp_err_t err) {
4145         int i;
4146 
4147         for (i = 0 ; i < rktparlist->cnt ; i++)
4148                 rktparlist->elems[i].err = err;
4149 }
4150 
4151 /**
4152  * @brief Get the first set error in the partition list.
4153  */
rd_kafka_topic_partition_list_get_err(const rd_kafka_topic_partition_list_t * rktparlist)4154 rd_kafka_resp_err_t rd_kafka_topic_partition_list_get_err (
4155         const rd_kafka_topic_partition_list_t *rktparlist) {
4156         int i;
4157 
4158         for (i = 0 ; i < rktparlist->cnt ; i++)
4159                 if (rktparlist->elems[i].err)
4160                         return rktparlist->elems[i].err;
4161 
4162         return RD_KAFKA_RESP_ERR_NO_ERROR;
4163 }
4164 
4165 
4166 /**
4167  * @returns the number of wildcard/regex topics
4168  */
rd_kafka_topic_partition_list_regex_cnt(const rd_kafka_topic_partition_list_t * rktparlist)4169 int rd_kafka_topic_partition_list_regex_cnt (
4170         const rd_kafka_topic_partition_list_t *rktparlist) {
4171         int i;
4172         int cnt = 0;
4173 
4174         for (i = 0 ; i < rktparlist->cnt ; i++) {
4175                 const rd_kafka_topic_partition_t *rktpar =
4176                         &rktparlist->elems[i];
4177                 cnt += *rktpar->topic == '^';
4178         }
4179         return cnt;
4180 }
4181 
4182 
4183 /**
4184  * @brief Reset base sequence for this toppar.
4185  *
4186  * See rd_kafka_toppar_pid_change() below.
4187  *
4188  * @warning Toppar must be completely drained.
4189  *
4190  * @locality toppar handler thread
4191  * @locks toppar_lock MUST be held.
4192  */
rd_kafka_toppar_reset_base_msgid(rd_kafka_toppar_t * rktp,uint64_t new_base_msgid)4193 static void rd_kafka_toppar_reset_base_msgid (rd_kafka_toppar_t *rktp,
4194                                               uint64_t new_base_msgid) {
4195         rd_kafka_dbg(rktp->rktp_rkt->rkt_rk,
4196                      TOPIC|RD_KAFKA_DBG_EOS, "RESETSEQ",
4197                      "%.*s [%"PRId32"] "
4198                      "resetting epoch base seq from %"PRIu64" to %"PRIu64,
4199                      RD_KAFKAP_STR_PR(rktp->rktp_rkt->rkt_topic),
4200                      rktp->rktp_partition,
4201                      rktp->rktp_eos.epoch_base_msgid, new_base_msgid);
4202 
4203         rktp->rktp_eos.next_ack_seq = 0;
4204         rktp->rktp_eos.next_err_seq = 0;
4205         rktp->rktp_eos.epoch_base_msgid = new_base_msgid;
4206 }
4207 
4208 
4209 /**
4210  * @brief Update/change the Producer ID for this toppar.
4211  *
4212  * Must only be called when pid is different from the current toppar pid.
4213  *
4214  * The epoch base sequence will be set to \p base_msgid, which must be the
4215  * first message in the partition
4216  * queue. However, if there are outstanding messages in-flight to the broker
4217  * we will need to wait for these ProduceRequests to finish (most likely
4218  * with failure) and have their messages re-enqueued to maintain original order.
4219  * In this case the pid will not be updated and this function should be
4220  * called again when there are no outstanding messages.
4221  *
4222  * @remark This function must only be called when rktp_xmitq is non-empty.
4223  *
4224  * @returns 1 if a new pid was set, else 0.
4225  *
4226  * @locality toppar handler thread
4227  * @locks none
4228  */
rd_kafka_toppar_pid_change(rd_kafka_toppar_t * rktp,rd_kafka_pid_t pid,uint64_t base_msgid)4229 int rd_kafka_toppar_pid_change (rd_kafka_toppar_t *rktp, rd_kafka_pid_t pid,
4230                                 uint64_t base_msgid) {
4231         int inflight = rd_atomic32_get(&rktp->rktp_msgs_inflight);
4232 
4233         if (unlikely(inflight > 0)) {
4234                 rd_kafka_dbg(rktp->rktp_rkt->rkt_rk,
4235                              TOPIC|RD_KAFKA_DBG_EOS, "NEWPID",
4236                              "%.*s [%"PRId32"] will not change %s -> %s yet: "
4237                              "%d message(s) still in-flight from current "
4238                              "epoch",
4239                              RD_KAFKAP_STR_PR(rktp->rktp_rkt->rkt_topic),
4240                              rktp->rktp_partition,
4241                              rd_kafka_pid2str(rktp->rktp_eos.pid),
4242                              rd_kafka_pid2str(pid),
4243                              inflight);
4244                 return 0;
4245         }
4246 
4247         rd_assert(base_msgid != 0 &&
4248                   *"BUG: pid_change() must only be called with "
4249                   "non-empty xmitq");
4250 
4251         rd_kafka_toppar_lock(rktp);
4252         rd_kafka_dbg(rktp->rktp_rkt->rkt_rk,
4253                      TOPIC|RD_KAFKA_DBG_EOS, "NEWPID",
4254                      "%.*s [%"PRId32"] changed %s -> %s "
4255                      "with base MsgId %"PRIu64,
4256                      RD_KAFKAP_STR_PR(rktp->rktp_rkt->rkt_topic),
4257                      rktp->rktp_partition,
4258                      rd_kafka_pid2str(rktp->rktp_eos.pid),
4259                      rd_kafka_pid2str(pid),
4260                      base_msgid);
4261 
4262         rktp->rktp_eos.pid = pid;
4263         rd_kafka_toppar_reset_base_msgid(rktp, base_msgid);
4264 
4265         rd_kafka_toppar_unlock(rktp);
4266 
4267         return 1;
4268 }
4269 
4270 
4271 /**
4272  * @brief Purge messages in partition queues.
4273  *        Delivery reports will be enqueued for all purged messages, the error
4274  *        code is set to RD_KAFKA_RESP_ERR__PURGE_QUEUE.
4275  *
4276  * @param include_xmit_msgq If executing from the rktp's current broker handler
4277  *                          thread, also include the xmit message queue.
4278  *
4279  * @warning Only to be used with the producer.
4280  *
4281  * @returns the number of messages purged
4282  *
4283  * @locality any thread.
4284  * @locks_acquired rd_kafka_toppar_lock()
4285  * @locks_required none
4286  */
rd_kafka_toppar_purge_queues(rd_kafka_toppar_t * rktp,int purge_flags,rd_bool_t include_xmit_msgq)4287 int rd_kafka_toppar_purge_queues (rd_kafka_toppar_t *rktp,
4288                                   int purge_flags,
4289                                   rd_bool_t include_xmit_msgq) {
4290         rd_kafka_t *rk = rktp->rktp_rkt->rkt_rk;
4291         rd_kafka_msgq_t rkmq = RD_KAFKA_MSGQ_INITIALIZER(rkmq);
4292         int cnt;
4293 
4294         rd_assert(rk->rk_type == RD_KAFKA_PRODUCER);
4295 
4296         rd_kafka_dbg(rk, TOPIC, "PURGE",
4297                      "%s [%"PRId32"]: purging queues "
4298                      "(purge_flags 0x%x, %s xmit_msgq)",
4299                      rktp->rktp_rkt->rkt_topic->str,
4300                      rktp->rktp_partition,
4301                      purge_flags,
4302                      include_xmit_msgq ? "include" : "exclude");
4303 
4304         if (!(purge_flags & RD_KAFKA_PURGE_F_QUEUE))
4305                 return 0;
4306 
4307         if (include_xmit_msgq) {
4308                 /* xmit_msgq is owned by the toppar handler thread
4309                  * (broker thread) and requires no locking. */
4310                 rd_assert(rktp->rktp_broker);
4311                 rd_assert(thrd_is_current(rktp->rktp_broker->rkb_thread));
4312                 rd_kafka_msgq_concat(&rkmq, &rktp->rktp_xmit_msgq);
4313         }
4314 
4315         rd_kafka_toppar_lock(rktp);
4316         rd_kafka_msgq_concat(&rkmq, &rktp->rktp_msgq);
4317         cnt = rd_kafka_msgq_len(&rkmq);
4318 
4319         if (cnt > 0 && purge_flags & RD_KAFKA_PURGE_F_ABORT_TXN) {
4320                 /* All messages in-queue are purged
4321                  * on abort_transaction(). Since these messages
4322                  * will not be produced (retried) we need to adjust the
4323                  * idempotence epoch's base msgid to skip the messages. */
4324                 rktp->rktp_eos.epoch_base_msgid += cnt;
4325                 rd_kafka_dbg(rk,
4326                              TOPIC|RD_KAFKA_DBG_EOS, "ADVBASE",
4327                              "%.*s [%"PRId32"] "
4328                              "advancing epoch base msgid to %"PRIu64
4329                              " due to %d message(s) in aborted transaction",
4330                              RD_KAFKAP_STR_PR(rktp->rktp_rkt->rkt_topic),
4331                              rktp->rktp_partition,
4332                              rktp->rktp_eos.epoch_base_msgid, cnt);
4333         }
4334         rd_kafka_toppar_unlock(rktp);
4335 
4336         rd_kafka_dr_msgq(rktp->rktp_rkt, &rkmq, RD_KAFKA_RESP_ERR__PURGE_QUEUE);
4337 
4338         return cnt;
4339 }
4340 
4341 
4342 /**
4343  * @brief Purge queues for the unassigned toppars of all known topics.
4344  *
4345  * @locality application thread
4346  * @locks none
4347  */
rd_kafka_purge_ua_toppar_queues(rd_kafka_t * rk)4348 void rd_kafka_purge_ua_toppar_queues (rd_kafka_t *rk) {
4349         rd_kafka_topic_t *rkt;
4350         int msg_cnt = 0, part_cnt = 0;
4351 
4352         rd_kafka_rdlock(rk);
4353         TAILQ_FOREACH(rkt, &rk->rk_topics, rkt_link) {
4354                 rd_kafka_toppar_t *rktp;
4355                 int r;
4356 
4357                 rd_kafka_topic_rdlock(rkt);
4358                 rktp = rkt->rkt_ua;
4359                 if (rktp)
4360                         rd_kafka_toppar_keep(rktp);
4361                 rd_kafka_topic_rdunlock(rkt);
4362 
4363                 if (unlikely(!rktp))
4364                         continue;
4365 
4366 
4367                 rd_kafka_toppar_lock(rktp);
4368 
4369                 r = rd_kafka_msgq_len(&rktp->rktp_msgq);
4370                 rd_kafka_dr_msgq(rkt, &rktp->rktp_msgq,
4371                                  RD_KAFKA_RESP_ERR__PURGE_QUEUE);
4372                 rd_kafka_toppar_unlock(rktp);
4373                 rd_kafka_toppar_destroy(rktp);
4374 
4375                 if (r > 0) {
4376                         msg_cnt += r;
4377                         part_cnt++;
4378                 }
4379         }
4380         rd_kafka_rdunlock(rk);
4381 
4382         rd_kafka_dbg(rk, QUEUE|RD_KAFKA_DBG_TOPIC, "PURGEQ",
4383                      "Purged %i message(s) from %d UA-partition(s)",
4384                      msg_cnt, part_cnt);
4385 }
4386 
4387 
rd_kafka_partition_leader_destroy_free(void * ptr)4388 void rd_kafka_partition_leader_destroy_free (void *ptr) {
4389         struct rd_kafka_partition_leader *leader = ptr;
4390         rd_kafka_partition_leader_destroy(leader);
4391 }
4392