1 /*
2 * librdkafka - Apache Kafka C library
3 *
4 * Copyright (c) 2012,2013 Magnus Edenhill
5 * All rights reserved.
6 *
7 * Redistribution and use in source and binary forms, with or without
8 * modification, are permitted provided that the following conditions are met:
9 *
10 * 1. Redistributions of source code must retain the above copyright notice,
11 * this list of conditions and the following disclaimer.
12 * 2. Redistributions in binary form must reproduce the above copyright notice,
13 * this list of conditions and the following disclaimer in the documentation
14 * and/or other materials provided with the distribution.
15 *
16 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
17 * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
18 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
19 * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
20 * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
21 * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
22 * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
23 * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
24 * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
25 * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
26 * POSSIBILITY OF SUCH DAMAGE.
27 */
28
29 #ifndef _RDKAFKA_PROTO_H_
30 #define _RDKAFKA_PROTO_H_
31
32
33 #include "rdendian.h"
34 #include "rdvarint.h"
35
36 /* Protocol defines */
37 #include "rdkafka_protocol.h"
38
39
40
41 /** Default generic retry count for failed requests.
42 * This may be overriden for specific request types. */
43 #define RD_KAFKA_REQUEST_DEFAULT_RETRIES 2
44
45 /** Max (practically infinite) retry count */
46 #define RD_KAFKA_REQUEST_MAX_RETRIES INT_MAX
47
48 /** Do not retry request */
49 #define RD_KAFKA_REQUEST_NO_RETRIES 0
50
51
52 /**
53 * Request types
54 */
55 struct rd_kafkap_reqhdr {
56 int32_t Size;
57 int16_t ApiKey;
58 int16_t ApiVersion;
59 int32_t CorrId;
60 /* ClientId follows */
61 };
62
63 #define RD_KAFKAP_REQHDR_SIZE (4+2+2+4)
64 #define RD_KAFKAP_RESHDR_SIZE (4+4)
65
66 /**
67 * Response header
68 */
69 struct rd_kafkap_reshdr {
70 int32_t Size;
71 int32_t CorrId;
72 };
73
74
75 /**
76 * Request type v1 (flexible version)
77 *
78 * i32 Size
79 * i16 ApiKey
80 * i16 ApiVersion
81 * i32 CorrId
82 * string ClientId (2-byte encoding, not compact string)
83 * uvarint Tags
84 * <Request payload>
85 * uvarint EndTags
86 *
87 * Any struct-type (non-primitive or array type) field in the request payload
88 * must also have a trailing tags list, this goes for structs in arrays as well.
89 */
90
91 /**
92 * @brief Protocol request type (ApiKey) to name/string.
93 *
94 * Generate updates to this list with generate_proto.sh.
95 */
96 static RD_UNUSED
rd_kafka_ApiKey2str(int16_t ApiKey)97 const char *rd_kafka_ApiKey2str (int16_t ApiKey) {
98 static const char *names[] = {
99 [RD_KAFKAP_Produce] = "Produce",
100 [RD_KAFKAP_Fetch] = "Fetch",
101 [RD_KAFKAP_ListOffsets] = "ListOffsets",
102 [RD_KAFKAP_Metadata] = "Metadata",
103 [RD_KAFKAP_LeaderAndIsr] = "LeaderAndIsr",
104 [RD_KAFKAP_StopReplica] = "StopReplica",
105 [RD_KAFKAP_UpdateMetadata] = "UpdateMetadata",
106 [RD_KAFKAP_ControlledShutdown] = "ControlledShutdown",
107 [RD_KAFKAP_OffsetCommit] = "OffsetCommit",
108 [RD_KAFKAP_OffsetFetch] = "OffsetFetch",
109 [RD_KAFKAP_FindCoordinator] = "FindCoordinator",
110 [RD_KAFKAP_JoinGroup] = "JoinGroup",
111 [RD_KAFKAP_Heartbeat] = "Heartbeat",
112 [RD_KAFKAP_LeaveGroup] = "LeaveGroup",
113 [RD_KAFKAP_SyncGroup] = "SyncGroup",
114 [RD_KAFKAP_DescribeGroups] = "DescribeGroups",
115 [RD_KAFKAP_ListGroups] = "ListGroups",
116 [RD_KAFKAP_SaslHandshake] = "SaslHandshake",
117 [RD_KAFKAP_ApiVersion] = "ApiVersion",
118 [RD_KAFKAP_CreateTopics] = "CreateTopics",
119 [RD_KAFKAP_DeleteTopics] = "DeleteTopics",
120 [RD_KAFKAP_DeleteRecords] = "DeleteRecords",
121 [RD_KAFKAP_InitProducerId] = "InitProducerId",
122 [RD_KAFKAP_OffsetForLeaderEpoch] = "OffsetForLeaderEpoch",
123 [RD_KAFKAP_AddPartitionsToTxn] = "AddPartitionsToTxn",
124 [RD_KAFKAP_AddOffsetsToTxn] = "AddOffsetsToTxn",
125 [RD_KAFKAP_EndTxn] = "EndTxn",
126 [RD_KAFKAP_WriteTxnMarkers] = "WriteTxnMarkers",
127 [RD_KAFKAP_TxnOffsetCommit] = "TxnOffsetCommit",
128 [RD_KAFKAP_DescribeAcls] = "DescribeAcls",
129 [RD_KAFKAP_CreateAcls] = "CreateAcls",
130 [RD_KAFKAP_DeleteAcls] = "DeleteAcls",
131 [RD_KAFKAP_DescribeConfigs] = "DescribeConfigs",
132 [RD_KAFKAP_AlterConfigs] = "AlterConfigs",
133 [RD_KAFKAP_AlterReplicaLogDirs] = "AlterReplicaLogDirs",
134 [RD_KAFKAP_DescribeLogDirs] = "DescribeLogDirs",
135 [RD_KAFKAP_SaslAuthenticate] = "SaslAuthenticate",
136 [RD_KAFKAP_CreatePartitions] = "CreatePartitions",
137 [RD_KAFKAP_CreateDelegationToken] = "CreateDelegationToken",
138 [RD_KAFKAP_RenewDelegationToken] = "RenewDelegationToken",
139 [RD_KAFKAP_ExpireDelegationToken] = "ExpireDelegationToken",
140 [RD_KAFKAP_DescribeDelegationToken] = "DescribeDelegationToken",
141 [RD_KAFKAP_DeleteGroups] = "DeleteGroups",
142 [RD_KAFKAP_ElectLeaders] = "ElectLeadersRequest",
143 [RD_KAFKAP_IncrementalAlterConfigs] =
144 "IncrementalAlterConfigsRequest",
145 [RD_KAFKAP_AlterPartitionReassignments] =
146 "AlterPartitionReassignmentsRequest",
147 [RD_KAFKAP_ListPartitionReassignments] =
148 "ListPartitionReassignmentsRequest",
149 [RD_KAFKAP_OffsetDelete] = "OffsetDeleteRequest",
150 [RD_KAFKAP_DescribeClientQuotas] =
151 "DescribeClientQuotasRequest",
152 [RD_KAFKAP_AlterClientQuotas] =
153 "AlterClientQuotasRequest",
154 [RD_KAFKAP_DescribeUserScramCredentials] =
155 "DescribeUserScramCredentialsRequest",
156 [RD_KAFKAP_AlterUserScramCredentials] =
157 "AlterUserScramCredentialsRequest",
158 [RD_KAFKAP_Vote] = "VoteRequest",
159 [RD_KAFKAP_BeginQuorumEpoch] = "BeginQuorumEpochRequest",
160 [RD_KAFKAP_EndQuorumEpoch] = "EndQuorumEpochRequest",
161 [RD_KAFKAP_DescribeQuorum] = "DescribeQuorumRequest",
162 [RD_KAFKAP_AlterIsr] = "AlterIsrRequest",
163 [RD_KAFKAP_UpdateFeatures] = "UpdateFeaturesRequest",
164 [RD_KAFKAP_Envelope] = "EnvelopeRequest",
165 };
166 static RD_TLS char ret[64];
167
168 if (ApiKey < 0 || ApiKey >= (int)RD_ARRAYSIZE(names) ||
169 !names[ApiKey]) {
170 rd_snprintf(ret, sizeof(ret), "Unknown-%hd?", ApiKey);
171 return ret;
172 }
173
174 return names[ApiKey];
175 }
176
177
178
179
180
181
182
183
184 /**
185 * @brief ApiKey version support tuple.
186 */
187 struct rd_kafka_ApiVersion {
188 int16_t ApiKey;
189 int16_t MinVer;
190 int16_t MaxVer;
191 };
192
193 /**
194 * @brief ApiVersion.ApiKey comparator.
195 */
196 static RD_UNUSED
rd_kafka_ApiVersion_key_cmp(const void * _a,const void * _b)197 int rd_kafka_ApiVersion_key_cmp (const void *_a, const void *_b) {
198 const struct rd_kafka_ApiVersion *a =
199 (const struct rd_kafka_ApiVersion *)_a;
200 const struct rd_kafka_ApiVersion *b =
201 (const struct rd_kafka_ApiVersion *)_b;
202 return RD_CMP(a->ApiKey, b->ApiKey);
203 }
204
205
206
207 typedef enum {
208 RD_KAFKA_READ_UNCOMMITTED = 0,
209 RD_KAFKA_READ_COMMITTED = 1
210 } rd_kafka_isolation_level_t;
211
212
213
214 #define RD_KAFKA_CTRL_MSG_ABORT 0
215 #define RD_KAFKA_CTRL_MSG_COMMIT 1
216
217
218 /**
219 * @enum Coordinator type, used with FindCoordinatorRequest
220 */
221 typedef enum rd_kafka_coordtype_t {
222 RD_KAFKA_COORD_GROUP = 0,
223 RD_KAFKA_COORD_TXN = 1
224 } rd_kafka_coordtype_t;
225
226
227 /**
228 *
229 * Kafka protocol string representation prefixed with a convenience header
230 *
231 * Serialized format:
232 * { uint16, data.. }
233 *
234 */
235 typedef struct rd_kafkap_str_s {
236 /* convenience header (aligned access, host endian) */
237 int len; /* Kafka string length (-1=NULL, 0=empty, >0=string) */
238 const char *str; /* points into data[] or other memory,
239 * not NULL-terminated */
240 } rd_kafkap_str_t;
241
242
243 #define RD_KAFKAP_STR_LEN_NULL -1
244 #define RD_KAFKAP_STR_IS_NULL(kstr) ((kstr)->len == RD_KAFKAP_STR_LEN_NULL)
245
246 /* Returns the length of the string of a kafka protocol string representation */
247 #define RD_KAFKAP_STR_LEN0(len) ((len) == RD_KAFKAP_STR_LEN_NULL ? 0 : (len))
248 #define RD_KAFKAP_STR_LEN(kstr) RD_KAFKAP_STR_LEN0((kstr)->len)
249
250 /* Returns the actual size of a kafka protocol string representation. */
251 #define RD_KAFKAP_STR_SIZE0(len) (2 + RD_KAFKAP_STR_LEN0(len))
252 #define RD_KAFKAP_STR_SIZE(kstr) RD_KAFKAP_STR_SIZE0((kstr)->len)
253
254
255 /** @returns true if kstr is pre-serialized through .._new() */
256 #define RD_KAFKAP_STR_IS_SERIALIZED(kstr) \
257 (((const char *)((kstr)+1))+2 == (const char *)((kstr)->str))
258
259 /* Serialized Kafka string: only works for _new() kstrs.
260 * Check with RD_KAFKAP_STR_IS_SERIALIZED */
261 #define RD_KAFKAP_STR_SER(kstr) ((kstr)+1)
262
263 /* Macro suitable for "%.*s" printing. */
264 #define RD_KAFKAP_STR_PR(kstr) \
265 (int)((kstr)->len == RD_KAFKAP_STR_LEN_NULL ? 0 : (kstr)->len), \
266 (kstr)->str
267
268 /* strndupa() a Kafka string */
269 #define RD_KAFKAP_STR_DUPA(destptr,kstr) \
270 rd_strndupa((destptr), (kstr)->str, RD_KAFKAP_STR_LEN(kstr))
271
272 /* strndup() a Kafka string */
273 #define RD_KAFKAP_STR_DUP(kstr) rd_strndup((kstr)->str, RD_KAFKAP_STR_LEN(kstr))
274
275 #define RD_KAFKAP_STR_INITIALIZER { .len = RD_KAFKAP_STR_LEN_NULL, .str = NULL }
276
277 /**
278 * Frees a Kafka string previously allocated with `rd_kafkap_str_new()`
279 */
rd_kafkap_str_destroy(rd_kafkap_str_t * kstr)280 static RD_UNUSED void rd_kafkap_str_destroy (rd_kafkap_str_t *kstr) {
281 rd_free(kstr);
282 }
283
284
285
286 /**
287 * Allocate a new Kafka string and make a copy of 'str'.
288 * If 'len' is -1 the length will be calculated.
289 * Supports Kafka NULL strings.
290 * Nul-terminates the string, but the trailing \0 is not part of
291 * the serialized string.
292 */
293 static RD_INLINE RD_UNUSED
rd_kafkap_str_new(const char * str,int len)294 rd_kafkap_str_t *rd_kafkap_str_new (const char *str, int len) {
295 rd_kafkap_str_t *kstr;
296 int16_t klen;
297
298 if (!str)
299 len = RD_KAFKAP_STR_LEN_NULL;
300 else if (len == -1)
301 len = (int)strlen(str);
302
303 kstr = (rd_kafkap_str_t *)rd_malloc(sizeof(*kstr) + 2 +
304 (len == RD_KAFKAP_STR_LEN_NULL ? 0 : len + 1));
305 kstr->len = len;
306
307 /* Serialised format: 16-bit string length */
308 klen = htobe16(len);
309 memcpy(kstr+1, &klen, 2);
310
311 /* Pre-Serialised format: non null-terminated string */
312 if (len == RD_KAFKAP_STR_LEN_NULL)
313 kstr->str = NULL;
314 else {
315 kstr->str = ((const char *)(kstr+1))+2;
316 memcpy((void *)kstr->str, str, len);
317 ((char *)kstr->str)[len] = '\0';
318 }
319
320 return kstr;
321 }
322
323
324 /**
325 * Makes a copy of `src`. The copy will be fully allocated and should
326 * be freed with rd_kafka_pstr_destroy()
327 */
328 static RD_INLINE RD_UNUSED
rd_kafkap_str_copy(const rd_kafkap_str_t * src)329 rd_kafkap_str_t *rd_kafkap_str_copy (const rd_kafkap_str_t *src) {
330 return rd_kafkap_str_new(src->str, src->len);
331 }
332
rd_kafkap_str_cmp(const rd_kafkap_str_t * a,const rd_kafkap_str_t * b)333 static RD_INLINE RD_UNUSED int rd_kafkap_str_cmp (const rd_kafkap_str_t *a,
334 const rd_kafkap_str_t *b) {
335 int minlen = RD_MIN(a->len, b->len);
336 int r = memcmp(a->str, b->str, minlen);
337 if (r)
338 return r;
339 else
340 return RD_CMP(a->len, b->len);
341 }
342
rd_kafkap_str_cmp_str(const rd_kafkap_str_t * a,const char * str)343 static RD_INLINE RD_UNUSED int rd_kafkap_str_cmp_str (const rd_kafkap_str_t *a,
344 const char *str) {
345 int len = (int)strlen(str);
346 int minlen = RD_MIN(a->len, len);
347 int r = memcmp(a->str, str, minlen);
348 if (r)
349 return r;
350 else
351 return RD_CMP(a->len, len);
352 }
353
rd_kafkap_str_cmp_str2(const char * str,const rd_kafkap_str_t * b)354 static RD_INLINE RD_UNUSED int rd_kafkap_str_cmp_str2 (const char *str,
355 const rd_kafkap_str_t *b){
356 int len = (int)strlen(str);
357 int minlen = RD_MIN(b->len, len);
358 int r = memcmp(str, b->str, minlen);
359 if (r)
360 return r;
361 else
362 return RD_CMP(len, b->len);
363 }
364
365
366
367 /**
368 *
369 * Kafka protocol bytes array representation prefixed with a convenience header
370 *
371 * Serialized format:
372 * { uint32, data.. }
373 *
374 */
375 typedef struct rd_kafkap_bytes_s {
376 /* convenience header (aligned access, host endian) */
377 int32_t len; /* Kafka bytes length (-1=NULL, 0=empty, >0=data) */
378 const void *data; /* points just past the struct, or other memory,
379 * not NULL-terminated */
380 const char _data[1]; /* Bytes following struct when new()ed */
381 } rd_kafkap_bytes_t;
382
383
384 #define RD_KAFKAP_BYTES_LEN_NULL -1
385 #define RD_KAFKAP_BYTES_IS_NULL(kbytes) \
386 ((kbytes)->len == RD_KAFKAP_BYTES_LEN_NULL)
387
388 /* Returns the length of the bytes of a kafka protocol bytes representation */
389 #define RD_KAFKAP_BYTES_LEN0(len) ((len) == RD_KAFKAP_BYTES_LEN_NULL ? 0:(len))
390 #define RD_KAFKAP_BYTES_LEN(kbytes) RD_KAFKAP_BYTES_LEN0((kbytes)->len)
391
392 /* Returns the actual size of a kafka protocol bytes representation. */
393 #define RD_KAFKAP_BYTES_SIZE0(len) (4 + RD_KAFKAP_BYTES_LEN0(len))
394 #define RD_KAFKAP_BYTES_SIZE(kbytes) RD_KAFKAP_BYTES_SIZE0((kbytes)->len)
395
396 /** @returns true if kbyes is pre-serialized through .._new() */
397 #define RD_KAFKAP_BYTES_IS_SERIALIZED(kstr) \
398 (((const char *)((kbytes)+1))+2 == (const char *)((kbytes)->data))
399
400 /* Serialized Kafka bytes: only works for _new() kbytes */
401 #define RD_KAFKAP_BYTES_SER(kbytes) ((kbytes)+1)
402
403
404 /**
405 * Frees a Kafka bytes previously allocated with `rd_kafkap_bytes_new()`
406 */
rd_kafkap_bytes_destroy(rd_kafkap_bytes_t * kbytes)407 static RD_UNUSED void rd_kafkap_bytes_destroy (rd_kafkap_bytes_t *kbytes) {
408 rd_free(kbytes);
409 }
410
411
412 /**
413 * @brief Allocate a new Kafka bytes and make a copy of 'bytes'.
414 * If \p len > 0 but \p bytes is NULL no copying is performed by
415 * the bytes structure will be allocated to fit \p size bytes.
416 *
417 * Supports:
418 * - Kafka NULL bytes (bytes==NULL,len==0),
419 * - Empty bytes (bytes!=NULL,len==0)
420 * - Copy data (bytes!=NULL,len>0)
421 * - No-copy, just alloc (bytes==NULL,len>0)
422 */
423 static RD_INLINE RD_UNUSED
rd_kafkap_bytes_new(const char * bytes,int32_t len)424 rd_kafkap_bytes_t *rd_kafkap_bytes_new (const char *bytes, int32_t len) {
425 rd_kafkap_bytes_t *kbytes;
426 int32_t klen;
427
428 if (!bytes && !len)
429 len = RD_KAFKAP_BYTES_LEN_NULL;
430
431 kbytes = (rd_kafkap_bytes_t *)rd_malloc(sizeof(*kbytes) + 4 +
432 (len == RD_KAFKAP_BYTES_LEN_NULL ? 0 : len));
433 kbytes->len = len;
434
435 klen = htobe32(len);
436 memcpy((void *)(kbytes+1), &klen, 4);
437
438 if (len == RD_KAFKAP_BYTES_LEN_NULL)
439 kbytes->data = NULL;
440 else {
441 kbytes->data = ((const char *)(kbytes+1))+4;
442 if (bytes)
443 memcpy((void *)kbytes->data, bytes, len);
444 }
445
446 return kbytes;
447 }
448
449
450 /**
451 * Makes a copy of `src`. The copy will be fully allocated and should
452 * be freed with rd_kafkap_bytes_destroy()
453 */
454 static RD_INLINE RD_UNUSED
rd_kafkap_bytes_copy(const rd_kafkap_bytes_t * src)455 rd_kafkap_bytes_t *rd_kafkap_bytes_copy (const rd_kafkap_bytes_t *src) {
456 return rd_kafkap_bytes_new(
457 (const char *)src->data, src->len);
458 }
459
460
rd_kafkap_bytes_cmp(const rd_kafkap_bytes_t * a,const rd_kafkap_bytes_t * b)461 static RD_INLINE RD_UNUSED int rd_kafkap_bytes_cmp (const rd_kafkap_bytes_t *a,
462 const rd_kafkap_bytes_t *b) {
463 int minlen = RD_MIN(a->len, b->len);
464 int r = memcmp(a->data, b->data, minlen);
465 if (r)
466 return r;
467 else
468 return RD_CMP(a->len, b->len);
469 }
470
471 static RD_INLINE RD_UNUSED
rd_kafkap_bytes_cmp_data(const rd_kafkap_bytes_t * a,const char * data,int len)472 int rd_kafkap_bytes_cmp_data (const rd_kafkap_bytes_t *a,
473 const char *data, int len) {
474 int minlen = RD_MIN(a->len, len);
475 int r = memcmp(a->data, data, minlen);
476 if (r)
477 return r;
478 else
479 return RD_CMP(a->len, len);
480 }
481
482
483
484
485 typedef struct rd_kafka_buf_s rd_kafka_buf_t;
486
487
488 #define RD_KAFKA_NODENAME_SIZE 256
489
490
491
492
493 /**
494 * @brief Message overheads (worst-case)
495 */
496
497 /**
498 * MsgVersion v0..v1
499 */
500 /* Offset + MessageSize */
501 #define RD_KAFKAP_MESSAGESET_V0_HDR_SIZE (8+4)
502 /* CRC + Magic + Attr + KeyLen + ValueLen */
503 #define RD_KAFKAP_MESSAGE_V0_HDR_SIZE (4+1+1+4+4)
504 /* CRC + Magic + Attr + Timestamp + KeyLen + ValueLen */
505 #define RD_KAFKAP_MESSAGE_V1_HDR_SIZE (4+1+1+8+4+4)
506 /* Maximum per-message overhead */
507 #define RD_KAFKAP_MESSAGE_V0_OVERHEAD \
508 (RD_KAFKAP_MESSAGESET_V0_HDR_SIZE + RD_KAFKAP_MESSAGE_V0_HDR_SIZE)
509 #define RD_KAFKAP_MESSAGE_V1_OVERHEAD \
510 (RD_KAFKAP_MESSAGESET_V0_HDR_SIZE + RD_KAFKAP_MESSAGE_V1_HDR_SIZE)
511
512 /**
513 * MsgVersion v2
514 */
515 #define RD_KAFKAP_MESSAGE_V2_MAX_OVERHEAD \
516 ( \
517 /* Length (varint) */ \
518 RD_UVARINT_ENC_SIZEOF(int32_t) + \
519 /* Attributes */ \
520 1 + \
521 /* TimestampDelta (varint) */ \
522 RD_UVARINT_ENC_SIZEOF(int64_t) + \
523 /* OffsetDelta (varint) */ \
524 RD_UVARINT_ENC_SIZEOF(int32_t) + \
525 /* KeyLen (varint) */ \
526 RD_UVARINT_ENC_SIZEOF(int32_t) + \
527 /* ValueLen (varint) */ \
528 RD_UVARINT_ENC_SIZEOF(int32_t) + \
529 /* HeaderCnt (varint): */ \
530 RD_UVARINT_ENC_SIZEOF(int32_t) \
531 )
532
533 #define RD_KAFKAP_MESSAGE_V2_MIN_OVERHEAD \
534 ( \
535 /* Length (varint) */ \
536 RD_UVARINT_ENC_SIZE_0() + \
537 /* Attributes */ \
538 1 + \
539 /* TimestampDelta (varint) */ \
540 RD_UVARINT_ENC_SIZE_0() + \
541 /* OffsetDelta (varint) */ \
542 RD_UVARINT_ENC_SIZE_0() + \
543 /* KeyLen (varint) */ \
544 RD_UVARINT_ENC_SIZE_0() + \
545 /* ValueLen (varint) */ \
546 RD_UVARINT_ENC_SIZE_0() + \
547 /* HeaderCnt (varint): */ \
548 RD_UVARINT_ENC_SIZE_0() \
549 )
550
551
552 /**
553 * @brief MessageSets are not explicitly versioned but depends on the
554 * Produce/Fetch API version and the encompassed Message versions.
555 * We use the Message version (MsgVersion, aka MagicByte) to describe
556 * the MessageSet version, that is, MsgVersion <= 1 uses the old
557 * MessageSet version (v0?) while MsgVersion 2 uses MessageSet version v2
558 */
559
560 /* Old MessageSet header: none */
561 #define RD_KAFKAP_MSGSET_V0_SIZE 0
562
563 /* MessageSet v2 header */
564 #define RD_KAFKAP_MSGSET_V2_SIZE (8+4+4+1+4+2+4+8+8+8+2+4+4)
565
566 /* Byte offsets for MessageSet fields */
567 #define RD_KAFKAP_MSGSET_V2_OF_Length (8)
568 #define RD_KAFKAP_MSGSET_V2_OF_CRC (8+4+4+1)
569 #define RD_KAFKAP_MSGSET_V2_OF_Attributes (8+4+4+1+4)
570 #define RD_KAFKAP_MSGSET_V2_OF_LastOffsetDelta (8+4+4+1+4+2)
571 #define RD_KAFKAP_MSGSET_V2_OF_BaseTimestamp (8+4+4+1+4+2+4)
572 #define RD_KAFKAP_MSGSET_V2_OF_MaxTimestamp (8+4+4+1+4+2+4+8)
573 #define RD_KAFKAP_MSGSET_V2_OF_BaseSequence (8+4+4+1+4+2+4+8+8+8+2)
574 #define RD_KAFKAP_MSGSET_V2_OF_RecordCount (8+4+4+1+4+2+4+8+8+8+2+4)
575
576
577
578
579 /**
580 * @name Producer ID and Epoch for the Idempotent Producer
581 * @{
582 *
583 */
584
585 /**
586 * @brief Producer ID and Epoch
587 */
588 typedef struct rd_kafka_pid_s {
589 int64_t id; /**< Producer Id */
590 int16_t epoch; /**< Producer Epoch */
591 } rd_kafka_pid_t;
592
593 #define RD_KAFKA_PID_INITIALIZER {-1,-1}
594
595 /**
596 * @returns true if \p PID is valid
597 */
598 #define rd_kafka_pid_valid(PID) ((PID).id != -1)
599
600 /**
601 * @brief Check two pids for equality
602 */
rd_kafka_pid_eq(const rd_kafka_pid_t a,const rd_kafka_pid_t b)603 static RD_UNUSED RD_INLINE int rd_kafka_pid_eq (const rd_kafka_pid_t a,
604 const rd_kafka_pid_t b) {
605 return a.id == b.id && a.epoch == b.epoch;
606 }
607
608 /**
609 * @brief Pid+epoch comparator
610 */
rd_kafka_pid_cmp(const void * _a,const void * _b)611 static RD_UNUSED int rd_kafka_pid_cmp (const void *_a, const void *_b) {
612 const rd_kafka_pid_t *a = _a, *b = _b;
613
614 if (a->id < b->id)
615 return -1;
616 else if (a->id > b->id)
617 return 1;
618
619 return (int)a->epoch - (int)b->epoch;
620 }
621
622
623 /**
624 * @brief Pid (not epoch) comparator
625 */
rd_kafka_pid_cmp_pid(const void * _a,const void * _b)626 static RD_UNUSED int rd_kafka_pid_cmp_pid (const void *_a, const void *_b) {
627 const rd_kafka_pid_t *a = _a, *b = _b;
628
629 if (a->id < b->id)
630 return -1;
631 else if (a->id > b->id)
632 return 1;
633
634 return 0;
635 }
636
637
638 /**
639 * @returns the string representation of a PID in a thread-safe
640 * static buffer.
641 */
642 static RD_UNUSED const char *
rd_kafka_pid2str(const rd_kafka_pid_t pid)643 rd_kafka_pid2str (const rd_kafka_pid_t pid) {
644 static RD_TLS char buf[2][64];
645 static RD_TLS int i;
646
647 if (!rd_kafka_pid_valid(pid))
648 return "PID{Invalid}";
649
650 i = (i + 1) % 2;
651
652 rd_snprintf(buf[i], sizeof(buf[i]),
653 "PID{Id:%" PRId64",Epoch:%hd}", pid.id, pid.epoch);
654
655 return buf[i];
656 }
657
658 /**
659 * @brief Reset the PID to invalid/init state
660 */
rd_kafka_pid_reset(rd_kafka_pid_t * pid)661 static RD_UNUSED RD_INLINE void rd_kafka_pid_reset (rd_kafka_pid_t *pid) {
662 pid->id = -1;
663 pid->epoch = -1;
664 }
665
666
667 /**
668 * @brief Bump the epoch of a valid PID
669 */
670 static RD_UNUSED RD_INLINE rd_kafka_pid_t
rd_kafka_pid_bump(const rd_kafka_pid_t old)671 rd_kafka_pid_bump (const rd_kafka_pid_t old) {
672 rd_kafka_pid_t new_pid = {
673 old.id,
674 (int16_t)(((int)old.epoch + 1) & (int)INT16_MAX) };
675 return new_pid;
676 }
677
678 /**@}*/
679
680
681 #endif /* _RDKAFKA_PROTO_H_ */
682