1 /*
2  * librdkafka - Apache Kafka C library
3  *
4  * Copyright (c) 2012,2013 Magnus Edenhill
5  * All rights reserved.
6  *
7  * Redistribution and use in source and binary forms, with or without
8  * modification, are permitted provided that the following conditions are met:
9  *
10  * 1. Redistributions of source code must retain the above copyright notice,
11  *    this list of conditions and the following disclaimer.
12  * 2. Redistributions in binary form must reproduce the above copyright notice,
13  *    this list of conditions and the following disclaimer in the documentation
14  *    and/or other materials provided with the distribution.
15  * PRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
16  * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
17  * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
18  * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
19  * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
20  * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
21  * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
22  * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
23  * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
24  * POSSIBILITY OF SUCH DAMAGE.
25  */
26 
27 #ifndef _RDKAFKA_MSG_H_
28 #define _RDKAFKA_MSG_H_
29 
30 #include "rdsysqueue.h"
31 
32 #include "rdkafka_proto.h"
33 #include "rdkafka_header.h"
34 
35 
36 /**
37  * @brief Internal RD_KAFKA_MSG_F_.. flags
38  */
39 #define RD_KAFKA_MSG_F_RKT_RDLOCKED    0x100000 /* rkt is rdlock():ed */
40 
41 
42 /**
43  * @brief Message.MsgAttributes for MsgVersion v0..v1,
44  *        also used for MessageSet.Attributes for MsgVersion v2.
45  */
46 #define RD_KAFKA_MSG_ATTR_GZIP             (1 << 0)
47 #define RD_KAFKA_MSG_ATTR_SNAPPY           (1 << 1)
48 #define RD_KAFKA_MSG_ATTR_LZ4              (3)
49 #define RD_KAFKA_MSG_ATTR_ZSTD             (4)
50 #define RD_KAFKA_MSG_ATTR_COMPRESSION_MASK 0x7
51 #define RD_KAFKA_MSG_ATTR_CREATE_TIME      (0 << 3)
52 #define RD_KAFKA_MSG_ATTR_LOG_APPEND_TIME  (1 << 3)
53 
54 /**
55  * @brief MessageSet.Attributes for MsgVersion v2
56  *
57  * Attributes:
58  *  -------------------------------------------------------------------------------------------------
59  *  | Unused (6-15) | Control (5) | Transactional (4) | Timestamp Type (3) | Compression Type (0-2) |
60  *  -------------------------------------------------------------------------------------------------
61  */
62 /* Compression types same as MsgVersion 0 above */
63 /* Timestamp type same as MsgVersion 0 above */
64 #define RD_KAFKA_MSGSET_V2_ATTR_TRANSACTIONAL (1 << 4)
65 #define RD_KAFKA_MSGSET_V2_ATTR_CONTROL       (1 << 5)
66 
67 
68 typedef struct rd_kafka_msg_s {
69 	rd_kafka_message_t rkm_rkmessage;  /* MUST be first field */
70 #define rkm_len               rkm_rkmessage.len
71 #define rkm_payload           rkm_rkmessage.payload
72 #define rkm_opaque            rkm_rkmessage._private
73 #define rkm_partition         rkm_rkmessage.partition
74 #define rkm_offset            rkm_rkmessage.offset
75 #define rkm_key               rkm_rkmessage.key
76 #define rkm_key_len           rkm_rkmessage.key_len
77 #define rkm_err               rkm_rkmessage.err
78 
79 	TAILQ_ENTRY(rd_kafka_msg_s)  rkm_link;
80 
81 	int        rkm_flags;
82 	/* @remark These additional flags must not collide with
83 	 *         the RD_KAFKA_MSG_F_* flags in rdkafka.h */
84 #define RD_KAFKA_MSG_F_FREE_RKM     0x10000 /* msg_t is allocated */
85 #define RD_KAFKA_MSG_F_ACCOUNT      0x20000 /* accounted for in curr_msgs */
86 #define RD_KAFKA_MSG_F_PRODUCER     0x40000 /* Producer message */
87 #define RD_KAFKA_MSG_F_CONTROL      0x80000 /* Control message */
88 
89 	rd_kafka_timestamp_type_t rkm_tstype; /* rkm_timestamp type */
90 	int64_t    rkm_timestamp;  /* Message format V1.
91 				    * Meaning of timestamp depends on
92 				    * message Attribute LogAppendtime (broker)
93 				    * or CreateTime (producer).
94 				    * Unit is milliseconds since epoch (UTC).*/
95 
96 
97         rd_kafka_headers_t *rkm_headers; /**< Parsed headers list, if any. */
98 
99         rd_kafka_msg_status_t rkm_status; /**< Persistence status. Updated in
100                                            *   the ProduceResponse handler:
101                                            *   this value is always up to date.
102                                            */
103         int32_t rkm_broker_id;            /**< Broker message was produced to
104                                            *   or fetched from. */
105 
106         union {
107                 struct {
108                         rd_ts_t ts_timeout; /* Message timeout */
109                         rd_ts_t ts_enq;     /* Enqueue/Produce time */
110                         rd_ts_t ts_backoff; /* Backoff next Produce until
111                                              * this time. */
112                         uint64_t msgid;     /**< Message sequencial id,
113                                              *   used to maintain ordering.
114                                              *   Starts at 1. */
115                         uint64_t last_msgid; /**< On retry this is set
116                                               *   on the first message
117                                               *   in a batch to point
118                                               *   out the last message
119                                               *   of the batch so that
120                                               *   the batch can be
121                                               *   identically reconstructed.
122                                               */
123                         int     retries;    /* Number of retries so far */
124                 } producer;
125 #define rkm_ts_timeout rkm_u.producer.ts_timeout
126 #define rkm_ts_enq     rkm_u.producer.ts_enq
127 #define rkm_msgid      rkm_u.producer.msgid
128 
129                 struct {
130                         rd_kafkap_bytes_t binhdrs; /**< Unparsed
131                                                     *   binary headers in
132                                                     *   protocol msg */
133                 } consumer;
134         } rkm_u;
135 } rd_kafka_msg_t;
136 
137 TAILQ_HEAD(rd_kafka_msg_head_s, rd_kafka_msg_s);
138 
139 
140 /** @returns the absolute time a message was enqueued (producer) */
141 #define rd_kafka_msg_enq_time(rkm) ((rkm)->rkm_ts_enq)
142 
143 /**
144  * @returns the message's total maximum on-wire size.
145  * @remark Depending on message version (MagicByte) the actual size
146  *         may be smaller.
147  */
148 static RD_INLINE RD_UNUSED
rd_kafka_msg_wire_size(const rd_kafka_msg_t * rkm,int MsgVersion)149 size_t rd_kafka_msg_wire_size (const rd_kafka_msg_t *rkm, int MsgVersion) {
150         static const size_t overheads[] = {
151                 [0] = RD_KAFKAP_MESSAGE_V0_OVERHEAD,
152                 [1] = RD_KAFKAP_MESSAGE_V1_OVERHEAD,
153                 [2] = RD_KAFKAP_MESSAGE_V2_MAX_OVERHEAD
154         };
155         size_t size;
156         rd_dassert(MsgVersion >= 0 && MsgVersion <= 2);
157 
158         size = overheads[MsgVersion] + rkm->rkm_len + rkm->rkm_key_len;
159         if (MsgVersion == 2 && rkm->rkm_headers)
160                 size += rd_kafka_headers_serialized_size(rkm->rkm_headers);
161 
162         return size;
163 }
164 
165 
166 /**
167  * @returns the maximum total on-wire message size regardless of MsgVersion.
168  *
169  * @remark This does not account for the ProduceRequest, et.al, just the
170  *         per-message overhead.
171  */
172 static RD_INLINE RD_UNUSED
rd_kafka_msg_max_wire_size(size_t keylen,size_t valuelen,size_t hdrslen)173 size_t rd_kafka_msg_max_wire_size (size_t keylen, size_t valuelen,
174                                    size_t hdrslen) {
175         return RD_KAFKAP_MESSAGE_V2_MAX_OVERHEAD +
176                 keylen + valuelen + hdrslen;
177 }
178 
179 /**
180  * @returns the enveloping rd_kafka_msg_t pointer for a rd_kafka_msg_t
181  *          wrapped rd_kafka_message_t.
182  */
183 static RD_INLINE RD_UNUSED
rd_kafka_message2msg(rd_kafka_message_t * rkmessage)184 rd_kafka_msg_t *rd_kafka_message2msg (rd_kafka_message_t *rkmessage) {
185 	return (rd_kafka_msg_t *)rkmessage;
186 }
187 
188 
189 
190 
191 
192 /**
193  * @brief Message queue with message and byte counters.
194  */
195 TAILQ_HEAD(rd_kafka_msgs_head_s, rd_kafka_msg_s);
196 typedef struct rd_kafka_msgq_s {
197         struct rd_kafka_msgs_head_s rkmq_msgs;  /* TAILQ_HEAD */
198         int32_t rkmq_msg_cnt;
199         int64_t rkmq_msg_bytes;
200 } rd_kafka_msgq_t;
201 
202 #define RD_KAFKA_MSGQ_INITIALIZER(rkmq) \
203 	{ .rkmq_msgs = TAILQ_HEAD_INITIALIZER((rkmq).rkmq_msgs) }
204 
205 #define RD_KAFKA_MSGQ_FOREACH(elm,head) \
206 	TAILQ_FOREACH(elm, &(head)->rkmq_msgs, rkm_link)
207 
208 /* @brief Check if queue is empty. Proper locks must be held. */
209 #define RD_KAFKA_MSGQ_EMPTY(rkmq) TAILQ_EMPTY(&(rkmq)->rkmq_msgs)
210 
211 /**
212  * Returns the number of messages in the specified queue.
213  */
214 static RD_INLINE RD_UNUSED
rd_kafka_msgq_len(const rd_kafka_msgq_t * rkmq)215 int rd_kafka_msgq_len (const rd_kafka_msgq_t *rkmq) {
216         return (int)rkmq->rkmq_msg_cnt;
217 }
218 
219 /**
220  * Returns the total number of bytes in the specified queue.
221  */
222 static RD_INLINE RD_UNUSED
rd_kafka_msgq_size(const rd_kafka_msgq_t * rkmq)223 size_t rd_kafka_msgq_size (const rd_kafka_msgq_t *rkmq) {
224         return (size_t)rkmq->rkmq_msg_bytes;
225 }
226 
227 
228 void rd_kafka_msg_destroy (rd_kafka_t *rk, rd_kafka_msg_t *rkm);
229 
230 int rd_kafka_msg_new (rd_kafka_topic_t *rkt, int32_t force_partition,
231 		      int msgflags,
232 		      char *payload, size_t len,
233 		      const void *keydata, size_t keylen,
234 		      void *msg_opaque);
235 
rd_kafka_msgq_init(rd_kafka_msgq_t * rkmq)236 static RD_INLINE RD_UNUSED void rd_kafka_msgq_init (rd_kafka_msgq_t *rkmq) {
237         TAILQ_INIT(&rkmq->rkmq_msgs);
238         rkmq->rkmq_msg_cnt   = 0;
239         rkmq->rkmq_msg_bytes = 0;
240 }
241 
242 #if ENABLE_DEVEL
243 #define rd_kafka_msgq_verify_order(rktp,rkmq,exp_first_msgid,gapless) \
244         rd_kafka_msgq_verify_order0(__FUNCTION__, __LINE__, \
245                                     rktp, rkmq, exp_first_msgid, gapless)
246 #else
247 #define rd_kafka_msgq_verify_order(rktp,rkmq,exp_first_msgid,gapless) \
248         do { } while (0)
249 #endif
250 
251 void rd_kafka_msgq_verify_order0 (const char *function, int line,
252                                   const struct rd_kafka_toppar_s *rktp,
253                                   const rd_kafka_msgq_t *rkmq,
254                                   uint64_t exp_first_msgid,
255                                   rd_bool_t gapless);
256 
257 
258 /**
259  * Concat all elements of 'src' onto tail of 'dst'.
260  * 'src' will be cleared.
261  * Proper locks for 'src' and 'dst' must be held.
262  */
rd_kafka_msgq_concat(rd_kafka_msgq_t * dst,rd_kafka_msgq_t * src)263 static RD_INLINE RD_UNUSED void rd_kafka_msgq_concat (rd_kafka_msgq_t *dst,
264 						   rd_kafka_msgq_t *src) {
265 	TAILQ_CONCAT(&dst->rkmq_msgs, &src->rkmq_msgs, rkm_link);
266         dst->rkmq_msg_cnt   += src->rkmq_msg_cnt;
267         dst->rkmq_msg_bytes += src->rkmq_msg_bytes;
268 	rd_kafka_msgq_init(src);
269         rd_kafka_msgq_verify_order(NULL, dst, 0, rd_false);
270 }
271 
272 /**
273  * Move queue 'src' to 'dst' (overwrites dst)
274  * Source will be cleared.
275  */
rd_kafka_msgq_move(rd_kafka_msgq_t * dst,rd_kafka_msgq_t * src)276 static RD_INLINE RD_UNUSED void rd_kafka_msgq_move (rd_kafka_msgq_t *dst,
277 						 rd_kafka_msgq_t *src) {
278 	TAILQ_MOVE(&dst->rkmq_msgs, &src->rkmq_msgs, rkm_link);
279         dst->rkmq_msg_cnt   = src->rkmq_msg_cnt;
280         dst->rkmq_msg_bytes = src->rkmq_msg_bytes;
281 	rd_kafka_msgq_init(src);
282         rd_kafka_msgq_verify_order(NULL, dst, 0, rd_false);
283 }
284 
285 
286 /**
287  * @brief Prepend all elements of \ src onto head of \p dst.
288  *        \p src will be cleared/re-initialized.
289  *
290  * @locks proper locks for \p src and \p dst MUST be held.
291  */
rd_kafka_msgq_prepend(rd_kafka_msgq_t * dst,rd_kafka_msgq_t * src)292 static RD_INLINE RD_UNUSED void rd_kafka_msgq_prepend (rd_kafka_msgq_t *dst,
293                                                        rd_kafka_msgq_t *src) {
294         rd_kafka_msgq_concat(src, dst);
295         rd_kafka_msgq_move(dst, src);
296         rd_kafka_msgq_verify_order(NULL, dst, 0, rd_false);
297 }
298 
299 
300 /**
301  * rd_free all msgs in msgq and reinitialize the msgq.
302  */
rd_kafka_msgq_purge(rd_kafka_t * rk,rd_kafka_msgq_t * rkmq)303 static RD_INLINE RD_UNUSED void rd_kafka_msgq_purge (rd_kafka_t *rk,
304                                                     rd_kafka_msgq_t *rkmq) {
305 	rd_kafka_msg_t *rkm, *next;
306 
307 	next = TAILQ_FIRST(&rkmq->rkmq_msgs);
308 	while (next) {
309 		rkm = next;
310 		next = TAILQ_NEXT(next, rkm_link);
311 
312 		rd_kafka_msg_destroy(rk, rkm);
313 	}
314 
315 	rd_kafka_msgq_init(rkmq);
316 }
317 
318 
319 /**
320  * Remove message from message queue
321  */
322 static RD_INLINE RD_UNUSED
rd_kafka_msgq_deq(rd_kafka_msgq_t * rkmq,rd_kafka_msg_t * rkm,int do_count)323 rd_kafka_msg_t *rd_kafka_msgq_deq (rd_kafka_msgq_t *rkmq,
324 				   rd_kafka_msg_t *rkm,
325 				   int do_count) {
326 	if (likely(do_count)) {
327 		rd_kafka_assert(NULL, rkmq->rkmq_msg_cnt > 0);
328                 rd_kafka_assert(NULL, rkmq->rkmq_msg_bytes >=
329                                 (int64_t)(rkm->rkm_len+rkm->rkm_key_len));
330                 rkmq->rkmq_msg_cnt--;
331                 rkmq->rkmq_msg_bytes -= rkm->rkm_len+rkm->rkm_key_len;
332 	}
333 
334 	TAILQ_REMOVE(&rkmq->rkmq_msgs, rkm, rkm_link);
335 
336 	return rkm;
337 }
338 
339 static RD_INLINE RD_UNUSED
rd_kafka_msgq_pop(rd_kafka_msgq_t * rkmq)340 rd_kafka_msg_t *rd_kafka_msgq_pop (rd_kafka_msgq_t *rkmq) {
341 	rd_kafka_msg_t *rkm;
342 
343 	if (((rkm = TAILQ_FIRST(&rkmq->rkmq_msgs))))
344 		rd_kafka_msgq_deq(rkmq, rkm, 1);
345 
346 	return rkm;
347 }
348 
349 
350 /**
351  * @returns the first message in the queue, or NULL if empty.
352  *
353  * @locks caller's responsibility
354  */
355 static RD_INLINE RD_UNUSED
rd_kafka_msgq_first(const rd_kafka_msgq_t * rkmq)356 rd_kafka_msg_t *rd_kafka_msgq_first (const rd_kafka_msgq_t *rkmq) {
357         return TAILQ_FIRST(&rkmq->rkmq_msgs);
358 }
359 
360 /**
361  * @returns the last message in the queue, or NULL if empty.
362  *
363  * @locks caller's responsibility
364  */
365 static RD_INLINE RD_UNUSED
rd_kafka_msgq_last(const rd_kafka_msgq_t * rkmq)366 rd_kafka_msg_t *rd_kafka_msgq_last (const rd_kafka_msgq_t *rkmq) {
367         return TAILQ_LAST(&rkmq->rkmq_msgs, rd_kafka_msgs_head_s);
368 }
369 
370 
371 /**
372  * @returns the MsgId of the first message in the queue, or 0 if empty.
373  *
374  * @locks caller's responsibility
375  */
376 static RD_INLINE RD_UNUSED
rd_kafka_msgq_first_msgid(const rd_kafka_msgq_t * rkmq)377 uint64_t rd_kafka_msgq_first_msgid (const rd_kafka_msgq_t *rkmq) {
378         const rd_kafka_msg_t *rkm = TAILQ_FIRST(&rkmq->rkmq_msgs);
379         if (rkm)
380                 return rkm->rkm_u.producer.msgid;
381         else
382                 return 0;
383 }
384 
385 
386 /**
387  * @brief Message ordering comparator using the message id
388  *        number to order messages in ascending order (FIFO).
389  */
390 static RD_INLINE
rd_kafka_msg_cmp_msgid(const void * _a,const void * _b)391 int rd_kafka_msg_cmp_msgid (const void *_a, const void *_b) {
392         const rd_kafka_msg_t *a = _a, *b = _b;
393 
394         rd_dassert(a->rkm_u.producer.msgid);
395 
396         return RD_CMP(a->rkm_u.producer.msgid, b->rkm_u.producer.msgid);
397 }
398 
399 /**
400  * @brief Message ordering comparator using the message id
401  *        number to order messages in descending order (LIFO).
402  */
403 static RD_INLINE
rd_kafka_msg_cmp_msgid_lifo(const void * _a,const void * _b)404 int rd_kafka_msg_cmp_msgid_lifo (const void *_a, const void *_b) {
405         const rd_kafka_msg_t *a = _a, *b = _b;
406 
407         rd_dassert(a->rkm_u.producer.msgid);
408 
409         return RD_CMP(b->rkm_u.producer.msgid, a->rkm_u.producer.msgid);
410 }
411 
412 
413 /**
414  * @brief Insert message at its sorted position using the msgid.
415  * @remark This is an O(n) operation.
416  * @warning The message must have a msgid set.
417  * @returns the message count of the queue after enqueuing the message.
418  */
419 int
420 rd_kafka_msgq_enq_sorted0 (rd_kafka_msgq_t *rkmq,
421                            rd_kafka_msg_t *rkm,
422                            int (*order_cmp) (const void *, const void *));
423 
424 /**
425  * @brief Insert message at its sorted position using the msgid.
426  * @remark This is an O(n) operation.
427  * @warning The message must have a msgid set.
428  * @returns the message count of the queue after enqueuing the message.
429  */
430 int rd_kafka_msgq_enq_sorted (const rd_kafka_topic_t *rkt,
431                               rd_kafka_msgq_t *rkmq,
432                               rd_kafka_msg_t *rkm);
433 
434 /**
435  * Insert message at head of message queue.
436  */
rd_kafka_msgq_insert(rd_kafka_msgq_t * rkmq,rd_kafka_msg_t * rkm)437 static RD_INLINE RD_UNUSED void rd_kafka_msgq_insert (rd_kafka_msgq_t *rkmq,
438 						   rd_kafka_msg_t *rkm) {
439 	TAILQ_INSERT_HEAD(&rkmq->rkmq_msgs, rkm, rkm_link);
440         rkmq->rkmq_msg_cnt++;
441         rkmq->rkmq_msg_bytes += rkm->rkm_len+rkm->rkm_key_len;
442 }
443 
444 /**
445  * Append message to tail of message queue.
446  */
rd_kafka_msgq_enq(rd_kafka_msgq_t * rkmq,rd_kafka_msg_t * rkm)447 static RD_INLINE RD_UNUSED int rd_kafka_msgq_enq (rd_kafka_msgq_t *rkmq,
448                                                 rd_kafka_msg_t *rkm) {
449         TAILQ_INSERT_TAIL(&rkmq->rkmq_msgs, rkm, rkm_link);
450         rkmq->rkmq_msg_bytes += rkm->rkm_len+rkm->rkm_key_len;
451         return (int)++rkmq->rkmq_msg_cnt;
452 }
453 
454 
455 /**
456  * @returns true if the MsgId extents (first, last) in the two queues overlap.
457  */
458 static RD_INLINE RD_UNUSED rd_bool_t
rd_kafka_msgq_overlap(const rd_kafka_msgq_t * a,const rd_kafka_msgq_t * b)459 rd_kafka_msgq_overlap (const rd_kafka_msgq_t *a, const rd_kafka_msgq_t *b) {
460         const rd_kafka_msg_t *fa, *la, *fb, *lb;
461 
462         if (RD_KAFKA_MSGQ_EMPTY(a) ||
463             RD_KAFKA_MSGQ_EMPTY(b))
464                 return rd_false;
465 
466         fa = rd_kafka_msgq_first(a);
467         fb = rd_kafka_msgq_first(b);
468         la = rd_kafka_msgq_last(a);
469         lb = rd_kafka_msgq_last(b);
470 
471         return (rd_bool_t)
472                 (fa->rkm_u.producer.msgid <= lb->rkm_u.producer.msgid &&
473                  fb->rkm_u.producer.msgid <= la->rkm_u.producer.msgid);
474 }
475 
476 /**
477  * Scans a message queue for timed out messages and removes them from
478  * 'rkmq' and adds them to 'timedout', returning the number of timed out
479  * messages.
480  * 'timedout' must be initialized.
481  */
482 int rd_kafka_msgq_age_scan (struct rd_kafka_toppar_s *rktp,
483                             rd_kafka_msgq_t *rkmq,
484                             rd_kafka_msgq_t *timedout,
485                             rd_ts_t now,
486                             rd_ts_t *abs_next_timeout);
487 
488 void rd_kafka_msgq_split (rd_kafka_msgq_t *leftq, rd_kafka_msgq_t *rightq,
489                           rd_kafka_msg_t *first_right,
490                           int cnt, int64_t bytes);
491 
492 rd_kafka_msg_t *rd_kafka_msgq_find_pos (const rd_kafka_msgq_t *rkmq,
493                                         const rd_kafka_msg_t *start_pos,
494                                         const rd_kafka_msg_t *rkm,
495                                         int (*cmp) (const void *,
496                                                     const void *),
497                                         int *cntp, int64_t *bytesp);
498 
499 void rd_kafka_msgq_set_metadata (rd_kafka_msgq_t *rkmq, int32_t broker_id,
500                                  int64_t base_offset, int64_t timestamp,
501                                  rd_kafka_msg_status_t status);
502 
503 void rd_kafka_msgq_move_acked (rd_kafka_msgq_t *dest, rd_kafka_msgq_t *src,
504                                uint64_t last_msgid,
505                                rd_kafka_msg_status_t status);
506 
507 int rd_kafka_msg_partitioner (rd_kafka_topic_t *rkt, rd_kafka_msg_t *rkm,
508                               rd_dolock_t do_lock);
509 
510 
511 rd_kafka_message_t *rd_kafka_message_get (struct rd_kafka_op_s *rko);
512 rd_kafka_message_t *rd_kafka_message_get_from_rkm (struct rd_kafka_op_s *rko,
513                                                    rd_kafka_msg_t *rkm);
514 rd_kafka_message_t *rd_kafka_message_new (void);
515 
516 
517 /**
518  * @returns a (possibly) wrapped Kafka protocol message sequence counter
519  *          for the non-overflowing \p seq.
520  */
rd_kafka_seq_wrap(int64_t seq)521 static RD_INLINE RD_UNUSED int32_t rd_kafka_seq_wrap (int64_t seq) {
522         return (int32_t)(seq & (int64_t)INT32_MAX);
523 }
524 
525 void rd_kafka_msgq_dump (FILE *fp, const char *what, rd_kafka_msgq_t *rkmq);
526 
527 rd_kafka_msg_t *ut_rd_kafka_msg_new (size_t msgsize);
528 void ut_rd_kafka_msgq_purge (rd_kafka_msgq_t *rkmq);
529 int unittest_msg (void);
530 
531 #endif /* _RDKAFKA_MSG_H_ */
532