1 /*
2 * librdkafka - Apache Kafka C library
3 *
4 * Copyright (c) 2012-2015, Magnus Edenhill
5 * All rights reserved.
6 *
7 * Redistribution and use in source and binary forms, with or without
8 * modification, are permitted provided that the following conditions are met:
9 *
10 * 1. Redistributions of source code must retain the above copyright notice,
11 * this list of conditions and the following disclaimer.
12 * 2. Redistributions in binary form must reproduce the above copyright notice,
13 * this list of conditions and the following disclaimer in the documentation
14 * and/or other materials provided with the distribution.
15 *
16 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
17 * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
18 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
19 * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
20 * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
21 * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
22 * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
23 * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
24 * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
25 * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
26 * POSSIBILITY OF SUCH DAMAGE.
27 */
28 #ifndef _RDKAFKA_BUF_H_
29 #define _RDKAFKA_BUF_H_
30
31 #include "rdkafka_int.h"
32 #include "rdcrc32.h"
33 #include "rdlist.h"
34 #include "rdbuf.h"
35 #include "rdkafka_msgbatch.h"
36
37 typedef struct rd_kafka_broker_s rd_kafka_broker_t;
38
39 #define RD_KAFKA_HEADERS_IOV_CNT 2
40
41
42 /**
43 * Temporary buffer with memory aligned writes to accommodate
44 * effective and platform safe struct writes.
45 */
46 typedef struct rd_tmpabuf_s {
47 size_t size;
48 size_t of;
49 char *buf;
50 int failed;
51 int assert_on_fail;
52 } rd_tmpabuf_t;
53
54 /**
55 * @brief Allocate new tmpabuf with \p size bytes pre-allocated.
56 */
57 static RD_UNUSED void
rd_tmpabuf_new(rd_tmpabuf_t * tab,size_t size,int assert_on_fail)58 rd_tmpabuf_new (rd_tmpabuf_t *tab, size_t size, int assert_on_fail) {
59 tab->buf = rd_malloc(size);
60 tab->size = size;
61 tab->of = 0;
62 tab->failed = 0;
63 tab->assert_on_fail = assert_on_fail;
64 }
65
66 /**
67 * @brief Free memory allocated by tmpabuf
68 */
69 static RD_UNUSED void
rd_tmpabuf_destroy(rd_tmpabuf_t * tab)70 rd_tmpabuf_destroy (rd_tmpabuf_t *tab) {
71 rd_free(tab->buf);
72 }
73
74 /**
75 * @returns 1 if a previous operation failed.
76 */
77 static RD_UNUSED RD_INLINE int
rd_tmpabuf_failed(rd_tmpabuf_t * tab)78 rd_tmpabuf_failed (rd_tmpabuf_t *tab) {
79 return tab->failed;
80 }
81
82 /**
83 * @brief Allocate \p size bytes for writing, returning an aligned pointer
84 * to the memory.
85 * @returns the allocated pointer (within the tmpabuf) on success or
86 * NULL if the requested number of bytes + alignment is not available
87 * in the tmpabuf.
88 */
89 static RD_UNUSED void *
rd_tmpabuf_alloc0(const char * func,int line,rd_tmpabuf_t * tab,size_t size)90 rd_tmpabuf_alloc0 (const char *func, int line, rd_tmpabuf_t *tab, size_t size) {
91 void *ptr;
92
93 if (unlikely(tab->failed))
94 return NULL;
95
96 if (unlikely(tab->of + size > tab->size)) {
97 if (tab->assert_on_fail) {
98 fprintf(stderr,
99 "%s: %s:%d: requested size %"PRIusz" + %"PRIusz" > %"PRIusz"\n",
100 __FUNCTION__, func, line, tab->of, size,
101 tab->size);
102 assert(!*"rd_tmpabuf_alloc: not enough size in buffer");
103 }
104 return NULL;
105 }
106
107 ptr = (void *)(tab->buf + tab->of);
108 tab->of += RD_ROUNDUP(size, 8);
109
110 return ptr;
111 }
112
113 #define rd_tmpabuf_alloc(tab,size) \
114 rd_tmpabuf_alloc0(__FUNCTION__,__LINE__,tab,size)
115
116 /**
117 * @brief Write \p buf of \p size bytes to tmpabuf memory in an aligned fashion.
118 *
119 * @returns the allocated and written-to pointer (within the tmpabuf) on success
120 * or NULL if the requested number of bytes + alignment is not available
121 * in the tmpabuf.
122 */
123 static RD_UNUSED void *
rd_tmpabuf_write0(const char * func,int line,rd_tmpabuf_t * tab,const void * buf,size_t size)124 rd_tmpabuf_write0 (const char *func, int line,
125 rd_tmpabuf_t *tab, const void *buf, size_t size) {
126 void *ptr = rd_tmpabuf_alloc0(func, line, tab, size);
127
128 if (likely(ptr && size))
129 memcpy(ptr, buf, size);
130
131 return ptr;
132 }
133 #define rd_tmpabuf_write(tab,buf,size) \
134 rd_tmpabuf_write0(__FUNCTION__, __LINE__, tab, buf, size)
135
136
137 /**
138 * @brief Wrapper for rd_tmpabuf_write() that takes a nul-terminated string.
139 */
140 static RD_UNUSED char *
rd_tmpabuf_write_str0(const char * func,int line,rd_tmpabuf_t * tab,const char * str)141 rd_tmpabuf_write_str0 (const char *func, int line,
142 rd_tmpabuf_t *tab, const char *str) {
143 return rd_tmpabuf_write0(func, line, tab, str, strlen(str)+1);
144 }
145 #define rd_tmpabuf_write_str(tab,str) \
146 rd_tmpabuf_write_str0(__FUNCTION__, __LINE__, tab, str)
147
148
149
150
151
152 /**
153 * Response handling callback.
154 *
155 * NOTE: Callbacks must check for 'err == RD_KAFKA_RESP_ERR__DESTROY'
156 * which indicates that some entity is terminating (rd_kafka_t, broker,
157 * toppar, queue, etc) and the callback may not be called in the
158 * correct thread. In this case the callback must perform just
159 * the most minimal cleanup and dont trigger any other operations.
160 *
161 * NOTE: rkb, reply and request may be NULL, depending on error situation.
162 */
163 typedef void (rd_kafka_resp_cb_t) (rd_kafka_t *rk,
164 rd_kafka_broker_t *rkb,
165 rd_kafka_resp_err_t err,
166 rd_kafka_buf_t *reply,
167 rd_kafka_buf_t *request,
168 void *opaque);
169
170
171 /**
172 * @brief Sender callback. This callback is used to construct and send (enq)
173 * a rkbuf on a particular broker.
174 */
175 typedef rd_kafka_resp_err_t (rd_kafka_send_req_cb_t) (
176 rd_kafka_broker_t *rkb,
177 rd_kafka_op_t *rko,
178 rd_kafka_replyq_t replyq,
179 rd_kafka_resp_cb_t *resp_cb,
180 void *reply_opaque);
181
182
183 /**
184 * @brief Request maker. A callback that constructs the actual contents
185 * of a request.
186 *
187 * When constructing a request the ApiVersion typically needs to be selected
188 * which requires the broker's supported ApiVersions to be known, which in
189 * turn requires the broker connection to be UP.
190 *
191 * As a buffer constructor you have two choices:
192 * a. acquire the broker handle, wait for it to come up, and then construct
193 * the request buffer, or
194 * b. acquire the broker handle, enqueue an uncrafted/unmaked
195 * request on the broker request queue, and when the broker is up
196 * the make_req_cb will be called for you to construct the request.
197 *
198 * From a code complexity standpoint, the latter option is usually the least
199 * complex and voids the caller to care about any of the broker state.
200 * Any information that is required to construct the request is passed through
201 * the make_opaque, which can be automatically freed by the buffer code
202 * when it has been used, or handled by the caller (in which case it must
203 * outlive the lifetime of the buffer).
204 *
205 * Usage:
206 *
207 * 1. Construct an rkbuf with the appropriate ApiKey.
208 * 2. Make a copy or reference of any data that is needed to construct the
209 * request, e.g., through rd_kafka_topic_partition_list_copy(). This
210 * data is passed by the make_opaque.
211 * 3. Set the make callback by calling rd_kafka_buf_set_maker() and pass
212 * the make_opaque data and a free function, if needed.
213 * 4. The callback will eventually be called from the broker thread.
214 * 5. In the make callback construct the request on the passed rkbuf.
215 * 6. The request is sent to the broker and the make_opaque is freed.
216 *
217 * See rd_kafka_ListOffsetsRequest() in rdkafka_request.c for an example.
218 *
219 */
220 typedef rd_kafka_resp_err_t (rd_kafka_make_req_cb_t) (
221 rd_kafka_broker_t *rkb,
222 rd_kafka_buf_t *rkbuf,
223 void *make_opaque);
224
225 /**
226 * @struct Request and response buffer
227 *
228 */
229 struct rd_kafka_buf_s { /* rd_kafka_buf_t */
230 TAILQ_ENTRY(rd_kafka_buf_s) rkbuf_link;
231
232 int32_t rkbuf_corrid;
233
234 rd_ts_t rkbuf_ts_retry; /* Absolute send retry time */
235
236 int rkbuf_flags; /* RD_KAFKA_OP_F */
237
238 /** What convenience flags to copy from request to response along
239 * with the reqhdr. */
240 #define RD_KAFKA_BUF_FLAGS_RESP_COPY_MASK (RD_KAFKA_OP_F_FLEXVER)
241
242 rd_kafka_prio_t rkbuf_prio; /**< Request priority */
243
244 rd_buf_t rkbuf_buf; /**< Send/Recv byte buffer */
245 rd_slice_t rkbuf_reader; /**< Buffer slice reader for rkbuf_buf */
246
247 int rkbuf_connid; /* broker connection id (used when buffer
248 * was partially sent). */
249 size_t rkbuf_totlen; /* recv: total expected length,
250 * send: not used */
251
252 rd_crc32_t rkbuf_crc; /* Current CRC calculation */
253
254 struct rd_kafkap_reqhdr rkbuf_reqhdr; /* Request header.
255 * These fields are encoded
256 * and written to output buffer
257 * on buffer finalization.
258 * Note:
259 * The request's
260 * reqhdr is copied to the
261 * response's reqhdr as a
262 * convenience. */
263 struct rd_kafkap_reshdr rkbuf_reshdr; /* Response header.
264 * Decoded fields are copied
265 * here from the buffer
266 * to provide an ease-of-use
267 * interface to the header */
268
269 int32_t rkbuf_expected_size; /* expected size of message */
270
271 rd_kafka_replyq_t rkbuf_replyq; /* Enqueue response on replyq */
272 rd_kafka_replyq_t rkbuf_orig_replyq; /* Original replyq to be used
273 * for retries from inside
274 * the rkbuf_cb() callback
275 * since rkbuf_replyq will
276 * have been reset. */
277 rd_kafka_resp_cb_t *rkbuf_cb; /* Response callback */
278 struct rd_kafka_buf_s *rkbuf_response; /* Response buffer */
279
280 rd_kafka_make_req_cb_t *rkbuf_make_req_cb; /**< Callback to construct
281 * the request itself.
282 * Will be used if
283 * RD_KAFKA_OP_F_NEED_MAKE
284 * is set. */
285 void *rkbuf_make_opaque; /**< Opaque passed to rkbuf_make_req_cb.
286 * Will be freed automatically after use
287 * by the rkbuf code. */
288 void (*rkbuf_free_make_opaque_cb) (void *); /**< Free function for
289 * rkbuf_make_opaque. */
290
291 struct rd_kafka_broker_s *rkbuf_rkb;
292
293 rd_refcnt_t rkbuf_refcnt;
294 void *rkbuf_opaque;
295
296 int rkbuf_max_retries; /**< Maximum retries to attempt. */
297 int rkbuf_retries; /**< Retries so far. */
298
299
300 int rkbuf_features; /* Required feature(s) that must be
301 * supported by broker. */
302
303 rd_ts_t rkbuf_ts_enq;
304 rd_ts_t rkbuf_ts_sent; /* Initially: Absolute time of transmission,
305 * after response: RTT. */
306
307 /* Request timeouts:
308 * rkbuf_ts_timeout is the effective absolute request timeout used
309 * by the timeout scanner to see if a request has timed out.
310 * It is set when a request is enqueued on the broker transmit
311 * queue based on the relative or absolute timeout:
312 *
313 * rkbuf_rel_timeout is the per-request-transmit relative timeout,
314 * this value is reused for each sub-sequent retry of a request.
315 *
316 * rkbuf_abs_timeout is the absolute request timeout, spanning
317 * all retries.
318 * This value is effectively limited by socket.timeout.ms for
319 * each transmission, but the absolute timeout for a request's
320 * lifetime is the absolute value.
321 *
322 * Use rd_kafka_buf_set_timeout() to set a relative timeout
323 * that will be reused on retry,
324 * or rd_kafka_buf_set_abs_timeout() to set a fixed absolute timeout
325 * for the case where the caller knows the request will be
326 * semantically outdated when that absolute time expires, such as for
327 * session.timeout.ms-based requests.
328 *
329 * The decision to retry a request is delegated to the rkbuf_cb
330 * response callback, which should use rd_kafka_err_action()
331 * and check the return actions for RD_KAFKA_ERR_ACTION_RETRY to be set
332 * and then call rd_kafka_buf_retry().
333 * rd_kafka_buf_retry() will enqueue the request on the rkb_retrybufs
334 * queue with a backoff time of retry.backoff.ms.
335 * The rkb_retrybufs queue is served by the broker thread's timeout
336 * scanner.
337 * @warning rkb_retrybufs is NOT purged on broker down.
338 */
339 rd_ts_t rkbuf_ts_timeout; /* Request timeout (absolute time). */
340 rd_ts_t rkbuf_abs_timeout;/* Absolute timeout for request, including
341 * retries.
342 * Mutually exclusive with rkbuf_rel_timeout*/
343 int rkbuf_rel_timeout;/* Relative timeout (ms), used for retries.
344 * Defaults to socket.timeout.ms.
345 * Mutually exclusive with rkbuf_abs_timeout*/
346 rd_bool_t rkbuf_force_timeout; /**< Force request timeout to be
347 * remaining abs_timeout regardless
348 * of socket.timeout.ms. */
349
350
351 int64_t rkbuf_offset; /* Used by OffsetCommit */
352
353 rd_list_t *rkbuf_rktp_vers; /* Toppar + Op Version map.
354 * Used by FetchRequest. */
355
356 rd_kafka_resp_err_t rkbuf_err; /* Buffer parsing error code */
357
358 union {
359 struct {
360 rd_list_t *topics; /* Requested topics (char *) */
361 char *reason; /* Textual reason */
362 rd_kafka_op_t *rko; /* Originating rko with replyq
363 * (if any) */
364 rd_bool_t all_topics; /**< Full/All topics requested */
365 rd_bool_t cgrp_update; /**< Update cgrp with topic
366 * status from response. */
367
368 int *decr; /* Decrement this integer by one
369 * when request is complete:
370 * typically points to metadata
371 * cache's full_.._sent.
372 * Will be performed with
373 * decr_lock held. */
374 mtx_t *decr_lock;
375
376 } Metadata;
377 struct {
378 rd_kafka_msgbatch_t batch; /**< MessageSet/batch */
379 } Produce;
380 struct {
381 rd_bool_t commit; /**< true = txn commit,
382 * false = txn abort */
383 } EndTxn;
384 } rkbuf_u;
385
386 #define rkbuf_batch rkbuf_u.Produce.batch
387
388 const char *rkbuf_uflow_mitigation; /**< Buffer read underflow
389 * human readable mitigation
390 * string (const memory).
391 * This is used to hint the
392 * user why the underflow
393 * might have occurred, which
394 * depends on request type. */
395 };
396
397
398
399
400 /**
401 * @name Read buffer interface
402 *
403 * Memory reading helper macros to be used when parsing network responses.
404 *
405 * Assumptions:
406 * - an 'err_parse:' goto-label must be available for error bailouts,
407 * the error code will be set in rkbuf->rkbuf_err
408 * - local `int log_decode_errors` variable set to the logging level
409 * to log parse errors (or 0 to turn off logging).
410 */
411
412 #define rd_kafka_buf_parse_fail(rkbuf,...) do { \
413 if (log_decode_errors > 0) { \
414 rd_kafka_assert(NULL, rkbuf->rkbuf_rkb); \
415 rd_rkb_log(rkbuf->rkbuf_rkb, log_decode_errors, \
416 "PROTOERR", \
417 "Protocol parse failure for %s v%hd%s " \
418 "at %"PRIusz"/%"PRIusz" (%s:%i) " \
419 "(incorrect broker.version.fallback?)", \
420 rd_kafka_ApiKey2str(rkbuf->rkbuf_reqhdr. \
421 ApiKey), \
422 rkbuf->rkbuf_reqhdr.ApiVersion, \
423 (rkbuf->rkbuf_flags&RD_KAFKA_OP_F_FLEXVER? \
424 "(flex)":""), \
425 rd_slice_offset(&rkbuf->rkbuf_reader), \
426 rd_slice_size(&rkbuf->rkbuf_reader), \
427 __FUNCTION__, __LINE__); \
428 rd_rkb_log(rkbuf->rkbuf_rkb, log_decode_errors, \
429 "PROTOERR", __VA_ARGS__); \
430 } \
431 (rkbuf)->rkbuf_err = RD_KAFKA_RESP_ERR__BAD_MSG; \
432 goto err_parse; \
433 } while (0)
434
435 /**
436 * @name Fail buffer reading due to buffer underflow.
437 */
438 #define rd_kafka_buf_underflow_fail(rkbuf,wantedlen,...) do { \
439 if (log_decode_errors > 0) { \
440 rd_kafka_assert(NULL, rkbuf->rkbuf_rkb); \
441 char __tmpstr[256]; \
442 rd_snprintf(__tmpstr, sizeof(__tmpstr), \
443 ": " __VA_ARGS__); \
444 if (strlen(__tmpstr) == 2) __tmpstr[0] = '\0'; \
445 rd_rkb_log(rkbuf->rkbuf_rkb, log_decode_errors, \
446 "PROTOUFLOW", \
447 "Protocol read buffer underflow " \
448 "for %s v%hd " \
449 "at %"PRIusz"/%"PRIusz" (%s:%i): " \
450 "expected %"PRIusz" bytes > " \
451 "%"PRIusz" remaining bytes (%s)%s", \
452 rd_kafka_ApiKey2str(rkbuf->rkbuf_reqhdr. \
453 ApiKey), \
454 rkbuf->rkbuf_reqhdr.ApiVersion, \
455 rd_slice_offset(&rkbuf->rkbuf_reader), \
456 rd_slice_size(&rkbuf->rkbuf_reader), \
457 __FUNCTION__, __LINE__, \
458 wantedlen, \
459 rd_slice_remains(&rkbuf->rkbuf_reader), \
460 rkbuf->rkbuf_uflow_mitigation ? \
461 rkbuf->rkbuf_uflow_mitigation : \
462 "incorrect broker.version.fallback?", \
463 __tmpstr); \
464 } \
465 (rkbuf)->rkbuf_err = RD_KAFKA_RESP_ERR__UNDERFLOW; \
466 goto err_parse; \
467 } while (0)
468
469
470 /**
471 * Returns the number of remaining bytes available to read.
472 */
473 #define rd_kafka_buf_read_remain(rkbuf) \
474 rd_slice_remains(&(rkbuf)->rkbuf_reader)
475
476 /**
477 * Checks that at least 'len' bytes remain to be read in buffer, else fails.
478 */
479 #define rd_kafka_buf_check_len(rkbuf,len) do { \
480 size_t __len0 = (size_t)(len); \
481 if (unlikely(__len0 > rd_kafka_buf_read_remain(rkbuf))) { \
482 rd_kafka_buf_underflow_fail(rkbuf, __len0); \
483 } \
484 } while (0)
485
486 /**
487 * Skip (as in read and ignore) the next 'len' bytes.
488 */
489 #define rd_kafka_buf_skip(rkbuf, len) do { \
490 size_t __len1 = (size_t)(len); \
491 if (__len1 && \
492 !rd_slice_read(&(rkbuf)->rkbuf_reader, NULL, __len1)) \
493 rd_kafka_buf_check_len(rkbuf, __len1); \
494 } while (0)
495
496 /**
497 * Skip (as in read and ignore) up to fixed position \p pos.
498 */
499 #define rd_kafka_buf_skip_to(rkbuf, pos) do { \
500 size_t __len1 = (size_t)(pos) - \
501 rd_slice_offset(&(rkbuf)->rkbuf_reader); \
502 if (__len1 && \
503 !rd_slice_read(&(rkbuf)->rkbuf_reader, NULL, __len1)) \
504 rd_kafka_buf_check_len(rkbuf, __len1); \
505 } while (0)
506
507
508
509 /**
510 * Read 'len' bytes and copy to 'dstptr'
511 */
512 #define rd_kafka_buf_read(rkbuf,dstptr,len) do { \
513 size_t __len2 = (size_t)(len); \
514 if (!rd_slice_read(&(rkbuf)->rkbuf_reader, dstptr, __len2)) \
515 rd_kafka_buf_check_len(rkbuf, __len2); \
516 } while (0)
517
518
519 /**
520 * @brief Read \p len bytes at slice offset \p offset and copy to \p dstptr
521 * without affecting the current reader position.
522 */
523 #define rd_kafka_buf_peek(rkbuf,offset,dstptr,len) do { \
524 size_t __len2 = (size_t)(len); \
525 if (!rd_slice_peek(&(rkbuf)->rkbuf_reader, offset, \
526 dstptr, __len2)) \
527 rd_kafka_buf_check_len(rkbuf, (offset)+(__len2)); \
528 } while (0)
529
530
531 /**
532 * Read a 16,32,64-bit integer and store it in 'dstptr'
533 */
534 #define rd_kafka_buf_read_i64(rkbuf,dstptr) do { \
535 int64_t _v; \
536 rd_kafka_buf_read(rkbuf, &_v, sizeof(_v)); \
537 *(dstptr) = be64toh(_v); \
538 } while (0)
539
540 #define rd_kafka_buf_peek_i64(rkbuf,of,dstptr) do { \
541 int64_t _v; \
542 rd_kafka_buf_peek(rkbuf, of, &_v, sizeof(_v)); \
543 *(dstptr) = be64toh(_v); \
544 } while (0)
545
546 #define rd_kafka_buf_read_i32(rkbuf,dstptr) do { \
547 int32_t _v; \
548 rd_kafka_buf_read(rkbuf, &_v, sizeof(_v)); \
549 *(dstptr) = be32toh(_v); \
550 } while (0)
551
552 #define rd_kafka_buf_peek_i32(rkbuf,of,dstptr) do { \
553 int32_t _v; \
554 rd_kafka_buf_peek(rkbuf, of, &_v, sizeof(_v)); \
555 *(dstptr) = be32toh(_v); \
556 } while (0)
557
558
559 /* Same as .._read_i32 but does a direct assignment.
560 * dst is assumed to be a scalar, not pointer. */
561 #define rd_kafka_buf_read_i32a(rkbuf, dst) do { \
562 int32_t _v; \
563 rd_kafka_buf_read(rkbuf, &_v, 4); \
564 dst = (int32_t) be32toh(_v); \
565 } while (0)
566
567 #define rd_kafka_buf_read_i16(rkbuf,dstptr) do { \
568 int16_t _v; \
569 rd_kafka_buf_read(rkbuf, &_v, sizeof(_v)); \
570 *(dstptr) = (int16_t)be16toh(_v); \
571 } while (0)
572
573
574 #define rd_kafka_buf_read_i16a(rkbuf, dst) do { \
575 int16_t _v; \
576 rd_kafka_buf_read(rkbuf, &_v, 2); \
577 dst = (int16_t)be16toh(_v); \
578 } while (0)
579
580 #define rd_kafka_buf_read_i8(rkbuf, dst) rd_kafka_buf_read(rkbuf, dst, 1)
581
582 #define rd_kafka_buf_peek_i8(rkbuf,of,dst) rd_kafka_buf_peek(rkbuf,of,dst,1)
583
584 #define rd_kafka_buf_read_bool(rkbuf, dstptr) do { \
585 int8_t _v; \
586 rd_bool_t *_dst = dstptr; \
587 rd_kafka_buf_read(rkbuf, &_v, 1); \
588 *_dst = (rd_bool_t)_v; \
589 } while (0)
590
591
592 /**
593 * @brief Read varint and store in int64_t \p dst
594 */
595 #define rd_kafka_buf_read_varint(rkbuf,dst) do { \
596 int64_t _v; \
597 size_t _r = rd_slice_read_varint(&(rkbuf)->rkbuf_reader, &_v);\
598 if (unlikely(RD_UVARINT_UNDERFLOW(_r))) \
599 rd_kafka_buf_underflow_fail(rkbuf, (size_t)0, \
600 "varint parsing failed");\
601 *(dst) = _v; \
602 } while (0)
603
604
605 /**
606 * @brief Read unsigned varint and store in uint64_t \p dst
607 */
608 #define rd_kafka_buf_read_uvarint(rkbuf,dst) do { \
609 uint64_t _v; \
610 size_t _r = rd_slice_read_uvarint(&(rkbuf)->rkbuf_reader, \
611 &_v); \
612 if (unlikely(RD_UVARINT_UNDERFLOW(_r))) \
613 rd_kafka_buf_underflow_fail(rkbuf, (size_t)0, \
614 "uvarint parsing failed"); \
615 *(dst) = _v; \
616 } while (0)
617
618
619 /**
620 * @brief Read Kafka COMPACT_STRING (VARINT+N) or
621 * standard String representation (2+N).
622 *
623 * The kstr data will be updated to point to the rkbuf. */
624 #define rd_kafka_buf_read_str(rkbuf, kstr) do { \
625 int _klen; \
626 if ((rkbuf)->rkbuf_flags & RD_KAFKA_OP_F_FLEXVER) { \
627 uint64_t _uva; \
628 rd_kafka_buf_read_uvarint(rkbuf, &_uva); \
629 (kstr)->len = ((int32_t)_uva) - 1; \
630 _klen = (kstr)->len; \
631 } else { \
632 rd_kafka_buf_read_i16a(rkbuf, (kstr)->len); \
633 _klen = RD_KAFKAP_STR_LEN(kstr); \
634 } \
635 if (RD_KAFKAP_STR_IS_NULL(kstr)) \
636 (kstr)->str = NULL; \
637 else if (RD_KAFKAP_STR_LEN(kstr) == 0) \
638 (kstr)->str = ""; \
639 else if (!((kstr)->str = \
640 rd_slice_ensure_contig(&rkbuf->rkbuf_reader, \
641 _klen))) \
642 rd_kafka_buf_check_len(rkbuf, _klen); \
643 } while (0)
644
645 /* Read Kafka String representation (2+N) and write it to the \p tmpabuf
646 * with a trailing nul byte. */
647 #define rd_kafka_buf_read_str_tmpabuf(rkbuf, tmpabuf, dst) do { \
648 rd_kafkap_str_t _kstr; \
649 size_t _slen; \
650 char *_dst; \
651 rd_kafka_buf_read_str(rkbuf, &_kstr); \
652 _slen = RD_KAFKAP_STR_LEN(&_kstr); \
653 if (!(_dst = \
654 rd_tmpabuf_write(tmpabuf, _kstr.str, _slen+1))) \
655 rd_kafka_buf_parse_fail( \
656 rkbuf, \
657 "Not enough room in tmpabuf: " \
658 "%"PRIusz"+%"PRIusz \
659 " > %"PRIusz, \
660 (tmpabuf)->of, _slen+1, (tmpabuf)->size); \
661 _dst[_slen] = '\0'; \
662 dst = (void *)_dst; \
663 } while (0)
664
665 /**
666 * Skip a string.
667 */
668 #define rd_kafka_buf_skip_str(rkbuf) do { \
669 int16_t _slen; \
670 rd_kafka_buf_read_i16(rkbuf, &_slen); \
671 rd_kafka_buf_skip(rkbuf, RD_KAFKAP_STR_LEN0(_slen)); \
672 } while (0)
673
674 /* Read Kafka Bytes representation (4+N).
675 * The 'kbytes' will be updated to point to rkbuf data */
676 #define rd_kafka_buf_read_bytes(rkbuf, kbytes) do { \
677 int _klen; \
678 rd_kafka_buf_read_i32a(rkbuf, _klen); \
679 (kbytes)->len = _klen; \
680 if (RD_KAFKAP_BYTES_IS_NULL(kbytes)) { \
681 (kbytes)->data = NULL; \
682 (kbytes)->len = 0; \
683 } else if (RD_KAFKAP_BYTES_LEN(kbytes) == 0) \
684 (kbytes)->data = ""; \
685 else if (!((kbytes)->data = \
686 rd_slice_ensure_contig(&(rkbuf)->rkbuf_reader, \
687 _klen))) \
688 rd_kafka_buf_check_len(rkbuf, _klen); \
689 } while (0)
690
691
692 /**
693 * @brief Read \p size bytes from buffer, setting \p *ptr to the start
694 * of the memory region.
695 */
696 #define rd_kafka_buf_read_ptr(rkbuf,ptr,size) do { \
697 size_t _klen = size; \
698 if (!(*(ptr) = (void *) \
699 rd_slice_ensure_contig(&(rkbuf)->rkbuf_reader, _klen))) \
700 rd_kafka_buf_check_len(rkbuf, _klen); \
701 } while (0)
702
703
704 /**
705 * @brief Read varint-lengted Kafka Bytes representation
706 */
707 #define rd_kafka_buf_read_bytes_varint(rkbuf,kbytes) do { \
708 int64_t _len2; \
709 size_t _r = rd_slice_read_varint(&(rkbuf)->rkbuf_reader, \
710 &_len2); \
711 if (unlikely(RD_UVARINT_UNDERFLOW(_r))) \
712 rd_kafka_buf_underflow_fail(rkbuf, (size_t)0, \
713 "varint parsing failed"); \
714 (kbytes)->len = (int32_t)_len2; \
715 if (RD_KAFKAP_BYTES_IS_NULL(kbytes)) { \
716 (kbytes)->data = NULL; \
717 (kbytes)->len = 0; \
718 } else if (RD_KAFKAP_BYTES_LEN(kbytes) == 0) \
719 (kbytes)->data = ""; \
720 else if (!((kbytes)->data = \
721 rd_slice_ensure_contig(&(rkbuf)->rkbuf_reader, \
722 (size_t)_len2))) \
723 rd_kafka_buf_check_len(rkbuf, _len2); \
724 } while (0)
725
726
727 /**
728 * @brief Read throttle_time_ms (i32) from response and pass the value
729 * to the throttle handling code.
730 */
731 #define rd_kafka_buf_read_throttle_time(rkbuf) do { \
732 int32_t _throttle_time_ms; \
733 rd_kafka_buf_read_i32(rkbuf, &_throttle_time_ms); \
734 rd_kafka_op_throttle_time((rkbuf)->rkbuf_rkb, \
735 (rkbuf)->rkbuf_rkb->rkb_rk->rk_rep, \
736 _throttle_time_ms); \
737 } while (0)
738
739
740 /**
741 * @brief Discard all KIP-482 Tags at the current position in the buffer.
742 */
743 #define rd_kafka_buf_skip_tags(rkbuf) do { \
744 uint64_t _tagcnt; \
745 if (!((rkbuf)->rkbuf_flags & RD_KAFKA_OP_F_FLEXVER)) \
746 break; \
747 rd_kafka_buf_read_uvarint(rkbuf, &_tagcnt); \
748 while (_tagcnt-- > 0) { \
749 uint64_t _tagtype, _taglen; \
750 rd_kafka_buf_read_uvarint(rkbuf, &_tagtype); \
751 rd_kafka_buf_read_uvarint(rkbuf, &_taglen); \
752 if (_taglen > 1) \
753 rd_kafka_buf_skip(rkbuf, (size_t)(_taglen - 1)); \
754 } \
755 } while (0)
756
757 /**
758 * @brief Write tags at the current position in the buffer.
759 * @remark Currently always writes empty tags.
760 * @remark Change to ..write_uvarint() when actual tags are supported.
761 */
762 #define rd_kafka_buf_write_tags(rkbuf) do { \
763 if (!((rkbuf)->rkbuf_flags & RD_KAFKA_OP_F_FLEXVER)) \
764 break; \
765 rd_kafka_buf_write_i8(rkbuf, 0); \
766 } while (0)
767
768
769 /**
770 * @brief Reads an ARRAY or COMPACT_ARRAY count depending on buffer type.
771 */
772 #define rd_kafka_buf_read_arraycnt(rkbuf,arrcnt,maxval) do { \
773 if ((rkbuf)->rkbuf_flags & RD_KAFKA_OP_F_FLEXVER) { \
774 uint64_t _uva; \
775 rd_kafka_buf_read_uvarint(rkbuf, &_uva); \
776 *(arrcnt) = (int32_t)_uva - 1; \
777 } else { \
778 rd_kafka_buf_read_i32(rkbuf, arrcnt); \
779 } \
780 if (*(arrcnt) < 0 || ((maxval) != -1 && *(arrcnt) > (maxval))) \
781 rd_kafka_buf_parse_fail(rkbuf, \
782 "ApiArrayCnt %"PRId32" out of range", \
783 *(arrcnt)); \
784 } while (0)
785
786
787
788
789 /**
790 * @returns true if buffer has been sent on wire, else 0.
791 */
792 #define rd_kafka_buf_was_sent(rkbuf) \
793 ((rkbuf)->rkbuf_flags & RD_KAFKA_OP_F_SENT)
794
795 typedef struct rd_kafka_bufq_s {
796 TAILQ_HEAD(, rd_kafka_buf_s) rkbq_bufs;
797 rd_atomic32_t rkbq_cnt;
798 rd_atomic32_t rkbq_msg_cnt;
799 } rd_kafka_bufq_t;
800
801 #define rd_kafka_bufq_cnt(rkbq) rd_atomic32_get(&(rkbq)->rkbq_cnt)
802
803 /**
804 * @brief Set buffer's request timeout to relative \p timeout_ms measured
805 * from the time the buffer is sent on the underlying socket.
806 *
807 * @param now Reuse current time from existing rd_clock() var, else 0.
808 *
809 * The relative timeout value is reused upon request retry.
810 */
811 static RD_INLINE void
rd_kafka_buf_set_timeout(rd_kafka_buf_t * rkbuf,int timeout_ms,rd_ts_t now)812 rd_kafka_buf_set_timeout (rd_kafka_buf_t *rkbuf, int timeout_ms, rd_ts_t now) {
813 if (!now)
814 now = rd_clock();
815 rkbuf->rkbuf_rel_timeout = timeout_ms;
816 rkbuf->rkbuf_abs_timeout = 0;
817 }
818
819
820 /**
821 * @brief Calculate the effective timeout for a request attempt
822 */
823 void rd_kafka_buf_calc_timeout (const rd_kafka_t *rk, rd_kafka_buf_t *rkbuf,
824 rd_ts_t now);
825
826
827 /**
828 * @brief Set buffer's request timeout to relative \p timeout_ms measured
829 * from \p now.
830 *
831 * @param now Reuse current time from existing rd_clock() var, else 0.
832 * @param force If true: force request timeout to be same as remaining
833 * abs timeout, regardless of socket.timeout.ms.
834 * If false: cap each request timeout to socket.timeout.ms.
835 *
836 * The remaining time is used as timeout for request retries.
837 */
838 static RD_INLINE void
rd_kafka_buf_set_abs_timeout0(rd_kafka_buf_t * rkbuf,int timeout_ms,rd_ts_t now,rd_bool_t force)839 rd_kafka_buf_set_abs_timeout0 (rd_kafka_buf_t *rkbuf, int timeout_ms,
840 rd_ts_t now, rd_bool_t force) {
841 if (!now)
842 now = rd_clock();
843 rkbuf->rkbuf_rel_timeout = 0;
844 rkbuf->rkbuf_abs_timeout = now + ((rd_ts_t)timeout_ms * 1000);
845 rkbuf->rkbuf_force_timeout = force;
846 }
847
848 #define rd_kafka_buf_set_abs_timeout(rkbuf,timeout_ms,now) \
849 rd_kafka_buf_set_abs_timeout0(rkbuf,timeout_ms,now,rd_false)
850
851
852 #define rd_kafka_buf_set_abs_timeout_force(rkbuf,timeout_ms,now) \
853 rd_kafka_buf_set_abs_timeout0(rkbuf,timeout_ms,now,rd_true)
854
855
856 #define rd_kafka_buf_keep(rkbuf) rd_refcnt_add(&(rkbuf)->rkbuf_refcnt)
857 #define rd_kafka_buf_destroy(rkbuf) \
858 rd_refcnt_destroywrapper(&(rkbuf)->rkbuf_refcnt, \
859 rd_kafka_buf_destroy_final(rkbuf))
860
861 void rd_kafka_buf_destroy_final (rd_kafka_buf_t *rkbuf);
862 void rd_kafka_buf_push0 (rd_kafka_buf_t *rkbuf, const void *buf, size_t len,
863 int allow_crc_calc, void (*free_cb) (void *));
864 #define rd_kafka_buf_push(rkbuf,buf,len,free_cb) \
865 rd_kafka_buf_push0(rkbuf,buf,len,1/*allow_crc*/,free_cb)
866 rd_kafka_buf_t *rd_kafka_buf_new0 (int segcnt, size_t size, int flags);
867 #define rd_kafka_buf_new(segcnt,size) \
868 rd_kafka_buf_new0(segcnt,size,0)
869 rd_kafka_buf_t *rd_kafka_buf_new_request0 (rd_kafka_broker_t *rkb,
870 int16_t ApiKey,
871 int segcnt, size_t size,
872 rd_bool_t is_flexver);
873 #define rd_kafka_buf_new_request(rkb,ApiKey,segcnt,size) \
874 rd_kafka_buf_new_request0(rkb,ApiKey,segcnt,size,rd_false) \
875
876 #define rd_kafka_buf_new_flexver_request(rkb,ApiKey,segcnt,size,is_flexver) \
877 rd_kafka_buf_new_request0(rkb,ApiKey,segcnt,size,is_flexver) \
878
879 rd_kafka_buf_t *rd_kafka_buf_new_shadow (const void *ptr, size_t size,
880 void (*free_cb) (void *));
881 void rd_kafka_bufq_enq (rd_kafka_bufq_t *rkbufq, rd_kafka_buf_t *rkbuf);
882 void rd_kafka_bufq_deq (rd_kafka_bufq_t *rkbufq, rd_kafka_buf_t *rkbuf);
883 void rd_kafka_bufq_init(rd_kafka_bufq_t *rkbufq);
884 void rd_kafka_bufq_concat (rd_kafka_bufq_t *dst, rd_kafka_bufq_t *src);
885 void rd_kafka_bufq_purge (rd_kafka_broker_t *rkb,
886 rd_kafka_bufq_t *rkbufq,
887 rd_kafka_resp_err_t err);
888 void rd_kafka_bufq_connection_reset (rd_kafka_broker_t *rkb,
889 rd_kafka_bufq_t *rkbufq);
890 void rd_kafka_bufq_dump (rd_kafka_broker_t *rkb, const char *fac,
891 rd_kafka_bufq_t *rkbq);
892
893 int rd_kafka_buf_retry (rd_kafka_broker_t *rkb, rd_kafka_buf_t *rkbuf);
894
895 void rd_kafka_buf_handle_op (rd_kafka_op_t *rko, rd_kafka_resp_err_t err);
896 void rd_kafka_buf_callback (rd_kafka_t *rk,
897 rd_kafka_broker_t *rkb, rd_kafka_resp_err_t err,
898 rd_kafka_buf_t *response, rd_kafka_buf_t *request);
899
900
901
902 /**
903 *
904 * Write buffer interface
905 *
906 */
907
908 /**
909 * Set request API type version
910 */
911 static RD_UNUSED RD_INLINE void
rd_kafka_buf_ApiVersion_set(rd_kafka_buf_t * rkbuf,int16_t version,int features)912 rd_kafka_buf_ApiVersion_set (rd_kafka_buf_t *rkbuf,
913 int16_t version, int features) {
914 rkbuf->rkbuf_reqhdr.ApiVersion = version;
915 rkbuf->rkbuf_features = features;
916 }
917
918
919 /**
920 * @returns the ApiVersion for a request
921 */
922 #define rd_kafka_buf_ApiVersion(rkbuf) ((rkbuf)->rkbuf_reqhdr.ApiVersion)
923
924
925
926 /**
927 * Write (copy) data to buffer at current write-buffer position.
928 * There must be enough space allocated in the rkbuf.
929 * Returns offset to written destination buffer.
930 */
rd_kafka_buf_write(rd_kafka_buf_t * rkbuf,const void * data,size_t len)931 static RD_INLINE size_t rd_kafka_buf_write (rd_kafka_buf_t *rkbuf,
932 const void *data, size_t len) {
933 size_t r;
934
935 r = rd_buf_write(&rkbuf->rkbuf_buf, data, len);
936
937 if (rkbuf->rkbuf_flags & RD_KAFKA_OP_F_CRC)
938 rkbuf->rkbuf_crc = rd_crc32_update(rkbuf->rkbuf_crc, data, len);
939
940 return r;
941 }
942
943
944
945 /**
946 * Write (copy) 'data' to buffer at 'ptr'.
947 * There must be enough space to fit 'len'.
948 * This will overwrite the buffer at given location and length.
949 *
950 * NOTE: rd_kafka_buf_update() MUST NOT be called when a CRC calculation
951 * is in progress (between rd_kafka_buf_crc_init() & .._crc_finalize())
952 */
rd_kafka_buf_update(rd_kafka_buf_t * rkbuf,size_t of,const void * data,size_t len)953 static RD_INLINE void rd_kafka_buf_update (rd_kafka_buf_t *rkbuf, size_t of,
954 const void *data, size_t len) {
955 rd_kafka_assert(NULL, !(rkbuf->rkbuf_flags & RD_KAFKA_OP_F_CRC));
956 rd_buf_write_update(&rkbuf->rkbuf_buf, of, data, len);
957 }
958
959 /**
960 * Write int8_t to buffer.
961 */
rd_kafka_buf_write_i8(rd_kafka_buf_t * rkbuf,int8_t v)962 static RD_INLINE size_t rd_kafka_buf_write_i8 (rd_kafka_buf_t *rkbuf,
963 int8_t v) {
964 return rd_kafka_buf_write(rkbuf, &v, sizeof(v));
965 }
966
967 /**
968 * Update int8_t in buffer at offset 'of'.
969 * 'of' should have been previously returned by `.._buf_write_i8()`.
970 */
rd_kafka_buf_update_i8(rd_kafka_buf_t * rkbuf,size_t of,int8_t v)971 static RD_INLINE void rd_kafka_buf_update_i8 (rd_kafka_buf_t *rkbuf,
972 size_t of, int8_t v) {
973 rd_kafka_buf_update(rkbuf, of, &v, sizeof(v));
974 }
975
976 /**
977 * Write int16_t to buffer.
978 * The value will be endian-swapped before write.
979 */
rd_kafka_buf_write_i16(rd_kafka_buf_t * rkbuf,int16_t v)980 static RD_INLINE size_t rd_kafka_buf_write_i16 (rd_kafka_buf_t *rkbuf,
981 int16_t v) {
982 v = htobe16(v);
983 return rd_kafka_buf_write(rkbuf, &v, sizeof(v));
984 }
985
986 /**
987 * Update int16_t in buffer at offset 'of'.
988 * 'of' should have been previously returned by `.._buf_write_i16()`.
989 */
rd_kafka_buf_update_i16(rd_kafka_buf_t * rkbuf,size_t of,int16_t v)990 static RD_INLINE void rd_kafka_buf_update_i16 (rd_kafka_buf_t *rkbuf,
991 size_t of, int16_t v) {
992 v = htobe16(v);
993 rd_kafka_buf_update(rkbuf, of, &v, sizeof(v));
994 }
995
996 /**
997 * Write int32_t to buffer.
998 * The value will be endian-swapped before write.
999 */
rd_kafka_buf_write_i32(rd_kafka_buf_t * rkbuf,int32_t v)1000 static RD_INLINE size_t rd_kafka_buf_write_i32 (rd_kafka_buf_t *rkbuf,
1001 int32_t v) {
1002 v = (int32_t)htobe32(v);
1003 return rd_kafka_buf_write(rkbuf, &v, sizeof(v));
1004 }
1005
1006 /**
1007 * Update int32_t in buffer at offset 'of'.
1008 * 'of' should have been previously returned by `.._buf_write_i32()`.
1009 */
rd_kafka_buf_update_i32(rd_kafka_buf_t * rkbuf,size_t of,int32_t v)1010 static RD_INLINE void rd_kafka_buf_update_i32 (rd_kafka_buf_t *rkbuf,
1011 size_t of, int32_t v) {
1012 v = htobe32(v);
1013 rd_kafka_buf_update(rkbuf, of, &v, sizeof(v));
1014 }
1015
1016 /**
1017 * Update int32_t in buffer at offset 'of'.
1018 * 'of' should have been previously returned by `.._buf_write_i32()`.
1019 */
rd_kafka_buf_update_u32(rd_kafka_buf_t * rkbuf,size_t of,uint32_t v)1020 static RD_INLINE void rd_kafka_buf_update_u32 (rd_kafka_buf_t *rkbuf,
1021 size_t of, uint32_t v) {
1022 v = htobe32(v);
1023 rd_kafka_buf_update(rkbuf, of, &v, sizeof(v));
1024 }
1025
1026
1027 /**
1028 * @brief Write array count field to buffer (i32) for later update with
1029 * rd_kafka_buf_update_arraycnt().
1030 */
1031 #define rd_kafka_buf_write_arraycnt_pos(rkbuf) rd_kafka_buf_write_i32(rkbuf, 0)
1032
1033
1034 /**
1035 * @brief Write the final array count to the position returned from
1036 * rd_kafka_buf_write_arraycnt_pos().
1037 *
1038 * Update int32_t in buffer at offset 'of' but serialize it as
1039 * compact uvarint (that must not exceed 4 bytes storage)
1040 * if the \p rkbuf is marked as FLEXVER, else just update it as
1041 * as a standard update_i32().
1042 *
1043 * @remark For flexibleVersions this will shrink the buffer and move data
1044 * and may thus be costly.
1045 */
rd_kafka_buf_finalize_arraycnt(rd_kafka_buf_t * rkbuf,size_t of,int cnt)1046 static RD_INLINE void rd_kafka_buf_finalize_arraycnt (rd_kafka_buf_t *rkbuf,
1047 size_t of, int cnt) {
1048 char buf[sizeof(int32_t)];
1049 size_t sz, r;
1050
1051 rd_assert(cnt >= 0);
1052
1053 if (!(rkbuf->rkbuf_flags & RD_KAFKA_OP_F_FLEXVER)) {
1054 rd_kafka_buf_update_i32(rkbuf, of, (int32_t)cnt);
1055 return;
1056 }
1057
1058 /* CompactArray has a base of 1, 0 is for Null arrays */
1059 cnt += 1;
1060
1061 sz = rd_uvarint_enc_u64(buf, sizeof(buf), (uint64_t)cnt);
1062 rd_assert(!RD_UVARINT_OVERFLOW(sz));
1063
1064 rd_buf_write_update(&rkbuf->rkbuf_buf, of, buf, sz);
1065
1066 if (sz < sizeof(int32_t)) {
1067 /* Varint occupies less space than the allotted 4 bytes, erase
1068 * the remaining bytes. */
1069 r = rd_buf_erase(&rkbuf->rkbuf_buf, of+sz, sizeof(int32_t)-sz);
1070 rd_assert(r == sizeof(int32_t) - sz);
1071 }
1072 }
1073
1074
1075 /**
1076 * Write int64_t to buffer.
1077 * The value will be endian-swapped before write.
1078 */
rd_kafka_buf_write_i64(rd_kafka_buf_t * rkbuf,int64_t v)1079 static RD_INLINE size_t rd_kafka_buf_write_i64 (rd_kafka_buf_t *rkbuf,
1080 int64_t v) {
1081 v = htobe64(v);
1082 return rd_kafka_buf_write(rkbuf, &v, sizeof(v));
1083 }
1084
1085 /**
1086 * Update int64_t in buffer at address 'ptr'.
1087 * 'of' should have been previously returned by `.._buf_write_i64()`.
1088 */
rd_kafka_buf_update_i64(rd_kafka_buf_t * rkbuf,size_t of,int64_t v)1089 static RD_INLINE void rd_kafka_buf_update_i64 (rd_kafka_buf_t *rkbuf,
1090 size_t of, int64_t v) {
1091 v = htobe64(v);
1092 rd_kafka_buf_update(rkbuf, of, &v, sizeof(v));
1093 }
1094
1095
1096 /**
1097 * @brief Write varint-encoded signed value to buffer.
1098 */
1099 static RD_INLINE size_t
rd_kafka_buf_write_varint(rd_kafka_buf_t * rkbuf,int64_t v)1100 rd_kafka_buf_write_varint (rd_kafka_buf_t *rkbuf, int64_t v) {
1101 char varint[RD_UVARINT_ENC_SIZEOF(v)];
1102 size_t sz;
1103
1104 sz = rd_uvarint_enc_i64(varint, sizeof(varint), v);
1105
1106 return rd_kafka_buf_write(rkbuf, varint, sz);
1107 }
1108
1109 /**
1110 * @brief Write varint-encoded unsigned value to buffer.
1111 */
1112 static RD_INLINE size_t
rd_kafka_buf_write_uvarint(rd_kafka_buf_t * rkbuf,uint64_t v)1113 rd_kafka_buf_write_uvarint (rd_kafka_buf_t *rkbuf, uint64_t v) {
1114 char varint[RD_UVARINT_ENC_SIZEOF(v)];
1115 size_t sz;
1116
1117 sz = rd_uvarint_enc_u64(varint, sizeof(varint), v);
1118
1119 return rd_kafka_buf_write(rkbuf, varint, sz);
1120 }
1121
1122
1123 /**
1124 * @brief Write standard (2-byte header) or KIP-482 COMPACT_STRING to buffer.
1125 *
1126 * @remark Copies the string.
1127 *
1128 * @returns the offset in \p rkbuf where the string was written.
1129 */
rd_kafka_buf_write_kstr(rd_kafka_buf_t * rkbuf,const rd_kafkap_str_t * kstr)1130 static RD_INLINE size_t rd_kafka_buf_write_kstr (rd_kafka_buf_t *rkbuf,
1131 const rd_kafkap_str_t *kstr) {
1132 size_t len, r;
1133
1134 if (!(rkbuf->rkbuf_flags & RD_KAFKA_OP_F_FLEXVER)) {
1135 /* Standard string */
1136 if (!kstr || RD_KAFKAP_STR_IS_NULL(kstr))
1137 return rd_kafka_buf_write_i16(rkbuf, -1);
1138
1139 if (RD_KAFKAP_STR_IS_SERIALIZED(kstr))
1140 return rd_kafka_buf_write(rkbuf,
1141 RD_KAFKAP_STR_SER(kstr),
1142 RD_KAFKAP_STR_SIZE(kstr));
1143
1144 len = RD_KAFKAP_STR_LEN(kstr);
1145 r = rd_kafka_buf_write_i16(rkbuf, (int16_t)len);
1146 rd_kafka_buf_write(rkbuf, kstr->str, len);
1147
1148 return r;
1149 }
1150
1151 /* COMPACT_STRING lengths are:
1152 * 0 = NULL,
1153 * 1 = empty
1154 * N.. = length + 1
1155 */
1156 if (!kstr || RD_KAFKAP_STR_IS_NULL(kstr))
1157 len = 0;
1158 else
1159 len = RD_KAFKAP_STR_LEN(kstr) + 1;
1160
1161 r = rd_kafka_buf_write_uvarint(rkbuf, (uint64_t)len);
1162 if (len > 1)
1163 rd_kafka_buf_write(rkbuf, kstr->str, len-1);
1164 return r;
1165 }
1166
1167
1168
1169 /**
1170 * @brief Write standard (2-byte header) or KIP-482 COMPACT_STRING to buffer.
1171 *
1172 * @remark Copies the string.
1173 */
1174 static RD_INLINE size_t
rd_kafka_buf_write_str(rd_kafka_buf_t * rkbuf,const char * str,size_t len)1175 rd_kafka_buf_write_str (rd_kafka_buf_t *rkbuf,
1176 const char *str, size_t len) {
1177 size_t r;
1178
1179 if (!(rkbuf->rkbuf_flags & RD_KAFKA_OP_F_FLEXVER)) {
1180 /* Standard string */
1181 if (!str)
1182 len = RD_KAFKAP_STR_LEN_NULL;
1183 else if (len == (size_t)-1)
1184 len = strlen(str);
1185 r = rd_kafka_buf_write_i16(rkbuf, (int16_t) len);
1186 if (str)
1187 rd_kafka_buf_write(rkbuf, str, len);
1188 return r;
1189 }
1190
1191 /* COMPACT_STRING lengths are:
1192 * 0 = NULL,
1193 * 1 = empty
1194 * N.. = length + 1
1195 */
1196 if (!str)
1197 len = 0;
1198 else if (len == (size_t)-1)
1199 len = strlen(str) + 1;
1200 else
1201 len++;
1202
1203 r = rd_kafka_buf_write_uvarint(rkbuf, (uint64_t)len);
1204 if (len > 1)
1205 rd_kafka_buf_write(rkbuf, str, len-1);
1206 return r;
1207 }
1208
1209
1210
1211 /**
1212 * Push (i.e., no copy) Kafka string to buffer iovec
1213 */
rd_kafka_buf_push_kstr(rd_kafka_buf_t * rkbuf,const rd_kafkap_str_t * kstr)1214 static RD_INLINE void rd_kafka_buf_push_kstr (rd_kafka_buf_t *rkbuf,
1215 const rd_kafkap_str_t *kstr) {
1216 rd_kafka_buf_push(rkbuf, RD_KAFKAP_STR_SER(kstr),
1217 RD_KAFKAP_STR_SIZE(kstr), NULL);
1218 }
1219
1220
1221
1222 /**
1223 * Write (copy) Kafka bytes to buffer.
1224 */
1225 static RD_INLINE size_t
rd_kafka_buf_write_kbytes(rd_kafka_buf_t * rkbuf,const rd_kafkap_bytes_t * kbytes)1226 rd_kafka_buf_write_kbytes (rd_kafka_buf_t *rkbuf,
1227 const rd_kafkap_bytes_t *kbytes) {
1228 size_t len;
1229
1230 if (!kbytes || RD_KAFKAP_BYTES_IS_NULL(kbytes))
1231 return rd_kafka_buf_write_i32(rkbuf, -1);
1232
1233 if (RD_KAFKAP_BYTES_IS_SERIALIZED(kbytes))
1234 return rd_kafka_buf_write(rkbuf, RD_KAFKAP_BYTES_SER(kbytes),
1235 RD_KAFKAP_BYTES_SIZE(kbytes));
1236
1237 len = RD_KAFKAP_BYTES_LEN(kbytes);
1238 rd_kafka_buf_write_i32(rkbuf, (int32_t)len);
1239 rd_kafka_buf_write(rkbuf, kbytes->data, len);
1240
1241 return 4 + len;
1242 }
1243
1244 /**
1245 * Push (i.e., no copy) Kafka bytes to buffer iovec
1246 */
rd_kafka_buf_push_kbytes(rd_kafka_buf_t * rkbuf,const rd_kafkap_bytes_t * kbytes)1247 static RD_INLINE void rd_kafka_buf_push_kbytes (rd_kafka_buf_t *rkbuf,
1248 const rd_kafkap_bytes_t *kbytes){
1249 rd_kafka_buf_push(rkbuf, RD_KAFKAP_BYTES_SER(kbytes),
1250 RD_KAFKAP_BYTES_SIZE(kbytes), NULL);
1251 }
1252
1253 /**
1254 * Write (copy) binary bytes to buffer as Kafka bytes encapsulate data.
1255 */
rd_kafka_buf_write_bytes(rd_kafka_buf_t * rkbuf,const void * payload,size_t size)1256 static RD_INLINE size_t rd_kafka_buf_write_bytes (rd_kafka_buf_t *rkbuf,
1257 const void *payload, size_t size) {
1258 size_t r;
1259 if (!payload)
1260 size = RD_KAFKAP_BYTES_LEN_NULL;
1261 r = rd_kafka_buf_write_i32(rkbuf, (int32_t) size);
1262 if (payload)
1263 rd_kafka_buf_write(rkbuf, payload, size);
1264 return r;
1265 }
1266
1267
1268 /**
1269 * @brief Write bool to buffer.
1270 */
rd_kafka_buf_write_bool(rd_kafka_buf_t * rkbuf,rd_bool_t v)1271 static RD_INLINE size_t rd_kafka_buf_write_bool (rd_kafka_buf_t *rkbuf,
1272 rd_bool_t v) {
1273 return rd_kafka_buf_write_i8(rkbuf, (int8_t)v);
1274 }
1275
1276
1277 /**
1278 * Write Kafka Message to buffer
1279 * The number of bytes written is returned in '*outlenp'.
1280 *
1281 * Returns the buffer offset of the first byte.
1282 */
1283 size_t rd_kafka_buf_write_Message (rd_kafka_broker_t *rkb,
1284 rd_kafka_buf_t *rkbuf,
1285 int64_t Offset, int8_t MagicByte,
1286 int8_t Attributes, int64_t Timestamp,
1287 const void *key, int32_t key_len,
1288 const void *payload, int32_t len,
1289 int *outlenp);
1290
1291 /**
1292 * Start calculating CRC from now and track it in '*crcp'.
1293 */
rd_kafka_buf_crc_init(rd_kafka_buf_t * rkbuf)1294 static RD_INLINE RD_UNUSED void rd_kafka_buf_crc_init (rd_kafka_buf_t *rkbuf) {
1295 rd_kafka_assert(NULL, !(rkbuf->rkbuf_flags & RD_KAFKA_OP_F_CRC));
1296 rkbuf->rkbuf_flags |= RD_KAFKA_OP_F_CRC;
1297 rkbuf->rkbuf_crc = rd_crc32_init();
1298 }
1299
1300 /**
1301 * Finalizes CRC calculation and returns the calculated checksum.
1302 */
1303 static RD_INLINE RD_UNUSED
rd_kafka_buf_crc_finalize(rd_kafka_buf_t * rkbuf)1304 rd_crc32_t rd_kafka_buf_crc_finalize (rd_kafka_buf_t *rkbuf) {
1305 rkbuf->rkbuf_flags &= ~RD_KAFKA_OP_F_CRC;
1306 return rd_crc32_finalize(rkbuf->rkbuf_crc);
1307 }
1308
1309
1310
1311
1312
1313 /**
1314 * @brief Check if buffer's replyq.version is outdated.
1315 * @param rkbuf: may be NULL, for convenience.
1316 *
1317 * @returns 1 if this is an outdated buffer, else 0.
1318 */
1319 static RD_UNUSED RD_INLINE int
rd_kafka_buf_version_outdated(const rd_kafka_buf_t * rkbuf,int version)1320 rd_kafka_buf_version_outdated (const rd_kafka_buf_t *rkbuf, int version) {
1321 return rkbuf && rkbuf->rkbuf_replyq.version &&
1322 rkbuf->rkbuf_replyq.version < version;
1323 }
1324
1325
1326 void rd_kafka_buf_set_maker (rd_kafka_buf_t *rkbuf,
1327 rd_kafka_make_req_cb_t *make_cb,
1328 void *make_opaque,
1329 void (*free_make_opaque_cb) (void *make_opaque));
1330
1331 #endif /* _RDKAFKA_BUF_H_ */
1332