1 /*
2 * librdkafka - Apache Kafka C library
3 *
4 * Copyright (c) 2012,2013 Magnus Edenhill
5 * All rights reserved.
6 *
7 * Redistribution and use in source and binary forms, with or without
8 * modification, are permitted provided that the following conditions are met:
9 *
10 * 1. Redistributions of source code must retain the above copyright notice,
11 * this list of conditions and the following disclaimer.
12 * 2. Redistributions in binary form must reproduce the above copyright notice,
13 * this list of conditions and the following disclaimer in the documentation
14 * and/or other materials provided with the distribution.
15 * PRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
16 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
17 * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
18 * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
19 * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
20 * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
21 * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
22 * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
23 * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
24 * POSSIBILITY OF SUCH DAMAGE.
25 */
26
27 #ifndef _RDKAFKA_MSG_H_
28 #define _RDKAFKA_MSG_H_
29
30 #include "rdsysqueue.h"
31
32 #include "rdkafka_proto.h"
33 #include "rdkafka_header.h"
34
35
36 /**
37 * @brief Internal RD_KAFKA_MSG_F_.. flags
38 */
39 #define RD_KAFKA_MSG_F_RKT_RDLOCKED 0x100000 /* rkt is rdlock():ed */
40
41
42 /**
43 * @brief Message.MsgAttributes for MsgVersion v0..v1,
44 * also used for MessageSet.Attributes for MsgVersion v2.
45 */
46 #define RD_KAFKA_MSG_ATTR_GZIP (1 << 0)
47 #define RD_KAFKA_MSG_ATTR_SNAPPY (1 << 1)
48 #define RD_KAFKA_MSG_ATTR_LZ4 (3)
49 #define RD_KAFKA_MSG_ATTR_ZSTD (4)
50 #define RD_KAFKA_MSG_ATTR_COMPRESSION_MASK 0x7
51 #define RD_KAFKA_MSG_ATTR_CREATE_TIME (0 << 3)
52 #define RD_KAFKA_MSG_ATTR_LOG_APPEND_TIME (1 << 3)
53
54 /**
55 * @brief MessageSet.Attributes for MsgVersion v2
56 *
57 * Attributes:
58 * -------------------------------------------------------------------------------------------------
59 * | Unused (6-15) | Control (5) | Transactional (4) | Timestamp Type (3) | Compression Type (0-2) |
60 * -------------------------------------------------------------------------------------------------
61 */
62 /* Compression types same as MsgVersion 0 above */
63 /* Timestamp type same as MsgVersion 0 above */
64 #define RD_KAFKA_MSGSET_V2_ATTR_TRANSACTIONAL (1 << 4)
65 #define RD_KAFKA_MSGSET_V2_ATTR_CONTROL (1 << 5)
66
67
68 typedef struct rd_kafka_msg_s {
69 rd_kafka_message_t rkm_rkmessage; /* MUST be first field */
70 #define rkm_len rkm_rkmessage.len
71 #define rkm_payload rkm_rkmessage.payload
72 #define rkm_opaque rkm_rkmessage._private
73 #define rkm_partition rkm_rkmessage.partition
74 #define rkm_offset rkm_rkmessage.offset
75 #define rkm_key rkm_rkmessage.key
76 #define rkm_key_len rkm_rkmessage.key_len
77 #define rkm_err rkm_rkmessage.err
78
79 TAILQ_ENTRY(rd_kafka_msg_s) rkm_link;
80
81 int rkm_flags;
82 /* @remark These additional flags must not collide with
83 * the RD_KAFKA_MSG_F_* flags in rdkafka.h */
84 #define RD_KAFKA_MSG_F_FREE_RKM 0x10000 /* msg_t is allocated */
85 #define RD_KAFKA_MSG_F_ACCOUNT 0x20000 /* accounted for in curr_msgs */
86 #define RD_KAFKA_MSG_F_PRODUCER 0x40000 /* Producer message */
87 #define RD_KAFKA_MSG_F_CONTROL 0x80000 /* Control message */
88
89 rd_kafka_timestamp_type_t rkm_tstype; /* rkm_timestamp type */
90 int64_t rkm_timestamp; /* Message format V1.
91 * Meaning of timestamp depends on
92 * message Attribute LogAppendtime (broker)
93 * or CreateTime (producer).
94 * Unit is milliseconds since epoch (UTC).*/
95
96
97 rd_kafka_headers_t *rkm_headers; /**< Parsed headers list, if any. */
98
99 rd_kafka_msg_status_t rkm_status; /**< Persistence status. Updated in
100 * the ProduceResponse handler:
101 * this value is always up to date.
102 */
103 int32_t rkm_broker_id; /**< Broker message was produced to
104 * or fetched from. */
105
106 union {
107 struct {
108 rd_ts_t ts_timeout; /* Message timeout */
109 rd_ts_t ts_enq; /* Enqueue/Produce time */
110 rd_ts_t ts_backoff; /* Backoff next Produce until
111 * this time. */
112 uint64_t msgid; /**< Message sequencial id,
113 * used to maintain ordering.
114 * Starts at 1. */
115 uint64_t last_msgid; /**< On retry this is set
116 * on the first message
117 * in a batch to point
118 * out the last message
119 * of the batch so that
120 * the batch can be
121 * identically reconstructed.
122 */
123 int retries; /* Number of retries so far */
124 } producer;
125 #define rkm_ts_timeout rkm_u.producer.ts_timeout
126 #define rkm_ts_enq rkm_u.producer.ts_enq
127 #define rkm_msgid rkm_u.producer.msgid
128
129 struct {
130 rd_kafkap_bytes_t binhdrs; /**< Unparsed
131 * binary headers in
132 * protocol msg */
133 } consumer;
134 } rkm_u;
135 } rd_kafka_msg_t;
136
137 TAILQ_HEAD(rd_kafka_msg_head_s, rd_kafka_msg_s);
138
139
140 /** @returns the absolute time a message was enqueued (producer) */
141 #define rd_kafka_msg_enq_time(rkm) ((rkm)->rkm_ts_enq)
142
143 /**
144 * @returns the message's total maximum on-wire size.
145 * @remark Depending on message version (MagicByte) the actual size
146 * may be smaller.
147 */
148 static RD_INLINE RD_UNUSED
rd_kafka_msg_wire_size(const rd_kafka_msg_t * rkm,int MsgVersion)149 size_t rd_kafka_msg_wire_size (const rd_kafka_msg_t *rkm, int MsgVersion) {
150 static const size_t overheads[] = {
151 [0] = RD_KAFKAP_MESSAGE_V0_OVERHEAD,
152 [1] = RD_KAFKAP_MESSAGE_V1_OVERHEAD,
153 [2] = RD_KAFKAP_MESSAGE_V2_MAX_OVERHEAD
154 };
155 size_t size;
156 rd_dassert(MsgVersion >= 0 && MsgVersion <= 2);
157
158 size = overheads[MsgVersion] + rkm->rkm_len + rkm->rkm_key_len;
159 if (MsgVersion == 2 && rkm->rkm_headers)
160 size += rd_kafka_headers_serialized_size(rkm->rkm_headers);
161
162 return size;
163 }
164
165
166 /**
167 * @returns the maximum total on-wire message size regardless of MsgVersion.
168 *
169 * @remark This does not account for the ProduceRequest, et.al, just the
170 * per-message overhead.
171 */
172 static RD_INLINE RD_UNUSED
rd_kafka_msg_max_wire_size(size_t keylen,size_t valuelen,size_t hdrslen)173 size_t rd_kafka_msg_max_wire_size (size_t keylen, size_t valuelen,
174 size_t hdrslen) {
175 return RD_KAFKAP_MESSAGE_V2_MAX_OVERHEAD +
176 keylen + valuelen + hdrslen;
177 }
178
179 /**
180 * @returns the enveloping rd_kafka_msg_t pointer for a rd_kafka_msg_t
181 * wrapped rd_kafka_message_t.
182 */
183 static RD_INLINE RD_UNUSED
rd_kafka_message2msg(rd_kafka_message_t * rkmessage)184 rd_kafka_msg_t *rd_kafka_message2msg (rd_kafka_message_t *rkmessage) {
185 return (rd_kafka_msg_t *)rkmessage;
186 }
187
188
189
190
191
192 /**
193 * @brief Message queue with message and byte counters.
194 */
195 TAILQ_HEAD(rd_kafka_msgs_head_s, rd_kafka_msg_s);
196 typedef struct rd_kafka_msgq_s {
197 struct rd_kafka_msgs_head_s rkmq_msgs; /* TAILQ_HEAD */
198 int32_t rkmq_msg_cnt;
199 int64_t rkmq_msg_bytes;
200 } rd_kafka_msgq_t;
201
202 #define RD_KAFKA_MSGQ_INITIALIZER(rkmq) \
203 { .rkmq_msgs = TAILQ_HEAD_INITIALIZER((rkmq).rkmq_msgs) }
204
205 #define RD_KAFKA_MSGQ_FOREACH(elm,head) \
206 TAILQ_FOREACH(elm, &(head)->rkmq_msgs, rkm_link)
207
208 /* @brief Check if queue is empty. Proper locks must be held. */
209 #define RD_KAFKA_MSGQ_EMPTY(rkmq) TAILQ_EMPTY(&(rkmq)->rkmq_msgs)
210
211 /**
212 * Returns the number of messages in the specified queue.
213 */
214 static RD_INLINE RD_UNUSED
rd_kafka_msgq_len(const rd_kafka_msgq_t * rkmq)215 int rd_kafka_msgq_len (const rd_kafka_msgq_t *rkmq) {
216 return (int)rkmq->rkmq_msg_cnt;
217 }
218
219 /**
220 * Returns the total number of bytes in the specified queue.
221 */
222 static RD_INLINE RD_UNUSED
rd_kafka_msgq_size(const rd_kafka_msgq_t * rkmq)223 size_t rd_kafka_msgq_size (const rd_kafka_msgq_t *rkmq) {
224 return (size_t)rkmq->rkmq_msg_bytes;
225 }
226
227
228 void rd_kafka_msg_destroy (rd_kafka_t *rk, rd_kafka_msg_t *rkm);
229
230 int rd_kafka_msg_new (rd_kafka_topic_t *rkt, int32_t force_partition,
231 int msgflags,
232 char *payload, size_t len,
233 const void *keydata, size_t keylen,
234 void *msg_opaque);
235
rd_kafka_msgq_init(rd_kafka_msgq_t * rkmq)236 static RD_INLINE RD_UNUSED void rd_kafka_msgq_init (rd_kafka_msgq_t *rkmq) {
237 TAILQ_INIT(&rkmq->rkmq_msgs);
238 rkmq->rkmq_msg_cnt = 0;
239 rkmq->rkmq_msg_bytes = 0;
240 }
241
242 #if ENABLE_DEVEL
243 #define rd_kafka_msgq_verify_order(rktp,rkmq,exp_first_msgid,gapless) \
244 rd_kafka_msgq_verify_order0(__FUNCTION__, __LINE__, \
245 rktp, rkmq, exp_first_msgid, gapless)
246 #else
247 #define rd_kafka_msgq_verify_order(rktp,rkmq,exp_first_msgid,gapless) \
248 do { } while (0)
249 #endif
250
251 void rd_kafka_msgq_verify_order0 (const char *function, int line,
252 const struct rd_kafka_toppar_s *rktp,
253 const rd_kafka_msgq_t *rkmq,
254 uint64_t exp_first_msgid,
255 rd_bool_t gapless);
256
257
258 /**
259 * Concat all elements of 'src' onto tail of 'dst'.
260 * 'src' will be cleared.
261 * Proper locks for 'src' and 'dst' must be held.
262 */
rd_kafka_msgq_concat(rd_kafka_msgq_t * dst,rd_kafka_msgq_t * src)263 static RD_INLINE RD_UNUSED void rd_kafka_msgq_concat (rd_kafka_msgq_t *dst,
264 rd_kafka_msgq_t *src) {
265 TAILQ_CONCAT(&dst->rkmq_msgs, &src->rkmq_msgs, rkm_link);
266 dst->rkmq_msg_cnt += src->rkmq_msg_cnt;
267 dst->rkmq_msg_bytes += src->rkmq_msg_bytes;
268 rd_kafka_msgq_init(src);
269 rd_kafka_msgq_verify_order(NULL, dst, 0, rd_false);
270 }
271
272 /**
273 * Move queue 'src' to 'dst' (overwrites dst)
274 * Source will be cleared.
275 */
rd_kafka_msgq_move(rd_kafka_msgq_t * dst,rd_kafka_msgq_t * src)276 static RD_INLINE RD_UNUSED void rd_kafka_msgq_move (rd_kafka_msgq_t *dst,
277 rd_kafka_msgq_t *src) {
278 TAILQ_MOVE(&dst->rkmq_msgs, &src->rkmq_msgs, rkm_link);
279 dst->rkmq_msg_cnt = src->rkmq_msg_cnt;
280 dst->rkmq_msg_bytes = src->rkmq_msg_bytes;
281 rd_kafka_msgq_init(src);
282 rd_kafka_msgq_verify_order(NULL, dst, 0, rd_false);
283 }
284
285
286 /**
287 * @brief Prepend all elements of \ src onto head of \p dst.
288 * \p src will be cleared/re-initialized.
289 *
290 * @locks proper locks for \p src and \p dst MUST be held.
291 */
rd_kafka_msgq_prepend(rd_kafka_msgq_t * dst,rd_kafka_msgq_t * src)292 static RD_INLINE RD_UNUSED void rd_kafka_msgq_prepend (rd_kafka_msgq_t *dst,
293 rd_kafka_msgq_t *src) {
294 rd_kafka_msgq_concat(src, dst);
295 rd_kafka_msgq_move(dst, src);
296 rd_kafka_msgq_verify_order(NULL, dst, 0, rd_false);
297 }
298
299
300 /**
301 * rd_free all msgs in msgq and reinitialize the msgq.
302 */
rd_kafka_msgq_purge(rd_kafka_t * rk,rd_kafka_msgq_t * rkmq)303 static RD_INLINE RD_UNUSED void rd_kafka_msgq_purge (rd_kafka_t *rk,
304 rd_kafka_msgq_t *rkmq) {
305 rd_kafka_msg_t *rkm, *next;
306
307 next = TAILQ_FIRST(&rkmq->rkmq_msgs);
308 while (next) {
309 rkm = next;
310 next = TAILQ_NEXT(next, rkm_link);
311
312 rd_kafka_msg_destroy(rk, rkm);
313 }
314
315 rd_kafka_msgq_init(rkmq);
316 }
317
318
319 /**
320 * Remove message from message queue
321 */
322 static RD_INLINE RD_UNUSED
rd_kafka_msgq_deq(rd_kafka_msgq_t * rkmq,rd_kafka_msg_t * rkm,int do_count)323 rd_kafka_msg_t *rd_kafka_msgq_deq (rd_kafka_msgq_t *rkmq,
324 rd_kafka_msg_t *rkm,
325 int do_count) {
326 if (likely(do_count)) {
327 rd_kafka_assert(NULL, rkmq->rkmq_msg_cnt > 0);
328 rd_kafka_assert(NULL, rkmq->rkmq_msg_bytes >=
329 (int64_t)(rkm->rkm_len+rkm->rkm_key_len));
330 rkmq->rkmq_msg_cnt--;
331 rkmq->rkmq_msg_bytes -= rkm->rkm_len+rkm->rkm_key_len;
332 }
333
334 TAILQ_REMOVE(&rkmq->rkmq_msgs, rkm, rkm_link);
335
336 return rkm;
337 }
338
339 static RD_INLINE RD_UNUSED
rd_kafka_msgq_pop(rd_kafka_msgq_t * rkmq)340 rd_kafka_msg_t *rd_kafka_msgq_pop (rd_kafka_msgq_t *rkmq) {
341 rd_kafka_msg_t *rkm;
342
343 if (((rkm = TAILQ_FIRST(&rkmq->rkmq_msgs))))
344 rd_kafka_msgq_deq(rkmq, rkm, 1);
345
346 return rkm;
347 }
348
349
350 /**
351 * @returns the first message in the queue, or NULL if empty.
352 *
353 * @locks caller's responsibility
354 */
355 static RD_INLINE RD_UNUSED
rd_kafka_msgq_first(const rd_kafka_msgq_t * rkmq)356 rd_kafka_msg_t *rd_kafka_msgq_first (const rd_kafka_msgq_t *rkmq) {
357 return TAILQ_FIRST(&rkmq->rkmq_msgs);
358 }
359
360 /**
361 * @returns the last message in the queue, or NULL if empty.
362 *
363 * @locks caller's responsibility
364 */
365 static RD_INLINE RD_UNUSED
rd_kafka_msgq_last(const rd_kafka_msgq_t * rkmq)366 rd_kafka_msg_t *rd_kafka_msgq_last (const rd_kafka_msgq_t *rkmq) {
367 return TAILQ_LAST(&rkmq->rkmq_msgs, rd_kafka_msgs_head_s);
368 }
369
370
371 /**
372 * @returns the MsgId of the first message in the queue, or 0 if empty.
373 *
374 * @locks caller's responsibility
375 */
376 static RD_INLINE RD_UNUSED
rd_kafka_msgq_first_msgid(const rd_kafka_msgq_t * rkmq)377 uint64_t rd_kafka_msgq_first_msgid (const rd_kafka_msgq_t *rkmq) {
378 const rd_kafka_msg_t *rkm = TAILQ_FIRST(&rkmq->rkmq_msgs);
379 if (rkm)
380 return rkm->rkm_u.producer.msgid;
381 else
382 return 0;
383 }
384
385
386 /**
387 * @brief Message ordering comparator using the message id
388 * number to order messages in ascending order (FIFO).
389 */
390 static RD_INLINE
rd_kafka_msg_cmp_msgid(const void * _a,const void * _b)391 int rd_kafka_msg_cmp_msgid (const void *_a, const void *_b) {
392 const rd_kafka_msg_t *a = _a, *b = _b;
393
394 rd_dassert(a->rkm_u.producer.msgid);
395
396 return RD_CMP(a->rkm_u.producer.msgid, b->rkm_u.producer.msgid);
397 }
398
399 /**
400 * @brief Message ordering comparator using the message id
401 * number to order messages in descending order (LIFO).
402 */
403 static RD_INLINE
rd_kafka_msg_cmp_msgid_lifo(const void * _a,const void * _b)404 int rd_kafka_msg_cmp_msgid_lifo (const void *_a, const void *_b) {
405 const rd_kafka_msg_t *a = _a, *b = _b;
406
407 rd_dassert(a->rkm_u.producer.msgid);
408
409 return RD_CMP(b->rkm_u.producer.msgid, a->rkm_u.producer.msgid);
410 }
411
412
413 /**
414 * @brief Insert message at its sorted position using the msgid.
415 * @remark This is an O(n) operation.
416 * @warning The message must have a msgid set.
417 * @returns the message count of the queue after enqueuing the message.
418 */
419 int
420 rd_kafka_msgq_enq_sorted0 (rd_kafka_msgq_t *rkmq,
421 rd_kafka_msg_t *rkm,
422 int (*order_cmp) (const void *, const void *));
423
424 /**
425 * @brief Insert message at its sorted position using the msgid.
426 * @remark This is an O(n) operation.
427 * @warning The message must have a msgid set.
428 * @returns the message count of the queue after enqueuing the message.
429 */
430 int rd_kafka_msgq_enq_sorted (const rd_kafka_topic_t *rkt,
431 rd_kafka_msgq_t *rkmq,
432 rd_kafka_msg_t *rkm);
433
434 /**
435 * Insert message at head of message queue.
436 */
rd_kafka_msgq_insert(rd_kafka_msgq_t * rkmq,rd_kafka_msg_t * rkm)437 static RD_INLINE RD_UNUSED void rd_kafka_msgq_insert (rd_kafka_msgq_t *rkmq,
438 rd_kafka_msg_t *rkm) {
439 TAILQ_INSERT_HEAD(&rkmq->rkmq_msgs, rkm, rkm_link);
440 rkmq->rkmq_msg_cnt++;
441 rkmq->rkmq_msg_bytes += rkm->rkm_len+rkm->rkm_key_len;
442 }
443
444 /**
445 * Append message to tail of message queue.
446 */
rd_kafka_msgq_enq(rd_kafka_msgq_t * rkmq,rd_kafka_msg_t * rkm)447 static RD_INLINE RD_UNUSED int rd_kafka_msgq_enq (rd_kafka_msgq_t *rkmq,
448 rd_kafka_msg_t *rkm) {
449 TAILQ_INSERT_TAIL(&rkmq->rkmq_msgs, rkm, rkm_link);
450 rkmq->rkmq_msg_bytes += rkm->rkm_len+rkm->rkm_key_len;
451 return (int)++rkmq->rkmq_msg_cnt;
452 }
453
454
455 /**
456 * @returns true if the MsgId extents (first, last) in the two queues overlap.
457 */
458 static RD_INLINE RD_UNUSED rd_bool_t
rd_kafka_msgq_overlap(const rd_kafka_msgq_t * a,const rd_kafka_msgq_t * b)459 rd_kafka_msgq_overlap (const rd_kafka_msgq_t *a, const rd_kafka_msgq_t *b) {
460 const rd_kafka_msg_t *fa, *la, *fb, *lb;
461
462 if (RD_KAFKA_MSGQ_EMPTY(a) ||
463 RD_KAFKA_MSGQ_EMPTY(b))
464 return rd_false;
465
466 fa = rd_kafka_msgq_first(a);
467 fb = rd_kafka_msgq_first(b);
468 la = rd_kafka_msgq_last(a);
469 lb = rd_kafka_msgq_last(b);
470
471 return (rd_bool_t)
472 (fa->rkm_u.producer.msgid <= lb->rkm_u.producer.msgid &&
473 fb->rkm_u.producer.msgid <= la->rkm_u.producer.msgid);
474 }
475
476 /**
477 * Scans a message queue for timed out messages and removes them from
478 * 'rkmq' and adds them to 'timedout', returning the number of timed out
479 * messages.
480 * 'timedout' must be initialized.
481 */
482 int rd_kafka_msgq_age_scan (struct rd_kafka_toppar_s *rktp,
483 rd_kafka_msgq_t *rkmq,
484 rd_kafka_msgq_t *timedout,
485 rd_ts_t now,
486 rd_ts_t *abs_next_timeout);
487
488 void rd_kafka_msgq_split (rd_kafka_msgq_t *leftq, rd_kafka_msgq_t *rightq,
489 rd_kafka_msg_t *first_right,
490 int cnt, int64_t bytes);
491
492 rd_kafka_msg_t *rd_kafka_msgq_find_pos (const rd_kafka_msgq_t *rkmq,
493 const rd_kafka_msg_t *start_pos,
494 const rd_kafka_msg_t *rkm,
495 int (*cmp) (const void *,
496 const void *),
497 int *cntp, int64_t *bytesp);
498
499 void rd_kafka_msgq_set_metadata (rd_kafka_msgq_t *rkmq, int32_t broker_id,
500 int64_t base_offset, int64_t timestamp,
501 rd_kafka_msg_status_t status);
502
503 void rd_kafka_msgq_move_acked (rd_kafka_msgq_t *dest, rd_kafka_msgq_t *src,
504 uint64_t last_msgid,
505 rd_kafka_msg_status_t status);
506
507 int rd_kafka_msg_partitioner (rd_kafka_topic_t *rkt, rd_kafka_msg_t *rkm,
508 rd_dolock_t do_lock);
509
510
511 rd_kafka_message_t *rd_kafka_message_get (struct rd_kafka_op_s *rko);
512 rd_kafka_message_t *rd_kafka_message_get_from_rkm (struct rd_kafka_op_s *rko,
513 rd_kafka_msg_t *rkm);
514 rd_kafka_message_t *rd_kafka_message_new (void);
515
516
517 /**
518 * @returns a (possibly) wrapped Kafka protocol message sequence counter
519 * for the non-overflowing \p seq.
520 */
rd_kafka_seq_wrap(int64_t seq)521 static RD_INLINE RD_UNUSED int32_t rd_kafka_seq_wrap (int64_t seq) {
522 return (int32_t)(seq & (int64_t)INT32_MAX);
523 }
524
525 void rd_kafka_msgq_dump (FILE *fp, const char *what, rd_kafka_msgq_t *rkmq);
526
527 rd_kafka_msg_t *ut_rd_kafka_msg_new (size_t msgsize);
528 void ut_rd_kafka_msgq_purge (rd_kafka_msgq_t *rkmq);
529 int unittest_msg (void);
530
531 #endif /* _RDKAFKA_MSG_H_ */
532