1 /*
2  * librdkafka - Apache Kafka C library
3  *
4  * Copyright (c) 2012-2015, Magnus Edenhill
5  * All rights reserved.
6  *
7  * Redistribution and use in source and binary forms, with or without
8  * modification, are permitted provided that the following conditions are met:
9  *
10  * 1. Redistributions of source code must retain the above copyright notice,
11  *    this list of conditions and the following disclaimer.
12  * 2. Redistributions in binary form must reproduce the above copyright notice,
13  *    this list of conditions and the following disclaimer in the documentation
14  *    and/or other materials provided with the distribution.
15  *
16  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
17  * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
18  * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
19  * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
20  * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
21  * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
22  * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
23  * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
24  * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
25  * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
26  * POSSIBILITY OF SUCH DAMAGE.
27  */
28 #ifndef _RDKAFKA_BUF_H_
29 #define _RDKAFKA_BUF_H_
30 
31 #include "rdkafka_int.h"
32 #include "rdcrc32.h"
33 #include "rdlist.h"
34 #include "rdbuf.h"
35 #include "rdkafka_msgbatch.h"
36 
37 typedef struct rd_kafka_broker_s rd_kafka_broker_t;
38 
39 #define RD_KAFKA_HEADERS_IOV_CNT   2
40 
41 
42 /**
43  * Temporary buffer with memory aligned writes to accommodate
44  * effective and platform safe struct writes.
45  */
46 typedef struct rd_tmpabuf_s {
47 	size_t size;
48 	size_t of;
49 	char  *buf;
50 	int    failed;
51 	int    assert_on_fail;
52 } rd_tmpabuf_t;
53 
54 /**
55  * @brief Allocate new tmpabuf with \p size bytes pre-allocated.
56  */
57 static RD_UNUSED void
rd_tmpabuf_new(rd_tmpabuf_t * tab,size_t size,int assert_on_fail)58 rd_tmpabuf_new (rd_tmpabuf_t *tab, size_t size, int assert_on_fail) {
59 	tab->buf = rd_malloc(size);
60 	tab->size = size;
61 	tab->of = 0;
62 	tab->failed = 0;
63 	tab->assert_on_fail = assert_on_fail;
64 }
65 
66 /**
67  * @brief Free memory allocated by tmpabuf
68  */
69 static RD_UNUSED void
rd_tmpabuf_destroy(rd_tmpabuf_t * tab)70 rd_tmpabuf_destroy (rd_tmpabuf_t *tab) {
71 	rd_free(tab->buf);
72 }
73 
74 /**
75  * @returns 1 if a previous operation failed.
76  */
77 static RD_UNUSED RD_INLINE int
rd_tmpabuf_failed(rd_tmpabuf_t * tab)78 rd_tmpabuf_failed (rd_tmpabuf_t *tab) {
79 	return tab->failed;
80 }
81 
82 /**
83  * @brief Allocate \p size bytes for writing, returning an aligned pointer
84  *        to the memory.
85  * @returns the allocated pointer (within the tmpabuf) on success or
86  *          NULL if the requested number of bytes + alignment is not available
87  *          in the tmpabuf.
88  */
89 static RD_UNUSED void *
rd_tmpabuf_alloc0(const char * func,int line,rd_tmpabuf_t * tab,size_t size)90 rd_tmpabuf_alloc0 (const char *func, int line, rd_tmpabuf_t *tab, size_t size) {
91 	void *ptr;
92 
93 	if (unlikely(tab->failed))
94 		return NULL;
95 
96 	if (unlikely(tab->of + size > tab->size)) {
97 		if (tab->assert_on_fail) {
98 			fprintf(stderr,
99 				"%s: %s:%d: requested size %"PRIusz" + %"PRIusz" > %"PRIusz"\n",
100 				__FUNCTION__, func, line, tab->of, size,
101 				tab->size);
102 			assert(!*"rd_tmpabuf_alloc: not enough size in buffer");
103 		}
104 		return NULL;
105 	}
106 
107         ptr = (void *)(tab->buf + tab->of);
108 	tab->of += RD_ROUNDUP(size, 8);
109 
110 	return ptr;
111 }
112 
113 #define rd_tmpabuf_alloc(tab,size) \
114 	rd_tmpabuf_alloc0(__FUNCTION__,__LINE__,tab,size)
115 
116 /**
117  * @brief Write \p buf of \p size bytes to tmpabuf memory in an aligned fashion.
118  *
119  * @returns the allocated and written-to pointer (within the tmpabuf) on success
120  *          or NULL if the requested number of bytes + alignment is not available
121  *          in the tmpabuf.
122  */
123 static RD_UNUSED void *
rd_tmpabuf_write0(const char * func,int line,rd_tmpabuf_t * tab,const void * buf,size_t size)124 rd_tmpabuf_write0 (const char *func, int line,
125 		   rd_tmpabuf_t *tab, const void *buf, size_t size) {
126 	void *ptr = rd_tmpabuf_alloc0(func, line, tab, size);
127 
128         if (likely(ptr && size))
129                 memcpy(ptr, buf, size);
130 
131 	return ptr;
132 }
133 #define rd_tmpabuf_write(tab,buf,size) \
134 	rd_tmpabuf_write0(__FUNCTION__, __LINE__, tab, buf, size)
135 
136 
137 /**
138  * @brief Wrapper for rd_tmpabuf_write() that takes a nul-terminated string.
139  */
140 static RD_UNUSED char *
rd_tmpabuf_write_str0(const char * func,int line,rd_tmpabuf_t * tab,const char * str)141 rd_tmpabuf_write_str0 (const char *func, int line,
142 		       rd_tmpabuf_t *tab, const char *str) {
143 	return rd_tmpabuf_write0(func, line, tab, str, strlen(str)+1);
144 }
145 #define rd_tmpabuf_write_str(tab,str) \
146 	rd_tmpabuf_write_str0(__FUNCTION__, __LINE__, tab, str)
147 
148 
149 
150 
151 
152 /**
153  * Response handling callback.
154  *
155  * NOTE: Callbacks must check for 'err == RD_KAFKA_RESP_ERR__DESTROY'
156  *       which indicates that some entity is terminating (rd_kafka_t, broker,
157  *       toppar, queue, etc) and the callback may not be called in the
158  *       correct thread. In this case the callback must perform just
159  *       the most minimal cleanup and dont trigger any other operations.
160  *
161  * NOTE: rkb, reply and request may be NULL, depending on error situation.
162  */
163 typedef void (rd_kafka_resp_cb_t) (rd_kafka_t *rk,
164 				   rd_kafka_broker_t *rkb,
165                                    rd_kafka_resp_err_t err,
166                                    rd_kafka_buf_t *reply,
167                                    rd_kafka_buf_t *request,
168                                    void *opaque);
169 
170 
171 /**
172  * @brief Sender callback. This callback is used to construct and send (enq)
173  *        a rkbuf on a particular broker.
174  */
175 typedef rd_kafka_resp_err_t (rd_kafka_send_req_cb_t) (
176         rd_kafka_broker_t *rkb,
177         rd_kafka_op_t *rko,
178         rd_kafka_replyq_t replyq,
179         rd_kafka_resp_cb_t *resp_cb,
180         void *reply_opaque);
181 
182 
183 /**
184  * @brief Request maker. A callback that constructs the actual contents
185  *        of a request.
186  *
187  * When constructing a request the ApiVersion typically needs to be selected
188  * which requires the broker's supported ApiVersions to be known, which in
189  * turn requires the broker connection to be UP.
190  *
191  * As a buffer constructor you have two choices:
192  *   a. acquire the broker handle, wait for it to come up, and then construct
193  *      the request buffer, or
194  *   b. acquire the broker handle, enqueue an uncrafted/unmaked
195  *      request on the broker request queue, and when the broker is up
196  *      the make_req_cb will be called for you to construct the request.
197  *
198  * From a code complexity standpoint, the latter option is usually the least
199  * complex and voids the caller to care about any of the broker state.
200  * Any information that is required to construct the request is passed through
201  * the make_opaque, which can be automatically freed by the buffer code
202  * when it has been used, or handled by the caller (in which case it must
203  * outlive the lifetime of the buffer).
204  *
205  * Usage:
206  *
207  *  1. Construct an rkbuf with the appropriate ApiKey.
208  *  2. Make a copy or reference of any data that is needed to construct the
209  *     request, e.g., through rd_kafka_topic_partition_list_copy(). This
210  *     data is passed by the make_opaque.
211  *  3. Set the make callback by calling rd_kafka_buf_set_maker() and pass
212  *     the make_opaque data and a free function, if needed.
213  *  4. The callback will eventually be called from the broker thread.
214  *  5. In the make callback construct the request on the passed rkbuf.
215  *  6. The request is sent to the broker and the make_opaque is freed.
216  *
217  * See rd_kafka_ListOffsetsRequest() in rdkafka_request.c for an example.
218  *
219  */
220 typedef rd_kafka_resp_err_t (rd_kafka_make_req_cb_t) (
221         rd_kafka_broker_t *rkb,
222         rd_kafka_buf_t *rkbuf,
223         void *make_opaque);
224 
225 /**
226  * @struct Request and response buffer
227  *
228  */
229 struct rd_kafka_buf_s { /* rd_kafka_buf_t */
230         TAILQ_ENTRY(rd_kafka_buf_s) rkbuf_link;
231 
232         int32_t rkbuf_corrid;
233 
234         rd_ts_t rkbuf_ts_retry;    /* Absolute send retry time */
235 
236         int     rkbuf_flags; /* RD_KAFKA_OP_F */
237 
238         /** What convenience flags to copy from request to response along
239          *  with the reqhdr. */
240 #define RD_KAFKA_BUF_FLAGS_RESP_COPY_MASK  (RD_KAFKA_OP_F_FLEXVER)
241 
242         rd_kafka_prio_t rkbuf_prio; /**< Request priority */
243 
244         rd_buf_t rkbuf_buf;        /**< Send/Recv byte buffer */
245         rd_slice_t rkbuf_reader;   /**< Buffer slice reader for rkbuf_buf */
246 
247         int     rkbuf_connid;      /* broker connection id (used when buffer
248                                     * was partially sent). */
249         size_t  rkbuf_totlen;      /* recv: total expected length,
250                                     * send: not used */
251 
252         rd_crc32_t rkbuf_crc;      /* Current CRC calculation */
253 
254         struct rd_kafkap_reqhdr rkbuf_reqhdr;   /* Request header.
255                                                  * These fields are encoded
256                                                  * and written to output buffer
257                                                  * on buffer finalization.
258                                                  * Note:
259                                                  * The request's
260                                                  * reqhdr is copied to the
261                                                  * response's reqhdr as a
262                                                  * convenience. */
263         struct rd_kafkap_reshdr rkbuf_reshdr;   /* Response header.
264                                                  * Decoded fields are copied
265                                                  * here from the buffer
266                                                  * to provide an ease-of-use
267                                                  * interface to the header */
268 
269         int32_t rkbuf_expected_size;  /* expected size of message */
270 
271         rd_kafka_replyq_t   rkbuf_replyq;       /* Enqueue response on replyq */
272         rd_kafka_replyq_t   rkbuf_orig_replyq;  /* Original replyq to be used
273                                                  * for retries from inside
274                                                  * the rkbuf_cb() callback
275                                                  * since rkbuf_replyq will
276                                                  * have been reset. */
277         rd_kafka_resp_cb_t *rkbuf_cb;           /* Response callback */
278         struct rd_kafka_buf_s *rkbuf_response;  /* Response buffer */
279 
280         rd_kafka_make_req_cb_t *rkbuf_make_req_cb; /**< Callback to construct
281                                                     *   the request itself.
282                                                     *   Will be used if
283                                                     *   RD_KAFKA_OP_F_NEED_MAKE
284                                                     *   is set. */
285         void *rkbuf_make_opaque;  /**< Opaque passed to rkbuf_make_req_cb.
286                                    *   Will be freed automatically after use
287                                    *   by the rkbuf code. */
288         void (*rkbuf_free_make_opaque_cb) (void *); /**< Free function for
289                                                      *   rkbuf_make_opaque. */
290 
291         struct rd_kafka_broker_s *rkbuf_rkb;
292 
293         rd_refcnt_t rkbuf_refcnt;
294         void   *rkbuf_opaque;
295 
296         int     rkbuf_max_retries;        /**< Maximum retries to attempt. */
297         int     rkbuf_retries;            /**< Retries so far. */
298 
299 
300         int     rkbuf_features;   /* Required feature(s) that must be
301                                    * supported by broker. */
302 
303         rd_ts_t rkbuf_ts_enq;
304         rd_ts_t rkbuf_ts_sent;    /* Initially: Absolute time of transmission,
305                                    * after response: RTT. */
306 
307         /* Request timeouts:
308          *  rkbuf_ts_timeout is the effective absolute request timeout used
309          *  by the timeout scanner to see if a request has timed out.
310          *  It is set when a request is enqueued on the broker transmit
311          *  queue based on the relative or absolute timeout:
312          *
313          *  rkbuf_rel_timeout is the per-request-transmit relative timeout,
314          *  this value is reused for each sub-sequent retry of a request.
315          *
316          *  rkbuf_abs_timeout is the absolute request timeout, spanning
317          *  all retries.
318          *  This value is effectively limited by socket.timeout.ms for
319          *  each transmission, but the absolute timeout for a request's
320          *  lifetime is the absolute value.
321          *
322          *  Use rd_kafka_buf_set_timeout() to set a relative timeout
323          *  that will be reused on retry,
324          *  or rd_kafka_buf_set_abs_timeout() to set a fixed absolute timeout
325          *  for the case where the caller knows the request will be
326          *  semantically outdated when that absolute time expires, such as for
327          *  session.timeout.ms-based requests.
328          *
329          * The decision to retry a request is delegated to the rkbuf_cb
330          * response callback, which should use rd_kafka_err_action()
331          * and check the return actions for RD_KAFKA_ERR_ACTION_RETRY to be set
332          * and then call rd_kafka_buf_retry().
333          * rd_kafka_buf_retry() will enqueue the request on the rkb_retrybufs
334          * queue with a backoff time of retry.backoff.ms.
335          * The rkb_retrybufs queue is served by the broker thread's timeout
336          * scanner.
337          * @warning rkb_retrybufs is NOT purged on broker down.
338          */
339         rd_ts_t rkbuf_ts_timeout; /* Request timeout (absolute time). */
340         rd_ts_t rkbuf_abs_timeout;/* Absolute timeout for request, including
341                                    * retries.
342                                    * Mutually exclusive with rkbuf_rel_timeout*/
343         int     rkbuf_rel_timeout;/* Relative timeout (ms), used for retries.
344                                    * Defaults to socket.timeout.ms.
345                                    * Mutually exclusive with rkbuf_abs_timeout*/
346         rd_bool_t rkbuf_force_timeout; /**< Force request timeout to be
347                                         *   remaining abs_timeout regardless
348                                         *   of socket.timeout.ms. */
349 
350 
351         int64_t rkbuf_offset;     /* Used by OffsetCommit */
352 
353         rd_list_t *rkbuf_rktp_vers;    /* Toppar + Op Version map.
354                                         * Used by FetchRequest. */
355 
356         rd_kafka_resp_err_t rkbuf_err;      /* Buffer parsing error code */
357 
358         union {
359                 struct {
360                         rd_list_t *topics;  /* Requested topics (char *) */
361                         char *reason;       /* Textual reason */
362                         rd_kafka_op_t *rko; /* Originating rko with replyq
363                                              * (if any) */
364                         rd_bool_t all_topics; /**< Full/All topics requested */
365                         rd_bool_t cgrp_update; /**< Update cgrp with topic
366                                                 *   status from response. */
367 
368                         int *decr;          /* Decrement this integer by one
369                                              * when request is complete:
370                                              * typically points to metadata
371                                              * cache's full_.._sent.
372                                              * Will be performed with
373                                              * decr_lock held. */
374                         mtx_t *decr_lock;
375 
376                 } Metadata;
377                 struct {
378                         rd_kafka_msgbatch_t batch; /**< MessageSet/batch */
379                 } Produce;
380                 struct {
381                         rd_bool_t commit;      /**< true = txn commit,
382                                                 *   false = txn abort */
383                 } EndTxn;
384         } rkbuf_u;
385 
386 #define rkbuf_batch rkbuf_u.Produce.batch
387 
388         const char *rkbuf_uflow_mitigation; /**< Buffer read underflow
389                                              *   human readable mitigation
390                                              *   string (const memory).
391                                              *   This is used to hint the
392                                              *   user why the underflow
393                                              *   might have occurred, which
394                                              *   depends on request type. */
395 };
396 
397 
398 
399 
400 /**
401  * @name Read buffer interface
402  *
403  * Memory reading helper macros to be used when parsing network responses.
404  *
405  * Assumptions:
406  *   - an 'err_parse:' goto-label must be available for error bailouts,
407  *                     the error code will be set in rkbuf->rkbuf_err
408  *   - local `int log_decode_errors` variable set to the logging level
409  *     to log parse errors (or 0 to turn off logging).
410  */
411 
412 #define rd_kafka_buf_parse_fail(rkbuf,...) do {				\
413                 if (log_decode_errors > 0) {                            \
414 			rd_kafka_assert(NULL, rkbuf->rkbuf_rkb);	\
415                         rd_rkb_log(rkbuf->rkbuf_rkb, log_decode_errors, \
416                                    "PROTOERR",                          \
417                                    "Protocol parse failure for %s v%hd%s " \
418                                    "at %"PRIusz"/%"PRIusz" (%s:%i) "    \
419                                    "(incorrect broker.version.fallback?)", \
420                                    rd_kafka_ApiKey2str(rkbuf->rkbuf_reqhdr. \
421                                                        ApiKey),         \
422                                    rkbuf->rkbuf_reqhdr.ApiVersion,      \
423                                    (rkbuf->rkbuf_flags&RD_KAFKA_OP_F_FLEXVER? \
424                                     "(flex)":""),                       \
425                                    rd_slice_offset(&rkbuf->rkbuf_reader), \
426                                    rd_slice_size(&rkbuf->rkbuf_reader), \
427                                    __FUNCTION__, __LINE__);             \
428                         rd_rkb_log(rkbuf->rkbuf_rkb, log_decode_errors, \
429 				   "PROTOERR", __VA_ARGS__);		\
430                 }                                                       \
431                 (rkbuf)->rkbuf_err = RD_KAFKA_RESP_ERR__BAD_MSG;        \
432                 goto err_parse;                                         \
433 	} while (0)
434 
435 /**
436  * @name Fail buffer reading due to buffer underflow.
437  */
438 #define rd_kafka_buf_underflow_fail(rkbuf,wantedlen,...) do {           \
439                 if (log_decode_errors > 0) {                            \
440                         rd_kafka_assert(NULL, rkbuf->rkbuf_rkb);        \
441                         char __tmpstr[256];                             \
442                         rd_snprintf(__tmpstr, sizeof(__tmpstr),         \
443                                     ": " __VA_ARGS__);                  \
444                         if (strlen(__tmpstr) == 2) __tmpstr[0] = '\0';  \
445                         rd_rkb_log(rkbuf->rkbuf_rkb, log_decode_errors, \
446                                    "PROTOUFLOW",                        \
447                                    "Protocol read buffer underflow "    \
448                                    "for %s v%hd "                       \
449                                    "at %"PRIusz"/%"PRIusz" (%s:%i): "   \
450                                    "expected %"PRIusz" bytes > "        \
451                                    "%"PRIusz" remaining bytes (%s)%s",  \
452                                    rd_kafka_ApiKey2str(rkbuf->rkbuf_reqhdr. \
453                                                        ApiKey),         \
454                                    rkbuf->rkbuf_reqhdr.ApiVersion,      \
455                                    rd_slice_offset(&rkbuf->rkbuf_reader), \
456                                    rd_slice_size(&rkbuf->rkbuf_reader), \
457                                    __FUNCTION__, __LINE__,              \
458                                    wantedlen,                           \
459                                    rd_slice_remains(&rkbuf->rkbuf_reader), \
460                                    rkbuf->rkbuf_uflow_mitigation ?      \
461                                    rkbuf->rkbuf_uflow_mitigation :      \
462                                    "incorrect broker.version.fallback?", \
463                                    __tmpstr);                           \
464                 }                                                       \
465                 (rkbuf)->rkbuf_err = RD_KAFKA_RESP_ERR__UNDERFLOW;      \
466                 goto err_parse;                                         \
467         } while (0)
468 
469 
470 /**
471  * Returns the number of remaining bytes available to read.
472  */
473 #define rd_kafka_buf_read_remain(rkbuf) \
474         rd_slice_remains(&(rkbuf)->rkbuf_reader)
475 
476 /**
477  * Checks that at least 'len' bytes remain to be read in buffer, else fails.
478  */
479 #define rd_kafka_buf_check_len(rkbuf,len) do {                          \
480                 size_t __len0 = (size_t)(len);                          \
481                 if (unlikely(__len0 > rd_kafka_buf_read_remain(rkbuf))) { \
482                         rd_kafka_buf_underflow_fail(rkbuf, __len0);     \
483                 }                                                       \
484         } while (0)
485 
486 /**
487  * Skip (as in read and ignore) the next 'len' bytes.
488  */
489 #define rd_kafka_buf_skip(rkbuf, len) do {                              \
490                 size_t __len1 = (size_t)(len);                          \
491                 if (__len1 &&                                           \
492                     !rd_slice_read(&(rkbuf)->rkbuf_reader, NULL, __len1)) \
493                         rd_kafka_buf_check_len(rkbuf, __len1);           \
494         } while (0)
495 
496 /**
497  * Skip (as in read and ignore) up to fixed position \p pos.
498  */
499 #define rd_kafka_buf_skip_to(rkbuf, pos) do {                           \
500                 size_t __len1 = (size_t)(pos) -                         \
501                         rd_slice_offset(&(rkbuf)->rkbuf_reader);        \
502                 if (__len1 &&                                           \
503                     !rd_slice_read(&(rkbuf)->rkbuf_reader, NULL, __len1)) \
504                         rd_kafka_buf_check_len(rkbuf, __len1);           \
505         } while (0)
506 
507 
508 
509 /**
510  * Read 'len' bytes and copy to 'dstptr'
511  */
512 #define rd_kafka_buf_read(rkbuf,dstptr,len) do {                        \
513                 size_t __len2 = (size_t)(len);                          \
514                 if (!rd_slice_read(&(rkbuf)->rkbuf_reader, dstptr, __len2))  \
515                         rd_kafka_buf_check_len(rkbuf, __len2);          \
516         } while (0)
517 
518 
519 /**
520  * @brief Read \p len bytes at slice offset \p offset and copy to \p dstptr
521  *        without affecting the current reader position.
522  */
523 #define rd_kafka_buf_peek(rkbuf,offset,dstptr,len) do {                 \
524                 size_t __len2 = (size_t)(len);                          \
525                 if (!rd_slice_peek(&(rkbuf)->rkbuf_reader, offset,      \
526                                    dstptr, __len2))                     \
527                         rd_kafka_buf_check_len(rkbuf, (offset)+(__len2)); \
528         } while (0)
529 
530 
531 /**
532  * Read a 16,32,64-bit integer and store it in 'dstptr'
533  */
534 #define rd_kafka_buf_read_i64(rkbuf,dstptr) do {                        \
535                 int64_t _v;                                             \
536                 rd_kafka_buf_read(rkbuf, &_v, sizeof(_v));              \
537                 *(dstptr) = be64toh(_v);                                \
538         } while (0)
539 
540 #define rd_kafka_buf_peek_i64(rkbuf,of,dstptr) do {                     \
541                 int64_t _v;                                             \
542                 rd_kafka_buf_peek(rkbuf, of, &_v, sizeof(_v));          \
543                 *(dstptr) = be64toh(_v);                                \
544         } while (0)
545 
546 #define rd_kafka_buf_read_i32(rkbuf,dstptr) do {                        \
547                 int32_t _v;                                             \
548                 rd_kafka_buf_read(rkbuf, &_v, sizeof(_v));              \
549                 *(dstptr) = be32toh(_v);                                \
550         } while (0)
551 
552 #define rd_kafka_buf_peek_i32(rkbuf,of,dstptr) do {                     \
553                 int32_t _v;                                             \
554                 rd_kafka_buf_peek(rkbuf, of, &_v, sizeof(_v));          \
555                 *(dstptr) = be32toh(_v);                                \
556         } while (0)
557 
558 
559 /* Same as .._read_i32 but does a direct assignment.
560  * dst is assumed to be a scalar, not pointer. */
561 #define rd_kafka_buf_read_i32a(rkbuf, dst) do {				\
562                 int32_t _v;                                             \
563 		rd_kafka_buf_read(rkbuf, &_v, 4);			\
564 		dst = (int32_t) be32toh(_v);				\
565 	} while (0)
566 
567 #define rd_kafka_buf_read_i16(rkbuf,dstptr) do {                        \
568                 int16_t _v;                                             \
569                 rd_kafka_buf_read(rkbuf, &_v, sizeof(_v));              \
570                 *(dstptr) = (int16_t)be16toh(_v);                       \
571         } while (0)
572 
573 
574 #define rd_kafka_buf_read_i16a(rkbuf, dst) do {				\
575                 int16_t _v;                                             \
576 		rd_kafka_buf_read(rkbuf, &_v, 2);			\
577                 dst = (int16_t)be16toh(_v);				\
578 	} while (0)
579 
580 #define rd_kafka_buf_read_i8(rkbuf, dst) rd_kafka_buf_read(rkbuf, dst, 1)
581 
582 #define rd_kafka_buf_peek_i8(rkbuf,of,dst) rd_kafka_buf_peek(rkbuf,of,dst,1)
583 
584 #define rd_kafka_buf_read_bool(rkbuf, dstptr) do {                      \
585                 int8_t _v;                                              \
586                 rd_bool_t *_dst = dstptr;                               \
587                 rd_kafka_buf_read(rkbuf, &_v, 1);                       \
588                 *_dst = (rd_bool_t)_v;                                  \
589         } while (0)
590 
591 
592 /**
593  * @brief Read varint and store in int64_t \p dst
594  */
595 #define rd_kafka_buf_read_varint(rkbuf,dst) do {                        \
596                 int64_t _v;                                             \
597                 size_t _r = rd_slice_read_varint(&(rkbuf)->rkbuf_reader, &_v);\
598                 if (unlikely(RD_UVARINT_UNDERFLOW(_r)))                 \
599                         rd_kafka_buf_underflow_fail(rkbuf, (size_t)0,   \
600                                                     "varint parsing failed");\
601                 *(dst) = _v;                                            \
602         } while (0)
603 
604 
605 /**
606  * @brief Read unsigned varint and store in uint64_t \p dst
607  */
608 #define rd_kafka_buf_read_uvarint(rkbuf,dst) do {                       \
609                 uint64_t _v;                                            \
610                 size_t _r = rd_slice_read_uvarint(&(rkbuf)->rkbuf_reader, \
611                                                   &_v);                 \
612                 if (unlikely(RD_UVARINT_UNDERFLOW(_r)))                 \
613                         rd_kafka_buf_underflow_fail(rkbuf, (size_t)0,   \
614                                                     "uvarint parsing failed"); \
615                 *(dst) = _v;                                            \
616         } while (0)
617 
618 
619 /**
620  * @brief Read Kafka COMPACT_STRING (VARINT+N) or
621  *        standard String representation (2+N).
622  *
623  * The kstr data will be updated to point to the rkbuf. */
624 #define rd_kafka_buf_read_str(rkbuf, kstr) do {                         \
625                 int _klen;                                              \
626                 if ((rkbuf)->rkbuf_flags & RD_KAFKA_OP_F_FLEXVER) {     \
627                         uint64_t _uva;                                  \
628                         rd_kafka_buf_read_uvarint(rkbuf, &_uva);        \
629                         (kstr)->len = ((int32_t)_uva) - 1;              \
630                         _klen = (kstr)->len;                            \
631                 } else {                                                \
632                         rd_kafka_buf_read_i16a(rkbuf, (kstr)->len);     \
633                         _klen = RD_KAFKAP_STR_LEN(kstr);                \
634                 }                                                       \
635                 if (RD_KAFKAP_STR_IS_NULL(kstr))                        \
636                         (kstr)->str = NULL;                             \
637                 else if (RD_KAFKAP_STR_LEN(kstr) == 0)                  \
638                         (kstr)->str = "";                               \
639                 else if (!((kstr)->str =                                \
640                            rd_slice_ensure_contig(&rkbuf->rkbuf_reader, \
641                                                   _klen)))              \
642                         rd_kafka_buf_check_len(rkbuf, _klen);           \
643         } while (0)
644 
645 /* Read Kafka String representation (2+N) and write it to the \p tmpabuf
646  * with a trailing nul byte. */
647 #define rd_kafka_buf_read_str_tmpabuf(rkbuf, tmpabuf, dst) do {         \
648                 rd_kafkap_str_t _kstr;					\
649 		size_t _slen;						\
650 		char *_dst;						\
651 		rd_kafka_buf_read_str(rkbuf, &_kstr);			\
652 		_slen = RD_KAFKAP_STR_LEN(&_kstr);			\
653 		if (!(_dst =						\
654 		      rd_tmpabuf_write(tmpabuf, _kstr.str, _slen+1)))	\
655 			rd_kafka_buf_parse_fail(			\
656 				rkbuf,					\
657 				"Not enough room in tmpabuf: "		\
658 				"%"PRIusz"+%"PRIusz			\
659 				" > %"PRIusz,				\
660 				(tmpabuf)->of, _slen+1, (tmpabuf)->size); \
661 		_dst[_slen] = '\0';					\
662 		dst = (void *)_dst;					\
663 	} while (0)
664 
665 /**
666  * Skip a string.
667  */
668 #define rd_kafka_buf_skip_str(rkbuf) do {			\
669 		int16_t _slen;					\
670 		rd_kafka_buf_read_i16(rkbuf, &_slen);		\
671 		rd_kafka_buf_skip(rkbuf, RD_KAFKAP_STR_LEN0(_slen));	\
672 	} while (0)
673 
674 /* Read Kafka Bytes representation (4+N).
675  *  The 'kbytes' will be updated to point to rkbuf data */
676 #define rd_kafka_buf_read_bytes(rkbuf, kbytes) do {                     \
677                 int _klen;                                              \
678                 rd_kafka_buf_read_i32a(rkbuf, _klen);                   \
679                 (kbytes)->len = _klen;                                  \
680                 if (RD_KAFKAP_BYTES_IS_NULL(kbytes)) {                  \
681                         (kbytes)->data = NULL;                          \
682                         (kbytes)->len = 0;                              \
683                 } else if (RD_KAFKAP_BYTES_LEN(kbytes) == 0)            \
684                         (kbytes)->data = "";                            \
685                 else if (!((kbytes)->data =                             \
686                            rd_slice_ensure_contig(&(rkbuf)->rkbuf_reader, \
687                                                   _klen)))              \
688                         rd_kafka_buf_check_len(rkbuf, _klen);           \
689         } while (0)
690 
691 
692 /**
693  * @brief Read \p size bytes from buffer, setting \p *ptr to the start
694  *        of the memory region.
695  */
696 #define rd_kafka_buf_read_ptr(rkbuf,ptr,size) do {                      \
697                 size_t _klen = size;                                    \
698                 if (!(*(ptr) = (void *)                                 \
699                       rd_slice_ensure_contig(&(rkbuf)->rkbuf_reader, _klen))) \
700                         rd_kafka_buf_check_len(rkbuf, _klen);           \
701         } while (0)
702 
703 
704 /**
705  * @brief Read varint-lengted Kafka Bytes representation
706  */
707 #define rd_kafka_buf_read_bytes_varint(rkbuf,kbytes) do {               \
708                 int64_t _len2;                                          \
709                 size_t _r = rd_slice_read_varint(&(rkbuf)->rkbuf_reader, \
710                                                  &_len2);               \
711                 if (unlikely(RD_UVARINT_UNDERFLOW(_r)))                 \
712                         rd_kafka_buf_underflow_fail(rkbuf, (size_t)0,   \
713                                                     "varint parsing failed"); \
714                 (kbytes)->len = (int32_t)_len2;                         \
715                 if (RD_KAFKAP_BYTES_IS_NULL(kbytes)) {                  \
716                         (kbytes)->data = NULL;                          \
717                         (kbytes)->len = 0;                              \
718                 } else if (RD_KAFKAP_BYTES_LEN(kbytes) == 0)            \
719                         (kbytes)->data = "";                            \
720                 else if (!((kbytes)->data =                             \
721                            rd_slice_ensure_contig(&(rkbuf)->rkbuf_reader, \
722                                                   (size_t)_len2)))      \
723                         rd_kafka_buf_check_len(rkbuf, _len2);           \
724         } while (0)
725 
726 
727 /**
728  * @brief Read throttle_time_ms (i32) from response and pass the value
729  *        to the throttle handling code.
730  */
731 #define rd_kafka_buf_read_throttle_time(rkbuf) do {                     \
732                 int32_t _throttle_time_ms;                              \
733                 rd_kafka_buf_read_i32(rkbuf, &_throttle_time_ms);       \
734                 rd_kafka_op_throttle_time((rkbuf)->rkbuf_rkb,           \
735                                           (rkbuf)->rkbuf_rkb->rkb_rk->rk_rep, \
736                                           _throttle_time_ms);           \
737         } while (0)
738 
739 
740 /**
741  * @brief Discard all KIP-482 Tags at the current position in the buffer.
742  */
743 #define rd_kafka_buf_skip_tags(rkbuf) do {                              \
744         uint64_t _tagcnt;                                               \
745         if (!((rkbuf)->rkbuf_flags & RD_KAFKA_OP_F_FLEXVER))            \
746                 break;                                                  \
747         rd_kafka_buf_read_uvarint(rkbuf, &_tagcnt);                     \
748         while (_tagcnt-- > 0) {                                         \
749                uint64_t _tagtype, _taglen;                              \
750                rd_kafka_buf_read_uvarint(rkbuf, &_tagtype);             \
751                rd_kafka_buf_read_uvarint(rkbuf, &_taglen);              \
752                if (_taglen > 1)                                         \
753                        rd_kafka_buf_skip(rkbuf, (size_t)(_taglen - 1)); \
754         }                                                               \
755         } while (0)
756 
757 /**
758  * @brief Write tags at the current position in the buffer.
759  * @remark Currently always writes empty tags.
760  * @remark Change to ..write_uvarint() when actual tags are supported.
761  */
762 #define rd_kafka_buf_write_tags(rkbuf) do {                             \
763         if (!((rkbuf)->rkbuf_flags & RD_KAFKA_OP_F_FLEXVER))            \
764                 break;                                                  \
765         rd_kafka_buf_write_i8(rkbuf, 0);                                \
766         } while (0)
767 
768 
769 /**
770  * @brief Reads an ARRAY or COMPACT_ARRAY count depending on buffer type.
771  */
772 #define rd_kafka_buf_read_arraycnt(rkbuf,arrcnt,maxval) do {            \
773         if ((rkbuf)->rkbuf_flags & RD_KAFKA_OP_F_FLEXVER) {             \
774                 uint64_t _uva;                                          \
775                 rd_kafka_buf_read_uvarint(rkbuf, &_uva);                \
776                 *(arrcnt) = (int32_t)_uva - 1;                          \
777         } else {                                                        \
778                 rd_kafka_buf_read_i32(rkbuf, arrcnt);                   \
779         }                                                               \
780         if (*(arrcnt) < 0 || ((maxval) != -1 && *(arrcnt) > (maxval)))  \
781                 rd_kafka_buf_parse_fail(rkbuf,                          \
782                                         "ApiArrayCnt %"PRId32" out of range", \
783                                         *(arrcnt));                     \
784         } while (0)
785 
786 
787 
788 
789 /**
790  * @returns true if buffer has been sent on wire, else 0.
791  */
792 #define rd_kafka_buf_was_sent(rkbuf)                    \
793         ((rkbuf)->rkbuf_flags & RD_KAFKA_OP_F_SENT)
794 
795 typedef struct rd_kafka_bufq_s {
796 	TAILQ_HEAD(, rd_kafka_buf_s) rkbq_bufs;
797 	rd_atomic32_t  rkbq_cnt;
798 	rd_atomic32_t  rkbq_msg_cnt;
799 } rd_kafka_bufq_t;
800 
801 #define rd_kafka_bufq_cnt(rkbq) rd_atomic32_get(&(rkbq)->rkbq_cnt)
802 
803 /**
804  * @brief Set buffer's request timeout to relative \p timeout_ms measured
805  *        from the time the buffer is sent on the underlying socket.
806  *
807  * @param now Reuse current time from existing rd_clock() var, else 0.
808  *
809  * The relative timeout value is reused upon request retry.
810  */
811 static RD_INLINE void
rd_kafka_buf_set_timeout(rd_kafka_buf_t * rkbuf,int timeout_ms,rd_ts_t now)812 rd_kafka_buf_set_timeout (rd_kafka_buf_t *rkbuf, int timeout_ms, rd_ts_t now) {
813         if (!now)
814                 now = rd_clock();
815         rkbuf->rkbuf_rel_timeout = timeout_ms;
816         rkbuf->rkbuf_abs_timeout = 0;
817 }
818 
819 
820 /**
821  * @brief Calculate the effective timeout for a request attempt
822  */
823 void rd_kafka_buf_calc_timeout (const rd_kafka_t *rk, rd_kafka_buf_t *rkbuf,
824                                 rd_ts_t now);
825 
826 
827 /**
828  * @brief Set buffer's request timeout to relative \p timeout_ms measured
829  *        from \p now.
830  *
831  * @param now Reuse current time from existing rd_clock() var, else 0.
832  * @param force If true: force request timeout to be same as remaining
833  *                       abs timeout, regardless of socket.timeout.ms.
834  *              If false: cap each request timeout to socket.timeout.ms.
835  *
836  * The remaining time is used as timeout for request retries.
837  */
838 static RD_INLINE void
rd_kafka_buf_set_abs_timeout0(rd_kafka_buf_t * rkbuf,int timeout_ms,rd_ts_t now,rd_bool_t force)839 rd_kafka_buf_set_abs_timeout0 (rd_kafka_buf_t *rkbuf, int timeout_ms,
840                                rd_ts_t now, rd_bool_t force) {
841         if (!now)
842                 now = rd_clock();
843         rkbuf->rkbuf_rel_timeout = 0;
844         rkbuf->rkbuf_abs_timeout = now + ((rd_ts_t)timeout_ms * 1000);
845         rkbuf->rkbuf_force_timeout = force;
846 }
847 
848 #define rd_kafka_buf_set_abs_timeout(rkbuf,timeout_ms,now) \
849         rd_kafka_buf_set_abs_timeout0(rkbuf,timeout_ms,now,rd_false)
850 
851 
852 #define rd_kafka_buf_set_abs_timeout_force(rkbuf,timeout_ms,now) \
853         rd_kafka_buf_set_abs_timeout0(rkbuf,timeout_ms,now,rd_true)
854 
855 
856 #define rd_kafka_buf_keep(rkbuf) rd_refcnt_add(&(rkbuf)->rkbuf_refcnt)
857 #define rd_kafka_buf_destroy(rkbuf)                                     \
858         rd_refcnt_destroywrapper(&(rkbuf)->rkbuf_refcnt,                \
859                                  rd_kafka_buf_destroy_final(rkbuf))
860 
861 void rd_kafka_buf_destroy_final (rd_kafka_buf_t *rkbuf);
862 void rd_kafka_buf_push0 (rd_kafka_buf_t *rkbuf, const void *buf, size_t len,
863                          int allow_crc_calc, void (*free_cb) (void *));
864 #define rd_kafka_buf_push(rkbuf,buf,len,free_cb)                        \
865         rd_kafka_buf_push0(rkbuf,buf,len,1/*allow_crc*/,free_cb)
866 rd_kafka_buf_t *rd_kafka_buf_new0 (int segcnt, size_t size, int flags);
867 #define rd_kafka_buf_new(segcnt,size) \
868         rd_kafka_buf_new0(segcnt,size,0)
869 rd_kafka_buf_t *rd_kafka_buf_new_request0 (rd_kafka_broker_t *rkb,
870                                            int16_t ApiKey,
871                                            int segcnt, size_t size,
872                                            rd_bool_t is_flexver);
873 #define rd_kafka_buf_new_request(rkb,ApiKey,segcnt,size)                \
874         rd_kafka_buf_new_request0(rkb,ApiKey,segcnt,size,rd_false)      \
875 
876 #define rd_kafka_buf_new_flexver_request(rkb,ApiKey,segcnt,size,is_flexver) \
877         rd_kafka_buf_new_request0(rkb,ApiKey,segcnt,size,is_flexver)    \
878 
879 rd_kafka_buf_t *rd_kafka_buf_new_shadow (const void *ptr, size_t size,
880                                          void (*free_cb) (void *));
881 void rd_kafka_bufq_enq (rd_kafka_bufq_t *rkbufq, rd_kafka_buf_t *rkbuf);
882 void rd_kafka_bufq_deq (rd_kafka_bufq_t *rkbufq, rd_kafka_buf_t *rkbuf);
883 void rd_kafka_bufq_init(rd_kafka_bufq_t *rkbufq);
884 void rd_kafka_bufq_concat (rd_kafka_bufq_t *dst, rd_kafka_bufq_t *src);
885 void rd_kafka_bufq_purge (rd_kafka_broker_t *rkb,
886                           rd_kafka_bufq_t *rkbufq,
887                           rd_kafka_resp_err_t err);
888 void rd_kafka_bufq_connection_reset (rd_kafka_broker_t *rkb,
889 				     rd_kafka_bufq_t *rkbufq);
890 void rd_kafka_bufq_dump (rd_kafka_broker_t *rkb, const char *fac,
891 			 rd_kafka_bufq_t *rkbq);
892 
893 int rd_kafka_buf_retry (rd_kafka_broker_t *rkb, rd_kafka_buf_t *rkbuf);
894 
895 void rd_kafka_buf_handle_op (rd_kafka_op_t *rko, rd_kafka_resp_err_t err);
896 void rd_kafka_buf_callback (rd_kafka_t *rk,
897 			    rd_kafka_broker_t *rkb, rd_kafka_resp_err_t err,
898                             rd_kafka_buf_t *response, rd_kafka_buf_t *request);
899 
900 
901 
902 /**
903  *
904  * Write buffer interface
905  *
906  */
907 
908 /**
909  * Set request API type version
910  */
911 static RD_UNUSED RD_INLINE void
rd_kafka_buf_ApiVersion_set(rd_kafka_buf_t * rkbuf,int16_t version,int features)912 rd_kafka_buf_ApiVersion_set (rd_kafka_buf_t *rkbuf,
913                              int16_t version, int features) {
914         rkbuf->rkbuf_reqhdr.ApiVersion = version;
915         rkbuf->rkbuf_features = features;
916 }
917 
918 
919 /**
920  * @returns the ApiVersion for a request
921  */
922 #define rd_kafka_buf_ApiVersion(rkbuf) ((rkbuf)->rkbuf_reqhdr.ApiVersion)
923 
924 
925 
926 /**
927  * Write (copy) data to buffer at current write-buffer position.
928  * There must be enough space allocated in the rkbuf.
929  * Returns offset to written destination buffer.
930  */
rd_kafka_buf_write(rd_kafka_buf_t * rkbuf,const void * data,size_t len)931 static RD_INLINE size_t rd_kafka_buf_write (rd_kafka_buf_t *rkbuf,
932                                         const void *data, size_t len) {
933         size_t r;
934 
935         r = rd_buf_write(&rkbuf->rkbuf_buf, data, len);
936 
937         if (rkbuf->rkbuf_flags & RD_KAFKA_OP_F_CRC)
938                 rkbuf->rkbuf_crc = rd_crc32_update(rkbuf->rkbuf_crc, data, len);
939 
940         return r;
941 }
942 
943 
944 
945 /**
946  * Write (copy) 'data' to buffer at 'ptr'.
947  * There must be enough space to fit 'len'.
948  * This will overwrite the buffer at given location and length.
949  *
950  * NOTE: rd_kafka_buf_update() MUST NOT be called when a CRC calculation
951  *       is in progress (between rd_kafka_buf_crc_init() & .._crc_finalize())
952  */
rd_kafka_buf_update(rd_kafka_buf_t * rkbuf,size_t of,const void * data,size_t len)953 static RD_INLINE void rd_kafka_buf_update (rd_kafka_buf_t *rkbuf, size_t of,
954                                           const void *data, size_t len) {
955         rd_kafka_assert(NULL, !(rkbuf->rkbuf_flags & RD_KAFKA_OP_F_CRC));
956         rd_buf_write_update(&rkbuf->rkbuf_buf, of, data, len);
957 }
958 
959 /**
960  * Write int8_t to buffer.
961  */
rd_kafka_buf_write_i8(rd_kafka_buf_t * rkbuf,int8_t v)962 static RD_INLINE size_t rd_kafka_buf_write_i8 (rd_kafka_buf_t *rkbuf,
963 					      int8_t v) {
964         return rd_kafka_buf_write(rkbuf, &v, sizeof(v));
965 }
966 
967 /**
968  * Update int8_t in buffer at offset 'of'.
969  * 'of' should have been previously returned by `.._buf_write_i8()`.
970  */
rd_kafka_buf_update_i8(rd_kafka_buf_t * rkbuf,size_t of,int8_t v)971 static RD_INLINE void rd_kafka_buf_update_i8 (rd_kafka_buf_t *rkbuf,
972 					     size_t of, int8_t v) {
973         rd_kafka_buf_update(rkbuf, of, &v, sizeof(v));
974 }
975 
976 /**
977  * Write int16_t to buffer.
978  * The value will be endian-swapped before write.
979  */
rd_kafka_buf_write_i16(rd_kafka_buf_t * rkbuf,int16_t v)980 static RD_INLINE size_t rd_kafka_buf_write_i16 (rd_kafka_buf_t *rkbuf,
981 					       int16_t v) {
982         v = htobe16(v);
983         return rd_kafka_buf_write(rkbuf, &v, sizeof(v));
984 }
985 
986 /**
987  * Update int16_t in buffer at offset 'of'.
988  * 'of' should have been previously returned by `.._buf_write_i16()`.
989  */
rd_kafka_buf_update_i16(rd_kafka_buf_t * rkbuf,size_t of,int16_t v)990 static RD_INLINE void rd_kafka_buf_update_i16 (rd_kafka_buf_t *rkbuf,
991                                               size_t of, int16_t v) {
992         v = htobe16(v);
993         rd_kafka_buf_update(rkbuf, of, &v, sizeof(v));
994 }
995 
996 /**
997  * Write int32_t to buffer.
998  * The value will be endian-swapped before write.
999  */
rd_kafka_buf_write_i32(rd_kafka_buf_t * rkbuf,int32_t v)1000 static RD_INLINE size_t rd_kafka_buf_write_i32 (rd_kafka_buf_t *rkbuf,
1001                                                int32_t v) {
1002         v = (int32_t)htobe32(v);
1003         return rd_kafka_buf_write(rkbuf, &v, sizeof(v));
1004 }
1005 
1006 /**
1007  * Update int32_t in buffer at offset 'of'.
1008  * 'of' should have been previously returned by `.._buf_write_i32()`.
1009  */
rd_kafka_buf_update_i32(rd_kafka_buf_t * rkbuf,size_t of,int32_t v)1010 static RD_INLINE void rd_kafka_buf_update_i32 (rd_kafka_buf_t *rkbuf,
1011                                               size_t of, int32_t v) {
1012         v = htobe32(v);
1013         rd_kafka_buf_update(rkbuf, of, &v, sizeof(v));
1014 }
1015 
1016 /**
1017  * Update int32_t in buffer at offset 'of'.
1018  * 'of' should have been previously returned by `.._buf_write_i32()`.
1019  */
rd_kafka_buf_update_u32(rd_kafka_buf_t * rkbuf,size_t of,uint32_t v)1020 static RD_INLINE void rd_kafka_buf_update_u32 (rd_kafka_buf_t *rkbuf,
1021                                               size_t of, uint32_t v) {
1022         v = htobe32(v);
1023         rd_kafka_buf_update(rkbuf, of, &v, sizeof(v));
1024 }
1025 
1026 
1027 /**
1028  * @brief Write array count field to buffer (i32) for later update with
1029  *        rd_kafka_buf_update_arraycnt().
1030  */
1031 #define rd_kafka_buf_write_arraycnt_pos(rkbuf) rd_kafka_buf_write_i32(rkbuf, 0)
1032 
1033 
1034 /**
1035  * @brief Write the final array count to the position returned from
1036  *        rd_kafka_buf_write_arraycnt_pos().
1037  *
1038  * Update int32_t in buffer at offset 'of' but serialize it as
1039  * compact uvarint (that must not exceed 4 bytes storage)
1040  * if the \p rkbuf is marked as FLEXVER, else just update it as
1041  * as a standard update_i32().
1042  *
1043  * @remark For flexibleVersions this will shrink the buffer and move data
1044  *         and may thus be costly.
1045  */
rd_kafka_buf_finalize_arraycnt(rd_kafka_buf_t * rkbuf,size_t of,int cnt)1046 static RD_INLINE void rd_kafka_buf_finalize_arraycnt (rd_kafka_buf_t *rkbuf,
1047                                                       size_t of, int cnt) {
1048         char buf[sizeof(int32_t)];
1049         size_t sz, r;
1050 
1051         rd_assert(cnt >= 0);
1052 
1053         if (!(rkbuf->rkbuf_flags & RD_KAFKA_OP_F_FLEXVER)) {
1054                 rd_kafka_buf_update_i32(rkbuf, of, (int32_t)cnt);
1055                 return;
1056         }
1057 
1058         /* CompactArray has a base of 1, 0 is for Null arrays */
1059         cnt += 1;
1060 
1061         sz = rd_uvarint_enc_u64(buf, sizeof(buf), (uint64_t)cnt);
1062         rd_assert(!RD_UVARINT_OVERFLOW(sz));
1063 
1064         rd_buf_write_update(&rkbuf->rkbuf_buf, of, buf, sz);
1065 
1066         if (sz < sizeof(int32_t)) {
1067                 /* Varint occupies less space than the allotted 4 bytes, erase
1068                  * the remaining bytes. */
1069                 r = rd_buf_erase(&rkbuf->rkbuf_buf, of+sz, sizeof(int32_t)-sz);
1070                 rd_assert(r == sizeof(int32_t) - sz);
1071         }
1072 }
1073 
1074 
1075 /**
1076  * Write int64_t to buffer.
1077  * The value will be endian-swapped before write.
1078  */
rd_kafka_buf_write_i64(rd_kafka_buf_t * rkbuf,int64_t v)1079 static RD_INLINE size_t rd_kafka_buf_write_i64 (rd_kafka_buf_t *rkbuf,
1080                                                 int64_t v) {
1081         v = htobe64(v);
1082         return rd_kafka_buf_write(rkbuf, &v, sizeof(v));
1083 }
1084 
1085 /**
1086  * Update int64_t in buffer at address 'ptr'.
1087  * 'of' should have been previously returned by `.._buf_write_i64()`.
1088  */
rd_kafka_buf_update_i64(rd_kafka_buf_t * rkbuf,size_t of,int64_t v)1089 static RD_INLINE void rd_kafka_buf_update_i64 (rd_kafka_buf_t *rkbuf,
1090                                               size_t of, int64_t v) {
1091         v = htobe64(v);
1092         rd_kafka_buf_update(rkbuf, of, &v, sizeof(v));
1093 }
1094 
1095 
1096 /**
1097  * @brief Write varint-encoded signed value to buffer.
1098  */
1099 static RD_INLINE size_t
rd_kafka_buf_write_varint(rd_kafka_buf_t * rkbuf,int64_t v)1100 rd_kafka_buf_write_varint (rd_kafka_buf_t *rkbuf, int64_t v) {
1101         char varint[RD_UVARINT_ENC_SIZEOF(v)];
1102         size_t sz;
1103 
1104         sz = rd_uvarint_enc_i64(varint, sizeof(varint), v);
1105 
1106         return rd_kafka_buf_write(rkbuf, varint, sz);
1107 }
1108 
1109 /**
1110  * @brief Write varint-encoded unsigned value to buffer.
1111  */
1112 static RD_INLINE size_t
rd_kafka_buf_write_uvarint(rd_kafka_buf_t * rkbuf,uint64_t v)1113 rd_kafka_buf_write_uvarint (rd_kafka_buf_t *rkbuf, uint64_t v) {
1114         char varint[RD_UVARINT_ENC_SIZEOF(v)];
1115         size_t sz;
1116 
1117         sz = rd_uvarint_enc_u64(varint, sizeof(varint), v);
1118 
1119         return rd_kafka_buf_write(rkbuf, varint, sz);
1120 }
1121 
1122 
1123 /**
1124  * @brief Write standard (2-byte header) or KIP-482 COMPACT_STRING to buffer.
1125  *
1126  * @remark Copies the string.
1127  *
1128  * @returns the offset in \p rkbuf where the string was written.
1129  */
rd_kafka_buf_write_kstr(rd_kafka_buf_t * rkbuf,const rd_kafkap_str_t * kstr)1130 static RD_INLINE size_t rd_kafka_buf_write_kstr (rd_kafka_buf_t *rkbuf,
1131                                                 const rd_kafkap_str_t *kstr) {
1132         size_t len, r;
1133 
1134         if (!(rkbuf->rkbuf_flags & RD_KAFKA_OP_F_FLEXVER)) {
1135                 /* Standard string */
1136                 if (!kstr || RD_KAFKAP_STR_IS_NULL(kstr))
1137                         return rd_kafka_buf_write_i16(rkbuf, -1);
1138 
1139                 if (RD_KAFKAP_STR_IS_SERIALIZED(kstr))
1140                         return rd_kafka_buf_write(rkbuf,
1141                                                   RD_KAFKAP_STR_SER(kstr),
1142                                                   RD_KAFKAP_STR_SIZE(kstr));
1143 
1144                 len = RD_KAFKAP_STR_LEN(kstr);
1145                 r = rd_kafka_buf_write_i16(rkbuf, (int16_t)len);
1146                 rd_kafka_buf_write(rkbuf, kstr->str, len);
1147 
1148                 return r;
1149         }
1150 
1151         /* COMPACT_STRING lengths are:
1152          *  0   = NULL,
1153          *  1   = empty
1154          *  N.. = length + 1
1155          */
1156         if (!kstr || RD_KAFKAP_STR_IS_NULL(kstr))
1157                 len = 0;
1158         else
1159                 len = RD_KAFKAP_STR_LEN(kstr) + 1;
1160 
1161         r = rd_kafka_buf_write_uvarint(rkbuf, (uint64_t)len);
1162         if (len > 1)
1163                 rd_kafka_buf_write(rkbuf, kstr->str, len-1);
1164         return r;
1165 }
1166 
1167 
1168 
1169 /**
1170  * @brief Write standard (2-byte header) or KIP-482 COMPACT_STRING to buffer.
1171  *
1172  * @remark Copies the string.
1173  */
1174 static RD_INLINE size_t
rd_kafka_buf_write_str(rd_kafka_buf_t * rkbuf,const char * str,size_t len)1175 rd_kafka_buf_write_str (rd_kafka_buf_t *rkbuf,
1176                         const char *str, size_t len) {
1177         size_t r;
1178 
1179         if (!(rkbuf->rkbuf_flags & RD_KAFKA_OP_F_FLEXVER)) {
1180                 /* Standard string */
1181                 if (!str)
1182                         len = RD_KAFKAP_STR_LEN_NULL;
1183                 else if (len == (size_t)-1)
1184                         len = strlen(str);
1185                 r = rd_kafka_buf_write_i16(rkbuf, (int16_t) len);
1186                 if (str)
1187                         rd_kafka_buf_write(rkbuf, str, len);
1188                 return r;
1189         }
1190 
1191         /* COMPACT_STRING lengths are:
1192          *  0   = NULL,
1193          *  1   = empty
1194          *  N.. = length + 1
1195          */
1196         if (!str)
1197                 len = 0;
1198         else if (len == (size_t)-1)
1199                 len = strlen(str) + 1;
1200         else
1201                 len++;
1202 
1203         r = rd_kafka_buf_write_uvarint(rkbuf, (uint64_t)len);
1204         if (len > 1)
1205                 rd_kafka_buf_write(rkbuf, str, len-1);
1206         return r;
1207 }
1208 
1209 
1210 
1211 /**
1212  * Push (i.e., no copy) Kafka string to buffer iovec
1213  */
rd_kafka_buf_push_kstr(rd_kafka_buf_t * rkbuf,const rd_kafkap_str_t * kstr)1214 static RD_INLINE void rd_kafka_buf_push_kstr (rd_kafka_buf_t *rkbuf,
1215                                              const rd_kafkap_str_t *kstr) {
1216 	rd_kafka_buf_push(rkbuf, RD_KAFKAP_STR_SER(kstr),
1217 			  RD_KAFKAP_STR_SIZE(kstr), NULL);
1218 }
1219 
1220 
1221 
1222 /**
1223  * Write (copy) Kafka bytes to buffer.
1224  */
1225 static RD_INLINE size_t
rd_kafka_buf_write_kbytes(rd_kafka_buf_t * rkbuf,const rd_kafkap_bytes_t * kbytes)1226 rd_kafka_buf_write_kbytes (rd_kafka_buf_t *rkbuf,
1227                            const rd_kafkap_bytes_t *kbytes) {
1228         size_t len;
1229 
1230         if (!kbytes || RD_KAFKAP_BYTES_IS_NULL(kbytes))
1231                 return rd_kafka_buf_write_i32(rkbuf, -1);
1232 
1233         if (RD_KAFKAP_BYTES_IS_SERIALIZED(kbytes))
1234                 return rd_kafka_buf_write(rkbuf, RD_KAFKAP_BYTES_SER(kbytes),
1235                                           RD_KAFKAP_BYTES_SIZE(kbytes));
1236 
1237         len = RD_KAFKAP_BYTES_LEN(kbytes);
1238         rd_kafka_buf_write_i32(rkbuf, (int32_t)len);
1239         rd_kafka_buf_write(rkbuf, kbytes->data, len);
1240 
1241         return 4 + len;
1242 }
1243 
1244 /**
1245  * Push (i.e., no copy) Kafka bytes to buffer iovec
1246  */
rd_kafka_buf_push_kbytes(rd_kafka_buf_t * rkbuf,const rd_kafkap_bytes_t * kbytes)1247 static RD_INLINE void rd_kafka_buf_push_kbytes (rd_kafka_buf_t *rkbuf,
1248 					       const rd_kafkap_bytes_t *kbytes){
1249 	rd_kafka_buf_push(rkbuf, RD_KAFKAP_BYTES_SER(kbytes),
1250 			  RD_KAFKAP_BYTES_SIZE(kbytes), NULL);
1251 }
1252 
1253 /**
1254  * Write (copy) binary bytes to buffer as Kafka bytes encapsulate data.
1255  */
rd_kafka_buf_write_bytes(rd_kafka_buf_t * rkbuf,const void * payload,size_t size)1256 static RD_INLINE size_t rd_kafka_buf_write_bytes (rd_kafka_buf_t *rkbuf,
1257                                                  const void *payload, size_t size) {
1258         size_t r;
1259         if (!payload)
1260                 size = RD_KAFKAP_BYTES_LEN_NULL;
1261         r = rd_kafka_buf_write_i32(rkbuf, (int32_t) size);
1262         if (payload)
1263                 rd_kafka_buf_write(rkbuf, payload, size);
1264         return r;
1265 }
1266 
1267 
1268 /**
1269  * @brief Write bool to buffer.
1270  */
rd_kafka_buf_write_bool(rd_kafka_buf_t * rkbuf,rd_bool_t v)1271 static RD_INLINE size_t rd_kafka_buf_write_bool (rd_kafka_buf_t *rkbuf,
1272                                                  rd_bool_t v) {
1273         return rd_kafka_buf_write_i8(rkbuf, (int8_t)v);
1274 }
1275 
1276 
1277 /**
1278  * Write Kafka Message to buffer
1279  * The number of bytes written is returned in '*outlenp'.
1280  *
1281  * Returns the buffer offset of the first byte.
1282  */
1283 size_t rd_kafka_buf_write_Message (rd_kafka_broker_t *rkb,
1284 				   rd_kafka_buf_t *rkbuf,
1285 				   int64_t Offset, int8_t MagicByte,
1286 				   int8_t Attributes, int64_t Timestamp,
1287 				   const void *key, int32_t key_len,
1288 				   const void *payload, int32_t len,
1289 				   int *outlenp);
1290 
1291 /**
1292  * Start calculating CRC from now and track it in '*crcp'.
1293  */
rd_kafka_buf_crc_init(rd_kafka_buf_t * rkbuf)1294 static RD_INLINE RD_UNUSED void rd_kafka_buf_crc_init (rd_kafka_buf_t *rkbuf) {
1295 	rd_kafka_assert(NULL, !(rkbuf->rkbuf_flags & RD_KAFKA_OP_F_CRC));
1296 	rkbuf->rkbuf_flags |= RD_KAFKA_OP_F_CRC;
1297 	rkbuf->rkbuf_crc = rd_crc32_init();
1298 }
1299 
1300 /**
1301  * Finalizes CRC calculation and returns the calculated checksum.
1302  */
1303 static RD_INLINE RD_UNUSED
rd_kafka_buf_crc_finalize(rd_kafka_buf_t * rkbuf)1304 rd_crc32_t rd_kafka_buf_crc_finalize (rd_kafka_buf_t *rkbuf) {
1305 	rkbuf->rkbuf_flags &= ~RD_KAFKA_OP_F_CRC;
1306 	return rd_crc32_finalize(rkbuf->rkbuf_crc);
1307 }
1308 
1309 
1310 
1311 
1312 
1313 /**
1314  * @brief Check if buffer's replyq.version is outdated.
1315  * @param rkbuf: may be NULL, for convenience.
1316  *
1317  * @returns 1 if this is an outdated buffer, else 0.
1318  */
1319 static RD_UNUSED RD_INLINE int
rd_kafka_buf_version_outdated(const rd_kafka_buf_t * rkbuf,int version)1320 rd_kafka_buf_version_outdated (const rd_kafka_buf_t *rkbuf, int version) {
1321         return rkbuf && rkbuf->rkbuf_replyq.version &&
1322                 rkbuf->rkbuf_replyq.version < version;
1323 }
1324 
1325 
1326 void rd_kafka_buf_set_maker (rd_kafka_buf_t *rkbuf,
1327                              rd_kafka_make_req_cb_t *make_cb,
1328                              void *make_opaque,
1329                              void (*free_make_opaque_cb) (void *make_opaque));
1330 
1331 #endif /* _RDKAFKA_BUF_H_ */
1332