1 /*
2  * librdkafka - Apache Kafka C library
3  *
4  * Copyright (c) 2012,2013 Magnus Edenhill
5  * All rights reserved.
6  *
7  * Redistribution and use in source and binary forms, with or without
8  * modification, are permitted provided that the following conditions are met:
9  *
10  * 1. Redistributions of source code must retain the above copyright notice,
11  *    this list of conditions and the following disclaimer.
12  * 2. Redistributions in binary form must reproduce the above copyright notice,
13  *    this list of conditions and the following disclaimer in the documentation
14  *    and/or other materials provided with the distribution.
15  *
16  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
17  * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
18  * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
19  * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
20  * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
21  * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
22  * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
23  * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
24  * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
25  * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
26  * POSSIBILITY OF SUCH DAMAGE.
27  */
28 
29 #ifndef _RDKAFKA_PROTO_H_
30 #define _RDKAFKA_PROTO_H_
31 
32 
33 #include "rdendian.h"
34 #include "rdvarint.h"
35 
36 /* Protocol defines */
37 #include "rdkafka_protocol.h"
38 
39 
40 
41 /** Default generic retry count for failed requests.
42  *  This may be overriden for specific request types. */
43 #define RD_KAFKA_REQUEST_DEFAULT_RETRIES 2
44 
45 /** Max (practically infinite) retry count */
46 #define RD_KAFKA_REQUEST_MAX_RETRIES  INT_MAX
47 
48 /** Do not retry request */
49 #define RD_KAFKA_REQUEST_NO_RETRIES 0
50 
51 
52 /**
53  * Request types
54  */
55 struct rd_kafkap_reqhdr {
56         int32_t  Size;
57         int16_t  ApiKey;
58         int16_t  ApiVersion;
59         int32_t  CorrId;
60         /* ClientId follows */
61 };
62 
63 #define RD_KAFKAP_REQHDR_SIZE (4+2+2+4)
64 #define RD_KAFKAP_RESHDR_SIZE (4+4)
65 
66 /**
67  * Response header
68  */
69 struct rd_kafkap_reshdr {
70 	int32_t  Size;
71 	int32_t  CorrId;
72 };
73 
74 
75 /**
76  * Request type v1 (flexible version)
77  *
78  * i32            Size
79  * i16            ApiKey
80  * i16            ApiVersion
81  * i32            CorrId
82  * string         ClientId   (2-byte encoding, not compact string)
83  * uvarint        Tags
84  * <Request payload>
85  * uvarint        EndTags
86  *
87  * Any struct-type (non-primitive or array type) field in the request payload
88  * must also have a trailing tags list, this goes for structs in arrays as well.
89  */
90 
91 /**
92  * @brief Protocol request type (ApiKey) to name/string.
93  *
94  * Generate updates to this list with generate_proto.sh.
95  */
96 static RD_UNUSED
rd_kafka_ApiKey2str(int16_t ApiKey)97 const char *rd_kafka_ApiKey2str (int16_t ApiKey) {
98         static const char *names[] = {
99                 [RD_KAFKAP_Produce] = "Produce",
100                 [RD_KAFKAP_Fetch] = "Fetch",
101                 [RD_KAFKAP_ListOffsets] = "ListOffsets",
102                 [RD_KAFKAP_Metadata] = "Metadata",
103                 [RD_KAFKAP_LeaderAndIsr] = "LeaderAndIsr",
104                 [RD_KAFKAP_StopReplica] = "StopReplica",
105                 [RD_KAFKAP_UpdateMetadata] = "UpdateMetadata",
106                 [RD_KAFKAP_ControlledShutdown] = "ControlledShutdown",
107                 [RD_KAFKAP_OffsetCommit] = "OffsetCommit",
108                 [RD_KAFKAP_OffsetFetch] = "OffsetFetch",
109                 [RD_KAFKAP_FindCoordinator] = "FindCoordinator",
110                 [RD_KAFKAP_JoinGroup] = "JoinGroup",
111                 [RD_KAFKAP_Heartbeat] = "Heartbeat",
112                 [RD_KAFKAP_LeaveGroup] = "LeaveGroup",
113                 [RD_KAFKAP_SyncGroup] = "SyncGroup",
114                 [RD_KAFKAP_DescribeGroups] = "DescribeGroups",
115                 [RD_KAFKAP_ListGroups] = "ListGroups",
116                 [RD_KAFKAP_SaslHandshake] = "SaslHandshake",
117                 [RD_KAFKAP_ApiVersion] = "ApiVersion",
118                 [RD_KAFKAP_CreateTopics] = "CreateTopics",
119                 [RD_KAFKAP_DeleteTopics] = "DeleteTopics",
120                 [RD_KAFKAP_DeleteRecords] = "DeleteRecords",
121                 [RD_KAFKAP_InitProducerId] = "InitProducerId",
122                 [RD_KAFKAP_OffsetForLeaderEpoch] = "OffsetForLeaderEpoch",
123                 [RD_KAFKAP_AddPartitionsToTxn] = "AddPartitionsToTxn",
124                 [RD_KAFKAP_AddOffsetsToTxn] = "AddOffsetsToTxn",
125                 [RD_KAFKAP_EndTxn] = "EndTxn",
126                 [RD_KAFKAP_WriteTxnMarkers] = "WriteTxnMarkers",
127                 [RD_KAFKAP_TxnOffsetCommit] = "TxnOffsetCommit",
128                 [RD_KAFKAP_DescribeAcls] = "DescribeAcls",
129                 [RD_KAFKAP_CreateAcls] = "CreateAcls",
130                 [RD_KAFKAP_DeleteAcls] = "DeleteAcls",
131                 [RD_KAFKAP_DescribeConfigs] = "DescribeConfigs",
132                 [RD_KAFKAP_AlterConfigs] = "AlterConfigs",
133                 [RD_KAFKAP_AlterReplicaLogDirs] = "AlterReplicaLogDirs",
134                 [RD_KAFKAP_DescribeLogDirs] = "DescribeLogDirs",
135                 [RD_KAFKAP_SaslAuthenticate] = "SaslAuthenticate",
136                 [RD_KAFKAP_CreatePartitions] = "CreatePartitions",
137                 [RD_KAFKAP_CreateDelegationToken] = "CreateDelegationToken",
138                 [RD_KAFKAP_RenewDelegationToken] = "RenewDelegationToken",
139                 [RD_KAFKAP_ExpireDelegationToken] = "ExpireDelegationToken",
140                 [RD_KAFKAP_DescribeDelegationToken] = "DescribeDelegationToken",
141                 [RD_KAFKAP_DeleteGroups] = "DeleteGroups",
142                 [RD_KAFKAP_ElectLeaders] = "ElectLeadersRequest",
143                 [RD_KAFKAP_IncrementalAlterConfigs] =
144                 "IncrementalAlterConfigsRequest",
145                 [RD_KAFKAP_AlterPartitionReassignments] =
146                 "AlterPartitionReassignmentsRequest",
147                 [RD_KAFKAP_ListPartitionReassignments] =
148                 "ListPartitionReassignmentsRequest",
149                 [RD_KAFKAP_OffsetDelete] = "OffsetDeleteRequest",
150                 [RD_KAFKAP_DescribeClientQuotas] =
151                 "DescribeClientQuotasRequest",
152                 [RD_KAFKAP_AlterClientQuotas] =
153                 "AlterClientQuotasRequest",
154                 [RD_KAFKAP_DescribeUserScramCredentials] =
155                 "DescribeUserScramCredentialsRequest",
156                 [RD_KAFKAP_AlterUserScramCredentials] =
157                 "AlterUserScramCredentialsRequest",
158                 [RD_KAFKAP_Vote] = "VoteRequest",
159                 [RD_KAFKAP_BeginQuorumEpoch] = "BeginQuorumEpochRequest",
160                 [RD_KAFKAP_EndQuorumEpoch] = "EndQuorumEpochRequest",
161                 [RD_KAFKAP_DescribeQuorum] = "DescribeQuorumRequest",
162                 [RD_KAFKAP_AlterIsr] = "AlterIsrRequest",
163                 [RD_KAFKAP_UpdateFeatures] = "UpdateFeaturesRequest",
164                 [RD_KAFKAP_Envelope] = "EnvelopeRequest",
165         };
166         static RD_TLS char ret[64];
167 
168         if (ApiKey < 0 || ApiKey >= (int)RD_ARRAYSIZE(names) ||
169             !names[ApiKey]) {
170                 rd_snprintf(ret, sizeof(ret), "Unknown-%hd?", ApiKey);
171                 return ret;
172         }
173 
174         return names[ApiKey];
175 }
176 
177 
178 
179 
180 
181 
182 
183 
184 /**
185  * @brief ApiKey version support tuple.
186  */
187 struct rd_kafka_ApiVersion {
188 	int16_t ApiKey;
189 	int16_t MinVer;
190 	int16_t MaxVer;
191 };
192 
193 /**
194  * @brief ApiVersion.ApiKey comparator.
195  */
196 static RD_UNUSED
rd_kafka_ApiVersion_key_cmp(const void * _a,const void * _b)197 int rd_kafka_ApiVersion_key_cmp (const void *_a, const void *_b) {
198         const struct rd_kafka_ApiVersion *a =
199                 (const struct rd_kafka_ApiVersion *)_a;
200         const struct rd_kafka_ApiVersion *b =
201                 (const struct rd_kafka_ApiVersion *)_b;
202         return RD_CMP(a->ApiKey, b->ApiKey);
203 }
204 
205 
206 
207 typedef enum {
208         RD_KAFKA_READ_UNCOMMITTED = 0,
209         RD_KAFKA_READ_COMMITTED = 1
210 } rd_kafka_isolation_level_t;
211 
212 
213 
214 #define RD_KAFKA_CTRL_MSG_ABORT 0
215 #define RD_KAFKA_CTRL_MSG_COMMIT 1
216 
217 
218 /**
219  * @enum Coordinator type, used with FindCoordinatorRequest
220  */
221 typedef enum rd_kafka_coordtype_t {
222         RD_KAFKA_COORD_GROUP = 0,
223         RD_KAFKA_COORD_TXN = 1
224 } rd_kafka_coordtype_t;
225 
226 
227 /**
228  *
229  * Kafka protocol string representation prefixed with a convenience header
230  *
231  * Serialized format:
232  *  { uint16, data.. }
233  *
234  */
235 typedef struct rd_kafkap_str_s {
236 	/* convenience header (aligned access, host endian) */
237 	int         len; /* Kafka string length (-1=NULL, 0=empty, >0=string) */
238 	const char *str; /* points into data[] or other memory,
239 			  * not NULL-terminated */
240 } rd_kafkap_str_t;
241 
242 
243 #define RD_KAFKAP_STR_LEN_NULL -1
244 #define RD_KAFKAP_STR_IS_NULL(kstr) ((kstr)->len == RD_KAFKAP_STR_LEN_NULL)
245 
246 /* Returns the length of the string of a kafka protocol string representation */
247 #define RD_KAFKAP_STR_LEN0(len) ((len) == RD_KAFKAP_STR_LEN_NULL ? 0 : (len))
248 #define RD_KAFKAP_STR_LEN(kstr) RD_KAFKAP_STR_LEN0((kstr)->len)
249 
250 /* Returns the actual size of a kafka protocol string representation. */
251 #define RD_KAFKAP_STR_SIZE0(len) (2 + RD_KAFKAP_STR_LEN0(len))
252 #define RD_KAFKAP_STR_SIZE(kstr) RD_KAFKAP_STR_SIZE0((kstr)->len)
253 
254 
255 /** @returns true if kstr is pre-serialized through .._new() */
256 #define RD_KAFKAP_STR_IS_SERIALIZED(kstr)                               \
257         (((const char *)((kstr)+1))+2 == (const char *)((kstr)->str))
258 
259 /* Serialized Kafka string: only works for _new() kstrs.
260  * Check with RD_KAFKAP_STR_IS_SERIALIZED */
261 #define RD_KAFKAP_STR_SER(kstr)  ((kstr)+1)
262 
263 /* Macro suitable for "%.*s" printing. */
264 #define RD_KAFKAP_STR_PR(kstr)						\
265 	(int)((kstr)->len == RD_KAFKAP_STR_LEN_NULL ? 0 : (kstr)->len), \
266 		(kstr)->str
267 
268 /* strndupa() a Kafka string */
269 #define RD_KAFKAP_STR_DUPA(destptr,kstr) \
270 	rd_strndupa((destptr), (kstr)->str, RD_KAFKAP_STR_LEN(kstr))
271 
272 /* strndup() a Kafka string */
273 #define RD_KAFKAP_STR_DUP(kstr) rd_strndup((kstr)->str, RD_KAFKAP_STR_LEN(kstr))
274 
275 #define RD_KAFKAP_STR_INITIALIZER { .len = RD_KAFKAP_STR_LEN_NULL, .str = NULL }
276 
277 /**
278  * Frees a Kafka string previously allocated with `rd_kafkap_str_new()`
279  */
rd_kafkap_str_destroy(rd_kafkap_str_t * kstr)280 static RD_UNUSED void rd_kafkap_str_destroy (rd_kafkap_str_t *kstr) {
281 	rd_free(kstr);
282 }
283 
284 
285 
286 /**
287  * Allocate a new Kafka string and make a copy of 'str'.
288  * If 'len' is -1 the length will be calculated.
289  * Supports Kafka NULL strings.
290  * Nul-terminates the string, but the trailing \0 is not part of
291  * the serialized string.
292  */
293 static RD_INLINE RD_UNUSED
rd_kafkap_str_new(const char * str,int len)294 rd_kafkap_str_t *rd_kafkap_str_new (const char *str, int len) {
295 	rd_kafkap_str_t *kstr;
296 	int16_t klen;
297 
298 	if (!str)
299 		len = RD_KAFKAP_STR_LEN_NULL;
300 	else if (len == -1)
301 		len = (int)strlen(str);
302 
303 	kstr = (rd_kafkap_str_t *)rd_malloc(sizeof(*kstr) + 2 +
304 			(len == RD_KAFKAP_STR_LEN_NULL ? 0 : len + 1));
305 	kstr->len = len;
306 
307 	/* Serialised format: 16-bit string length */
308 	klen = htobe16(len);
309 	memcpy(kstr+1, &klen, 2);
310 
311 	/* Pre-Serialised format: non null-terminated string */
312 	if (len == RD_KAFKAP_STR_LEN_NULL)
313 		kstr->str = NULL;
314 	else {
315 		kstr->str = ((const char *)(kstr+1))+2;
316 		memcpy((void *)kstr->str, str, len);
317 		((char *)kstr->str)[len] = '\0';
318 	}
319 
320 	return kstr;
321 }
322 
323 
324 /**
325  * Makes a copy of `src`. The copy will be fully allocated and should
326  * be freed with rd_kafka_pstr_destroy()
327  */
328 static RD_INLINE RD_UNUSED
rd_kafkap_str_copy(const rd_kafkap_str_t * src)329 rd_kafkap_str_t *rd_kafkap_str_copy (const rd_kafkap_str_t *src) {
330         return rd_kafkap_str_new(src->str, src->len);
331 }
332 
rd_kafkap_str_cmp(const rd_kafkap_str_t * a,const rd_kafkap_str_t * b)333 static RD_INLINE RD_UNUSED int rd_kafkap_str_cmp (const rd_kafkap_str_t *a,
334 						 const rd_kafkap_str_t *b) {
335 	int minlen = RD_MIN(a->len, b->len);
336 	int r = memcmp(a->str, b->str, minlen);
337 	if (r)
338 		return r;
339 	else
340                 return RD_CMP(a->len, b->len);
341 }
342 
rd_kafkap_str_cmp_str(const rd_kafkap_str_t * a,const char * str)343 static RD_INLINE RD_UNUSED int rd_kafkap_str_cmp_str (const rd_kafkap_str_t *a,
344 						     const char *str) {
345 	int len = (int)strlen(str);
346 	int minlen = RD_MIN(a->len, len);
347 	int r = memcmp(a->str, str, minlen);
348 	if (r)
349 		return r;
350 	else
351                 return RD_CMP(a->len, len);
352 }
353 
rd_kafkap_str_cmp_str2(const char * str,const rd_kafkap_str_t * b)354 static RD_INLINE RD_UNUSED int rd_kafkap_str_cmp_str2 (const char *str,
355 						      const rd_kafkap_str_t *b){
356 	int len = (int)strlen(str);
357 	int minlen = RD_MIN(b->len, len);
358 	int r = memcmp(str, b->str, minlen);
359 	if (r)
360 		return r;
361 	else
362                 return RD_CMP(len, b->len);
363 }
364 
365 
366 
367 /**
368  *
369  * Kafka protocol bytes array representation prefixed with a convenience header
370  *
371  * Serialized format:
372  *  { uint32, data.. }
373  *
374  */
375 typedef struct rd_kafkap_bytes_s {
376 	/* convenience header (aligned access, host endian) */
377 	int32_t     len;   /* Kafka bytes length (-1=NULL, 0=empty, >0=data) */
378 	const void *data;  /* points just past the struct, or other memory,
379 			    * not NULL-terminated */
380 	const char _data[1]; /* Bytes following struct when new()ed */
381 } rd_kafkap_bytes_t;
382 
383 
384 #define RD_KAFKAP_BYTES_LEN_NULL -1
385 #define RD_KAFKAP_BYTES_IS_NULL(kbytes) \
386 	((kbytes)->len == RD_KAFKAP_BYTES_LEN_NULL)
387 
388 /* Returns the length of the bytes of a kafka protocol bytes representation */
389 #define RD_KAFKAP_BYTES_LEN0(len) ((len) == RD_KAFKAP_BYTES_LEN_NULL ? 0:(len))
390 #define RD_KAFKAP_BYTES_LEN(kbytes) RD_KAFKAP_BYTES_LEN0((kbytes)->len)
391 
392 /* Returns the actual size of a kafka protocol bytes representation. */
393 #define RD_KAFKAP_BYTES_SIZE0(len) (4 + RD_KAFKAP_BYTES_LEN0(len))
394 #define RD_KAFKAP_BYTES_SIZE(kbytes) RD_KAFKAP_BYTES_SIZE0((kbytes)->len)
395 
396 /** @returns true if kbyes is pre-serialized through .._new() */
397 #define RD_KAFKAP_BYTES_IS_SERIALIZED(kstr)                             \
398         (((const char *)((kbytes)+1))+2 == (const char *)((kbytes)->data))
399 
400 /* Serialized Kafka bytes: only works for _new() kbytes */
401 #define RD_KAFKAP_BYTES_SER(kbytes)  ((kbytes)+1)
402 
403 
404 /**
405  * Frees a Kafka bytes previously allocated with `rd_kafkap_bytes_new()`
406  */
rd_kafkap_bytes_destroy(rd_kafkap_bytes_t * kbytes)407 static RD_UNUSED void rd_kafkap_bytes_destroy (rd_kafkap_bytes_t *kbytes) {
408 	rd_free(kbytes);
409 }
410 
411 
412 /**
413  * @brief Allocate a new Kafka bytes and make a copy of 'bytes'.
414  * If \p len > 0 but \p bytes is NULL no copying is performed by
415  * the bytes structure will be allocated to fit \p size bytes.
416  *
417  * Supports:
418  *  - Kafka NULL bytes (bytes==NULL,len==0),
419  *  - Empty bytes (bytes!=NULL,len==0)
420  *  - Copy data (bytes!=NULL,len>0)
421  *  - No-copy, just alloc (bytes==NULL,len>0)
422  */
423 static RD_INLINE RD_UNUSED
rd_kafkap_bytes_new(const char * bytes,int32_t len)424 rd_kafkap_bytes_t *rd_kafkap_bytes_new (const char *bytes, int32_t len) {
425 	rd_kafkap_bytes_t *kbytes;
426 	int32_t klen;
427 
428 	if (!bytes && !len)
429 		len = RD_KAFKAP_BYTES_LEN_NULL;
430 
431 	kbytes = (rd_kafkap_bytes_t *)rd_malloc(sizeof(*kbytes) + 4 +
432 			(len == RD_KAFKAP_BYTES_LEN_NULL ? 0 : len));
433 	kbytes->len = len;
434 
435 	klen = htobe32(len);
436 	memcpy((void *)(kbytes+1), &klen, 4);
437 
438 	if (len == RD_KAFKAP_BYTES_LEN_NULL)
439 		kbytes->data = NULL;
440 	else {
441 		kbytes->data = ((const char *)(kbytes+1))+4;
442                 if (bytes)
443                         memcpy((void *)kbytes->data, bytes, len);
444 	}
445 
446 	return kbytes;
447 }
448 
449 
450 /**
451  * Makes a copy of `src`. The copy will be fully allocated and should
452  * be freed with rd_kafkap_bytes_destroy()
453  */
454 static RD_INLINE RD_UNUSED
rd_kafkap_bytes_copy(const rd_kafkap_bytes_t * src)455 rd_kafkap_bytes_t *rd_kafkap_bytes_copy (const rd_kafkap_bytes_t *src) {
456         return rd_kafkap_bytes_new(
457                 (const char *)src->data, src->len);
458 }
459 
460 
rd_kafkap_bytes_cmp(const rd_kafkap_bytes_t * a,const rd_kafkap_bytes_t * b)461 static RD_INLINE RD_UNUSED int rd_kafkap_bytes_cmp (const rd_kafkap_bytes_t *a,
462 						   const rd_kafkap_bytes_t *b) {
463 	int minlen = RD_MIN(a->len, b->len);
464 	int r = memcmp(a->data, b->data, minlen);
465 	if (r)
466 		return r;
467 	else
468                 return RD_CMP(a->len, b->len);
469 }
470 
471 static RD_INLINE RD_UNUSED
rd_kafkap_bytes_cmp_data(const rd_kafkap_bytes_t * a,const char * data,int len)472 int rd_kafkap_bytes_cmp_data (const rd_kafkap_bytes_t *a,
473 			      const char *data, int len) {
474 	int minlen = RD_MIN(a->len, len);
475 	int r = memcmp(a->data, data, minlen);
476 	if (r)
477 		return r;
478 	else
479                 return RD_CMP(a->len, len);
480 }
481 
482 
483 
484 
485 typedef struct rd_kafka_buf_s rd_kafka_buf_t;
486 
487 
488 #define RD_KAFKA_NODENAME_SIZE  256
489 
490 
491 
492 
493 /**
494  * @brief Message overheads (worst-case)
495  */
496 
497 /**
498  * MsgVersion v0..v1
499  */
500 /* Offset + MessageSize */
501 #define RD_KAFKAP_MESSAGESET_V0_HDR_SIZE (8+4)
502 /* CRC + Magic + Attr + KeyLen + ValueLen */
503 #define RD_KAFKAP_MESSAGE_V0_HDR_SIZE    (4+1+1+4+4)
504 /* CRC + Magic + Attr + Timestamp + KeyLen + ValueLen */
505 #define RD_KAFKAP_MESSAGE_V1_HDR_SIZE    (4+1+1+8+4+4)
506 /* Maximum per-message overhead */
507 #define RD_KAFKAP_MESSAGE_V0_OVERHEAD                                   \
508         (RD_KAFKAP_MESSAGESET_V0_HDR_SIZE + RD_KAFKAP_MESSAGE_V0_HDR_SIZE)
509 #define RD_KAFKAP_MESSAGE_V1_OVERHEAD                                   \
510         (RD_KAFKAP_MESSAGESET_V0_HDR_SIZE + RD_KAFKAP_MESSAGE_V1_HDR_SIZE)
511 
512 /**
513  * MsgVersion v2
514  */
515 #define RD_KAFKAP_MESSAGE_V2_MAX_OVERHEAD                              \
516         (                                                              \
517         /* Length (varint) */                                          \
518         RD_UVARINT_ENC_SIZEOF(int32_t) +                               \
519         /* Attributes */                                               \
520         1 +                                                            \
521         /* TimestampDelta (varint) */                                  \
522         RD_UVARINT_ENC_SIZEOF(int64_t) +                               \
523         /* OffsetDelta (varint) */                                     \
524         RD_UVARINT_ENC_SIZEOF(int32_t) +                               \
525         /* KeyLen (varint) */                                          \
526         RD_UVARINT_ENC_SIZEOF(int32_t) +                               \
527         /* ValueLen (varint) */                                        \
528         RD_UVARINT_ENC_SIZEOF(int32_t) +                               \
529         /* HeaderCnt (varint): */                                      \
530         RD_UVARINT_ENC_SIZEOF(int32_t)                                 \
531         )
532 
533 #define RD_KAFKAP_MESSAGE_V2_MIN_OVERHEAD                              \
534         (                                                              \
535         /* Length (varint) */                                          \
536         RD_UVARINT_ENC_SIZE_0() +                                      \
537         /* Attributes */                                               \
538         1 +                                                            \
539         /* TimestampDelta (varint) */                                  \
540         RD_UVARINT_ENC_SIZE_0() +                                      \
541         /* OffsetDelta (varint) */                                     \
542         RD_UVARINT_ENC_SIZE_0() +                                      \
543         /* KeyLen (varint) */                                          \
544         RD_UVARINT_ENC_SIZE_0() +                                      \
545         /* ValueLen (varint) */                                        \
546         RD_UVARINT_ENC_SIZE_0() +                                      \
547         /* HeaderCnt (varint): */                                      \
548         RD_UVARINT_ENC_SIZE_0()                                        \
549         )
550 
551 
552 /**
553  * @brief MessageSets are not explicitly versioned but depends on the
554  *        Produce/Fetch API version and the encompassed Message versions.
555  *        We use the Message version (MsgVersion, aka MagicByte) to describe
556  *        the MessageSet version, that is, MsgVersion <= 1 uses the old
557  *        MessageSet version (v0?) while MsgVersion 2 uses MessageSet version v2
558  */
559 
560 /* Old MessageSet header: none */
561 #define RD_KAFKAP_MSGSET_V0_SIZE                0
562 
563 /* MessageSet v2 header */
564 #define RD_KAFKAP_MSGSET_V2_SIZE                (8+4+4+1+4+2+4+8+8+8+2+4+4)
565 
566 /* Byte offsets for MessageSet fields */
567 #define RD_KAFKAP_MSGSET_V2_OF_Length           (8)
568 #define RD_KAFKAP_MSGSET_V2_OF_CRC              (8+4+4+1)
569 #define RD_KAFKAP_MSGSET_V2_OF_Attributes       (8+4+4+1+4)
570 #define RD_KAFKAP_MSGSET_V2_OF_LastOffsetDelta  (8+4+4+1+4+2)
571 #define RD_KAFKAP_MSGSET_V2_OF_BaseTimestamp    (8+4+4+1+4+2+4)
572 #define RD_KAFKAP_MSGSET_V2_OF_MaxTimestamp     (8+4+4+1+4+2+4+8)
573 #define RD_KAFKAP_MSGSET_V2_OF_BaseSequence     (8+4+4+1+4+2+4+8+8+8+2)
574 #define RD_KAFKAP_MSGSET_V2_OF_RecordCount      (8+4+4+1+4+2+4+8+8+8+2+4)
575 
576 
577 
578 
579 /**
580  * @name Producer ID and Epoch for the Idempotent Producer
581  * @{
582  *
583  */
584 
585 /**
586  * @brief Producer ID and Epoch
587  */
588 typedef struct rd_kafka_pid_s {
589         int64_t id;     /**< Producer Id */
590         int16_t epoch;  /**< Producer Epoch */
591 } rd_kafka_pid_t;
592 
593 #define RD_KAFKA_PID_INITIALIZER {-1,-1}
594 
595 /**
596  * @returns true if \p PID is valid
597  */
598 #define rd_kafka_pid_valid(PID) ((PID).id != -1)
599 
600 /**
601  * @brief Check two pids for equality
602  */
rd_kafka_pid_eq(const rd_kafka_pid_t a,const rd_kafka_pid_t b)603 static RD_UNUSED RD_INLINE int rd_kafka_pid_eq (const rd_kafka_pid_t a,
604                                                 const rd_kafka_pid_t b) {
605         return a.id == b.id && a.epoch == b.epoch;
606 }
607 
608 /**
609  * @brief Pid+epoch comparator
610  */
rd_kafka_pid_cmp(const void * _a,const void * _b)611 static RD_UNUSED int rd_kafka_pid_cmp (const void *_a, const void *_b) {
612         const rd_kafka_pid_t *a = _a, *b = _b;
613 
614         if (a->id < b->id)
615                 return -1;
616         else if (a->id > b->id)
617                 return 1;
618 
619         return (int)a->epoch - (int)b->epoch;
620 }
621 
622 
623 /**
624  * @brief Pid (not epoch) comparator
625  */
rd_kafka_pid_cmp_pid(const void * _a,const void * _b)626 static RD_UNUSED int rd_kafka_pid_cmp_pid (const void *_a, const void *_b) {
627         const rd_kafka_pid_t *a = _a, *b = _b;
628 
629         if (a->id < b->id)
630                 return -1;
631         else if (a->id > b->id)
632                 return 1;
633 
634         return 0;
635 }
636 
637 
638 /**
639  * @returns the string representation of a PID in a thread-safe
640  *          static buffer.
641  */
642 static RD_UNUSED const char *
rd_kafka_pid2str(const rd_kafka_pid_t pid)643 rd_kafka_pid2str (const rd_kafka_pid_t pid) {
644         static RD_TLS char buf[2][64];
645         static RD_TLS int i;
646 
647         if (!rd_kafka_pid_valid(pid))
648                 return "PID{Invalid}";
649 
650         i = (i + 1) % 2;
651 
652         rd_snprintf(buf[i], sizeof(buf[i]),
653                     "PID{Id:%" PRId64",Epoch:%hd}", pid.id, pid.epoch);
654 
655         return buf[i];
656 }
657 
658 /**
659  * @brief Reset the PID to invalid/init state
660  */
rd_kafka_pid_reset(rd_kafka_pid_t * pid)661 static RD_UNUSED RD_INLINE void rd_kafka_pid_reset (rd_kafka_pid_t *pid) {
662         pid->id = -1;
663         pid->epoch = -1;
664 }
665 
666 
667 /**
668  * @brief Bump the epoch of a valid PID
669  */
670 static RD_UNUSED RD_INLINE rd_kafka_pid_t
rd_kafka_pid_bump(const rd_kafka_pid_t old)671 rd_kafka_pid_bump (const rd_kafka_pid_t old) {
672         rd_kafka_pid_t new_pid = {
673                 old.id,
674                 (int16_t)(((int)old.epoch + 1) & (int)INT16_MAX) };
675         return new_pid;
676 }
677 
678 /**@}*/
679 
680 
681 #endif /* _RDKAFKA_PROTO_H_ */
682