1 /*
2  * librdkafka - The Apache Kafka C/C++ library
3  *
4  * Copyright (c) 2016 Magnus Edenhill
5  * All rights reserved.
6  *
7  * Redistribution and use in source and binary forms, with or without
8  * modification, are permitted provided that the following conditions are met:
9  *
10  * 1. Redistributions of source code must retain the above copyright notice,
11  *    this list of conditions and the following disclaimer.
12  * 2. Redistributions in binary form must reproduce the above copyright notice,
13  *    this list of conditions and the following disclaimer in the documentation
14  *    and/or other materials provided with the distribution.
15  *
16  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
17  * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
18  * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
19  * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
20  * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
21  * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
22  * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
23  * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
24  * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
25  * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
26  * POSSIBILITY OF SUCH DAMAGE.
27  */
28 
29 #include "rdkafka_int.h"
30 #include "rdkafka_offset.h"
31 #include "rdkafka_topic.h"
32 #include "rdkafka_interceptor.h"
33 
34 int RD_TLS rd_kafka_yield_thread = 0;
35 
rd_kafka_yield(rd_kafka_t * rk)36 void rd_kafka_yield (rd_kafka_t *rk) {
37         rd_kafka_yield_thread = 1;
38 }
39 
40 
41 /**
42  * @brief Check and reset yield flag.
43  * @returns rd_true if caller should yield, otherwise rd_false.
44  * @remarks rkq_lock MUST be held
45  */
rd_kafka_q_check_yield(rd_kafka_q_t * rkq)46 static RD_INLINE rd_bool_t rd_kafka_q_check_yield (rd_kafka_q_t *rkq) {
47         if (!(rkq->rkq_flags & RD_KAFKA_Q_F_YIELD))
48                 return rd_false;
49 
50         rkq->rkq_flags &= ~RD_KAFKA_Q_F_YIELD;
51         return rd_true;
52 }
53 /**
54  * Destroy a queue. refcnt must be at zero.
55  */
rd_kafka_q_destroy_final(rd_kafka_q_t * rkq)56 void rd_kafka_q_destroy_final (rd_kafka_q_t *rkq) {
57 
58         mtx_lock(&rkq->rkq_lock);
59 	if (unlikely(rkq->rkq_qio != NULL)) {
60 		rd_free(rkq->rkq_qio);
61 		rkq->rkq_qio = NULL;
62 	}
63         /* Queue must have been disabled prior to final destruction,
64          * this is to catch the case where the queue owner/poll does not
65          * use rd_kafka_q_destroy_owner(). */
66         rd_dassert(!(rkq->rkq_flags & RD_KAFKA_Q_F_READY));
67         rd_kafka_q_disable0(rkq, 0/*no-lock*/); /* for the non-devel case */
68         rd_kafka_q_fwd_set0(rkq, NULL, 0/*no-lock*/, 0 /*no-fwd-app*/);
69         rd_kafka_q_purge0(rkq, 0/*no-lock*/);
70 	assert(!rkq->rkq_fwdq);
71         mtx_unlock(&rkq->rkq_lock);
72 	mtx_destroy(&rkq->rkq_lock);
73 	cnd_destroy(&rkq->rkq_cond);
74 
75         if (rkq->rkq_flags & RD_KAFKA_Q_F_ALLOCATED)
76                 rd_free(rkq);
77 }
78 
79 
80 
81 /**
82  * Initialize a queue.
83  */
rd_kafka_q_init0(rd_kafka_q_t * rkq,rd_kafka_t * rk,const char * func,int line)84 void rd_kafka_q_init0 (rd_kafka_q_t *rkq, rd_kafka_t *rk,
85                        const char *func, int line) {
86         rd_kafka_q_reset(rkq);
87 	rkq->rkq_fwdq   = NULL;
88         rkq->rkq_refcnt = 1;
89         rkq->rkq_flags  = RD_KAFKA_Q_F_READY;
90         rkq->rkq_rk     = rk;
91 	rkq->rkq_qio    = NULL;
92         rkq->rkq_serve  = NULL;
93         rkq->rkq_opaque = NULL;
94 	mtx_init(&rkq->rkq_lock, mtx_plain);
95 	cnd_init(&rkq->rkq_cond);
96 #if ENABLE_DEVEL
97         rd_snprintf(rkq->rkq_name, sizeof(rkq->rkq_name), "%s:%d", func, line);
98 #else
99         rkq->rkq_name = func;
100 #endif
101 }
102 
103 
104 /**
105  * Allocate a new queue and initialize it.
106  */
rd_kafka_q_new0(rd_kafka_t * rk,const char * func,int line)107 rd_kafka_q_t *rd_kafka_q_new0 (rd_kafka_t *rk, const char *func, int line) {
108         rd_kafka_q_t *rkq = rd_malloc(sizeof(*rkq));
109         rd_kafka_q_init(rkq, rk);
110         rkq->rkq_flags |= RD_KAFKA_Q_F_ALLOCATED;
111 #if ENABLE_DEVEL
112 	rd_snprintf(rkq->rkq_name, sizeof(rkq->rkq_name), "%s:%d", func, line);
113 #else
114 	rkq->rkq_name = func;
115 #endif
116         return rkq;
117 }
118 
119 /**
120  * Set/clear forward queue.
121  * Queue forwarding enables message routing inside rdkafka.
122  * Typical use is to re-route all fetched messages for all partitions
123  * to one single queue.
124  *
125  * All access to rkq_fwdq are protected by rkq_lock.
126  */
rd_kafka_q_fwd_set0(rd_kafka_q_t * srcq,rd_kafka_q_t * destq,int do_lock,int fwd_app)127 void rd_kafka_q_fwd_set0 (rd_kafka_q_t *srcq, rd_kafka_q_t *destq,
128                           int do_lock, int fwd_app) {
129 
130         if (do_lock)
131                 mtx_lock(&srcq->rkq_lock);
132         if (fwd_app)
133                 srcq->rkq_flags |= RD_KAFKA_Q_F_FWD_APP;
134 	if (srcq->rkq_fwdq) {
135 		rd_kafka_q_destroy(srcq->rkq_fwdq);
136 		srcq->rkq_fwdq = NULL;
137 	}
138 	if (destq) {
139 		rd_kafka_q_keep(destq);
140 
141 		/* If rkq has ops in queue, append them to fwdq's queue.
142 		 * This is an irreversible operation. */
143                 if (srcq->rkq_qlen > 0) {
144 			rd_dassert(destq->rkq_flags & RD_KAFKA_Q_F_READY);
145 			rd_kafka_q_concat(destq, srcq);
146 		}
147 
148 		srcq->rkq_fwdq = destq;
149 	}
150         if (do_lock)
151                 mtx_unlock(&srcq->rkq_lock);
152 }
153 
154 /**
155  * Purge all entries from a queue.
156  */
rd_kafka_q_purge0(rd_kafka_q_t * rkq,int do_lock)157 int rd_kafka_q_purge0 (rd_kafka_q_t *rkq, int do_lock) {
158 	rd_kafka_op_t *rko, *next;
159 	TAILQ_HEAD(, rd_kafka_op_s) tmpq = TAILQ_HEAD_INITIALIZER(tmpq);
160         rd_kafka_q_t *fwdq;
161         int cnt = 0;
162 
163         if (do_lock)
164                 mtx_lock(&rkq->rkq_lock);
165 
166         if ((fwdq = rd_kafka_q_fwd_get(rkq, 0))) {
167                 if (do_lock)
168                         mtx_unlock(&rkq->rkq_lock);
169                 cnt = rd_kafka_q_purge(fwdq);
170                 rd_kafka_q_destroy(fwdq);
171                 return cnt;
172         }
173 
174 	/* Move ops queue to tmpq to avoid lock-order issue
175 	 * by locks taken from rd_kafka_op_destroy(). */
176 	TAILQ_MOVE(&tmpq, &rkq->rkq_q, rko_link);
177 
178 	/* Zero out queue */
179         rd_kafka_q_reset(rkq);
180 
181         if (do_lock)
182                 mtx_unlock(&rkq->rkq_lock);
183 
184 	/* Destroy the ops */
185 	next = TAILQ_FIRST(&tmpq);
186 	while ((rko = next)) {
187 		next = TAILQ_NEXT(next, rko_link);
188 		rd_kafka_op_destroy(rko);
189                 cnt++;
190 	}
191 
192         return cnt;
193 }
194 
195 
196 /**
197  * Purge all entries from a queue with a rktp version smaller than `version`
198  * This shaves off the head of the queue, up until the first rko with
199  * a non-matching rktp or version.
200  */
rd_kafka_q_purge_toppar_version(rd_kafka_q_t * rkq,rd_kafka_toppar_t * rktp,int version)201 void rd_kafka_q_purge_toppar_version (rd_kafka_q_t *rkq,
202                                       rd_kafka_toppar_t *rktp, int version) {
203 	rd_kafka_op_t *rko, *next;
204 	TAILQ_HEAD(, rd_kafka_op_s) tmpq = TAILQ_HEAD_INITIALIZER(tmpq);
205         int32_t cnt = 0;
206         int64_t size = 0;
207         rd_kafka_q_t *fwdq;
208 
209 	mtx_lock(&rkq->rkq_lock);
210 
211         if ((fwdq = rd_kafka_q_fwd_get(rkq, 0))) {
212                 mtx_unlock(&rkq->rkq_lock);
213                 rd_kafka_q_purge_toppar_version(fwdq, rktp, version);
214                 rd_kafka_q_destroy(fwdq);
215                 return;
216         }
217 
218         /* Move ops to temporary queue and then destroy them from there
219          * without locks to avoid lock-ordering problems in op_destroy() */
220         while ((rko = TAILQ_FIRST(&rkq->rkq_q)) && rko->rko_rktp &&
221                rd_kafka_toppar_s2i(rko->rko_rktp) == rktp &&
222                rko->rko_version < version) {
223                 TAILQ_REMOVE(&rkq->rkq_q, rko, rko_link);
224                 TAILQ_INSERT_TAIL(&tmpq, rko, rko_link);
225                 cnt++;
226                 size += rko->rko_len;
227         }
228 
229 
230         rkq->rkq_qlen -= cnt;
231         rkq->rkq_qsize -= size;
232 	mtx_unlock(&rkq->rkq_lock);
233 
234 	next = TAILQ_FIRST(&tmpq);
235 	while ((rko = next)) {
236 		next = TAILQ_NEXT(next, rko_link);
237 		rd_kafka_op_destroy(rko);
238 	}
239 }
240 
241 
242 /**
243  * Move 'cnt' entries from 'srcq' to 'dstq'.
244  * If 'cnt' == -1 all entries will be moved.
245  * Returns the number of entries moved.
246  */
rd_kafka_q_move_cnt(rd_kafka_q_t * dstq,rd_kafka_q_t * srcq,int cnt,int do_locks)247 int rd_kafka_q_move_cnt (rd_kafka_q_t *dstq, rd_kafka_q_t *srcq,
248 			    int cnt, int do_locks) {
249 	rd_kafka_op_t *rko;
250         int mcnt = 0;
251 
252         if (do_locks) {
253 		mtx_lock(&srcq->rkq_lock);
254 		mtx_lock(&dstq->rkq_lock);
255 	}
256 
257 	if (!dstq->rkq_fwdq && !srcq->rkq_fwdq) {
258 		if (cnt > 0 && dstq->rkq_qlen == 0)
259 			rd_kafka_q_io_event(dstq, rd_false/*no rate-limiting*/);
260 
261 		/* Optimization, if 'cnt' is equal/larger than all
262 		 * items of 'srcq' we can move the entire queue. */
263 		if (cnt == -1 ||
264                     cnt >= (int)srcq->rkq_qlen) {
265                         mcnt = srcq->rkq_qlen;
266                         rd_kafka_q_concat0(dstq, srcq, 0/*no-lock*/);
267 		} else {
268 			while (mcnt < cnt &&
269 			       (rko = TAILQ_FIRST(&srcq->rkq_q))) {
270 				TAILQ_REMOVE(&srcq->rkq_q, rko, rko_link);
271                                 if (likely(!rko->rko_prio))
272                                         TAILQ_INSERT_TAIL(&dstq->rkq_q, rko,
273                                                           rko_link);
274                                 else
275                                         TAILQ_INSERT_SORTED(
276                                                 &dstq->rkq_q, rko,
277                                                 rd_kafka_op_t *, rko_link,
278                                                 rd_kafka_op_cmp_prio);
279 
280                                 srcq->rkq_qlen--;
281                                 dstq->rkq_qlen++;
282                                 srcq->rkq_qsize -= rko->rko_len;
283                                 dstq->rkq_qsize += rko->rko_len;
284 				mcnt++;
285 			}
286 		}
287 	} else
288 		mcnt = rd_kafka_q_move_cnt(dstq->rkq_fwdq ? dstq->rkq_fwdq:dstq,
289 					   srcq->rkq_fwdq ? srcq->rkq_fwdq:srcq,
290 					   cnt, do_locks);
291 
292 	if (do_locks) {
293 		mtx_unlock(&dstq->rkq_lock);
294 		mtx_unlock(&srcq->rkq_lock);
295 	}
296 
297 	return mcnt;
298 }
299 
300 
301 /**
302  * Filters out outdated ops.
303  */
rd_kafka_op_filter(rd_kafka_q_t * rkq,rd_kafka_op_t * rko,int version)304 static RD_INLINE rd_kafka_op_t *rd_kafka_op_filter (rd_kafka_q_t *rkq,
305 						    rd_kafka_op_t *rko,
306 						    int version) {
307         if (unlikely(!rko))
308                 return NULL;
309 
310         if (unlikely(rd_kafka_op_version_outdated(rko, version))) {
311 		rd_kafka_q_deq0(rkq, rko);
312                 rd_kafka_op_destroy(rko);
313                 return NULL;
314         }
315 
316         return rko;
317 }
318 
319 
320 
321 /**
322  * Pop an op from a queue.
323  *
324  * Locality: any thread.
325  */
326 
327 
328 /**
329  * Serve q like rd_kafka_q_serve() until an op is found that can be returned
330  * as an event to the application.
331  *
332  * @returns the first event:able op, or NULL on timeout.
333  *
334  * Locality: any thread
335  */
rd_kafka_q_pop_serve(rd_kafka_q_t * rkq,rd_ts_t timeout_us,int32_t version,rd_kafka_q_cb_type_t cb_type,rd_kafka_q_serve_cb_t * callback,void * opaque)336 rd_kafka_op_t *rd_kafka_q_pop_serve (rd_kafka_q_t *rkq, rd_ts_t timeout_us,
337                                      int32_t version,
338                                      rd_kafka_q_cb_type_t cb_type,
339                                      rd_kafka_q_serve_cb_t *callback,
340                                      void *opaque) {
341 	rd_kafka_op_t *rko;
342         rd_kafka_q_t *fwdq;
343 
344         rd_dassert(cb_type);
345 
346 	mtx_lock(&rkq->rkq_lock);
347 
348         rd_kafka_yield_thread = 0;
349         if (!(fwdq = rd_kafka_q_fwd_get(rkq, 0))) {
350                 struct timespec timeout_tspec;
351 
352                 rd_timeout_init_timespec_us(&timeout_tspec, timeout_us);
353 
354                 while (1) {
355                         rd_kafka_op_res_t res;
356                         /* Keep track of current lock status to avoid
357                          * unnecessary lock flapping in all the cases below. */
358                         rd_bool_t is_locked = rd_true;
359 
360                         /* Filter out outdated ops */
361                 retry:
362                         while ((rko = TAILQ_FIRST(&rkq->rkq_q)) &&
363                                !(rko = rd_kafka_op_filter(rkq, rko, version)))
364                                 ;
365 
366                         if (rko) {
367                                 /* Proper versioned op */
368                                 rd_kafka_q_deq0(rkq, rko);
369 
370                                 /* Let op_handle() operate without lock
371                                  * held to allow re-enqueuing, etc. */
372                                 mtx_unlock(&rkq->rkq_lock);
373                                 is_locked = rd_false;
374 
375                                 /* Ops with callbacks are considered handled
376                                  * and we move on to the next op, if any.
377                                  * Ops w/o callbacks are returned immediately */
378                                 res = rd_kafka_op_handle(rkq->rkq_rk, rkq, rko,
379                                                          cb_type, opaque,
380                                                          callback);
381 
382                                 if (res == RD_KAFKA_OP_RES_HANDLED ||
383                                     res == RD_KAFKA_OP_RES_KEEP) {
384                                         mtx_lock(&rkq->rkq_lock);
385                                         is_locked = rd_true;
386                                         goto retry; /* Next op */
387                                 } else if (unlikely(res ==
388                                                   RD_KAFKA_OP_RES_YIELD)) {
389                                         /* Callback yielded, unroll */
390                                         return NULL;
391                                 } else
392                                         break; /* Proper op, handle below. */
393                         }
394 
395                         if (unlikely(rd_kafka_q_check_yield(rkq))) {
396                                 if (is_locked)
397                                         mtx_unlock(&rkq->rkq_lock);
398                                 return NULL;
399                         }
400 
401                         if (!is_locked)
402                                 mtx_lock(&rkq->rkq_lock);
403 
404                         if (cnd_timedwait_abs(&rkq->rkq_cond,
405                                               &rkq->rkq_lock,
406                                               &timeout_tspec) !=
407                             thrd_success) {
408 				mtx_unlock(&rkq->rkq_lock);
409 				return NULL;
410 			}
411                 }
412 
413         } else {
414                 /* Since the q_pop may block we need to release the parent
415                  * queue's lock. */
416                 mtx_unlock(&rkq->rkq_lock);
417 		rko = rd_kafka_q_pop_serve(fwdq, timeout_us, version,
418 					   cb_type, callback, opaque);
419                 rd_kafka_q_destroy(fwdq);
420         }
421 
422 
423 	return rko;
424 }
425 
rd_kafka_q_pop(rd_kafka_q_t * rkq,rd_ts_t timeout_us,int32_t version)426 rd_kafka_op_t *rd_kafka_q_pop (rd_kafka_q_t *rkq, rd_ts_t timeout_us,
427                                int32_t version) {
428         return rd_kafka_q_pop_serve(rkq, timeout_us, version,
429                                     RD_KAFKA_Q_CB_RETURN,
430                                     NULL, NULL);
431 }
432 
433 
434 /**
435  * Pop all available ops from a queue and call the provided
436  * callback for each op.
437  * `max_cnt` limits the number of ops served, 0 = no limit.
438  *
439  * Returns the number of ops served.
440  *
441  * Locality: any thread.
442  */
rd_kafka_q_serve(rd_kafka_q_t * rkq,int timeout_ms,int max_cnt,rd_kafka_q_cb_type_t cb_type,rd_kafka_q_serve_cb_t * callback,void * opaque)443 int rd_kafka_q_serve (rd_kafka_q_t *rkq, int timeout_ms,
444                       int max_cnt, rd_kafka_q_cb_type_t cb_type,
445                       rd_kafka_q_serve_cb_t *callback, void *opaque) {
446         rd_kafka_t *rk = rkq->rkq_rk;
447 	rd_kafka_op_t *rko;
448 	rd_kafka_q_t localq;
449         rd_kafka_q_t *fwdq;
450         int cnt = 0;
451         struct timespec timeout_tspec;
452 
453         rd_dassert(cb_type);
454 
455 	mtx_lock(&rkq->rkq_lock);
456 
457         rd_dassert(TAILQ_EMPTY(&rkq->rkq_q) || rkq->rkq_qlen > 0);
458         if ((fwdq = rd_kafka_q_fwd_get(rkq, 0))) {
459                 int ret;
460                 /* Since the q_pop may block we need to release the parent
461                  * queue's lock. */
462                 mtx_unlock(&rkq->rkq_lock);
463 		ret = rd_kafka_q_serve(fwdq, timeout_ms, max_cnt,
464                                        cb_type, callback, opaque);
465                 rd_kafka_q_destroy(fwdq);
466 		return ret;
467 	}
468 
469         rd_timeout_init_timespec(&timeout_tspec, timeout_ms);
470 
471         /* Wait for op */
472         while (!(rko = TAILQ_FIRST(&rkq->rkq_q)) &&
473                !rd_kafka_q_check_yield(rkq) &&
474                cnd_timedwait_abs(&rkq->rkq_cond, &rkq->rkq_lock,
475                                  &timeout_tspec) == thrd_success)
476                 ;
477 
478 	if (!rko) {
479 		mtx_unlock(&rkq->rkq_lock);
480 		return 0;
481 	}
482 
483 	/* Move the first `max_cnt` ops. */
484 	rd_kafka_q_init(&localq, rkq->rkq_rk);
485 	rd_kafka_q_move_cnt(&localq, rkq, max_cnt == 0 ? -1/*all*/ : max_cnt,
486 			    0/*no-locks*/);
487 
488         mtx_unlock(&rkq->rkq_lock);
489 
490         rd_kafka_yield_thread = 0;
491 
492 	/* Call callback for each op */
493         while ((rko = TAILQ_FIRST(&localq.rkq_q))) {
494                 rd_kafka_op_res_t res;
495 
496                 rd_kafka_q_deq0(&localq, rko);
497                 res = rd_kafka_op_handle(rk, &localq, rko, cb_type,
498                                          opaque, callback);
499                 /* op must have been handled */
500                 rd_kafka_assert(NULL, res != RD_KAFKA_OP_RES_PASS);
501                 cnt++;
502 
503                 if (unlikely(res == RD_KAFKA_OP_RES_YIELD ||
504                              rd_kafka_yield_thread)) {
505                         /* Callback called rd_kafka_yield(), we must
506                          * stop our callback dispatching and put the
507                          * ops in localq back on the original queue head. */
508                         if (!TAILQ_EMPTY(&localq.rkq_q))
509                                 rd_kafka_q_prepend(rkq, &localq);
510                         break;
511                 }
512 	}
513 
514 	rd_kafka_q_destroy_owner(&localq);
515 
516 	return cnt;
517 }
518 
519 
520 
521 
522 
523 /**
524  * Populate 'rkmessages' array with messages from 'rkq'.
525  * If 'auto_commit' is set, each message's offset will be committed
526  * to the offset store for that toppar.
527  *
528  * Returns the number of messages added.
529  */
530 
rd_kafka_q_serve_rkmessages(rd_kafka_q_t * rkq,int timeout_ms,rd_kafka_message_t ** rkmessages,size_t rkmessages_size)531 int rd_kafka_q_serve_rkmessages (rd_kafka_q_t *rkq, int timeout_ms,
532                                  rd_kafka_message_t **rkmessages,
533                                  size_t rkmessages_size) {
534 	unsigned int cnt = 0;
535         TAILQ_HEAD(, rd_kafka_op_s) tmpq = TAILQ_HEAD_INITIALIZER(tmpq);
536         rd_kafka_op_t *rko, *next;
537         rd_kafka_t *rk = rkq->rkq_rk;
538         rd_kafka_q_t *fwdq;
539         struct timespec timeout_tspec;
540 
541 	mtx_lock(&rkq->rkq_lock);
542         if ((fwdq = rd_kafka_q_fwd_get(rkq, 0))) {
543                 /* Since the q_pop may block we need to release the parent
544                  * queue's lock. */
545                 mtx_unlock(&rkq->rkq_lock);
546 		cnt = rd_kafka_q_serve_rkmessages(fwdq, timeout_ms,
547 						  rkmessages, rkmessages_size);
548                 rd_kafka_q_destroy(fwdq);
549 		return cnt;
550 	}
551         mtx_unlock(&rkq->rkq_lock);
552 
553         if (timeout_ms)
554                 rd_kafka_app_poll_blocking(rk);
555 
556         rd_timeout_init_timespec(&timeout_tspec, timeout_ms);
557 
558         rd_kafka_yield_thread = 0;
559 	while (cnt < rkmessages_size) {
560                 rd_kafka_op_res_t res;
561 
562                 mtx_lock(&rkq->rkq_lock);
563 
564                 while (!(rko = TAILQ_FIRST(&rkq->rkq_q)) &&
565                        !rd_kafka_q_check_yield(rkq) &&
566                        cnd_timedwait_abs(&rkq->rkq_cond, &rkq->rkq_lock,
567                                          &timeout_tspec) == thrd_success)
568                         ;
569 
570 		if (!rko) {
571                         mtx_unlock(&rkq->rkq_lock);
572 			break; /* Timed out */
573                 }
574 
575 		rd_kafka_q_deq0(rkq, rko);
576 
577                 mtx_unlock(&rkq->rkq_lock);
578 
579 		if (rd_kafka_op_version_outdated(rko, 0)) {
580                         /* Outdated op, put on discard queue */
581                         TAILQ_INSERT_TAIL(&tmpq, rko, rko_link);
582                         continue;
583                 }
584 
585                 /* Serve non-FETCH callbacks */
586                 res = rd_kafka_poll_cb(rk, rkq, rko,
587                                        RD_KAFKA_Q_CB_RETURN, NULL);
588                 if (res == RD_KAFKA_OP_RES_KEEP ||
589                     res == RD_KAFKA_OP_RES_HANDLED) {
590                         /* Callback served, rko is destroyed (if HANDLED). */
591                         continue;
592                 } else if (unlikely(res == RD_KAFKA_OP_RES_YIELD ||
593                                     rd_kafka_yield_thread)) {
594                         /* Yield. */
595                         break;
596                 }
597                 rd_dassert(res == RD_KAFKA_OP_RES_PASS);
598 
599 		/* Auto-store offset, if enabled. */
600 		if (!rko->rko_err && rko->rko_type == RD_KAFKA_OP_FETCH) {
601                         rd_kafka_op_offset_store(rk, rko);
602 
603                         /* If this is a control messages, don't return
604                          * message to application, only store the offset */
605                         if (unlikely(rd_kafka_op_is_ctrl_msg(rko)))
606                                 continue;
607                 }
608 
609 		/* Get rkmessage from rko and append to array. */
610 		rkmessages[cnt++] = rd_kafka_message_get(rko);
611 	}
612 
613         /* Discard non-desired and already handled ops */
614         next = TAILQ_FIRST(&tmpq);
615         while (next) {
616                 rko = next;
617                 next = TAILQ_NEXT(next, rko_link);
618                 rd_kafka_op_destroy(rko);
619         }
620 
621         rd_kafka_app_polled(rk);
622 
623 	return cnt;
624 }
625 
626 
627 
rd_kafka_queue_destroy(rd_kafka_queue_t * rkqu)628 void rd_kafka_queue_destroy (rd_kafka_queue_t *rkqu) {
629         if (rkqu->rkqu_is_owner)
630                 rd_kafka_q_destroy_owner(rkqu->rkqu_q);
631         else
632                 rd_kafka_q_destroy(rkqu->rkqu_q);
633         rd_free(rkqu);
634 }
635 
rd_kafka_queue_new0(rd_kafka_t * rk,rd_kafka_q_t * rkq)636 rd_kafka_queue_t *rd_kafka_queue_new0 (rd_kafka_t *rk, rd_kafka_q_t *rkq) {
637 	rd_kafka_queue_t *rkqu;
638 
639 	rkqu = rd_calloc(1, sizeof(*rkqu));
640 
641 	rkqu->rkqu_q = rkq;
642 	rd_kafka_q_keep(rkq);
643 
644         rkqu->rkqu_rk = rk;
645 
646 	return rkqu;
647 }
648 
649 
rd_kafka_queue_new(rd_kafka_t * rk)650 rd_kafka_queue_t *rd_kafka_queue_new (rd_kafka_t *rk) {
651 	rd_kafka_q_t *rkq;
652 	rd_kafka_queue_t *rkqu;
653 
654 	rkq = rd_kafka_q_new(rk);
655 	rkqu = rd_kafka_queue_new0(rk, rkq);
656 	rd_kafka_q_destroy(rkq); /* Loose refcount from q_new, one is held
657 				  * by queue_new0 */
658         rkqu->rkqu_is_owner = 1;
659 	return rkqu;
660 }
661 
662 
rd_kafka_queue_get_main(rd_kafka_t * rk)663 rd_kafka_queue_t *rd_kafka_queue_get_main (rd_kafka_t *rk) {
664 	return rd_kafka_queue_new0(rk, rk->rk_rep);
665 }
666 
667 
rd_kafka_queue_get_consumer(rd_kafka_t * rk)668 rd_kafka_queue_t *rd_kafka_queue_get_consumer (rd_kafka_t *rk) {
669 	if (!rk->rk_cgrp)
670 		return NULL;
671 	return rd_kafka_queue_new0(rk, rk->rk_cgrp->rkcg_q);
672 }
673 
rd_kafka_queue_get_partition(rd_kafka_t * rk,const char * topic,int32_t partition)674 rd_kafka_queue_t *rd_kafka_queue_get_partition (rd_kafka_t *rk,
675                                                 const char *topic,
676                                                 int32_t partition) {
677         shptr_rd_kafka_toppar_t *s_rktp;
678         rd_kafka_toppar_t *rktp;
679         rd_kafka_queue_t *result;
680 
681         if (rk->rk_type == RD_KAFKA_PRODUCER)
682                 return NULL;
683 
684         s_rktp = rd_kafka_toppar_get2(rk, topic,
685                                       partition,
686                                       0, /* no ua_on_miss */
687                                       1 /* create_on_miss */);
688 
689         if (!s_rktp)
690                 return NULL;
691 
692         rktp = rd_kafka_toppar_s2i(s_rktp);
693         result = rd_kafka_queue_new0(rk, rktp->rktp_fetchq);
694         rd_kafka_toppar_destroy(s_rktp);
695 
696         return result;
697 }
698 
rd_kafka_queue_get_background(rd_kafka_t * rk)699 rd_kafka_queue_t *rd_kafka_queue_get_background (rd_kafka_t *rk) {
700         if (rk->rk_background.q)
701                 return rd_kafka_queue_new0(rk, rk->rk_background.q);
702         else
703                 return NULL;
704 }
705 
706 
rd_kafka_set_log_queue(rd_kafka_t * rk,rd_kafka_queue_t * rkqu)707 rd_kafka_resp_err_t rd_kafka_set_log_queue (rd_kafka_t *rk,
708                                             rd_kafka_queue_t *rkqu) {
709         rd_kafka_q_t *rkq;
710         if (!rkqu)
711                 rkq = rk->rk_rep;
712         else
713                 rkq = rkqu->rkqu_q;
714         rd_kafka_q_fwd_set(rk->rk_logq, rkq);
715         return RD_KAFKA_RESP_ERR_NO_ERROR;
716 }
717 
rd_kafka_queue_forward(rd_kafka_queue_t * src,rd_kafka_queue_t * dst)718 void rd_kafka_queue_forward (rd_kafka_queue_t *src, rd_kafka_queue_t *dst) {
719         rd_kafka_q_fwd_set0(src->rkqu_q, dst ? dst->rkqu_q : NULL,
720                             1, /* do_lock */
721                             1 /* fwd_app */);
722 }
723 
724 
rd_kafka_queue_length(rd_kafka_queue_t * rkqu)725 size_t rd_kafka_queue_length (rd_kafka_queue_t *rkqu) {
726 	return (size_t)rd_kafka_q_len(rkqu->rkqu_q);
727 }
728 
729 /**
730  * @brief Enable or disable(fd==-1) fd-based wake-ups for queue
731  */
rd_kafka_q_io_event_enable(rd_kafka_q_t * rkq,rd_socket_t fd,const void * payload,size_t size)732 void rd_kafka_q_io_event_enable (rd_kafka_q_t *rkq, rd_socket_t fd,
733                                  const void *payload, size_t size) {
734         struct rd_kafka_q_io *qio = NULL;
735 
736         if (fd != -1) {
737                 qio = rd_malloc(sizeof(*qio) + size);
738                 qio->fd = fd;
739                 qio->size = size;
740                 qio->payload = (void *)(qio+1);
741                 qio->ts_rate = rkq->rkq_rk->rk_conf.buffering_max_us;
742                 qio->ts_last = 0;
743                 qio->event_cb = NULL;
744                 qio->event_cb_opaque = NULL;
745                 memcpy(qio->payload, payload, size);
746         }
747 
748         mtx_lock(&rkq->rkq_lock);
749         if (rkq->rkq_qio) {
750                 rd_free(rkq->rkq_qio);
751                 rkq->rkq_qio = NULL;
752         }
753 
754         if (fd != -1) {
755                 rkq->rkq_qio = qio;
756         }
757 
758         mtx_unlock(&rkq->rkq_lock);
759 
760 }
761 
rd_kafka_queue_io_event_enable(rd_kafka_queue_t * rkqu,int fd,const void * payload,size_t size)762 void rd_kafka_queue_io_event_enable (rd_kafka_queue_t *rkqu, int fd,
763                                      const void *payload, size_t size) {
764         rd_kafka_q_io_event_enable(rkqu->rkqu_q, fd, payload, size);
765 }
766 
767 
768 /**
769  * @brief Enable or disable(event_cb==NULL) callback-based wake-ups for queue
770  */
rd_kafka_q_cb_event_enable(rd_kafka_q_t * rkq,void (* event_cb)(rd_kafka_t * rk,void * opaque),void * opaque)771 void rd_kafka_q_cb_event_enable (rd_kafka_q_t *rkq,
772                                  void (*event_cb) (rd_kafka_t *rk,
773                                                    void *opaque),
774                                  void *opaque) {
775         struct rd_kafka_q_io *qio = NULL;
776 
777         if (event_cb) {
778                 qio = rd_malloc(sizeof(*qio));
779                 qio->fd = -1;
780                 qio->size = 0;
781                 qio->payload = NULL;
782                 qio->event_cb = event_cb;
783                 qio->event_cb_opaque = opaque;
784         }
785 
786         mtx_lock(&rkq->rkq_lock);
787         if (rkq->rkq_qio) {
788                 rd_free(rkq->rkq_qio);
789                 rkq->rkq_qio = NULL;
790         }
791 
792         if (event_cb) {
793                 rkq->rkq_qio = qio;
794         }
795 
796         mtx_unlock(&rkq->rkq_lock);
797 
798 }
799 
rd_kafka_queue_cb_event_enable(rd_kafka_queue_t * rkqu,void (* event_cb)(rd_kafka_t * rk,void * opaque),void * opaque)800 void rd_kafka_queue_cb_event_enable (rd_kafka_queue_t *rkqu,
801                                      void (*event_cb) (rd_kafka_t *rk,
802                                                        void *opaque),
803                                      void *opaque) {
804         rd_kafka_q_cb_event_enable (rkqu->rkqu_q, event_cb, opaque);
805 }
806 
807 
808 /**
809  * Helper: wait for single op on 'rkq', and return its error,
810  * or .._TIMED_OUT on timeout.
811  */
rd_kafka_q_wait_result(rd_kafka_q_t * rkq,int timeout_ms)812 rd_kafka_resp_err_t rd_kafka_q_wait_result (rd_kafka_q_t *rkq, int timeout_ms) {
813         rd_kafka_op_t *rko;
814         rd_kafka_resp_err_t err;
815 
816         rko = rd_kafka_q_pop(rkq, rd_timeout_us(timeout_ms), 0);
817         if (!rko)
818                 err = RD_KAFKA_RESP_ERR__TIMED_OUT;
819         else {
820                 err = rko->rko_err;
821                 rd_kafka_op_destroy(rko);
822         }
823 
824         return err;
825 }
826 
827 
828 /**
829  * Apply \p callback on each op in queue.
830  * If the callback wishes to remove the rko it must do so using
831  * using rd_kafka_op_deq0().
832  *
833  * @returns the sum of \p callback() return values.
834  * @remark rkq will be locked, callers should take care not to
835  *         interact with \p rkq through other means from the callback to avoid
836  *         deadlocks.
837  */
rd_kafka_q_apply(rd_kafka_q_t * rkq,int (* callback)(rd_kafka_q_t * rkq,rd_kafka_op_t * rko,void * opaque),void * opaque)838 int rd_kafka_q_apply (rd_kafka_q_t *rkq,
839                       int (*callback) (rd_kafka_q_t *rkq, rd_kafka_op_t *rko,
840                                        void *opaque),
841                       void *opaque) {
842 	rd_kafka_op_t *rko, *next;
843         rd_kafka_q_t *fwdq;
844         int cnt = 0;
845 
846         mtx_lock(&rkq->rkq_lock);
847         if ((fwdq = rd_kafka_q_fwd_get(rkq, 0))) {
848                 mtx_unlock(&rkq->rkq_lock);
849 		cnt = rd_kafka_q_apply(fwdq, callback, opaque);
850                 rd_kafka_q_destroy(fwdq);
851 		return cnt;
852 	}
853 
854 	next = TAILQ_FIRST(&rkq->rkq_q);
855 	while ((rko = next)) {
856 		next = TAILQ_NEXT(next, rko_link);
857                 cnt += callback(rkq, rko, opaque);
858 	}
859         mtx_unlock(&rkq->rkq_lock);
860 
861         return cnt;
862 }
863 
864 /**
865  * @brief Convert relative to absolute offsets and also purge any messages
866  *        that are older than \p min_offset.
867  * @remark Error ops with ERR__NOT_IMPLEMENTED will not be purged since
868  *         they are used to indicate unknnown compression codecs and compressed
869  *         messagesets may have a starting offset lower than what we requested.
870  * @remark \p rkq locking is not performed (caller's responsibility)
871  * @remark Must NOT be used on fwdq.
872  */
rd_kafka_q_fix_offsets(rd_kafka_q_t * rkq,int64_t min_offset,int64_t base_offset)873 void rd_kafka_q_fix_offsets (rd_kafka_q_t *rkq, int64_t min_offset,
874 			     int64_t base_offset) {
875 	rd_kafka_op_t *rko, *next;
876 	int     adj_len  = 0;
877 	int64_t adj_size = 0;
878 
879 	rd_kafka_assert(NULL, !rkq->rkq_fwdq);
880 
881 	next = TAILQ_FIRST(&rkq->rkq_q);
882 	while ((rko = next)) {
883 		next = TAILQ_NEXT(next, rko_link);
884 
885 		if (unlikely(rko->rko_type != RD_KAFKA_OP_FETCH))
886 			continue;
887 
888 		rko->rko_u.fetch.rkm.rkm_offset += base_offset;
889 
890 		if (rko->rko_u.fetch.rkm.rkm_offset < min_offset &&
891 		    rko->rko_err != RD_KAFKA_RESP_ERR__NOT_IMPLEMENTED) {
892 			adj_len++;
893 			adj_size += rko->rko_len;
894 			TAILQ_REMOVE(&rkq->rkq_q, rko, rko_link);
895 			rd_kafka_op_destroy(rko);
896 			continue;
897 		}
898 	}
899 
900 
901 	rkq->rkq_qlen  -= adj_len;
902 	rkq->rkq_qsize -= adj_size;
903 }
904 
905 
906 /**
907  * @brief Print information and contents of queue
908  */
rd_kafka_q_dump(FILE * fp,rd_kafka_q_t * rkq)909 void rd_kafka_q_dump (FILE *fp, rd_kafka_q_t *rkq) {
910         mtx_lock(&rkq->rkq_lock);
911         fprintf(fp, "Queue %p \"%s\" (refcnt %d, flags 0x%x, %d ops, "
912                 "%"PRId64" bytes)\n",
913                 rkq, rkq->rkq_name, rkq->rkq_refcnt, rkq->rkq_flags,
914                 rkq->rkq_qlen, rkq->rkq_qsize);
915 
916         if (rkq->rkq_qio)
917                 fprintf(fp, " QIO fd %d\n", (int)rkq->rkq_qio->fd);
918         if (rkq->rkq_serve)
919                 fprintf(fp, " Serve callback %p, opaque %p\n",
920                         rkq->rkq_serve, rkq->rkq_opaque);
921 
922         if (rkq->rkq_fwdq) {
923                 fprintf(fp, " Forwarded ->\n");
924                 rd_kafka_q_dump(fp, rkq->rkq_fwdq);
925         } else {
926                 rd_kafka_op_t *rko;
927 
928                 if (!TAILQ_EMPTY(&rkq->rkq_q))
929                         fprintf(fp, " Queued ops:\n");
930                 TAILQ_FOREACH(rko, &rkq->rkq_q, rko_link) {
931                         fprintf(fp, "  %p %s (v%"PRId32", flags 0x%x, "
932                                 "prio %d, len %"PRId32", source %s, "
933                                 "replyq %p)\n",
934                                 rko, rd_kafka_op2str(rko->rko_type),
935                                 rko->rko_version, rko->rko_flags,
936                                 rko->rko_prio, rko->rko_len,
937                                 #if ENABLE_DEVEL
938                                 rko->rko_source
939                                 #else
940                                 "-"
941                                 #endif
942                                 ,
943                                 rko->rko_replyq.q
944                                 );
945                 }
946         }
947 
948         mtx_unlock(&rkq->rkq_lock);
949 }
950 
951 
rd_kafka_enq_once_trigger_destroy(void * ptr)952 void rd_kafka_enq_once_trigger_destroy (void *ptr) {
953         rd_kafka_enq_once_t *eonce = ptr;
954 
955         rd_kafka_enq_once_trigger(eonce, RD_KAFKA_RESP_ERR__DESTROY, "destroy");
956 }
957