/* * librdkafka - Apache Kafka C library * * Copyright (c) 2012,2013 Magnus Edenhill * All rights reserved. * * Redistribution and use in source and binary forms, with or without * modification, are permitted provided that the following conditions are met: * * 1. Redistributions of source code must retain the above copyright notice, * this list of conditions and the following disclaimer. * 2. Redistributions in binary form must reproduce the above copyright notice, * this list of conditions and the following disclaimer in the documentation * and/or other materials provided with the distribution. * PRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE * POSSIBILITY OF SUCH DAMAGE. */ #ifndef _RDKAFKA_MSG_H_ #define _RDKAFKA_MSG_H_ #include "rdsysqueue.h" #include "rdkafka_proto.h" #include "rdkafka_header.h" /** * @brief Internal RD_KAFKA_MSG_F_.. flags */ #define RD_KAFKA_MSG_F_RKT_RDLOCKED 0x100000 /* rkt is rdlock():ed */ /** * @brief Message.MsgAttributes for MsgVersion v0..v1, * also used for MessageSet.Attributes for MsgVersion v2. */ #define RD_KAFKA_MSG_ATTR_GZIP (1 << 0) #define RD_KAFKA_MSG_ATTR_SNAPPY (1 << 1) #define RD_KAFKA_MSG_ATTR_LZ4 (3) #define RD_KAFKA_MSG_ATTR_ZSTD (4) #define RD_KAFKA_MSG_ATTR_COMPRESSION_MASK 0x7 #define RD_KAFKA_MSG_ATTR_CREATE_TIME (0 << 3) #define RD_KAFKA_MSG_ATTR_LOG_APPEND_TIME (1 << 3) /** * @brief MessageSet.Attributes for MsgVersion v2 * * Attributes: * ------------------------------------------------------------------------------------------------- * | Unused (6-15) | Control (5) | Transactional (4) | Timestamp Type (3) | Compression Type (0-2) | * ------------------------------------------------------------------------------------------------- */ /* Compression types same as MsgVersion 0 above */ /* Timestamp type same as MsgVersion 0 above */ #define RD_KAFKA_MSGSET_V2_ATTR_TRANSACTIONAL (1 << 4) #define RD_KAFKA_MSGSET_V2_ATTR_CONTROL (1 << 5) typedef struct rd_kafka_msg_s { rd_kafka_message_t rkm_rkmessage; /* MUST be first field */ #define rkm_len rkm_rkmessage.len #define rkm_payload rkm_rkmessage.payload #define rkm_opaque rkm_rkmessage._private #define rkm_partition rkm_rkmessage.partition #define rkm_offset rkm_rkmessage.offset #define rkm_key rkm_rkmessage.key #define rkm_key_len rkm_rkmessage.key_len #define rkm_err rkm_rkmessage.err TAILQ_ENTRY(rd_kafka_msg_s) rkm_link; int rkm_flags; /* @remark These additional flags must not collide with * the RD_KAFKA_MSG_F_* flags in rdkafka.h */ #define RD_KAFKA_MSG_F_FREE_RKM 0x10000 /* msg_t is allocated */ #define RD_KAFKA_MSG_F_ACCOUNT 0x20000 /* accounted for in curr_msgs */ #define RD_KAFKA_MSG_F_PRODUCER 0x40000 /* Producer message */ #define RD_KAFKA_MSG_F_CONTROL 0x80000 /* Control message */ rd_kafka_timestamp_type_t rkm_tstype; /* rkm_timestamp type */ int64_t rkm_timestamp; /* Message format V1. * Meaning of timestamp depends on * message Attribute LogAppendtime (broker) * or CreateTime (producer). * Unit is milliseconds since epoch (UTC).*/ rd_kafka_headers_t *rkm_headers; /**< Parsed headers list, if any. */ rd_kafka_msg_status_t rkm_status; /**< Persistence status. Updated in * the ProduceResponse handler: * this value is always up to date. */ int32_t rkm_broker_id; /**< Broker message was produced to * or fetched from. */ union { struct { rd_ts_t ts_timeout; /* Message timeout */ rd_ts_t ts_enq; /* Enqueue/Produce time */ rd_ts_t ts_backoff; /* Backoff next Produce until * this time. */ uint64_t msgid; /**< Message sequencial id, * used to maintain ordering. * Starts at 1. */ uint64_t last_msgid; /**< On retry this is set * on the first message * in a batch to point * out the last message * of the batch so that * the batch can be * identically reconstructed. */ int retries; /* Number of retries so far */ } producer; #define rkm_ts_timeout rkm_u.producer.ts_timeout #define rkm_ts_enq rkm_u.producer.ts_enq #define rkm_msgid rkm_u.producer.msgid struct { rd_kafkap_bytes_t binhdrs; /**< Unparsed * binary headers in * protocol msg */ } consumer; } rkm_u; } rd_kafka_msg_t; TAILQ_HEAD(rd_kafka_msg_head_s, rd_kafka_msg_s); /** @returns the absolute time a message was enqueued (producer) */ #define rd_kafka_msg_enq_time(rkm) ((rkm)->rkm_ts_enq) /** * @returns the message's total maximum on-wire size. * @remark Depending on message version (MagicByte) the actual size * may be smaller. */ static RD_INLINE RD_UNUSED size_t rd_kafka_msg_wire_size (const rd_kafka_msg_t *rkm, int MsgVersion) { static const size_t overheads[] = { [0] = RD_KAFKAP_MESSAGE_V0_OVERHEAD, [1] = RD_KAFKAP_MESSAGE_V1_OVERHEAD, [2] = RD_KAFKAP_MESSAGE_V2_MAX_OVERHEAD }; size_t size; rd_dassert(MsgVersion >= 0 && MsgVersion <= 2); size = overheads[MsgVersion] + rkm->rkm_len + rkm->rkm_key_len; if (MsgVersion == 2 && rkm->rkm_headers) size += rd_kafka_headers_serialized_size(rkm->rkm_headers); return size; } /** * @returns the maximum total on-wire message size regardless of MsgVersion. * * @remark This does not account for the ProduceRequest, et.al, just the * per-message overhead. */ static RD_INLINE RD_UNUSED size_t rd_kafka_msg_max_wire_size (size_t keylen, size_t valuelen, size_t hdrslen) { return RD_KAFKAP_MESSAGE_V2_MAX_OVERHEAD + keylen + valuelen + hdrslen; } /** * @returns the enveloping rd_kafka_msg_t pointer for a rd_kafka_msg_t * wrapped rd_kafka_message_t. */ static RD_INLINE RD_UNUSED rd_kafka_msg_t *rd_kafka_message2msg (rd_kafka_message_t *rkmessage) { return (rd_kafka_msg_t *)rkmessage; } /** * @brief Message queue with message and byte counters. */ TAILQ_HEAD(rd_kafka_msgs_head_s, rd_kafka_msg_s); typedef struct rd_kafka_msgq_s { struct rd_kafka_msgs_head_s rkmq_msgs; /* TAILQ_HEAD */ int32_t rkmq_msg_cnt; int64_t rkmq_msg_bytes; } rd_kafka_msgq_t; #define RD_KAFKA_MSGQ_INITIALIZER(rkmq) \ { .rkmq_msgs = TAILQ_HEAD_INITIALIZER((rkmq).rkmq_msgs) } #define RD_KAFKA_MSGQ_FOREACH(elm,head) \ TAILQ_FOREACH(elm, &(head)->rkmq_msgs, rkm_link) /* @brief Check if queue is empty. Proper locks must be held. */ #define RD_KAFKA_MSGQ_EMPTY(rkmq) TAILQ_EMPTY(&(rkmq)->rkmq_msgs) /** * Returns the number of messages in the specified queue. */ static RD_INLINE RD_UNUSED int rd_kafka_msgq_len (const rd_kafka_msgq_t *rkmq) { return (int)rkmq->rkmq_msg_cnt; } /** * Returns the total number of bytes in the specified queue. */ static RD_INLINE RD_UNUSED size_t rd_kafka_msgq_size (const rd_kafka_msgq_t *rkmq) { return (size_t)rkmq->rkmq_msg_bytes; } void rd_kafka_msg_destroy (rd_kafka_t *rk, rd_kafka_msg_t *rkm); int rd_kafka_msg_new (rd_kafka_topic_t *rkt, int32_t force_partition, int msgflags, char *payload, size_t len, const void *keydata, size_t keylen, void *msg_opaque); static RD_INLINE RD_UNUSED void rd_kafka_msgq_init (rd_kafka_msgq_t *rkmq) { TAILQ_INIT(&rkmq->rkmq_msgs); rkmq->rkmq_msg_cnt = 0; rkmq->rkmq_msg_bytes = 0; } #if ENABLE_DEVEL #define rd_kafka_msgq_verify_order(rktp,rkmq,exp_first_msgid,gapless) \ rd_kafka_msgq_verify_order0(__FUNCTION__, __LINE__, \ rktp, rkmq, exp_first_msgid, gapless) #else #define rd_kafka_msgq_verify_order(rktp,rkmq,exp_first_msgid,gapless) \ do { } while (0) #endif void rd_kafka_msgq_verify_order0 (const char *function, int line, const struct rd_kafka_toppar_s *rktp, const rd_kafka_msgq_t *rkmq, uint64_t exp_first_msgid, rd_bool_t gapless); /** * Concat all elements of 'src' onto tail of 'dst'. * 'src' will be cleared. * Proper locks for 'src' and 'dst' must be held. */ static RD_INLINE RD_UNUSED void rd_kafka_msgq_concat (rd_kafka_msgq_t *dst, rd_kafka_msgq_t *src) { TAILQ_CONCAT(&dst->rkmq_msgs, &src->rkmq_msgs, rkm_link); dst->rkmq_msg_cnt += src->rkmq_msg_cnt; dst->rkmq_msg_bytes += src->rkmq_msg_bytes; rd_kafka_msgq_init(src); rd_kafka_msgq_verify_order(NULL, dst, 0, rd_false); } /** * Move queue 'src' to 'dst' (overwrites dst) * Source will be cleared. */ static RD_INLINE RD_UNUSED void rd_kafka_msgq_move (rd_kafka_msgq_t *dst, rd_kafka_msgq_t *src) { TAILQ_MOVE(&dst->rkmq_msgs, &src->rkmq_msgs, rkm_link); dst->rkmq_msg_cnt = src->rkmq_msg_cnt; dst->rkmq_msg_bytes = src->rkmq_msg_bytes; rd_kafka_msgq_init(src); rd_kafka_msgq_verify_order(NULL, dst, 0, rd_false); } /** * @brief Prepend all elements of \ src onto head of \p dst. * \p src will be cleared/re-initialized. * * @locks proper locks for \p src and \p dst MUST be held. */ static RD_INLINE RD_UNUSED void rd_kafka_msgq_prepend (rd_kafka_msgq_t *dst, rd_kafka_msgq_t *src) { rd_kafka_msgq_concat(src, dst); rd_kafka_msgq_move(dst, src); rd_kafka_msgq_verify_order(NULL, dst, 0, rd_false); } /** * rd_free all msgs in msgq and reinitialize the msgq. */ static RD_INLINE RD_UNUSED void rd_kafka_msgq_purge (rd_kafka_t *rk, rd_kafka_msgq_t *rkmq) { rd_kafka_msg_t *rkm, *next; next = TAILQ_FIRST(&rkmq->rkmq_msgs); while (next) { rkm = next; next = TAILQ_NEXT(next, rkm_link); rd_kafka_msg_destroy(rk, rkm); } rd_kafka_msgq_init(rkmq); } /** * Remove message from message queue */ static RD_INLINE RD_UNUSED rd_kafka_msg_t *rd_kafka_msgq_deq (rd_kafka_msgq_t *rkmq, rd_kafka_msg_t *rkm, int do_count) { if (likely(do_count)) { rd_kafka_assert(NULL, rkmq->rkmq_msg_cnt > 0); rd_kafka_assert(NULL, rkmq->rkmq_msg_bytes >= (int64_t)(rkm->rkm_len+rkm->rkm_key_len)); rkmq->rkmq_msg_cnt--; rkmq->rkmq_msg_bytes -= rkm->rkm_len+rkm->rkm_key_len; } TAILQ_REMOVE(&rkmq->rkmq_msgs, rkm, rkm_link); return rkm; } static RD_INLINE RD_UNUSED rd_kafka_msg_t *rd_kafka_msgq_pop (rd_kafka_msgq_t *rkmq) { rd_kafka_msg_t *rkm; if (((rkm = TAILQ_FIRST(&rkmq->rkmq_msgs)))) rd_kafka_msgq_deq(rkmq, rkm, 1); return rkm; } /** * @returns the first message in the queue, or NULL if empty. * * @locks caller's responsibility */ static RD_INLINE RD_UNUSED rd_kafka_msg_t *rd_kafka_msgq_first (const rd_kafka_msgq_t *rkmq) { return TAILQ_FIRST(&rkmq->rkmq_msgs); } /** * @returns the last message in the queue, or NULL if empty. * * @locks caller's responsibility */ static RD_INLINE RD_UNUSED rd_kafka_msg_t *rd_kafka_msgq_last (const rd_kafka_msgq_t *rkmq) { return TAILQ_LAST(&rkmq->rkmq_msgs, rd_kafka_msgs_head_s); } /** * @returns the MsgId of the first message in the queue, or 0 if empty. * * @locks caller's responsibility */ static RD_INLINE RD_UNUSED uint64_t rd_kafka_msgq_first_msgid (const rd_kafka_msgq_t *rkmq) { const rd_kafka_msg_t *rkm = TAILQ_FIRST(&rkmq->rkmq_msgs); if (rkm) return rkm->rkm_u.producer.msgid; else return 0; } /** * @brief Message ordering comparator using the message id * number to order messages in ascending order (FIFO). */ static RD_INLINE int rd_kafka_msg_cmp_msgid (const void *_a, const void *_b) { const rd_kafka_msg_t *a = _a, *b = _b; rd_dassert(a->rkm_u.producer.msgid); return RD_CMP(a->rkm_u.producer.msgid, b->rkm_u.producer.msgid); } /** * @brief Message ordering comparator using the message id * number to order messages in descending order (LIFO). */ static RD_INLINE int rd_kafka_msg_cmp_msgid_lifo (const void *_a, const void *_b) { const rd_kafka_msg_t *a = _a, *b = _b; rd_dassert(a->rkm_u.producer.msgid); return RD_CMP(b->rkm_u.producer.msgid, a->rkm_u.producer.msgid); } /** * @brief Insert message at its sorted position using the msgid. * @remark This is an O(n) operation. * @warning The message must have a msgid set. * @returns the message count of the queue after enqueuing the message. */ int rd_kafka_msgq_enq_sorted0 (rd_kafka_msgq_t *rkmq, rd_kafka_msg_t *rkm, int (*order_cmp) (const void *, const void *)); /** * @brief Insert message at its sorted position using the msgid. * @remark This is an O(n) operation. * @warning The message must have a msgid set. * @returns the message count of the queue after enqueuing the message. */ int rd_kafka_msgq_enq_sorted (const rd_kafka_topic_t *rkt, rd_kafka_msgq_t *rkmq, rd_kafka_msg_t *rkm); /** * Insert message at head of message queue. */ static RD_INLINE RD_UNUSED void rd_kafka_msgq_insert (rd_kafka_msgq_t *rkmq, rd_kafka_msg_t *rkm) { TAILQ_INSERT_HEAD(&rkmq->rkmq_msgs, rkm, rkm_link); rkmq->rkmq_msg_cnt++; rkmq->rkmq_msg_bytes += rkm->rkm_len+rkm->rkm_key_len; } /** * Append message to tail of message queue. */ static RD_INLINE RD_UNUSED int rd_kafka_msgq_enq (rd_kafka_msgq_t *rkmq, rd_kafka_msg_t *rkm) { TAILQ_INSERT_TAIL(&rkmq->rkmq_msgs, rkm, rkm_link); rkmq->rkmq_msg_bytes += rkm->rkm_len+rkm->rkm_key_len; return (int)++rkmq->rkmq_msg_cnt; } /** * @returns true if the MsgId extents (first, last) in the two queues overlap. */ static RD_INLINE RD_UNUSED rd_bool_t rd_kafka_msgq_overlap (const rd_kafka_msgq_t *a, const rd_kafka_msgq_t *b) { const rd_kafka_msg_t *fa, *la, *fb, *lb; if (RD_KAFKA_MSGQ_EMPTY(a) || RD_KAFKA_MSGQ_EMPTY(b)) return rd_false; fa = rd_kafka_msgq_first(a); fb = rd_kafka_msgq_first(b); la = rd_kafka_msgq_last(a); lb = rd_kafka_msgq_last(b); return (rd_bool_t) (fa->rkm_u.producer.msgid <= lb->rkm_u.producer.msgid && fb->rkm_u.producer.msgid <= la->rkm_u.producer.msgid); } /** * Scans a message queue for timed out messages and removes them from * 'rkmq' and adds them to 'timedout', returning the number of timed out * messages. * 'timedout' must be initialized. */ int rd_kafka_msgq_age_scan (struct rd_kafka_toppar_s *rktp, rd_kafka_msgq_t *rkmq, rd_kafka_msgq_t *timedout, rd_ts_t now, rd_ts_t *abs_next_timeout); void rd_kafka_msgq_split (rd_kafka_msgq_t *leftq, rd_kafka_msgq_t *rightq, rd_kafka_msg_t *first_right, int cnt, int64_t bytes); rd_kafka_msg_t *rd_kafka_msgq_find_pos (const rd_kafka_msgq_t *rkmq, const rd_kafka_msg_t *start_pos, const rd_kafka_msg_t *rkm, int (*cmp) (const void *, const void *), int *cntp, int64_t *bytesp); void rd_kafka_msgq_set_metadata (rd_kafka_msgq_t *rkmq, int32_t broker_id, int64_t base_offset, int64_t timestamp, rd_kafka_msg_status_t status); void rd_kafka_msgq_move_acked (rd_kafka_msgq_t *dest, rd_kafka_msgq_t *src, uint64_t last_msgid, rd_kafka_msg_status_t status); int rd_kafka_msg_partitioner (rd_kafka_topic_t *rkt, rd_kafka_msg_t *rkm, rd_dolock_t do_lock); rd_kafka_message_t *rd_kafka_message_get (struct rd_kafka_op_s *rko); rd_kafka_message_t *rd_kafka_message_get_from_rkm (struct rd_kafka_op_s *rko, rd_kafka_msg_t *rkm); rd_kafka_message_t *rd_kafka_message_new (void); /** * @returns a (possibly) wrapped Kafka protocol message sequence counter * for the non-overflowing \p seq. */ static RD_INLINE RD_UNUSED int32_t rd_kafka_seq_wrap (int64_t seq) { return (int32_t)(seq & (int64_t)INT32_MAX); } void rd_kafka_msgq_dump (FILE *fp, const char *what, rd_kafka_msgq_t *rkmq); rd_kafka_msg_t *ut_rd_kafka_msg_new (size_t msgsize); void ut_rd_kafka_msgq_purge (rd_kafka_msgq_t *rkmq); int unittest_msg (void); #endif /* _RDKAFKA_MSG_H_ */