1 /*
2  * kcat - Apache Kafka consumer and producer
3  *
4  * Copyright (c) 2014-2021, Magnus Edenhill
5  * All rights reserved.
6  *
7  * Redistribution and use in source and binary forms, with or without
8  * modification, are permitted provided that the following conditions are met:
9  *
10  * 1. Redistributions of source code must retain the above copyright notice,
11  *    this list of conditions and the following disclaimer.
12  * 2. Redistributions in binary form must reproduce the above copyright notice,
13  *    this list of conditions and the following disclaimer in the documentation
14  *    and/or other materials provided with the distribution.
15  *
16  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
17  * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
18  * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
19  * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
20  * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
21  * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
22  * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
23  * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
24  * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
25  * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
26  * POSSIBILITY OF SUCH DAMAGE.
27  */
28 
29 #ifndef _MSC_VER
30 #include <unistd.h>
31 #include <syslog.h>
32 #include <sys/time.h>
33 #include <sys/mman.h>
34 #else
35 #pragma comment(lib, "ws2_32.lib")
36 #include "win32/wingetopt.h"
37 #include <io.h>
38 #endif
39 #include <stdio.h>
40 #include <stdlib.h>
41 #include <stdarg.h>
42 #include <signal.h>
43 #include <ctype.h>
44 
45 #include <sys/types.h>
46 #include <sys/stat.h>
47 #include <fcntl.h>
48 
49 
50 
51 #include "kcat.h"
52 #include "input.h"
53 
54 #if RD_KAFKA_VERSION >= 0x01040000
55 #define ENABLE_TXNS 1
56 #endif
57 
58 #if RD_KAFKA_VERSION >= 0x01060000
59 #define ENABLE_INCREMENTAL_ASSIGN 1
60 #endif
61 
62 
63 struct conf conf = {
64         .run = 1,
65         .verbosity = 1,
66         .exitonerror = 1,
67         .partition = RD_KAFKA_PARTITION_UA,
68         .msg_size = 1024*1024,
69         .null_str = "NULL",
70         .fixed_key = NULL,
71         .metadata_timeout = 5,
72         .offset = RD_KAFKA_OFFSET_INVALID,
73 };
74 
75 static struct stats {
76         uint64_t tx;
77         uint64_t tx_err_q;
78         uint64_t tx_err_dr;
79         uint64_t tx_delivered;
80 
81         uint64_t rx;
82 } stats;
83 
84 
85 /* Partition's stopped state array */
86 int *part_stop = NULL;
87 /* Number of partitions that are stopped */
88 int part_stop_cnt = 0;
89 /* Threshold level (partitions stopped) before exiting */
90 int part_stop_thres = 0;
91 
92 
93 
94 /**
95  * Fatal error: print error and exit
96  */
fatal0(const char * func,int line,const char * fmt,...)97 void RD_NORETURN fatal0 (const char *func, int line,
98                          const char *fmt, ...) {
99         va_list ap;
100         char buf[1024];
101 
102         va_start(ap, fmt);
103         vsnprintf(buf, sizeof(buf), fmt, ap);
104         va_end(ap);
105 
106         KC_INFO(2, "Fatal error at %s:%i:\n", func, line);
107         fprintf(stderr, "%% ERROR: %s\n", buf);
108         exit(1);
109 }
110 
111 /**
112  * Print error and exit if needed
113  */
error0(int exitonerror,const char * func,int line,const char * fmt,...)114 void error0 (int exitonerror, const char *func, int line,
115              const char *fmt, ...) {
116         va_list ap;
117         char buf[1024];
118 
119         va_start(ap, fmt);
120         vsnprintf(buf, sizeof(buf), fmt, ap);
121         va_end(ap);
122 
123         if (exitonerror)
124                 KC_INFO(2, "Error at %s:%i:\n", func, line);
125 
126         fprintf(stderr, "%% ERROR: %s%s\n",
127                 buf, exitonerror ? ": terminating":"");
128 
129         if (exitonerror)
130                 exit(1);
131 }
132 
133 
134 
135 /**
136  * The delivery report callback is called once per message to
137  * report delivery success or failure.
138  */
dr_msg_cb(rd_kafka_t * rk,const rd_kafka_message_t * rkmessage,void * opaque)139 static void dr_msg_cb (rd_kafka_t *rk, const rd_kafka_message_t *rkmessage,
140                        void *opaque) {
141         static int say_once = 1;
142         int32_t broker_id = -1;
143         struct buf *b = rkmessage->_private;
144 
145         if (b)
146                 buf_destroy(b);
147 
148         if (rkmessage->err) {
149                 KC_INFO(1, "Delivery failed for message: %s\n",
150                         rd_kafka_err2str(rkmessage->err));
151                 stats.tx_err_dr++;
152                 return;
153         }
154 
155 #if RD_KAFKA_VERSION >= 0x010500ff
156         broker_id = rd_kafka_message_broker_id(rkmessage);
157 #endif
158 
159         KC_INFO(3,
160                 "Message delivered to partition %"PRId32" (offset %"PRId64") "
161                 "on broker %"PRId32"\n",
162                 rkmessage->partition, rkmessage->offset, broker_id);
163 
164         if (rkmessage->offset == 0 && say_once) {
165                 KC_INFO(3, "Enable message offset reporting "
166                         "with '-X topic.produce.offset.report=true'\n");
167                 say_once = 0;
168         }
169         stats.tx_delivered++;
170 }
171 
172 
173 /**
174  * Produces a single message, retries on queue congestion, and
175  * exits hard on error.
176  */
produce(void * buf,size_t len,const void * key,size_t key_len,int msgflags,void * msg_opaque)177 static void produce (void *buf, size_t len,
178                      const void *key, size_t key_len, int msgflags,
179                      void *msg_opaque) {
180         rd_kafka_headers_t *hdrs = NULL;
181 
182         /* Headers are freed on successful producev(), pass a copy. */
183         if (conf.headers)
184                 hdrs = rd_kafka_headers_copy(conf.headers);
185 
186         /* Produce message: keep trying until it succeeds. */
187         do {
188                 rd_kafka_resp_err_t err;
189 
190                 if (!conf.run)
191                         KC_FATAL("Program terminated while "
192                                  "producing message of %zd bytes", len);
193 
194                 err = rd_kafka_producev(
195                         conf.rk,
196                         RD_KAFKA_V_RKT(conf.rkt),
197                         RD_KAFKA_V_PARTITION(conf.partition),
198                         RD_KAFKA_V_MSGFLAGS(msgflags),
199                         RD_KAFKA_V_VALUE(buf, len),
200                         RD_KAFKA_V_KEY(key, key_len),
201                         RD_KAFKA_V_HEADERS(hdrs),
202                         RD_KAFKA_V_OPAQUE(msg_opaque),
203                         RD_KAFKA_V_END);
204 
205                 if (!err) {
206                         stats.tx++;
207                         break;
208                 }
209 
210                 if (err != RD_KAFKA_RESP_ERR__QUEUE_FULL)
211                         KC_FATAL("Failed to produce message (%zd bytes): %s",
212                                  len, rd_kafka_err2str(err));
213 
214                 stats.tx_err_q++;
215 
216                 /* Internal queue full, sleep to allow
217                  * messages to be produced/time out
218                  * before trying again. */
219                 rd_kafka_poll(conf.rk, 5);
220         } while (1);
221 
222         /* Poll for delivery reports, errors, etc. */
223         rd_kafka_poll(conf.rk, 0);
224 }
225 
226 
227 /**
228  * Produce contents of file as a single message.
229  * Returns the file length on success, else -1.
230  */
produce_file(const char * path)231 static ssize_t produce_file (const char *path) {
232         int fd;
233         void *ptr;
234         struct stat st;
235         ssize_t sz;
236         int msgflags = 0;
237 
238         if ((fd = _COMPAT(open)(path, O_RDONLY)) == -1) {
239                 KC_INFO(1, "Failed to open %s: %s\n", path, strerror(errno));
240                 return -1;
241         }
242 
243         if (fstat(fd, &st) == -1) {
244                 KC_INFO(1, "Failed to stat %s: %s\n", path, strerror(errno));
245                 _COMPAT(close)(fd);
246                 return -1;
247         }
248 
249         if (st.st_size == 0) {
250                 KC_INFO(3, "Skipping empty file %s\n", path);
251                 _COMPAT(close)(fd);
252                 return 0;
253         }
254 
255 #ifndef _MSC_VER
256         ptr = mmap(NULL, st.st_size, PROT_READ, MAP_PRIVATE, fd, 0);
257         if (ptr == MAP_FAILED) {
258                 KC_INFO(1, "Failed to mmap %s: %s\n", path, strerror(errno));
259                 _COMPAT(close)(fd);
260                 return -1;
261         }
262         sz = st.st_size;
263         msgflags = RD_KAFKA_MSG_F_COPY;
264 #else
265         ptr = malloc(st.st_size);
266         if (!ptr) {
267                 KC_INFO(1, "Failed to allocate message for %s: %s\n",
268                         path, strerror(errno));
269                 _COMPAT(close)(fd);
270                 return -1;
271         }
272 
273         sz = _read(fd, ptr, st.st_size);
274         if (sz < st.st_size) {
275                 KC_INFO(1, "Read failed for %s (%zd/%zd): %s\n",
276                         path, sz, (size_t)st.st_size, sz == -1 ? strerror(errno) :
277                         "incomplete read");
278                 free(ptr);
279                 close(fd);
280                 return -1;
281         }
282         msgflags = RD_KAFKA_MSG_F_FREE;
283 #endif
284 
285         KC_INFO(4, "Producing file %s (%"PRIdMAX" bytes)\n",
286                 path, (intmax_t)st.st_size);
287         produce(ptr, sz, conf.fixed_key, conf.fixed_key_len, msgflags, NULL);
288 
289         _COMPAT(close)(fd);
290 
291         if (!(msgflags & RD_KAFKA_MSG_F_FREE)) {
292 #ifndef _MSC_VER
293                 munmap(ptr, st.st_size);
294 #else
295                 free(ptr);
296 #endif
297         }
298         return sz;
299 }
300 
301 
rd_strnstr(const char * haystack,size_t size,const char * needle,size_t needle_len)302 static char *rd_strnstr (const char *haystack, size_t size,
303                          const char *needle, size_t needle_len) {
304         const char *nend = needle + needle_len - 1;
305         const char *t;
306         size_t of = needle_len - 1;
307 
308         while (of < size &&
309                (t = (const char *)memchr((void *)(haystack + of),
310                                          (int)*nend,
311                                          size - of))) {
312                 const char *n = nend;
313                 const char *p = t;
314 
315                 do {
316                         n--;
317                         p--;
318                 } while (n >= needle && *n == *p);
319 
320                 if (n < needle)
321                         return (char *)p+1;
322 
323                 of = (size_t)(t - haystack) + 1;
324         }
325 
326         return NULL;
327 }
328 
329 
330 /**
331  * Run producer, reading messages from 'fp' and producing to kafka.
332  * Or if 'pathcnt' is > 0, read messages from files in 'paths' instead.
333  */
producer_run(FILE * fp,char ** paths,int pathcnt)334 static void producer_run (FILE *fp, char **paths, int pathcnt) {
335         char    errstr[512];
336         char    tmp[16];
337         size_t  tsize = sizeof(tmp);
338 
339         if (rd_kafka_conf_get(conf.rk_conf, "transactional.id",
340                               tmp, &tsize) == RD_KAFKA_CONF_OK && tsize > 1) {
341                 KC_INFO(1, "Using transactional producer\n");
342                 conf.txn = 1;
343         }
344 
345         tsize = sizeof(tmp);
346         if (rd_kafka_conf_get(conf.rk_conf, "message.max.bytes",
347                               tmp, &tsize) == RD_KAFKA_CONF_OK && tsize > 1) {
348                 int msg_max_bytes = atoi(tmp);
349                 KC_INFO(3, "Setting producer input buffer max size to "
350                         "message.max.bytes value %d\n", msg_max_bytes);
351                 conf.msg_size = msg_max_bytes;
352         }
353 
354         /* Assign per-message delivery report callback. */
355         rd_kafka_conf_set_dr_msg_cb(conf.rk_conf, dr_msg_cb);
356 
357         /* Create producer */
358         if (!(conf.rk = rd_kafka_new(RD_KAFKA_PRODUCER, conf.rk_conf,
359                                      errstr, sizeof(errstr))))
360                 KC_FATAL("Failed to create producer: %s", errstr);
361 
362         if (!conf.debug && conf.verbosity == 0)
363                 rd_kafka_set_log_level(conf.rk, 0);
364 
365 #if ENABLE_TXNS
366         if (conf.txn) {
367                 rd_kafka_error_t *error;
368 
369                 error = rd_kafka_init_transactions(conf.rk,
370                                                    conf.metadata_timeout*1000);
371                 if (error)
372                         KC_FATAL("init_transactions(): %s",
373                                  rd_kafka_error_string(error));
374 
375                 error = rd_kafka_begin_transaction(conf.rk);
376                 if (error)
377                         KC_FATAL("begin_transaction(): %s",
378                                  rd_kafka_error_string(error));
379         }
380 #endif
381 
382         /* Create topic */
383         if (!(conf.rkt = rd_kafka_topic_new(conf.rk, conf.topic,
384                                             conf.rkt_conf)))
385                 KC_FATAL("Failed to create topic %s: %s", conf.topic,
386                          rd_kafka_err2str(rd_kafka_last_error()));
387 
388         conf.rk_conf  = NULL;
389         conf.rkt_conf = NULL;
390 
391 
392         if (pathcnt > 0 && !(conf.flags & CONF_F_LINE)) {
393                 int i;
394                 int good = 0;
395                 /* Read messages from files, each file is its own message. */
396 
397                 for (i = 0 ; i < pathcnt ; i++)
398                         if (produce_file(paths[i]) != -1)
399                                 good++;
400 
401                 if (!good)
402                         conf.exitcode = 1;
403                 else if (good < pathcnt)
404                         KC_INFO(1, "Failed to produce from %i/%i files\n",
405                                 pathcnt - good, pathcnt);
406 
407         } else {
408                 struct inbuf inbuf;
409                 struct buf *b;
410 
411                 inbuf_init(&inbuf, conf.msg_size, conf.delim, conf.delim_size);
412 
413                 /* Read messages from input, delimited by conf.delim */
414                 while (conf.run &&
415                        inbuf_read_to_delimeter(&inbuf, fp, &b)) {
416                         int msgflags = 0;
417                         char *buf = b->buf;
418                         char *key = NULL;
419                         size_t key_len = 0;
420                         size_t len = b->size;
421 
422                         if (len == 0) {
423                                 buf_destroy(b);
424                                 continue;
425                         }
426 
427                         /* Extract key, if desired and found. */
428                         if (conf.flags & CONF_F_KEY_DELIM) {
429                                 char *t;
430                                 if ((t = rd_strnstr(buf, len,
431                                                     conf.key_delim,
432                                                     conf.key_delim_size))) {
433                                         key_len = (size_t)(t-buf);
434                                         key     = buf;
435                                         buf     = t + conf.key_delim_size;
436                                         len    -= key_len + conf.key_delim_size;
437 
438                                         if (conf.flags & CONF_F_NULL) {
439                                                 if (len == 0)
440                                                         buf = NULL;
441                                                 if (key_len == 0)
442                                                         key = NULL;
443                                         }
444                                 }
445                         }
446 
447                         if (!key && conf.fixed_key) {
448                                 key = conf.fixed_key;
449                                 key_len = conf.fixed_key_len;
450                         }
451 
452                         if (len < 1024) {
453                                 /* If message is smaller than this arbitrary
454                                  * threshold it will be more effective to
455                                  * copy the data in librdkafka. */
456                                 msgflags |= RD_KAFKA_MSG_F_COPY;
457                         }
458 
459                         /* Produce message */
460                         produce(buf, len, key, key_len, msgflags,
461                                 (msgflags & RD_KAFKA_MSG_F_COPY) ?
462                                 NULL : b);
463 
464                         if (conf.flags & CONF_F_TEE &&
465                             fwrite(b->buf, b->size, 1, stdout) != 1)
466                                 KC_FATAL("Tee write error for message "
467                                          "of %zd bytes: %s",
468                                          b->size, strerror(errno));
469 
470                         if (msgflags & RD_KAFKA_MSG_F_COPY) {
471                                 /* librdkafka made a copy of the input. */
472                                 buf_destroy(b);
473                         }
474 
475                         /* Enforce -c <cnt> */
476                         if (stats.tx == (uint64_t)conf.msg_cnt)
477                                 conf.run = 0;
478                 }
479 
480                 if (conf.run) {
481                         if (!feof(fp))
482                                 KC_FATAL("Unable to read message: %s",
483                                          strerror(errno));
484                 }
485         }
486 
487 #if ENABLE_TXNS
488         if (conf.txn) {
489                 rd_kafka_error_t *error;
490                 const char *what;
491 
492                 if (conf.term_sig) {
493                         KC_INFO(0,
494                                 "Aborting transaction due to "
495                                 "termination signal\n");
496                         what = "abort_transaction()";
497                         error = rd_kafka_abort_transaction(
498                                 conf.rk, conf.metadata_timeout * 1000);
499                 } else {
500                         KC_INFO(1, "Committing transaction\n");
501                         what = "commit_transaction()";
502                         error = rd_kafka_commit_transaction(
503                                 conf.rk, conf.metadata_timeout * 1000);
504                         if (!error)
505                                 KC_INFO(1,
506                                         "Transaction successfully committed\n");
507                 }
508 
509                 if (error)
510                         KC_FATAL("%s: %s", what, rd_kafka_error_string(error));
511         }
512 #endif
513 
514 
515         /* Wait for all messages to be transmitted */
516         conf.run = 1;
517         while (conf.run && rd_kafka_outq_len(conf.rk))
518                 rd_kafka_poll(conf.rk, 50);
519 
520         rd_kafka_topic_destroy(conf.rkt);
521         rd_kafka_destroy(conf.rk);
522 
523         if (stats.tx_err_dr)
524                 conf.exitcode = 1;
525 }
526 
stop_partition(rd_kafka_message_t * rkmessage)527 static void stop_partition (rd_kafka_message_t *rkmessage) {
528         if (!part_stop[rkmessage->partition]) {
529                 /* Stop consuming this partition */
530                 rd_kafka_consume_stop(rkmessage->rkt,
531                                       rkmessage->partition);
532                 part_stop[rkmessage->partition] = 1;
533                 part_stop_cnt++;
534                 if (part_stop_cnt >= part_stop_thres)
535                         conf.run = 0;
536         }
537 }
538 
539 
540 /**
541  * @brief Mark partition as not at EOF
542  */
partition_not_eof(const rd_kafka_message_t * rkmessage)543 static void partition_not_eof (const rd_kafka_message_t *rkmessage) {
544         rd_kafka_topic_partition_t *rktpar;
545 
546         if (conf.mode != 'G' || !conf.exit_eof ||
547             !conf.assignment || conf.eof_cnt == 0)
548                 return;
549 
550         /* Find partition in assignment */
551         rktpar = rd_kafka_topic_partition_list_find(
552                 conf.assignment,
553                 rd_kafka_topic_name(rkmessage->rkt),
554                 rkmessage->partition);
555 
556         if (!rktpar || rktpar->err != RD_KAFKA_RESP_ERR__PARTITION_EOF)
557                 return;
558 
559         rktpar->err = RD_KAFKA_RESP_ERR_NO_ERROR;
560         conf.eof_cnt--;
561 }
562 
563 
564 /**
565  * @brief Mark partition as at EOF
566  */
partition_at_eof(rd_kafka_message_t * rkmessage)567 static void partition_at_eof (rd_kafka_message_t *rkmessage) {
568 
569         if (conf.mode == 'C') {
570                 /* Store EOF offset.
571                  * If partition is empty and at offset 0,
572                  * store future first message (0). */
573                 rd_kafka_offset_store(rkmessage->rkt,
574                                       rkmessage->partition,
575                                       rkmessage->offset == 0 ?
576                                       0 : rkmessage->offset-1);
577                 if (conf.exit_eof) {
578                         stop_partition(rkmessage);
579                 }
580 
581         } else if (conf.mode == 'G' && conf.exit_eof && conf.assignment) {
582                 /* Find partition in assignment */
583                 rd_kafka_topic_partition_t *rktpar;
584 
585                 rktpar = rd_kafka_topic_partition_list_find(
586                         conf.assignment,
587                         rd_kafka_topic_name(rkmessage->rkt),
588                         rkmessage->partition);
589 
590                 if (rktpar && rktpar->err != RD_KAFKA_RESP_ERR__PARTITION_EOF) {
591                         rktpar->err = RD_KAFKA_RESP_ERR__PARTITION_EOF;
592                         conf.eof_cnt++;
593 
594                         if (conf.eof_cnt == conf.assignment->cnt)
595                                 conf.run = 0;
596                 }
597         }
598 
599         KC_INFO(1, "Reached end of topic %s [%"PRId32"] "
600                 "at offset %"PRId64"%s\n",
601                 rd_kafka_topic_name(rkmessage->rkt),
602                 rkmessage->partition,
603                 rkmessage->offset,
604                 !conf.run ? ": exiting" : "");
605 }
606 
607 
608 /**
609  * Consume callback, called for each message consumed.
610  */
consume_cb(rd_kafka_message_t * rkmessage,void * opaque)611 static void consume_cb (rd_kafka_message_t *rkmessage, void *opaque) {
612         FILE *fp = opaque;
613 
614         if (!conf.run)
615                 return;
616 
617         if (rkmessage->err) {
618                 if (rkmessage->err == RD_KAFKA_RESP_ERR__PARTITION_EOF) {
619                         partition_at_eof(rkmessage);
620                         return;
621                 }
622 
623                 if (rkmessage->rkt)
624                         KC_FATAL("Topic %s [%"PRId32"] error: %s",
625                                  rd_kafka_topic_name(rkmessage->rkt),
626                                  rkmessage->partition,
627                                  rd_kafka_message_errstr(rkmessage));
628                 else
629                         KC_FATAL("Consumer error: %s",
630                                  rd_kafka_message_errstr(rkmessage));
631 
632         } else {
633                 partition_not_eof(rkmessage);
634         }
635 
636         if (conf.stopts) {
637                 int64_t ts = rd_kafka_message_timestamp(rkmessage, NULL);
638                 if (ts >= conf.stopts) {
639                         stop_partition(rkmessage);
640                         KC_INFO(1, "Reached stop timestamp for topic "
641                                 "%s [%"PRId32"] "
642                                 "at offset %"PRId64"%s\n",
643                                 rd_kafka_topic_name(rkmessage->rkt),
644                                 rkmessage->partition,
645                                 rkmessage->offset,
646                                 !conf.run ? ": exiting" : "");
647                         return;
648                 }
649         }
650 
651         /* Print message */
652         fmt_msg_output(fp, rkmessage);
653 
654         if (conf.mode == 'C') {
655                 rd_kafka_offset_store(rkmessage->rkt,
656                                       rkmessage->partition,
657                                       rkmessage->offset);
658         }
659 
660         if (++stats.rx == (uint64_t)conf.msg_cnt) {
661                 conf.run = 0;
662                 rd_kafka_yield(conf.rk);
663         }
664 }
665 
666 
667 #if RD_KAFKA_VERSION >= 0x00090000
throttle_cb(rd_kafka_t * rk,const char * broker_name,int32_t broker_id,int throttle_time_ms,void * opaque)668 static void throttle_cb (rd_kafka_t *rk, const char *broker_name,
669                          int32_t broker_id, int throttle_time_ms, void *opaque){
670         KC_INFO(1, "Broker %s (%"PRId32") throttled request for %dms\n",
671                 broker_name, broker_id, throttle_time_ms);
672 }
673 #endif
674 
675 #if ENABLE_KAFKACONSUMER
print_partition_list(int is_assigned,const rd_kafka_topic_partition_list_t * partitions)676 static void print_partition_list (int is_assigned,
677                                   const rd_kafka_topic_partition_list_t
678                                   *partitions) {
679         int i;
680         for (i = 0 ; i < partitions->cnt ; i++) {
681                 fprintf(stderr, "%s%s [%"PRId32"]",
682                         i > 0 ? ", ":"",
683                         partitions->elems[i].topic,
684                         partitions->elems[i].partition);
685         }
686         fprintf(stderr, "\n");
687 }
688 
689 
690 #if ENABLE_INCREMENTAL_ASSIGN
691 static void
incremental_rebalance_cb(rd_kafka_t * rk,rd_kafka_resp_err_t err,rd_kafka_topic_partition_list_t * partitions,void * opaque)692 incremental_rebalance_cb (rd_kafka_t *rk, rd_kafka_resp_err_t err,
693                           rd_kafka_topic_partition_list_t *partitions,
694                           void *opaque) {
695         rd_kafka_error_t *error = NULL;
696         int i;
697 
698         KC_INFO(1, "Group %s rebalanced: incremental %s of %d partition(s) "
699                 "(memberid %s%s, %s rebalance protocol): ",
700                 conf.group,
701                 err == RD_KAFKA_RESP_ERR__ASSIGN_PARTITIONS ?
702                 "assignment" : "revoke",
703                 partitions->cnt,
704                 rd_kafka_memberid(rk),
705                 rd_kafka_assignment_lost(rk) ? ", assignment lost" : "",
706                 rd_kafka_rebalance_protocol(rk));
707 
708         switch (err)
709         {
710         case RD_KAFKA_RESP_ERR__ASSIGN_PARTITIONS:
711                 if (conf.verbosity >= 1)
712                         print_partition_list(1, partitions);
713 
714                 if (!conf.assignment)
715                         conf.assignment =
716                                 rd_kafka_topic_partition_list_new(
717                                         partitions->cnt);
718 
719                 for (i = 0 ; i < partitions->cnt ; i++) {
720                         rd_kafka_topic_partition_t *rktpar =
721                                 &partitions->elems[i];
722 
723                         /* Set start offset from -o .. */
724                         if (conf.offset != RD_KAFKA_OFFSET_INVALID)
725                                 rktpar->offset = conf.offset;
726 
727                         rktpar->offset = conf.offset;
728 
729                         rd_kafka_topic_partition_list_add(conf.assignment,
730                                                           rktpar->topic,
731                                                           rktpar->partition);
732                 }
733                 error = rd_kafka_incremental_assign(rk, partitions);
734                 break;
735 
736         case RD_KAFKA_RESP_ERR__REVOKE_PARTITIONS:
737                 if (conf.verbosity >= 1)
738                         print_partition_list(1, partitions);
739 
740                 /* Remove partitions from conf.assignment in reverse order
741                  * to minimize array shrink sizes. */
742                 for (i = partitions->cnt - 1 ;
743                      conf.assignment && i >= 0 ;
744                      i--) {
745                         rd_kafka_topic_partition_t *rktpar =
746                                 &partitions->elems[i];
747 
748                         rd_kafka_topic_partition_list_del(conf.assignment,
749                                                           rktpar->topic,
750                                                           rktpar->partition);
751                 }
752 
753                 error = rd_kafka_incremental_unassign(rk, partitions);
754                 break;
755 
756         default:
757                 KC_INFO(0, "failed: %s\n", rd_kafka_err2str(err));
758                 break;
759         }
760 
761         if (error) {
762                 KC_ERROR("Incremental rebalance %s of %d partition(s) "
763                          "failed: %s\n",
764                          err == RD_KAFKA_RESP_ERR__ASSIGN_PARTITIONS ?
765                          "assign" : "unassign",
766                          partitions->cnt, rd_kafka_error_string(error));
767                 rd_kafka_error_destroy(error);
768         }
769 }
770 #endif
771 
rebalance_cb(rd_kafka_t * rk,rd_kafka_resp_err_t err,rd_kafka_topic_partition_list_t * partitions,void * opaque)772 static void rebalance_cb (rd_kafka_t *rk, rd_kafka_resp_err_t err,
773                           rd_kafka_topic_partition_list_t *partitions,
774                           void *opaque) {
775         rd_kafka_resp_err_t ret_err = RD_KAFKA_RESP_ERR_NO_ERROR;
776 
777 #if ENABLE_INCREMENTAL_ASSIGN
778         if (!strcmp(rd_kafka_rebalance_protocol(rk), "COOPERATIVE")) {
779                 incremental_rebalance_cb(rk, err, partitions, opaque);
780                 return;
781         }
782 #endif
783 
784         KC_INFO(1, "Group %s rebalanced (memberid %s): ",
785                 conf.group, rd_kafka_memberid(rk));
786 
787         switch (err)
788         {
789         case RD_KAFKA_RESP_ERR__ASSIGN_PARTITIONS:
790                 if (conf.verbosity >= 1) {
791                         fprintf(stderr, "assigned: ");
792                         print_partition_list(1, partitions);
793                 }
794                 if (conf.offset != RD_KAFKA_OFFSET_INVALID) {
795                         int i;
796                         for (i = 0 ; i < partitions->cnt ; i++)
797                                 partitions->elems[i].offset = conf.offset;
798                 }
799 
800                 if (conf.assignment)
801                         rd_kafka_topic_partition_list_destroy(conf.assignment);
802                 conf.assignment =
803                         rd_kafka_topic_partition_list_copy(partitions);
804 
805                 ret_err = rd_kafka_assign(rk, partitions);
806                 break;
807 
808         case RD_KAFKA_RESP_ERR__REVOKE_PARTITIONS:
809                 if (conf.verbosity >= 1) {
810                         fprintf(stderr, "revoked: ");
811                         print_partition_list(1, partitions);
812                 }
813 
814                 if (conf.assignment) {
815                         rd_kafka_topic_partition_list_destroy(conf.assignment);
816                         conf.assignment = NULL;
817                 }
818 
819                 ret_err = rd_kafka_assign(rk, NULL);
820                 break;
821 
822         default:
823                 KC_INFO(0, "failed: %s\n", rd_kafka_err2str(err));
824                 break;
825         }
826 
827         if (ret_err)
828                 KC_ERROR("Rebalance %s of %d partition(s) failed: %s\n",
829                          err == RD_KAFKA_RESP_ERR__ASSIGN_PARTITIONS ?
830                          "assign" : "unassign",
831                          partitions->cnt, rd_kafka_err2str(ret_err));
832 }
833 
834 /**
835  * Run high-level KafkaConsumer, write messages to 'fp'
836  */
kafkaconsumer_run(FILE * fp,char * const * topics,int topic_cnt)837 static void kafkaconsumer_run (FILE *fp, char *const *topics, int topic_cnt) {
838         char    errstr[512];
839         rd_kafka_resp_err_t err;
840         rd_kafka_topic_partition_list_t *topiclist;
841         int i;
842 
843         rd_kafka_conf_set_rebalance_cb(conf.rk_conf, rebalance_cb);
844         rd_kafka_conf_set_default_topic_conf(conf.rk_conf, conf.rkt_conf);
845         conf.rkt_conf = NULL;
846 
847         /* Create consumer */
848         if (!(conf.rk = rd_kafka_new(RD_KAFKA_CONSUMER, conf.rk_conf,
849                                      errstr, sizeof(errstr))))
850                 KC_FATAL("Failed to create consumer: %s", errstr);
851         conf.rk_conf  = NULL;
852 
853         /* Forward main event queue to consumer queue so we can
854          * serve both queues with a single consumer_poll() call. */
855         rd_kafka_poll_set_consumer(conf.rk);
856 
857         if (conf.debug)
858                 rd_kafka_set_log_level(conf.rk, LOG_DEBUG);
859         else if (conf.verbosity == 0)
860                 rd_kafka_set_log_level(conf.rk, 0);
861 
862         /* Build subscription set */
863         topiclist = rd_kafka_topic_partition_list_new(topic_cnt);
864         for (i = 0 ; i < topic_cnt ; i++)
865                 rd_kafka_topic_partition_list_add(topiclist, topics[i], -1);
866 
867         /* Subscribe */
868         if ((err = rd_kafka_subscribe(conf.rk, topiclist)))
869                 KC_FATAL("Failed to subscribe to %d topics: %s\n",
870                          topiclist->cnt, rd_kafka_err2str(err));
871 
872         KC_INFO(1, "Waiting for group rebalance\n");
873 
874         rd_kafka_topic_partition_list_destroy(topiclist);
875 
876         /* Read messages from Kafka, write to 'fp'. */
877         while (conf.run) {
878                 rd_kafka_message_t *rkmessage;
879 
880                 rkmessage = rd_kafka_consumer_poll(conf.rk, 100);
881                 if (!rkmessage)
882                         continue;
883 
884                 consume_cb(rkmessage, fp);
885 
886                 rd_kafka_message_destroy(rkmessage);
887         }
888 
889         if ((err = rd_kafka_consumer_close(conf.rk)))
890                 KC_FATAL("Failed to close consumer: %s\n",
891                          rd_kafka_err2str(err));
892 
893         /* Wait for outstanding requests to finish. */
894         conf.run = 1;
895         while (conf.run && rd_kafka_outq_len(conf.rk) > 0)
896                 rd_kafka_poll(conf.rk, 50);
897 
898         if (conf.assignment)
899                 rd_kafka_topic_partition_list_destroy(conf.assignment);
900 
901         rd_kafka_destroy(conf.rk);
902 }
903 #endif
904 
905 /**
906  * Get offsets from conf.startts for consumer_run
907  */
get_offsets(rd_kafka_metadata_topic_t * topic)908 static int64_t *get_offsets (rd_kafka_metadata_topic_t *topic) {
909         int i;
910         int64_t *offsets;
911         rd_kafka_resp_err_t err;
912         rd_kafka_topic_partition_list_t *rktparlistp =
913                 rd_kafka_topic_partition_list_new(1);
914 
915         for (i = 0 ; i < topic->partition_cnt ; i++) {
916                 int32_t partition = topic->partitions[i].id;
917 
918                 /* If -p <part> was specified: skip unwanted partitions */
919                 if (conf.partition != RD_KAFKA_PARTITION_UA &&
920                     conf.partition != partition)
921                         continue;
922 
923                 rd_kafka_topic_partition_list_add(
924                         rktparlistp,
925                         rd_kafka_topic_name(conf.rkt),
926                         partition)->offset = conf.startts;
927 
928                 if (conf.partition != RD_KAFKA_PARTITION_UA)
929                         break;
930         }
931         err = rd_kafka_offsets_for_times(conf.rk, rktparlistp,
932                                          conf.metadata_timeout * 1000);
933         if (err)
934                 KC_FATAL("offsets_for_times failed: %s", rd_kafka_err2str(err));
935 
936         offsets = calloc(sizeof(int64_t), topic->partition_cnt);
937         for (i = 0 ; i < rktparlistp->cnt ; i++) {
938                 const rd_kafka_topic_partition_t *p = &rktparlistp->elems[i];
939                 offsets[p->partition] = p->offset;
940         }
941         rd_kafka_topic_partition_list_destroy(rktparlistp);
942 
943         return offsets;
944 }
945 
946 /**
947  * Run consumer, consuming messages from Kafka and writing to 'fp'.
948  */
consumer_run(FILE * fp)949 static void consumer_run (FILE *fp) {
950         char    errstr[512];
951         rd_kafka_resp_err_t err;
952         const rd_kafka_metadata_t *metadata;
953         int i;
954         int64_t *offsets = NULL;
955         rd_kafka_queue_t *rkqu;
956 
957         /* Create consumer */
958         if (!(conf.rk = rd_kafka_new(RD_KAFKA_CONSUMER, conf.rk_conf,
959                                      errstr, sizeof(errstr))))
960                 KC_FATAL("Failed to create consumer: %s", errstr);
961 
962         if (!conf.debug && conf.verbosity == 0)
963                 rd_kafka_set_log_level(conf.rk, 0);
964 
965         /* The callback-based consumer API's offset store granularity is
966          * not good enough for us, disable automatic offset store
967          * and do it explicitly per-message in the consume callback instead. */
968         if (rd_kafka_topic_conf_set(conf.rkt_conf,
969                                     "auto.commit.enable", "false",
970                                     errstr, sizeof(errstr)) != RD_KAFKA_CONF_OK)
971                 KC_FATAL("%s", errstr);
972 
973         /* Create topic */
974         if (!(conf.rkt = rd_kafka_topic_new(conf.rk, conf.topic,
975                                             conf.rkt_conf)))
976                 KC_FATAL("Failed to create topic %s: %s", conf.topic,
977                          rd_kafka_err2str(rd_kafka_last_error()));
978 
979         conf.rk_conf  = NULL;
980         conf.rkt_conf = NULL;
981 
982 
983         /* Query broker for topic + partition information. */
984         if ((err = rd_kafka_metadata(conf.rk, 0, conf.rkt, &metadata,
985                                      conf.metadata_timeout * 1000)))
986                 KC_FATAL("Failed to query metadata for topic %s: %s",
987                          rd_kafka_topic_name(conf.rkt), rd_kafka_err2str(err));
988 
989         /* Error handling */
990         if (metadata->topic_cnt == 0)
991                 KC_FATAL("No such topic in cluster: %s",
992                          rd_kafka_topic_name(conf.rkt));
993 
994         if ((err = metadata->topics[0].err))
995                 KC_FATAL("Topic %s error: %s",
996                          rd_kafka_topic_name(conf.rkt), rd_kafka_err2str(err));
997 
998         if (metadata->topics[0].partition_cnt == 0)
999                 KC_FATAL("Topic %s has no partitions",
1000                          rd_kafka_topic_name(conf.rkt));
1001 
1002         /* If Exit-at-EOF is enabled, set up array to track EOF
1003          * state for each partition. */
1004         if (conf.exit_eof || conf.stopts) {
1005                 part_stop = calloc(sizeof(*part_stop),
1006                                    metadata->topics[0].partition_cnt);
1007 
1008                 if (conf.partition != RD_KAFKA_PARTITION_UA)
1009                         part_stop_thres = 1;
1010                 else
1011                         part_stop_thres = metadata->topics[0].partition_cnt;
1012         }
1013 
1014 #if RD_KAFKA_VERSION >= 0x00090300
1015         if (conf.startts) {
1016                 offsets = get_offsets(&metadata->topics[0]);
1017         }
1018 #endif
1019 
1020         /* Create a shared queue that combines messages from
1021          * all wanted partitions. */
1022         rkqu = rd_kafka_queue_new(conf.rk);
1023 
1024         /* Start consuming from all wanted partitions. */
1025         for (i = 0 ; i < metadata->topics[0].partition_cnt ; i++) {
1026                 int32_t partition = metadata->topics[0].partitions[i].id;
1027 
1028                 /* If -p <part> was specified: skip unwanted partitions */
1029                 if (conf.partition != RD_KAFKA_PARTITION_UA &&
1030                     conf.partition != partition)
1031                         continue;
1032 
1033                 /* Start consumer for this partition */
1034                 if (rd_kafka_consume_start_queue(conf.rkt, partition,
1035                                                  offsets ? offsets[i] :
1036                                                  (conf.offset ==
1037                                                   RD_KAFKA_OFFSET_INVALID ?
1038                                                   RD_KAFKA_OFFSET_BEGINNING :
1039                                                   conf.offset),
1040                                                  rkqu) == -1)
1041                         KC_FATAL("Failed to start consuming "
1042                                  "topic %s [%"PRId32"]: %s",
1043                                  conf.topic, partition,
1044                                  rd_kafka_err2str(rd_kafka_last_error()));
1045 
1046                 if (conf.partition != RD_KAFKA_PARTITION_UA)
1047                         break;
1048         }
1049         free(offsets);
1050 
1051         if (conf.partition != RD_KAFKA_PARTITION_UA &&
1052             i == metadata->topics[0].partition_cnt)
1053                 KC_FATAL("Topic %s (with partitions 0..%i): "
1054                          "partition %i does not exist",
1055                          rd_kafka_topic_name(conf.rkt),
1056                          metadata->topics[0].partition_cnt-1,
1057                          conf.partition);
1058 
1059 
1060         /* Read messages from Kafka, write to 'fp'. */
1061         while (conf.run) {
1062                 rd_kafka_consume_callback_queue(rkqu, 100,
1063                                                 consume_cb, fp);
1064 
1065                 /* Poll for errors, etc */
1066                 rd_kafka_poll(conf.rk, 0);
1067         }
1068 
1069         /* Stop consuming */
1070         for (i = 0 ; i < metadata->topics[0].partition_cnt ; i++) {
1071                 int32_t partition = metadata->topics[0].partitions[i].id;
1072 
1073                 /* If -p <part> was specified: skip unwanted partitions */
1074                 if (conf.partition != RD_KAFKA_PARTITION_UA &&
1075                     conf.partition != partition)
1076                         continue;
1077 
1078                 /* Dont stop already stopped partitions */
1079                 if (!part_stop || !part_stop[partition])
1080                         rd_kafka_consume_stop(conf.rkt, partition);
1081 
1082                 rd_kafka_consume_stop(conf.rkt, partition);
1083         }
1084 
1085         /* Destroy shared queue */
1086         rd_kafka_queue_destroy(rkqu);
1087 
1088         /* Wait for outstanding requests to finish. */
1089         conf.run = 1;
1090         while (conf.run && rd_kafka_outq_len(conf.rk) > 0)
1091                 rd_kafka_poll(conf.rk, 50);
1092 
1093         if (conf.assignment)
1094                 rd_kafka_topic_partition_list_destroy(conf.assignment);
1095 
1096         rd_kafka_metadata_destroy(metadata);
1097         rd_kafka_topic_destroy(conf.rkt);
1098         rd_kafka_destroy(conf.rk);
1099 }
1100 
1101 
1102 /**
1103  * Print metadata information
1104  */
metadata_print(const rd_kafka_metadata_t * metadata,int32_t controllerid)1105 static void metadata_print (const rd_kafka_metadata_t *metadata,
1106                             int32_t controllerid) {
1107         int i, j, k;
1108 
1109         printf("Metadata for %s (from broker %"PRId32": %s):\n",
1110                conf.topic ? conf.topic : "all topics",
1111                metadata->orig_broker_id, metadata->orig_broker_name);
1112 
1113         /* Iterate brokers */
1114         printf(" %i brokers:\n", metadata->broker_cnt);
1115         for (i = 0 ; i < metadata->broker_cnt ; i++)
1116                 printf("  broker %"PRId32" at %s:%i%s\n",
1117                        metadata->brokers[i].id,
1118                        metadata->brokers[i].host,
1119                        metadata->brokers[i].port,
1120                        controllerid == metadata->brokers[i].id ?
1121                        " (controller)" : "");
1122 
1123         /* Iterate topics */
1124         printf(" %i topics:\n", metadata->topic_cnt);
1125         for (i = 0 ; i < metadata->topic_cnt ; i++) {
1126                 const rd_kafka_metadata_topic_t *t = &metadata->topics[i];
1127                 printf("  topic \"%s\" with %i partitions:",
1128                        t->topic,
1129                        t->partition_cnt);
1130                 if (t->err) {
1131                         printf(" %s", rd_kafka_err2str(t->err));
1132                         if (t->err == RD_KAFKA_RESP_ERR_LEADER_NOT_AVAILABLE)
1133                                 printf(" (try again)");
1134                 }
1135                 printf("\n");
1136 
1137                 /* Iterate topic's partitions */
1138                 for (j = 0 ; j < t->partition_cnt ; j++) {
1139                         const rd_kafka_metadata_partition_t *p;
1140                         p = &t->partitions[j];
1141                         printf("    partition %"PRId32", "
1142                                "leader %"PRId32", replicas: ",
1143                                p->id, p->leader);
1144 
1145                         /* Iterate partition's replicas */
1146                         for (k = 0 ; k < p->replica_cnt ; k++)
1147                                 printf("%s%"PRId32,
1148                                        k > 0 ? ",":"", p->replicas[k]);
1149 
1150                         /* Iterate partition's ISRs */
1151                         printf(", isrs: ");
1152                         for (k = 0 ; k < p->isr_cnt ; k++)
1153                                 printf("%s%"PRId32,
1154                                        k > 0 ? ",":"", p->isrs[k]);
1155                         if (p->err)
1156                                 printf(", %s\n", rd_kafka_err2str(p->err));
1157                         else
1158                                 printf("\n");
1159                 }
1160         }
1161 }
1162 
1163 
1164 /**
1165  * Lists metadata
1166  */
metadata_list(void)1167 static void metadata_list (void) {
1168         char    errstr[512];
1169         rd_kafka_resp_err_t err;
1170         const rd_kafka_metadata_t *metadata;
1171         int32_t controllerid = -1;
1172 
1173         /* Create handle */
1174         if (!(conf.rk = rd_kafka_new(RD_KAFKA_PRODUCER, conf.rk_conf,
1175                                      errstr, sizeof(errstr))))
1176                 KC_FATAL("Failed to create producer: %s", errstr);
1177 
1178         if (!conf.debug && conf.verbosity == 0)
1179                 rd_kafka_set_log_level(conf.rk, 0);
1180 
1181         /* Create topic, if specified */
1182         if (conf.topic &&
1183             !(conf.rkt = rd_kafka_topic_new(conf.rk, conf.topic,
1184                                             conf.rkt_conf)))
1185                 KC_FATAL("Failed to create topic %s: %s", conf.topic,
1186                          rd_kafka_err2str(rd_kafka_last_error()));
1187 
1188         conf.rk_conf  = NULL;
1189         conf.rkt_conf = NULL;
1190 
1191 
1192         /* Fetch metadata */
1193         err = rd_kafka_metadata(conf.rk, conf.rkt ? 0 : 1, conf.rkt,
1194                                 &metadata, conf.metadata_timeout * 1000);
1195         if (err != RD_KAFKA_RESP_ERR_NO_ERROR)
1196                 KC_FATAL("Failed to acquire metadata: %s%s",
1197                          rd_kafka_err2str(err),
1198                          err == RD_KAFKA_RESP_ERR__TRANSPORT ?
1199                          " (Are the brokers reachable? "
1200                          "Also try increasing the metadata timeout with "
1201                          "-m <timeout>?)" : "");
1202 
1203 #if HAVE_CONTROLLERID
1204         controllerid = rd_kafka_controllerid(conf.rk, 0);
1205 #endif
1206 
1207         /* Print metadata */
1208 #if ENABLE_JSON
1209         if (conf.flags & CONF_F_FMT_JSON)
1210                 metadata_print_json(metadata, controllerid);
1211         else
1212 #endif
1213                 metadata_print(metadata, controllerid);
1214 
1215         rd_kafka_metadata_destroy(metadata);
1216 
1217         if (conf.rkt)
1218                 rd_kafka_topic_destroy(conf.rkt);
1219         rd_kafka_destroy(conf.rk);
1220 }
1221 
1222 
1223 /**
1224  * Print usage and exit.
1225  */
usage(const char * argv0,int exitcode,const char * reason,int version_only)1226 static void RD_NORETURN usage (const char *argv0, int exitcode,
1227                                const char *reason,
1228                                int version_only) {
1229 
1230         FILE *out = stdout;
1231         char features[256];
1232         size_t flen;
1233         rd_kafka_conf_t *tmpconf;
1234 
1235         if (reason) {
1236                 out = stderr;
1237                 fprintf(out, "Error: %s\n\n", reason);
1238         }
1239 
1240         if (!version_only)
1241                 fprintf(out, "Usage: %s <options> [file1 file2 .. | topic1 topic2 ..]]\n",
1242                         argv0);
1243 
1244         /* Create a temporary config object to extract builtin.features */
1245         tmpconf = rd_kafka_conf_new();
1246         flen = sizeof(features);
1247         if (rd_kafka_conf_get(tmpconf, "builtin.features",
1248                               features, &flen) != RD_KAFKA_CONF_OK)
1249                 strncpy(features, "n/a", sizeof(features));
1250         rd_kafka_conf_destroy(tmpconf);
1251 
1252         fprintf(out,
1253                 "kcat - Apache Kafka producer and consumer tool\n"
1254                 "https://github.com/edenhill/kcat\n"
1255                 "Copyright (c) 2014-2021, Magnus Edenhill\n"
1256                 "Version %s (%s%slibrdkafka %s builtin.features=%s)\n"
1257                 "\n",
1258                 KCAT_VERSION,
1259                 ""
1260 #if ENABLE_JSON
1261                 "JSON, "
1262 #endif
1263 #if ENABLE_AVRO
1264                 "Avro, "
1265 #endif
1266 #if ENABLE_TXNS
1267                 "Transactions, "
1268 #endif
1269 #if ENABLE_INCREMENTAL_ASSIGN
1270                 "IncrementalAssign, "
1271 #endif
1272                 ,
1273 #if ENABLE_JSON
1274                 json_can_emit_verbatim() ? "JSONVerbatim, " : "",
1275 #else
1276                 "",
1277 #endif
1278                 rd_kafka_version_str(), features
1279                 );
1280 
1281         if (version_only)
1282                 exit(exitcode);
1283 
1284         fprintf(out, "\n"
1285                 "General options:\n"
1286                 "  -C | -P | -L | -Q  Mode: Consume, Produce, Metadata List, Query mode\n"
1287 #if ENABLE_KAFKACONSUMER
1288                 "  -G <group-id>      Mode: High-level KafkaConsumer (Kafka >=0.9 balanced consumer groups)\n"
1289                 "                     Expects a list of topics to subscribe to\n"
1290 #endif
1291                 "  -t <topic>         Topic to consume from, produce to, "
1292                 "or list\n"
1293                 "  -p <partition>     Partition\n"
1294                 "  -b <brokers,..>    Bootstrap broker(s) (host[:port])\n"
1295                 "  -D <delim>         Message delimiter string:\n"
1296                 "                     a-z | \\r | \\n | \\t | \\xNN ..\n"
1297                 "                     Default: \\n\n"
1298                 "  -K <delim>         Key delimiter (same format as -D)\n"
1299                 "  -c <cnt>           Limit message count\n"
1300                 "  -m <seconds>       Metadata (et.al.) request timeout.\n"
1301                 "                     This limits how long kcat will block\n"
1302                 "                     while waiting for initial metadata to be\n"
1303                 "                     retrieved from the Kafka cluster.\n"
1304                 "                     It also sets the timeout for the producer's\n"
1305                 "                     transaction commits, init, aborts, etc.\n"
1306                 "                     Default: 5 seconds.\n"
1307                 "  -F <config-file>   Read configuration properties from file,\n"
1308                 "                     file format is \"property=value\".\n"
1309                 "                     The KCAT_CONFIG=path environment can "
1310                 "also be used, but -F takes precedence.\n"
1311                 "                     The default configuration file is "
1312                 "$HOME/.config/kcat.conf\n"
1313                 "  -X list            List available librdkafka configuration "
1314                 "properties\n"
1315                 "  -X prop=val        Set librdkafka configuration property.\n"
1316                 "                     Properties prefixed with \"topic.\" are\n"
1317                 "                     applied as topic properties.\n"
1318 #if ENABLE_AVRO
1319                 "  -X schema.registry.prop=val Set libserdes configuration property "
1320                 "for the Avro/Schema-Registry client.\n"
1321 #endif
1322                 "  -X dump            Dump configuration and exit.\n"
1323                 "  -d <dbg1,...>      Enable librdkafka debugging:\n"
1324                 "                     " RD_KAFKA_DEBUG_CONTEXTS "\n"
1325                 "  -q                 Be quiet (verbosity set to 0)\n"
1326                 "  -v                 Increase verbosity\n"
1327                 "  -E                 Do not exit on non-fatal error\n"
1328                 "  -V                 Print version\n"
1329                 "  -h                 Print usage help\n"
1330                 "\n"
1331                 "Producer options:\n"
1332                 "  -z snappy|gzip|lz4 Message compression. Default: none\n"
1333                 "  -p -1              Use random partitioner\n"
1334                 "  -D <delim>         Delimiter to split input into messages\n"
1335                 "  -K <delim>         Delimiter to split input key and message\n"
1336                 "  -k <str>           Use a fixed key for all messages.\n"
1337                 "                     If combined with -K, per-message keys\n"
1338                 "                     takes precendence.\n"
1339                 "  -H <header=value>  Add Message Headers "
1340                 "(may be specified multiple times)\n"
1341                 "  -l                 Send messages from a file separated by\n"
1342                 "                     delimiter, as with stdin.\n"
1343                 "                     (only one file allowed)\n"
1344                 "  -T                 Output sent messages to stdout, acting like tee.\n"
1345                 "  -c <cnt>           Exit after producing this number "
1346                 "of messages\n"
1347                 "  -Z                 Send empty messages as NULL messages\n"
1348                 "  file1 file2..      Read messages from files.\n"
1349                 "                     With -l, only one file permitted.\n"
1350                 "                     Otherwise, the entire file contents will\n"
1351                 "                     be sent as one single message.\n"
1352                 "  -X transactional.id=.. Enable transactions and send all\n"
1353                 "                     messages in a single transaction which\n"
1354                 "                     is committed when stdin is closed or the\n"
1355                 "                     input file(s) are fully read.\n"
1356                 "                     If kcat is terminated through Ctrl-C\n"
1357                 "                     (et.al) the transaction will be aborted.\n"
1358                 "\n"
1359                 "Consumer options:\n"
1360                 "  -o <offset>        Offset to start consuming from:\n"
1361                 "                     beginning | end | stored |\n"
1362                 "                     <value>  (absolute offset) |\n"
1363                 "                     -<value> (relative offset from end)\n"
1364 #if RD_KAFKA_VERSION >= 0x00090300
1365                 "                     s@<value> (timestamp in ms to start at)\n"
1366                 "                     e@<value> (timestamp in ms to stop at "
1367                 "(not included))\n"
1368 #endif
1369                 "  -e                 Exit successfully when last message "
1370                 "received\n"
1371                 "  -f <fmt..>         Output formatting string, see below.\n"
1372                 "                     Takes precedence over -D and -K.\n"
1373 #if ENABLE_JSON
1374                 "  -J                 Output with JSON envelope\n"
1375 #endif
1376                 "  -s key=<serdes>    Deserialize non-NULL keys using <serdes>.\n"
1377                 "  -s value=<serdes>  Deserialize non-NULL values using <serdes>.\n"
1378                 "  -s <serdes>        Deserialize non-NULL keys and values using <serdes>.\n"
1379                 "                     Available deserializers (<serdes>):\n"
1380                 "                       <pack-str> - A combination of:\n"
1381                 "                                    <: little-endian,\n"
1382                 "                                    >: big-endian (recommended),\n"
1383                 "                                    b: signed 8-bit integer\n"
1384                 "                                    B: unsigned 8-bit integer\n"
1385                 "                                    h: signed 16-bit integer\n"
1386                 "                                    H: unsigned 16-bit integer\n"
1387                 "                                    i: signed 32-bit integer\n"
1388                 "                                    I: unsigned 32-bit integer\n"
1389                 "                                    q: signed 64-bit integer\n"
1390                 "                                    Q: unsigned 64-bit integer\n"
1391                 "                                    c: ASCII character\n"
1392                 "                                    s: remaining data is string\n"
1393                 "                                    $: match end-of-input (no more bytes remaining or a parse error is raised).\n"
1394                 "                                       Not including this token skips any\n"
1395                 "                                       remaining data after the pack-str is\n"
1396                 "                                       exhausted.\n"
1397 #if ENABLE_AVRO
1398                 "                       avro       - Avro-formatted with schema in Schema-Registry (requires -r)\n"
1399                 "                     E.g.: -s key=i -s value=avro - key is 32-bit integer, value is Avro.\n"
1400                 "                       or: -s avro - both key and value are Avro-serialized\n"
1401 #endif
1402 #if ENABLE_AVRO
1403                 "  -r <url>           Schema registry URL (when avro deserializer is used with -s)\n"
1404 #endif
1405                 "  -D <delim>         Delimiter to separate messages on output\n"
1406                 "  -K <delim>         Print message keys prefixing the message\n"
1407                 "                     with specified delimiter.\n"
1408                 "  -O                 Print message offset using -K delimiter\n"
1409                 "  -c <cnt>           Exit after consuming this number "
1410                 "of messages\n"
1411                 "  -Z                 Print NULL values and keys as \"%s\" "
1412                 "instead of empty.\n"
1413                 "                     For JSON (-J) the nullstr is always null.\n"
1414                 "  -u                 Unbuffered output\n"
1415                 "\n"
1416                 "Metadata options (-L):\n"
1417                 "  -t <topic>         Topic to query (optional)\n"
1418                 "\n"
1419                 "Query options (-Q):\n"
1420                 "  -t <t>:<p>:<ts>    Get offset for topic <t>,\n"
1421                 "                     partition <p>, timestamp <ts>.\n"
1422                 "                     Timestamp is the number of milliseconds\n"
1423                 "                     since epoch UTC.\n"
1424                 "                     Requires broker >= 0.10.0.0 and librdkafka >= 0.9.3.\n"
1425                 "                     Multiple -t .. are allowed but a partition\n"
1426                 "                     must only occur once.\n"
1427                 "\n"
1428                 "Format string tokens:\n"
1429                 "  %%s                 Message payload\n"
1430                 "  %%S                 Message payload length (or -1 for NULL)\n"
1431                 "  %%R                 Message payload length (or -1 for NULL) serialized\n"
1432                 "                     as a binary big endian 32-bit signed integer\n"
1433                 "  %%k                 Message key\n"
1434                 "  %%K                 Message key length (or -1 for NULL)\n"
1435 #if RD_KAFKA_VERSION >= 0x000902ff
1436                 "  %%T                 Message timestamp (milliseconds since epoch UTC)\n"
1437 #endif
1438 #if HAVE_HEADERS
1439                 "  %%h                 Message headers (n=v CSV)\n"
1440 #endif
1441                 "  %%t                 Topic\n"
1442                 "  %%p                 Partition\n"
1443                 "  %%o                 Message offset\n"
1444                 "  \\n \\r \\t           Newlines, tab\n"
1445                 "  \\xXX \\xNNN         Any ASCII character\n"
1446                 " Example:\n"
1447                 "  -f 'Topic %%t [%%p] at offset %%o: key %%k: %%s\\n'\n"
1448                 "\n"
1449 #if ENABLE_JSON
1450                 "JSON message envelope (on one line) when consuming with -J:\n"
1451                 " { \"topic\": str, \"partition\": int, \"offset\": int,\n"
1452                 "   \"tstype\": \"create|logappend|unknown\", \"ts\": int, "
1453                 "// timestamp in milliseconds since epoch\n"
1454                 "   \"broker\": int,\n"
1455                 "   \"headers\": { \"<name>\": str, .. }, // optional\n"
1456                 "   \"key\": str|json, \"payload\": str|json,\n"
1457                 "   \"key_error\": str, \"payload_error\": str, //optional\n"
1458                 "   \"key_schema_id\": int, "
1459                 "\"value_schema_id\": int //optional\n"
1460                 " }\n"
1461                 " notes:\n"
1462                 "   - key_error and payload_error are only included if "
1463                 "deserialization fails.\n"
1464                 "   - key_schema_id and value_schema_id are included for "
1465                 "successfully deserialized Avro messages.\n"
1466                 "\n"
1467 #endif
1468                 "Consumer mode (writes messages to stdout):\n"
1469                 "  kcat -b <broker> -t <topic> -p <partition>\n"
1470                 " or:\n"
1471                 "  kcat -C -b ...\n"
1472                 "\n"
1473 #if ENABLE_KAFKACONSUMER
1474                 "High-level KafkaConsumer mode:\n"
1475                 "  kcat -b <broker> -G <group-id> topic1 top2 ^aregex\\d+\n"
1476                 "\n"
1477 #endif
1478                 "Producer mode (reads messages from stdin):\n"
1479                 "  ... | kcat -b <broker> -t <topic> -p <partition>\n"
1480                 " or:\n"
1481                 "  kcat -P -b ...\n"
1482                 "\n"
1483                 "Metadata listing:\n"
1484                 "  kcat -L -b <broker> [-t <topic>]\n"
1485                 "\n"
1486                 "Query offset by timestamp:\n"
1487                 "  kcat -Q -b broker -t <topic>:<partition>:<timestamp>\n"
1488                 "\n",
1489                 conf.null_str
1490                 );
1491         exit(exitcode);
1492 }
1493 
1494 
1495 /**
1496  * Terminate by putting out the run flag.
1497  */
term(int sig)1498 static void term (int sig) {
1499         conf.run = 0;
1500         conf.term_sig = sig;
1501 }
1502 
1503 
1504 /**
1505  * librdkafka error callback
1506  */
error_cb(rd_kafka_t * rk,int err,const char * reason,void * opaque)1507 static void error_cb (rd_kafka_t *rk, int err,
1508                       const char *reason, void *opaque) {
1509 #if RD_KAFKA_VERSION >= 0x01000000
1510         if (err == RD_KAFKA_RESP_ERR__FATAL) {
1511                 /* A fatal error has been raised, extract the
1512                  * underlying error, and start graceful termination -
1513                  * this to make sure producer delivery reports are
1514                  * handled before exiting. */
1515                 char fatal_errstr[512];
1516                 rd_kafka_resp_err_t fatal_err;
1517 
1518                 fatal_err = rd_kafka_fatal_error(rk, fatal_errstr,
1519                                                  sizeof(fatal_errstr));
1520                 KC_INFO(0, "FATAL CLIENT ERROR: %s: %s: terminating\n",
1521                         rd_kafka_err2str(fatal_err), fatal_errstr);
1522                 conf.run = 0;
1523 
1524         } else
1525 #endif
1526                 if (err == RD_KAFKA_RESP_ERR__ALL_BROKERS_DOWN) {
1527                         KC_ERROR("%s: %s", rd_kafka_err2str(err),
1528                                  reason ? reason : "");
1529                 } else {
1530                         KC_INFO(1, "ERROR: %s: %s\n", rd_kafka_err2str(err),
1531                                 reason ? reason : "");
1532                 }
1533 }
1534 
1535 
1536 /**
1537  * @brief Parse delimiter string from command line arguments and return
1538  *        an allocated copy.
1539  */
parse_delim(const char * instr)1540 static char *parse_delim (const char *instr) {
1541         char *str;
1542 
1543         /* Make a copy so we can modify the string. */
1544         str = strdup(instr);
1545 
1546         while (1) {
1547                 size_t skip = 0;
1548                 char *t;
1549 
1550                 if ((t = strstr(str, "\\n"))) {
1551                         *t = '\n';
1552                         skip = 1;
1553                 } else if ((t = strstr(str, "\\t"))) {
1554                         *t = '\t';
1555                         skip = 1;
1556                 } else if ((t = strstr(str, "\\x"))) {
1557                         char *end;
1558                         int x;
1559                         x = strtoul(t+strlen("\\x"), &end, 16) & 0xff;
1560                         if (end == t+strlen("\\x"))
1561                                 KC_FATAL("Delimiter %s expects hex number", t);
1562                         *t = (char)x;
1563                         skip = (int)(end - t) - 1;
1564                 } else
1565                         break;
1566 
1567                 if (t && skip)
1568                         memmove(t+1, t+1+skip, strlen(t+1+skip)+1);
1569         }
1570 
1571         return str;
1572 }
1573 
1574 /**
1575  * @brief Add topic+partition+offset to list, from :-separated string.
1576  *
1577  * "<t>:<p>:<o>"
1578  *
1579  * @remark Will modify \p str
1580  */
add_topparoff(const char * what,rd_kafka_topic_partition_list_t * rktparlist,char * str)1581 static void add_topparoff (const char *what,
1582                            rd_kafka_topic_partition_list_t *rktparlist,
1583                            char *str) {
1584         char *s, *t, *e;
1585         char *topic;
1586         int partition;
1587         int64_t offset;
1588 
1589         if (!(s = strchr(str, ':')) ||
1590             !(t = strchr(s+1, ':')))
1591                 KC_FATAL("%s: expected \"topic:partition:offset_or_timestamp\"", what);
1592 
1593         topic = str;
1594         *s = '\0';
1595 
1596         partition = strtoul(s+1, &e, 0);
1597         if (e == s+1)
1598                 KC_FATAL("%s: expected \"topic:partition:offset_or_timestamp\"", what);
1599 
1600         offset = strtoll(t+1, &e, 0);
1601         if (e == t+1)
1602                 KC_FATAL("%s: expected \"topic:partition:offset_or_timestamp\"", what);
1603 
1604         rd_kafka_topic_partition_list_add(rktparlist, topic, partition)->offset = offset;
1605 }
1606 
1607 /**
1608  * Dump current rdkafka configuration to stdout.
1609  */
conf_dump(void)1610 static void conf_dump (void) {
1611         const char **arr;
1612         size_t cnt;
1613         int pass;
1614 
1615         for (pass = 0 ; pass < 2 ; pass++) {
1616                 int i;
1617 
1618                 if (pass == 0) {
1619                         arr = rd_kafka_conf_dump(conf.rk_conf, &cnt);
1620                         printf("# Global config\n");
1621                 } else {
1622                         printf("# Topic config\n");
1623                         arr = rd_kafka_topic_conf_dump(conf.rkt_conf, &cnt);
1624                 }
1625 
1626                 for (i = 0 ; i < (int)cnt ; i += 2)
1627                         printf("%s = %s\n",
1628                                arr[i], arr[i+1]);
1629 
1630                 printf("\n");
1631 
1632                 rd_kafka_conf_dump_free(arr, cnt);
1633         }
1634 }
1635 
1636 
1637 /**
1638  * @brief Try setting a config property. Provides "topic." fallthru.
1639  *
1640  * @remark \p val may be truncated by this function.
1641  *
1642  * @returns -1 on failure or 0 on success.
1643  */
try_conf_set(const char * name,char * val,char * errstr,size_t errstr_size)1644 static int try_conf_set (const char *name, char *val,
1645                          char *errstr, size_t errstr_size) {
1646         rd_kafka_conf_res_t res = RD_KAFKA_CONF_UNKNOWN;
1647         size_t srlen = strlen("schema.registry.");
1648 
1649         /* Pass schema.registry. config to the serdes */
1650         if (!strncmp(name, "schema.registry.", srlen)) {
1651 #if ENABLE_AVRO
1652                 serdes_err_t serr;
1653 
1654                 if (!conf.srconf)
1655                         conf.srconf = serdes_conf_new(NULL, 0, NULL);
1656 
1657                 if (!strcmp(name, "schema.registry.url")) {
1658                         char *t;
1659 
1660                         /* Trim trailing slashes from URL to avoid 404 */
1661                         for (t = val + strlen(val) - 1;
1662                              t >= val && *t == '/'; t--)
1663                                 *t = '\0';
1664 
1665                         if (!*t) {
1666                                 snprintf(errstr, errstr_size,
1667                                          "schema.registry.url is empty");
1668                                 return -1;
1669                         }
1670 
1671                         conf.flags |= CONF_F_SR_URL_SEEN;
1672                         srlen = 0;
1673                 }
1674 
1675                 serr = serdes_conf_set(conf.srconf, name+srlen, val,
1676                                        errstr, errstr_size);
1677                 return serr == SERDES_ERR_OK ? 0 : -1;
1678 #else
1679                 snprintf(errstr, errstr_size,
1680                          "This build of kcat lacks "
1681                          "Avro/Schema-Registry support");
1682                 return -1;
1683 #endif
1684         }
1685 
1686 
1687         /* Try "topic." prefixed properties on topic
1688          * conf first, and then fall through to global if
1689          * it didnt match a topic configuration property. */
1690         if (!strncmp(name, "topic.", strlen("topic.")))
1691                 res = rd_kafka_topic_conf_set(conf.rkt_conf,
1692                                               name+
1693                                               strlen("topic."),
1694                                               val,
1695                                               errstr, errstr_size);
1696         else
1697                 /* If no "topic." prefix, try the topic config first. */
1698                 res = rd_kafka_topic_conf_set(conf.rkt_conf,
1699                                               name, val,
1700                                               errstr, errstr_size);
1701 
1702         if (res == RD_KAFKA_CONF_UNKNOWN)
1703                 res = rd_kafka_conf_set(conf.rk_conf, name, val,
1704                                         errstr, errstr_size);
1705 
1706         if (res != RD_KAFKA_CONF_OK)
1707                 return -1;
1708 
1709         if (!strcmp(name, "metadata.broker.list") ||
1710             !strcmp(name, "bootstrap.servers"))
1711                 conf.flags |= CONF_F_BROKERS_SEEN;
1712 
1713         if (!strcmp(name, "api.version.request"))
1714                 conf.flags |= CONF_F_APIVERREQ_USER;
1715 
1716 #if RD_KAFKA_VERSION >= 0x00090000
1717         rd_kafka_conf_set_throttle_cb(conf.rk_conf, throttle_cb);
1718 #endif
1719 
1720 
1721         return 0;
1722 }
1723 
1724 /**
1725  * @brief Intercept configuration properties and try to identify
1726  *        incompatible properties that needs to be converted to librdkafka
1727  *        configuration properties.
1728  *
1729  * @returns -1 on failure, 0 if the property was not handled,
1730  *          or 1 if it was handled.
1731  */
try_java_conf_set(const char * name,const char * val,char * errstr,size_t errstr_size)1732 static int try_java_conf_set (const char *name, const char *val,
1733                               char *errstr, size_t errstr_size) {
1734         if (!strcmp(name, "ssl.endpoint.identification.algorithm"))
1735                 return 1; /* SSL server verification:
1736                            * not supported by librdkafka: ignore for now */
1737 
1738         if (!strcmp(name, "sasl.jaas.config")) {
1739                 char sasl_user[128], sasl_pass[128];
1740                 if (sscanf(val,
1741                            "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"%[^\"]\" password=\"%[^\"]\"",
1742                            sasl_user, sasl_pass) == 2) {
1743                         if (try_conf_set("sasl.username", sasl_user,
1744                                          errstr, errstr_size) == -1 ||
1745                             try_conf_set("sasl.password", sasl_pass,
1746                                          errstr, errstr_size) == -1)
1747                                 return -1;
1748                         return 1;
1749                 }
1750         }
1751 
1752         return 0;
1753 }
1754 
1755 
1756 /**
1757  * @brief Read config file, fail terminally if fatal is true, else
1758  *        fail silently.
1759  *
1760  * @returns 0 on success or -1 on failure (unless fatal is true
1761  *          in which case the app will have exited).
1762  */
read_conf_file(const char * path,int fatal)1763 static int read_conf_file (const char *path, int fatal) {
1764         FILE *fp;
1765         char buf[512];
1766         int line = 0;
1767 
1768         if (!(fp = fopen(path, "r"))) {
1769                 if (fatal)
1770                         KC_FATAL("Failed to open %s: %s",
1771                                  path, strerror(errno));
1772                 return -1;
1773         }
1774 
1775         KC_INFO(fatal ? 1 : 3, "Reading configuration from file %s\n", path);
1776 
1777         while (fgets(buf, sizeof(buf), fp)) {
1778                 char *s = buf;
1779                 char *t;
1780                 char errstr[512];
1781                 int r;
1782 
1783                 line++;
1784 
1785                 /* Left-trim */
1786                 while (isspace(*s))
1787                         s++;
1788 
1789                 /* Right-trim and remove newline */
1790                 t = s + strlen(s) - 1;
1791                 while (t > s && isspace(*t)) {
1792                         *t = 0;
1793                         t--;
1794                 }
1795 
1796                 /* Ignore Empty line */
1797                 if (!*s)
1798                         continue;
1799 
1800                 /* Ignore comments */
1801                 if (*s == '#')
1802                         continue;
1803 
1804                 /* Strip escapes for \: \= which can be encountered in
1805                  * Java configs (see comment below) */
1806                 while ((t = strstr(s, "\\:"))) {
1807                         memmove(t, t+1, strlen(t+1)+1); /* overwrite \: */
1808                         *t = ':'; /* reinsert : */
1809                 }
1810                 while ((t = strstr(s, "\\="))) {
1811                         memmove(t, t+1, strlen(t+1)+1); /* overwrite \= */
1812                         *t = '='; /* reinsert : */
1813                 }
1814 
1815                 /* Parse prop=value */
1816                 if (!(t = strchr(s, '=')) || t == s)
1817                         KC_FATAL("%s:%d: expected property=value\n",
1818                                  path, line);
1819 
1820                 *t = 0;
1821                 t++;
1822 
1823                 /**
1824                  * Attempt to support Java client configuration files,
1825                  * such as the ccloud config.
1826                  * There are some quirks with unnecessary escaping with \
1827                  * that we remove, as well as parsing special configuration
1828                  * properties that don't match librdkafka's.
1829                  */
1830                 r = try_java_conf_set(s, t, errstr, sizeof(errstr));
1831                 if (r == -1)
1832                         KC_FATAL("%s:%d: %s (java config conversion)\n",
1833                                  path, line, errstr);
1834                 else if (r == 1)
1835                         continue; /* Handled */
1836 
1837                 if (try_conf_set(s, t, errstr, sizeof(errstr)) == -1)
1838                         KC_FATAL("%s:%d: %s\n", path, line, errstr);
1839         }
1840 
1841         fclose(fp);
1842 
1843         return 0;
1844 }
1845 
1846 
1847 /**
1848  * @returns the value for environment variable \p env, or NULL
1849  *          if it is not set or it is empty.
1850  */
kc_getenv(const char * env)1851 static const char *kc_getenv (const char *env) {
1852         const char *val;
1853         if (!(val = getenv(env)) || !*val)
1854                 return NULL;
1855         return val;
1856 }
1857 
read_default_conf_files(void)1858 static void read_default_conf_files (void) {
1859         char kpath[512], kpath2[512];
1860         const char *home;
1861 
1862         if (!(home = kc_getenv("HOME")))
1863                 return;
1864 
1865         snprintf(kpath, sizeof(kpath), "%s/.config/kcat.conf", home);
1866 
1867         if (read_conf_file(kpath, 0/*not fatal*/) == 0)
1868                 return;
1869 
1870         snprintf(kpath2, sizeof(kpath2), "%s/.config/kafkacat.conf", home);
1871 
1872         if (read_conf_file(kpath2, 0/*not fatal*/) == 0) {
1873                 KC_INFO(1,
1874                         "Configuration filename kafkacat.conf is "
1875                         "deprecated!\n");
1876                 KC_INFO(1, "Rename %s to %s\n", kpath2, kpath);
1877         }
1878 }
1879 
1880 
1881 
unittest_strnstr(void)1882 static int unittest_strnstr (void) {
1883         struct {
1884                 const char *sep;
1885                 const char *hay;
1886                 int offset;
1887         } exp[] = {
1888                 { ";Sep;", ";Sep;Post", 0 },
1889                 { ";Sep;", ";Sep;", 0 },
1890                 { ";Sep;", "Pre;Sep;Post", 3 },
1891                 { ";Sep;", "Pre;Sep;", 3 },
1892                 { ";Sep;", "Pre;SepPost", -1 },
1893                 { ";KeyDel;", "Key1;KeyDel;Value1", 4 },
1894                 { ";", "Is The;", 6 },
1895                 { NULL },
1896         };
1897         int i;
1898         int fails = 0;
1899 
1900         for (i = 0 ; exp[i].sep ; i++) {
1901                 const char *t = rd_strnstr(exp[i].hay, strlen(exp[i].hay),
1902                                            exp[i].sep, strlen(exp[i].sep));
1903                 const char *e = exp[i].hay + exp[i].offset;
1904                 const char *fail = NULL;
1905 
1906                 if (exp[i].offset == -1) {
1907                         if (t)
1908                                 fail = "expected no match";
1909                 } else if (!t) {
1910                         fail = "expected match";
1911                 } else if (t != e)
1912                         fail = "invalid match";
1913 
1914                 if (!fail)
1915                         continue;
1916 
1917                 fprintf(stderr,
1918                         "%s: FAILED: for hay %d: "
1919                         "match is %p '%s' for %p '%s' in %p '%s' "
1920                         "(want offset %d, not %d): %s\n",
1921                         __FUNCTION__,
1922                         i,
1923                         t, t,
1924                         exp[i].sep, exp[i].sep,
1925                         exp[i].hay, exp[i].hay,
1926                         exp[i].offset,
1927                         t ? (int)(t - exp[i].hay) : -1,
1928                         fail);
1929                 fails++;
1930         }
1931 
1932         return fails;
1933 }
1934 
unittest_parse_delim(void)1935 static int unittest_parse_delim (void) {
1936         struct {
1937                 const char *in;
1938                 const char *exp;
1939         } delims[] = {
1940                 { "", "" },
1941                 { "\\n", "\n" },
1942                 { "\\t\\n\\n", "\t\n\n" },
1943                 { "\\x54!\\x45\\x53T", "T!EST" },
1944                 { "\\x30", "0" },
1945                 { NULL }
1946         };
1947         int i;
1948         int fails = 0;
1949 
1950         for (i = 0 ; delims[i].in ; i++) {
1951                 char *out = parse_delim(delims[i].in);
1952                 if (strcmp(out, delims[i].exp))
1953                         fprintf(stderr, "%s: FAILED: "
1954                                 "expected '%s' to return '%s', not '%s'\n",
1955                                 __FUNCTION__, delims[i].in, delims[i].exp, out);
1956                 free(out);
1957         }
1958 
1959         return fails;
1960 }
1961 
1962 
1963 /**
1964  * @brief Run unittests
1965  *
1966  * @returns the number of failed tests.
1967  */
unittest(void)1968 static int unittest (void) {
1969         int r = 0;
1970 
1971         r += unittest_strnstr();
1972         r += unittest_parse_delim();
1973 
1974         return r;
1975 }
1976 
1977 /**
1978  * @brief Add a single header specified as a command line option.
1979  *
1980  * @param inp "name=value" or "name" formatted header
1981  */
add_header(const char * inp)1982 static void add_header (const char *inp) {
1983         const char *t;
1984         rd_kafka_resp_err_t err;
1985 
1986         t = strchr(inp, '=');
1987         if (t == inp || !*inp)
1988                 KC_FATAL("Expected -H \"name=value..\" or -H \"name\"");
1989 
1990         if (!conf.headers)
1991                 conf.headers = rd_kafka_headers_new(8);
1992 
1993 
1994         err = rd_kafka_header_add(conf.headers,
1995                                   inp,
1996                                   t ? (ssize_t)(t-inp) : -1,
1997                                   t ? t+1 : NULL, -1);
1998         if (err)
1999                 KC_FATAL("Failed to add header \"%s\": %s",
2000                          inp, rd_kafka_err2str(err));
2001 }
2002 
2003 
2004 /**
2005  * Parse command line arguments
2006  */
argparse(int argc,char ** argv,rd_kafka_topic_partition_list_t ** rktparlistp)2007 static void argparse (int argc, char **argv,
2008                       rd_kafka_topic_partition_list_t **rktparlistp) {
2009         char errstr[512];
2010         int opt;
2011         const char *fmt = NULL;
2012         const char *delim = "\n";
2013         const char *key_delim = NULL;
2014         char tmp_fmt[64];
2015         int do_conf_dump = 0;
2016         int conf_files_read = 0;
2017         int i;
2018 
2019         while ((opt = getopt(argc, argv,
2020                              ":PCG:LQt:p:b:z:o:eED:K:k:H:Od:qvF:X:c:Tuf:ZlVh"
2021                              "s:r:Jm:U")) != -1) {
2022                 switch (opt) {
2023                 case 'P':
2024                 case 'C':
2025                 case 'L':
2026                 case 'Q':
2027                         conf.mode = opt;
2028                         break;
2029 #if ENABLE_KAFKACONSUMER
2030                 case 'G':
2031                         conf.mode = opt;
2032                         conf.group = optarg;
2033                         if (rd_kafka_conf_set(conf.rk_conf, "group.id", optarg,
2034                                               errstr, sizeof(errstr)) !=
2035                             RD_KAFKA_CONF_OK)
2036                                 KC_FATAL("%s", errstr);
2037                         break;
2038 #endif
2039                 case 't':
2040                         if (conf.mode == 'Q') {
2041                                 if (!*rktparlistp)
2042                                         *rktparlistp = rd_kafka_topic_partition_list_new(1);
2043                                 add_topparoff("-t", *rktparlistp, optarg);
2044                                 conf.flags |= CONF_F_APIVERREQ;
2045                         } else
2046                                 conf.topic = optarg;
2047 
2048                         break;
2049                 case 'p':
2050                         conf.partition = atoi(optarg);
2051                         break;
2052                 case 'b':
2053                         conf.brokers = optarg;
2054                         conf.flags |= CONF_F_BROKERS_SEEN;
2055                         break;
2056                 case 'z':
2057                         if (rd_kafka_conf_set(conf.rk_conf,
2058                                               "compression.codec", optarg,
2059                                               errstr, sizeof(errstr)) !=
2060                             RD_KAFKA_CONF_OK)
2061                                 KC_FATAL("%s", errstr);
2062                         break;
2063                 case 'o':
2064                         if (!strcmp(optarg, "end"))
2065                                 conf.offset = RD_KAFKA_OFFSET_END;
2066                         else if (!strcmp(optarg, "beginning"))
2067                                 conf.offset = RD_KAFKA_OFFSET_BEGINNING;
2068                         else if (!strcmp(optarg, "stored"))
2069                                 conf.offset = RD_KAFKA_OFFSET_STORED;
2070 #if RD_KAFKA_VERSION >= 0x00090300
2071                         else if (!strncmp(optarg, "s@", 2)) {
2072                                 conf.startts = strtoll(optarg+2, NULL, 10);
2073                                 conf.flags |= CONF_F_APIVERREQ;
2074                         } else if (!strncmp(optarg, "e@", 2)) {
2075                                 conf.stopts = strtoll(optarg+2, NULL, 10);
2076                                 conf.flags |= CONF_F_APIVERREQ;
2077                         }
2078 #endif
2079                         else {
2080                                 conf.offset = strtoll(optarg, NULL, 10);
2081                                 if (conf.offset < 0)
2082                                         conf.offset = RD_KAFKA_OFFSET_TAIL(-conf.offset);
2083                         }
2084                         break;
2085                 case 'e':
2086                         conf.exit_eof = 1;
2087                         break;
2088                 case 'E':
2089                         conf.exitonerror = 0;
2090                         break;
2091                 case 'f':
2092                         fmt = optarg;
2093                         break;
2094                 case 'J':
2095 #if ENABLE_JSON
2096                         conf.flags |= CONF_F_FMT_JSON;
2097 #else
2098                         KC_FATAL("This build of kcat lacks JSON support");
2099 #endif
2100                         break;
2101 
2102                 case 's':
2103                 {
2104                         int field = -1;
2105                         const char *t = optarg;
2106 
2107                         if (!strncmp(t, "key=", strlen("key="))) {
2108                                 t += strlen("key=");
2109                                 field = KC_MSG_FIELD_KEY;
2110                         } else if (!strncmp(t, "value=", strlen("value="))) {
2111                                 t += strlen("value=");
2112                                 field = KC_MSG_FIELD_VALUE;
2113                         }
2114 
2115                         if (field == -1 || field == KC_MSG_FIELD_KEY) {
2116                                 if (strcmp(t, "avro"))
2117                                         pack_check("key", t);
2118                                 conf.pack[KC_MSG_FIELD_KEY] = t;
2119                         }
2120 
2121                         if (field == -1 || field == KC_MSG_FIELD_VALUE) {
2122                                 if (strcmp(t, "avro"))
2123                                         pack_check("value", t);
2124                                 conf.pack[KC_MSG_FIELD_VALUE] = t;
2125                         }
2126                 }
2127                 break;
2128                 case 'r':
2129 #if ENABLE_AVRO
2130                         if (!*optarg)
2131                                 KC_FATAL("-r url must not be empty");
2132 
2133                         if (try_conf_set("schema.registry.url", optarg,
2134                                          errstr, sizeof(errstr)) == -1)
2135                                 KC_FATAL("%s", errstr);
2136 #else
2137                         KC_FATAL("This build of kcat lacks "
2138                                  "Avro/Schema-Registry support");
2139 #endif
2140                         break;
2141                 case 'D':
2142                         delim = optarg;
2143                         break;
2144                 case 'K':
2145                         key_delim = optarg;
2146                         conf.flags |= CONF_F_KEY_DELIM;
2147                         break;
2148                 case 'k':
2149                         conf.fixed_key = optarg;
2150                         conf.fixed_key_len = (size_t)(strlen(conf.fixed_key));
2151                         break;
2152                 case 'H':
2153                         add_header(optarg);
2154                         break;
2155                 case 'l':
2156                         conf.flags |= CONF_F_LINE;
2157                         break;
2158                 case 'O':
2159                         conf.flags |= CONF_F_OFFSET;
2160                         break;
2161                 case 'c':
2162                         conf.msg_cnt = strtoll(optarg, NULL, 10);
2163                         break;
2164                 case 'm':
2165                         conf.metadata_timeout = strtoll(optarg, NULL, 10);
2166                         break;
2167                 case 'Z':
2168                         conf.flags |= CONF_F_NULL;
2169                         conf.null_str_len = strlen(conf.null_str);
2170                         break;
2171                 case 'd':
2172                         conf.debug = optarg;
2173                         if (rd_kafka_conf_set(conf.rk_conf, "debug", conf.debug,
2174                                               errstr, sizeof(errstr)) !=
2175                             RD_KAFKA_CONF_OK)
2176                                 KC_FATAL("%s", errstr);
2177                         break;
2178                 case 'q':
2179                         conf.verbosity = 0;
2180                         break;
2181                 case 'v':
2182                         conf.verbosity++;
2183                         break;
2184                 case 'T':
2185                         conf.flags |= CONF_F_TEE;
2186                         break;
2187                 case 'u':
2188                         setbuf(stdout, NULL);
2189                         break;
2190                 case 'F':
2191                         conf.flags |= CONF_F_NO_CONF_SEARCH;
2192                         if (!strcmp(optarg, "-") || !strcmp(optarg, "none"))
2193                                 break;
2194 
2195                         read_conf_file(optarg, 1);
2196                         conf_files_read++;
2197                         break;
2198                 case 'X':
2199                 {
2200                         char *name, *val;
2201 
2202                         if (!strcmp(optarg, "list") ||
2203                             !strcmp(optarg, "help")) {
2204                                 rd_kafka_conf_properties_show(stdout);
2205                                 exit(0);
2206                         }
2207 
2208                         if (!strcmp(optarg, "dump")) {
2209                                 do_conf_dump = 1;
2210                                 continue;
2211                         }
2212 
2213                         name = optarg;
2214                         if (!(val = strchr(name, '='))) {
2215                                 fprintf(stderr, "%% Expected "
2216                                         "-X property=value, not %s, "
2217                                         "use -X list to display available "
2218                                         "properties\n", name);
2219                                 exit(1);
2220                         }
2221 
2222                         *val = '\0';
2223                         val++;
2224 
2225                         if (try_conf_set(name, val,
2226                                          errstr, sizeof(errstr)) == -1)
2227                                 KC_FATAL("%s", errstr);
2228                 }
2229                 break;
2230 
2231                 case 'U':
2232                         if (unittest())
2233                                 exit(1);
2234                         else
2235                                 exit(0);
2236                         break;
2237 
2238                 case 'V':
2239                         usage(argv[0], 0, NULL, 1);
2240                         break;
2241 
2242                 case 'h':
2243                         usage(argv[0], 0, NULL, 0);
2244                         break;
2245 
2246                 default:
2247                         usage(argv[0], 1, "unknown argument", 0);
2248                         break;
2249                 }
2250         }
2251 
2252 
2253         if (conf_files_read == 0) {
2254                 const char *cpath = kc_getenv("KCAT_CONFIG");
2255                 if (cpath) {
2256                         conf.flags |= CONF_F_NO_CONF_SEARCH;
2257                         read_conf_file(cpath, 1/*fatal errors*/);
2258 
2259                 } else if ((cpath = kc_getenv("KAFKACAT_CONFIG"))) {
2260                         KC_INFO(1, "KAFKA_CONFIG is deprecated!\n");
2261                         KC_INFO(1, "Rename KAFKA_CONFIG to KCAT_CONFIG\n");
2262                         conf.flags |= CONF_F_NO_CONF_SEARCH;
2263                         read_conf_file(cpath, 1/*fatal errors*/);
2264                 }
2265         }
2266 
2267         if (!(conf.flags & CONF_F_NO_CONF_SEARCH))
2268                 read_default_conf_files();
2269 
2270         /* Dump configuration and exit, if so desired. */
2271         if (do_conf_dump) {
2272                 conf_dump();
2273                 exit(0);
2274         }
2275 
2276         if (!(conf.flags & CONF_F_BROKERS_SEEN))
2277                 usage(argv[0], 1, "-b <broker,..> missing", 0);
2278 
2279         /* Decide mode if not specified */
2280         if (!conf.mode) {
2281                 if (_COMPAT(isatty)(STDIN_FILENO))
2282                         conf.mode = 'C';
2283                 else
2284                         conf.mode = 'P';
2285                 KC_INFO(1, "Auto-selecting %s mode (use -P or -C to override)\n",
2286                         conf.mode == 'C' ? "Consumer":"Producer");
2287         }
2288 
2289 
2290         if (!strchr("GLQ", conf.mode) && !conf.topic)
2291                 usage(argv[0], 1, "-t <topic> missing", 0);
2292         else if (conf.mode == 'Q' && !*rktparlistp)
2293                 usage(argv[0], 1,
2294                       "-t <topic>:<partition>:<offset_or_timestamp> missing",
2295                       0);
2296 
2297         if (conf.brokers &&
2298             rd_kafka_conf_set(conf.rk_conf, "metadata.broker.list",
2299                               conf.brokers, errstr, sizeof(errstr)) !=
2300             RD_KAFKA_CONF_OK)
2301                 usage(argv[0], 1, errstr, 0);
2302 
2303         rd_kafka_conf_set_error_cb(conf.rk_conf, error_cb);
2304 
2305         fmt_init();
2306 
2307         /*
2308          * Verify serdes
2309          */
2310         for (i = 0 ; i < KC_MSG_FIELD_CNT ; i++) {
2311                 if (!conf.pack[i])
2312                         continue;
2313 
2314                 if (!strchr("GC", conf.mode))
2315                         KC_FATAL("-s serdes only available in the consumer");
2316 
2317                 if (conf.pack[i] && !strcmp(conf.pack[i], "avro")) {
2318 #if !ENABLE_AVRO
2319                         KC_FATAL("This build of kcat lacks "
2320                                  "Avro/Schema-Registry support");
2321 #endif
2322 #if ENABLE_JSON
2323                         /* Avro is decoded to JSON which needs to be
2324                          * written verbatim to the JSON envelope when
2325                          * using -J: libyajl does not support this,
2326                          * but my own fork of yajl does. */
2327                         if (conf.flags & CONF_F_FMT_JSON &&
2328                             !json_can_emit_verbatim())
2329                                 KC_FATAL("This build of kcat lacks "
2330                                          "support for emitting "
2331                                          "JSON-formatted "
2332                                          "message keys and values: "
2333                                          "try without -J or build "
2334                                          "kcat with yajl from "
2335                                          "https://github.com/edenhill/yajl");
2336 #endif
2337 
2338                         if (i == KC_MSG_FIELD_VALUE)
2339                                 conf.flags |= CONF_F_FMT_AVRO_VALUE;
2340                         else if (i == KC_MSG_FIELD_KEY)
2341                                 conf.flags |= CONF_F_FMT_AVRO_KEY;
2342                         continue;
2343                 }
2344         }
2345 
2346 
2347         /*
2348          * Verify and initialize Avro/SR
2349          */
2350 #if ENABLE_AVRO
2351         if (conf.flags & (CONF_F_FMT_AVRO_VALUE|CONF_F_FMT_AVRO_KEY)) {
2352 
2353                 if (!(conf.flags & CONF_F_SR_URL_SEEN))
2354                         KC_FATAL("-s avro requires -r <sr_url>");
2355 
2356                 if (!strchr("GC", conf.mode))
2357                         KC_FATAL("Avro and Schema-registry support is "
2358                                  "currently only available in the consumer");
2359 
2360                 /* Initialize Avro/Schema-Registry client */
2361                 kc_avro_init(NULL, NULL, NULL, NULL);
2362         }
2363 #endif
2364 
2365 
2366         /* If avro key is to be deserialized, set up an delimiter so that
2367          * the key is actually emitted. */
2368         if ((conf.flags & CONF_F_FMT_AVRO_KEY) && !key_delim)
2369                 key_delim = "";
2370 
2371         if (key_delim) {
2372                 conf.key_delim = parse_delim(key_delim);
2373                 conf.key_delim_size = strlen(conf.key_delim);
2374         }
2375 
2376         conf.delim = parse_delim(delim);
2377         conf.delim_size = strlen(conf.delim);
2378 
2379         if (strchr("GC", conf.mode)) {
2380                 /* Must be explicitly enabled for librdkafka >= v1.0.0 */
2381                 rd_kafka_conf_set(conf.rk_conf, "enable.partition.eof", "true",
2382                                   NULL, 0);
2383 
2384                 if (!fmt) {
2385                         if ((conf.flags & CONF_F_FMT_JSON)) {
2386                                 /* For JSON the format string is simply the
2387                                  * output object delimiter (e.g., newline). */
2388                                 fmt = conf.delim;
2389                         } else {
2390                                 if (conf.key_delim)
2391                                         snprintf(tmp_fmt, sizeof(tmp_fmt),
2392                                                  "%%k%s%%s%s",
2393                                                  conf.key_delim, conf.delim);
2394                                 else
2395                                         snprintf(tmp_fmt, sizeof(tmp_fmt),
2396                                                  "%%s%s", conf.delim);
2397                                 fmt = tmp_fmt;
2398                         }
2399                 }
2400 
2401                 fmt_parse(fmt);
2402 
2403         } else if (conf.mode == 'P') {
2404                 if (conf.delim_size == 0)
2405                         KC_FATAL("Message delimiter -D must not be empty "
2406                                  "when producing");
2407         }
2408 
2409         /* Automatically enable API version requests if needed and
2410          * user hasn't explicitly configured it (in any way). */
2411         if ((conf.flags & (CONF_F_APIVERREQ | CONF_F_APIVERREQ_USER)) ==
2412             CONF_F_APIVERREQ) {
2413                 KC_INFO(2, "Automatically enabling api.version.request=true\n");
2414                 rd_kafka_conf_set(conf.rk_conf, "api.version.request", "true",
2415                                   NULL, 0);
2416         }
2417 }
2418 
2419 
2420 
2421 
main(int argc,char ** argv)2422 int main (int argc, char **argv) {
2423 #ifdef SIGIO
2424         char tmp[16];
2425 #endif
2426         FILE *in = stdin;
2427         struct timeval tv;
2428         rd_kafka_topic_partition_list_t *rktparlist = NULL;
2429 
2430         /* Certain Docker images don't have kcat as the entry point,
2431          * requiring `kcat` to be the first argument. As these images
2432          * are fixed the examples get outdated and that first argument
2433          * will still be passed to the container and thus kcat,
2434          * so remove it here. */
2435         if (argc > 1 && (!strcmp(argv[1], "kcat") ||
2436                          !strcmp(argv[1], "kafkacat"))) {
2437                 if (argc > 2)
2438                         memmove(&argv[1], &argv[2], sizeof(*argv) * (argc - 2));
2439                 argc--;
2440         }
2441 
2442         /* Seed rng for random partitioner, jitter, etc. */
2443         rd_gettimeofday(&tv, NULL);
2444         srand(tv.tv_usec);
2445 
2446         /* Create config containers */
2447         conf.rk_conf  = rd_kafka_conf_new();
2448         conf.rkt_conf = rd_kafka_topic_conf_new();
2449 
2450         /*
2451          * Default config
2452          */
2453 #ifdef SIGIO
2454         /* Enable quick termination of librdkafka */
2455         snprintf(tmp, sizeof(tmp), "%i", SIGIO);
2456         rd_kafka_conf_set(conf.rk_conf, "internal.termination.signal",
2457                           tmp, NULL, 0);
2458 #endif
2459 
2460         /* Log callback */
2461         rd_kafka_conf_set_log_cb(conf.rk_conf, rd_kafka_log_print);
2462 
2463         /* Parse command line arguments */
2464         argparse(argc, argv, &rktparlist);
2465 
2466         if (optind < argc) {
2467                 if (!strchr("PG", conf.mode))
2468                         usage(argv[0], 1,
2469                               "file/topic list only allowed in "
2470                               "producer(-P)/kafkaconsumer(-G) mode", 0);
2471                 else if ((conf.flags & CONF_F_LINE) && argc - optind > 1)
2472                         KC_FATAL("Only one file allowed for line mode (-l)");
2473                 else if (conf.flags & CONF_F_LINE) {
2474                         in = fopen(argv[optind], "r");
2475                         if (in == NULL)
2476                                 KC_FATAL("Cannot open %s: %s", argv[optind],
2477                                       strerror(errno));
2478                 }
2479         }
2480 
2481         signal(SIGINT, term);
2482         signal(SIGTERM, term);
2483 #ifdef SIGPIPE
2484         signal(SIGPIPE, term);
2485 #endif
2486 
2487         /* Run according to mode */
2488         switch (conf.mode)
2489         {
2490         case 'C':
2491                 consumer_run(stdout);
2492                 break;
2493 
2494 #if ENABLE_KAFKACONSUMER
2495         case 'G':
2496                 if (conf.stopts || conf.startts)
2497                         KC_FATAL("-o ..@ timestamps can't be used "
2498                                  "with -G mode\n");
2499                 kafkaconsumer_run(stdout, &argv[optind], argc-optind);
2500                 break;
2501 #endif
2502 
2503         case 'P':
2504                 producer_run(in, &argv[optind], argc-optind);
2505                 break;
2506 
2507         case 'L':
2508                 metadata_list();
2509                 break;
2510 
2511         case 'Q':
2512                 if (!rktparlist)
2513                         usage(argv[0], 1,
2514                               "-Q requires one or more "
2515                               "-t <topic>:<partition>:<timestamp>", 0);
2516 
2517                 query_offsets_by_time(rktparlist);
2518 
2519                 rd_kafka_topic_partition_list_destroy(rktparlist);
2520                 break;
2521 
2522         default:
2523                 usage(argv[0], 0, NULL, 0);
2524                 break;
2525         }
2526 
2527         if (conf.headers)
2528                 rd_kafka_headers_destroy(conf.headers);
2529 
2530         if (conf.key_delim)
2531                 free(conf.key_delim);
2532         if (conf.delim)
2533                 free(conf.delim);
2534 
2535         if (in != stdin)
2536                 fclose(in);
2537 
2538         rd_kafka_wait_destroyed(5000);
2539 
2540 #if ENABLE_AVRO
2541         kc_avro_term();
2542 #endif
2543 
2544         fmt_term();
2545 
2546         exit(conf.exitcode);
2547 }
2548