1 /*
2 * kcat - Apache Kafka consumer and producer
3 *
4 * Copyright (c) 2014-2021, Magnus Edenhill
5 * All rights reserved.
6 *
7 * Redistribution and use in source and binary forms, with or without
8 * modification, are permitted provided that the following conditions are met:
9 *
10 * 1. Redistributions of source code must retain the above copyright notice,
11 * this list of conditions and the following disclaimer.
12 * 2. Redistributions in binary form must reproduce the above copyright notice,
13 * this list of conditions and the following disclaimer in the documentation
14 * and/or other materials provided with the distribution.
15 *
16 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
17 * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
18 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
19 * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
20 * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
21 * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
22 * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
23 * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
24 * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
25 * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
26 * POSSIBILITY OF SUCH DAMAGE.
27 */
28
29 #ifndef _MSC_VER
30 #include <unistd.h>
31 #include <syslog.h>
32 #include <sys/time.h>
33 #include <sys/mman.h>
34 #else
35 #pragma comment(lib, "ws2_32.lib")
36 #include "win32/wingetopt.h"
37 #include <io.h>
38 #endif
39 #include <stdio.h>
40 #include <stdlib.h>
41 #include <stdarg.h>
42 #include <signal.h>
43 #include <ctype.h>
44
45 #include <sys/types.h>
46 #include <sys/stat.h>
47 #include <fcntl.h>
48
49
50
51 #include "kcat.h"
52 #include "input.h"
53
54 #if RD_KAFKA_VERSION >= 0x01040000
55 #define ENABLE_TXNS 1
56 #endif
57
58 #if RD_KAFKA_VERSION >= 0x01060000
59 #define ENABLE_INCREMENTAL_ASSIGN 1
60 #endif
61
62
63 struct conf conf = {
64 .run = 1,
65 .verbosity = 1,
66 .exitonerror = 1,
67 .partition = RD_KAFKA_PARTITION_UA,
68 .msg_size = 1024*1024,
69 .null_str = "NULL",
70 .fixed_key = NULL,
71 .metadata_timeout = 5,
72 .offset = RD_KAFKA_OFFSET_INVALID,
73 };
74
75 static struct stats {
76 uint64_t tx;
77 uint64_t tx_err_q;
78 uint64_t tx_err_dr;
79 uint64_t tx_delivered;
80
81 uint64_t rx;
82 } stats;
83
84
85 /* Partition's stopped state array */
86 int *part_stop = NULL;
87 /* Number of partitions that are stopped */
88 int part_stop_cnt = 0;
89 /* Threshold level (partitions stopped) before exiting */
90 int part_stop_thres = 0;
91
92
93
94 /**
95 * Fatal error: print error and exit
96 */
fatal0(const char * func,int line,const char * fmt,...)97 void RD_NORETURN fatal0 (const char *func, int line,
98 const char *fmt, ...) {
99 va_list ap;
100 char buf[1024];
101
102 va_start(ap, fmt);
103 vsnprintf(buf, sizeof(buf), fmt, ap);
104 va_end(ap);
105
106 KC_INFO(2, "Fatal error at %s:%i:\n", func, line);
107 fprintf(stderr, "%% ERROR: %s\n", buf);
108 exit(1);
109 }
110
111 /**
112 * Print error and exit if needed
113 */
error0(int exitonerror,const char * func,int line,const char * fmt,...)114 void error0 (int exitonerror, const char *func, int line,
115 const char *fmt, ...) {
116 va_list ap;
117 char buf[1024];
118
119 va_start(ap, fmt);
120 vsnprintf(buf, sizeof(buf), fmt, ap);
121 va_end(ap);
122
123 if (exitonerror)
124 KC_INFO(2, "Error at %s:%i:\n", func, line);
125
126 fprintf(stderr, "%% ERROR: %s%s\n",
127 buf, exitonerror ? ": terminating":"");
128
129 if (exitonerror)
130 exit(1);
131 }
132
133
134
135 /**
136 * The delivery report callback is called once per message to
137 * report delivery success or failure.
138 */
dr_msg_cb(rd_kafka_t * rk,const rd_kafka_message_t * rkmessage,void * opaque)139 static void dr_msg_cb (rd_kafka_t *rk, const rd_kafka_message_t *rkmessage,
140 void *opaque) {
141 static int say_once = 1;
142 int32_t broker_id = -1;
143 struct buf *b = rkmessage->_private;
144
145 if (b)
146 buf_destroy(b);
147
148 if (rkmessage->err) {
149 KC_INFO(1, "Delivery failed for message: %s\n",
150 rd_kafka_err2str(rkmessage->err));
151 stats.tx_err_dr++;
152 return;
153 }
154
155 #if RD_KAFKA_VERSION >= 0x010500ff
156 broker_id = rd_kafka_message_broker_id(rkmessage);
157 #endif
158
159 KC_INFO(3,
160 "Message delivered to partition %"PRId32" (offset %"PRId64") "
161 "on broker %"PRId32"\n",
162 rkmessage->partition, rkmessage->offset, broker_id);
163
164 if (rkmessage->offset == 0 && say_once) {
165 KC_INFO(3, "Enable message offset reporting "
166 "with '-X topic.produce.offset.report=true'\n");
167 say_once = 0;
168 }
169 stats.tx_delivered++;
170 }
171
172
173 /**
174 * Produces a single message, retries on queue congestion, and
175 * exits hard on error.
176 */
produce(void * buf,size_t len,const void * key,size_t key_len,int msgflags,void * msg_opaque)177 static void produce (void *buf, size_t len,
178 const void *key, size_t key_len, int msgflags,
179 void *msg_opaque) {
180 rd_kafka_headers_t *hdrs = NULL;
181
182 /* Headers are freed on successful producev(), pass a copy. */
183 if (conf.headers)
184 hdrs = rd_kafka_headers_copy(conf.headers);
185
186 /* Produce message: keep trying until it succeeds. */
187 do {
188 rd_kafka_resp_err_t err;
189
190 if (!conf.run)
191 KC_FATAL("Program terminated while "
192 "producing message of %zd bytes", len);
193
194 err = rd_kafka_producev(
195 conf.rk,
196 RD_KAFKA_V_RKT(conf.rkt),
197 RD_KAFKA_V_PARTITION(conf.partition),
198 RD_KAFKA_V_MSGFLAGS(msgflags),
199 RD_KAFKA_V_VALUE(buf, len),
200 RD_KAFKA_V_KEY(key, key_len),
201 RD_KAFKA_V_HEADERS(hdrs),
202 RD_KAFKA_V_OPAQUE(msg_opaque),
203 RD_KAFKA_V_END);
204
205 if (!err) {
206 stats.tx++;
207 break;
208 }
209
210 if (err != RD_KAFKA_RESP_ERR__QUEUE_FULL)
211 KC_FATAL("Failed to produce message (%zd bytes): %s",
212 len, rd_kafka_err2str(err));
213
214 stats.tx_err_q++;
215
216 /* Internal queue full, sleep to allow
217 * messages to be produced/time out
218 * before trying again. */
219 rd_kafka_poll(conf.rk, 5);
220 } while (1);
221
222 /* Poll for delivery reports, errors, etc. */
223 rd_kafka_poll(conf.rk, 0);
224 }
225
226
227 /**
228 * Produce contents of file as a single message.
229 * Returns the file length on success, else -1.
230 */
produce_file(const char * path)231 static ssize_t produce_file (const char *path) {
232 int fd;
233 void *ptr;
234 struct stat st;
235 ssize_t sz;
236 int msgflags = 0;
237
238 if ((fd = _COMPAT(open)(path, O_RDONLY)) == -1) {
239 KC_INFO(1, "Failed to open %s: %s\n", path, strerror(errno));
240 return -1;
241 }
242
243 if (fstat(fd, &st) == -1) {
244 KC_INFO(1, "Failed to stat %s: %s\n", path, strerror(errno));
245 _COMPAT(close)(fd);
246 return -1;
247 }
248
249 if (st.st_size == 0) {
250 KC_INFO(3, "Skipping empty file %s\n", path);
251 _COMPAT(close)(fd);
252 return 0;
253 }
254
255 #ifndef _MSC_VER
256 ptr = mmap(NULL, st.st_size, PROT_READ, MAP_PRIVATE, fd, 0);
257 if (ptr == MAP_FAILED) {
258 KC_INFO(1, "Failed to mmap %s: %s\n", path, strerror(errno));
259 _COMPAT(close)(fd);
260 return -1;
261 }
262 sz = st.st_size;
263 msgflags = RD_KAFKA_MSG_F_COPY;
264 #else
265 ptr = malloc(st.st_size);
266 if (!ptr) {
267 KC_INFO(1, "Failed to allocate message for %s: %s\n",
268 path, strerror(errno));
269 _COMPAT(close)(fd);
270 return -1;
271 }
272
273 sz = _read(fd, ptr, st.st_size);
274 if (sz < st.st_size) {
275 KC_INFO(1, "Read failed for %s (%zd/%zd): %s\n",
276 path, sz, (size_t)st.st_size, sz == -1 ? strerror(errno) :
277 "incomplete read");
278 free(ptr);
279 close(fd);
280 return -1;
281 }
282 msgflags = RD_KAFKA_MSG_F_FREE;
283 #endif
284
285 KC_INFO(4, "Producing file %s (%"PRIdMAX" bytes)\n",
286 path, (intmax_t)st.st_size);
287 produce(ptr, sz, conf.fixed_key, conf.fixed_key_len, msgflags, NULL);
288
289 _COMPAT(close)(fd);
290
291 if (!(msgflags & RD_KAFKA_MSG_F_FREE)) {
292 #ifndef _MSC_VER
293 munmap(ptr, st.st_size);
294 #else
295 free(ptr);
296 #endif
297 }
298 return sz;
299 }
300
301
rd_strnstr(const char * haystack,size_t size,const char * needle,size_t needle_len)302 static char *rd_strnstr (const char *haystack, size_t size,
303 const char *needle, size_t needle_len) {
304 const char *nend = needle + needle_len - 1;
305 const char *t;
306 size_t of = needle_len - 1;
307
308 while (of < size &&
309 (t = (const char *)memchr((void *)(haystack + of),
310 (int)*nend,
311 size - of))) {
312 const char *n = nend;
313 const char *p = t;
314
315 do {
316 n--;
317 p--;
318 } while (n >= needle && *n == *p);
319
320 if (n < needle)
321 return (char *)p+1;
322
323 of = (size_t)(t - haystack) + 1;
324 }
325
326 return NULL;
327 }
328
329
330 /**
331 * Run producer, reading messages from 'fp' and producing to kafka.
332 * Or if 'pathcnt' is > 0, read messages from files in 'paths' instead.
333 */
producer_run(FILE * fp,char ** paths,int pathcnt)334 static void producer_run (FILE *fp, char **paths, int pathcnt) {
335 char errstr[512];
336 char tmp[16];
337 size_t tsize = sizeof(tmp);
338
339 if (rd_kafka_conf_get(conf.rk_conf, "transactional.id",
340 tmp, &tsize) == RD_KAFKA_CONF_OK && tsize > 1) {
341 KC_INFO(1, "Using transactional producer\n");
342 conf.txn = 1;
343 }
344
345 tsize = sizeof(tmp);
346 if (rd_kafka_conf_get(conf.rk_conf, "message.max.bytes",
347 tmp, &tsize) == RD_KAFKA_CONF_OK && tsize > 1) {
348 int msg_max_bytes = atoi(tmp);
349 KC_INFO(3, "Setting producer input buffer max size to "
350 "message.max.bytes value %d\n", msg_max_bytes);
351 conf.msg_size = msg_max_bytes;
352 }
353
354 /* Assign per-message delivery report callback. */
355 rd_kafka_conf_set_dr_msg_cb(conf.rk_conf, dr_msg_cb);
356
357 /* Create producer */
358 if (!(conf.rk = rd_kafka_new(RD_KAFKA_PRODUCER, conf.rk_conf,
359 errstr, sizeof(errstr))))
360 KC_FATAL("Failed to create producer: %s", errstr);
361
362 if (!conf.debug && conf.verbosity == 0)
363 rd_kafka_set_log_level(conf.rk, 0);
364
365 #if ENABLE_TXNS
366 if (conf.txn) {
367 rd_kafka_error_t *error;
368
369 error = rd_kafka_init_transactions(conf.rk,
370 conf.metadata_timeout*1000);
371 if (error)
372 KC_FATAL("init_transactions(): %s",
373 rd_kafka_error_string(error));
374
375 error = rd_kafka_begin_transaction(conf.rk);
376 if (error)
377 KC_FATAL("begin_transaction(): %s",
378 rd_kafka_error_string(error));
379 }
380 #endif
381
382 /* Create topic */
383 if (!(conf.rkt = rd_kafka_topic_new(conf.rk, conf.topic,
384 conf.rkt_conf)))
385 KC_FATAL("Failed to create topic %s: %s", conf.topic,
386 rd_kafka_err2str(rd_kafka_last_error()));
387
388 conf.rk_conf = NULL;
389 conf.rkt_conf = NULL;
390
391
392 if (pathcnt > 0 && !(conf.flags & CONF_F_LINE)) {
393 int i;
394 int good = 0;
395 /* Read messages from files, each file is its own message. */
396
397 for (i = 0 ; i < pathcnt ; i++)
398 if (produce_file(paths[i]) != -1)
399 good++;
400
401 if (!good)
402 conf.exitcode = 1;
403 else if (good < pathcnt)
404 KC_INFO(1, "Failed to produce from %i/%i files\n",
405 pathcnt - good, pathcnt);
406
407 } else {
408 struct inbuf inbuf;
409 struct buf *b;
410
411 inbuf_init(&inbuf, conf.msg_size, conf.delim, conf.delim_size);
412
413 /* Read messages from input, delimited by conf.delim */
414 while (conf.run &&
415 inbuf_read_to_delimeter(&inbuf, fp, &b)) {
416 int msgflags = 0;
417 char *buf = b->buf;
418 char *key = NULL;
419 size_t key_len = 0;
420 size_t len = b->size;
421
422 if (len == 0) {
423 buf_destroy(b);
424 continue;
425 }
426
427 /* Extract key, if desired and found. */
428 if (conf.flags & CONF_F_KEY_DELIM) {
429 char *t;
430 if ((t = rd_strnstr(buf, len,
431 conf.key_delim,
432 conf.key_delim_size))) {
433 key_len = (size_t)(t-buf);
434 key = buf;
435 buf = t + conf.key_delim_size;
436 len -= key_len + conf.key_delim_size;
437
438 if (conf.flags & CONF_F_NULL) {
439 if (len == 0)
440 buf = NULL;
441 if (key_len == 0)
442 key = NULL;
443 }
444 }
445 }
446
447 if (!key && conf.fixed_key) {
448 key = conf.fixed_key;
449 key_len = conf.fixed_key_len;
450 }
451
452 if (len < 1024) {
453 /* If message is smaller than this arbitrary
454 * threshold it will be more effective to
455 * copy the data in librdkafka. */
456 msgflags |= RD_KAFKA_MSG_F_COPY;
457 }
458
459 /* Produce message */
460 produce(buf, len, key, key_len, msgflags,
461 (msgflags & RD_KAFKA_MSG_F_COPY) ?
462 NULL : b);
463
464 if (conf.flags & CONF_F_TEE &&
465 fwrite(b->buf, b->size, 1, stdout) != 1)
466 KC_FATAL("Tee write error for message "
467 "of %zd bytes: %s",
468 b->size, strerror(errno));
469
470 if (msgflags & RD_KAFKA_MSG_F_COPY) {
471 /* librdkafka made a copy of the input. */
472 buf_destroy(b);
473 }
474
475 /* Enforce -c <cnt> */
476 if (stats.tx == (uint64_t)conf.msg_cnt)
477 conf.run = 0;
478 }
479
480 if (conf.run) {
481 if (!feof(fp))
482 KC_FATAL("Unable to read message: %s",
483 strerror(errno));
484 }
485 }
486
487 #if ENABLE_TXNS
488 if (conf.txn) {
489 rd_kafka_error_t *error;
490 const char *what;
491
492 if (conf.term_sig) {
493 KC_INFO(0,
494 "Aborting transaction due to "
495 "termination signal\n");
496 what = "abort_transaction()";
497 error = rd_kafka_abort_transaction(
498 conf.rk, conf.metadata_timeout * 1000);
499 } else {
500 KC_INFO(1, "Committing transaction\n");
501 what = "commit_transaction()";
502 error = rd_kafka_commit_transaction(
503 conf.rk, conf.metadata_timeout * 1000);
504 if (!error)
505 KC_INFO(1,
506 "Transaction successfully committed\n");
507 }
508
509 if (error)
510 KC_FATAL("%s: %s", what, rd_kafka_error_string(error));
511 }
512 #endif
513
514
515 /* Wait for all messages to be transmitted */
516 conf.run = 1;
517 while (conf.run && rd_kafka_outq_len(conf.rk))
518 rd_kafka_poll(conf.rk, 50);
519
520 rd_kafka_topic_destroy(conf.rkt);
521 rd_kafka_destroy(conf.rk);
522
523 if (stats.tx_err_dr)
524 conf.exitcode = 1;
525 }
526
stop_partition(rd_kafka_message_t * rkmessage)527 static void stop_partition (rd_kafka_message_t *rkmessage) {
528 if (!part_stop[rkmessage->partition]) {
529 /* Stop consuming this partition */
530 rd_kafka_consume_stop(rkmessage->rkt,
531 rkmessage->partition);
532 part_stop[rkmessage->partition] = 1;
533 part_stop_cnt++;
534 if (part_stop_cnt >= part_stop_thres)
535 conf.run = 0;
536 }
537 }
538
539
540 /**
541 * @brief Mark partition as not at EOF
542 */
partition_not_eof(const rd_kafka_message_t * rkmessage)543 static void partition_not_eof (const rd_kafka_message_t *rkmessage) {
544 rd_kafka_topic_partition_t *rktpar;
545
546 if (conf.mode != 'G' || !conf.exit_eof ||
547 !conf.assignment || conf.eof_cnt == 0)
548 return;
549
550 /* Find partition in assignment */
551 rktpar = rd_kafka_topic_partition_list_find(
552 conf.assignment,
553 rd_kafka_topic_name(rkmessage->rkt),
554 rkmessage->partition);
555
556 if (!rktpar || rktpar->err != RD_KAFKA_RESP_ERR__PARTITION_EOF)
557 return;
558
559 rktpar->err = RD_KAFKA_RESP_ERR_NO_ERROR;
560 conf.eof_cnt--;
561 }
562
563
564 /**
565 * @brief Mark partition as at EOF
566 */
partition_at_eof(rd_kafka_message_t * rkmessage)567 static void partition_at_eof (rd_kafka_message_t *rkmessage) {
568
569 if (conf.mode == 'C') {
570 /* Store EOF offset.
571 * If partition is empty and at offset 0,
572 * store future first message (0). */
573 rd_kafka_offset_store(rkmessage->rkt,
574 rkmessage->partition,
575 rkmessage->offset == 0 ?
576 0 : rkmessage->offset-1);
577 if (conf.exit_eof) {
578 stop_partition(rkmessage);
579 }
580
581 } else if (conf.mode == 'G' && conf.exit_eof && conf.assignment) {
582 /* Find partition in assignment */
583 rd_kafka_topic_partition_t *rktpar;
584
585 rktpar = rd_kafka_topic_partition_list_find(
586 conf.assignment,
587 rd_kafka_topic_name(rkmessage->rkt),
588 rkmessage->partition);
589
590 if (rktpar && rktpar->err != RD_KAFKA_RESP_ERR__PARTITION_EOF) {
591 rktpar->err = RD_KAFKA_RESP_ERR__PARTITION_EOF;
592 conf.eof_cnt++;
593
594 if (conf.eof_cnt == conf.assignment->cnt)
595 conf.run = 0;
596 }
597 }
598
599 KC_INFO(1, "Reached end of topic %s [%"PRId32"] "
600 "at offset %"PRId64"%s\n",
601 rd_kafka_topic_name(rkmessage->rkt),
602 rkmessage->partition,
603 rkmessage->offset,
604 !conf.run ? ": exiting" : "");
605 }
606
607
608 /**
609 * Consume callback, called for each message consumed.
610 */
consume_cb(rd_kafka_message_t * rkmessage,void * opaque)611 static void consume_cb (rd_kafka_message_t *rkmessage, void *opaque) {
612 FILE *fp = opaque;
613
614 if (!conf.run)
615 return;
616
617 if (rkmessage->err) {
618 if (rkmessage->err == RD_KAFKA_RESP_ERR__PARTITION_EOF) {
619 partition_at_eof(rkmessage);
620 return;
621 }
622
623 if (rkmessage->rkt)
624 KC_FATAL("Topic %s [%"PRId32"] error: %s",
625 rd_kafka_topic_name(rkmessage->rkt),
626 rkmessage->partition,
627 rd_kafka_message_errstr(rkmessage));
628 else
629 KC_FATAL("Consumer error: %s",
630 rd_kafka_message_errstr(rkmessage));
631
632 } else {
633 partition_not_eof(rkmessage);
634 }
635
636 if (conf.stopts) {
637 int64_t ts = rd_kafka_message_timestamp(rkmessage, NULL);
638 if (ts >= conf.stopts) {
639 stop_partition(rkmessage);
640 KC_INFO(1, "Reached stop timestamp for topic "
641 "%s [%"PRId32"] "
642 "at offset %"PRId64"%s\n",
643 rd_kafka_topic_name(rkmessage->rkt),
644 rkmessage->partition,
645 rkmessage->offset,
646 !conf.run ? ": exiting" : "");
647 return;
648 }
649 }
650
651 /* Print message */
652 fmt_msg_output(fp, rkmessage);
653
654 if (conf.mode == 'C') {
655 rd_kafka_offset_store(rkmessage->rkt,
656 rkmessage->partition,
657 rkmessage->offset);
658 }
659
660 if (++stats.rx == (uint64_t)conf.msg_cnt) {
661 conf.run = 0;
662 rd_kafka_yield(conf.rk);
663 }
664 }
665
666
667 #if RD_KAFKA_VERSION >= 0x00090000
throttle_cb(rd_kafka_t * rk,const char * broker_name,int32_t broker_id,int throttle_time_ms,void * opaque)668 static void throttle_cb (rd_kafka_t *rk, const char *broker_name,
669 int32_t broker_id, int throttle_time_ms, void *opaque){
670 KC_INFO(1, "Broker %s (%"PRId32") throttled request for %dms\n",
671 broker_name, broker_id, throttle_time_ms);
672 }
673 #endif
674
675 #if ENABLE_KAFKACONSUMER
print_partition_list(int is_assigned,const rd_kafka_topic_partition_list_t * partitions)676 static void print_partition_list (int is_assigned,
677 const rd_kafka_topic_partition_list_t
678 *partitions) {
679 int i;
680 for (i = 0 ; i < partitions->cnt ; i++) {
681 fprintf(stderr, "%s%s [%"PRId32"]",
682 i > 0 ? ", ":"",
683 partitions->elems[i].topic,
684 partitions->elems[i].partition);
685 }
686 fprintf(stderr, "\n");
687 }
688
689
690 #if ENABLE_INCREMENTAL_ASSIGN
691 static void
incremental_rebalance_cb(rd_kafka_t * rk,rd_kafka_resp_err_t err,rd_kafka_topic_partition_list_t * partitions,void * opaque)692 incremental_rebalance_cb (rd_kafka_t *rk, rd_kafka_resp_err_t err,
693 rd_kafka_topic_partition_list_t *partitions,
694 void *opaque) {
695 rd_kafka_error_t *error = NULL;
696 int i;
697
698 KC_INFO(1, "Group %s rebalanced: incremental %s of %d partition(s) "
699 "(memberid %s%s, %s rebalance protocol): ",
700 conf.group,
701 err == RD_KAFKA_RESP_ERR__ASSIGN_PARTITIONS ?
702 "assignment" : "revoke",
703 partitions->cnt,
704 rd_kafka_memberid(rk),
705 rd_kafka_assignment_lost(rk) ? ", assignment lost" : "",
706 rd_kafka_rebalance_protocol(rk));
707
708 switch (err)
709 {
710 case RD_KAFKA_RESP_ERR__ASSIGN_PARTITIONS:
711 if (conf.verbosity >= 1)
712 print_partition_list(1, partitions);
713
714 if (!conf.assignment)
715 conf.assignment =
716 rd_kafka_topic_partition_list_new(
717 partitions->cnt);
718
719 for (i = 0 ; i < partitions->cnt ; i++) {
720 rd_kafka_topic_partition_t *rktpar =
721 &partitions->elems[i];
722
723 /* Set start offset from -o .. */
724 if (conf.offset != RD_KAFKA_OFFSET_INVALID)
725 rktpar->offset = conf.offset;
726
727 rktpar->offset = conf.offset;
728
729 rd_kafka_topic_partition_list_add(conf.assignment,
730 rktpar->topic,
731 rktpar->partition);
732 }
733 error = rd_kafka_incremental_assign(rk, partitions);
734 break;
735
736 case RD_KAFKA_RESP_ERR__REVOKE_PARTITIONS:
737 if (conf.verbosity >= 1)
738 print_partition_list(1, partitions);
739
740 /* Remove partitions from conf.assignment in reverse order
741 * to minimize array shrink sizes. */
742 for (i = partitions->cnt - 1 ;
743 conf.assignment && i >= 0 ;
744 i--) {
745 rd_kafka_topic_partition_t *rktpar =
746 &partitions->elems[i];
747
748 rd_kafka_topic_partition_list_del(conf.assignment,
749 rktpar->topic,
750 rktpar->partition);
751 }
752
753 error = rd_kafka_incremental_unassign(rk, partitions);
754 break;
755
756 default:
757 KC_INFO(0, "failed: %s\n", rd_kafka_err2str(err));
758 break;
759 }
760
761 if (error) {
762 KC_ERROR("Incremental rebalance %s of %d partition(s) "
763 "failed: %s\n",
764 err == RD_KAFKA_RESP_ERR__ASSIGN_PARTITIONS ?
765 "assign" : "unassign",
766 partitions->cnt, rd_kafka_error_string(error));
767 rd_kafka_error_destroy(error);
768 }
769 }
770 #endif
771
rebalance_cb(rd_kafka_t * rk,rd_kafka_resp_err_t err,rd_kafka_topic_partition_list_t * partitions,void * opaque)772 static void rebalance_cb (rd_kafka_t *rk, rd_kafka_resp_err_t err,
773 rd_kafka_topic_partition_list_t *partitions,
774 void *opaque) {
775 rd_kafka_resp_err_t ret_err = RD_KAFKA_RESP_ERR_NO_ERROR;
776
777 #if ENABLE_INCREMENTAL_ASSIGN
778 if (!strcmp(rd_kafka_rebalance_protocol(rk), "COOPERATIVE")) {
779 incremental_rebalance_cb(rk, err, partitions, opaque);
780 return;
781 }
782 #endif
783
784 KC_INFO(1, "Group %s rebalanced (memberid %s): ",
785 conf.group, rd_kafka_memberid(rk));
786
787 switch (err)
788 {
789 case RD_KAFKA_RESP_ERR__ASSIGN_PARTITIONS:
790 if (conf.verbosity >= 1) {
791 fprintf(stderr, "assigned: ");
792 print_partition_list(1, partitions);
793 }
794 if (conf.offset != RD_KAFKA_OFFSET_INVALID) {
795 int i;
796 for (i = 0 ; i < partitions->cnt ; i++)
797 partitions->elems[i].offset = conf.offset;
798 }
799
800 if (conf.assignment)
801 rd_kafka_topic_partition_list_destroy(conf.assignment);
802 conf.assignment =
803 rd_kafka_topic_partition_list_copy(partitions);
804
805 ret_err = rd_kafka_assign(rk, partitions);
806 break;
807
808 case RD_KAFKA_RESP_ERR__REVOKE_PARTITIONS:
809 if (conf.verbosity >= 1) {
810 fprintf(stderr, "revoked: ");
811 print_partition_list(1, partitions);
812 }
813
814 if (conf.assignment) {
815 rd_kafka_topic_partition_list_destroy(conf.assignment);
816 conf.assignment = NULL;
817 }
818
819 ret_err = rd_kafka_assign(rk, NULL);
820 break;
821
822 default:
823 KC_INFO(0, "failed: %s\n", rd_kafka_err2str(err));
824 break;
825 }
826
827 if (ret_err)
828 KC_ERROR("Rebalance %s of %d partition(s) failed: %s\n",
829 err == RD_KAFKA_RESP_ERR__ASSIGN_PARTITIONS ?
830 "assign" : "unassign",
831 partitions->cnt, rd_kafka_err2str(ret_err));
832 }
833
834 /**
835 * Run high-level KafkaConsumer, write messages to 'fp'
836 */
kafkaconsumer_run(FILE * fp,char * const * topics,int topic_cnt)837 static void kafkaconsumer_run (FILE *fp, char *const *topics, int topic_cnt) {
838 char errstr[512];
839 rd_kafka_resp_err_t err;
840 rd_kafka_topic_partition_list_t *topiclist;
841 int i;
842
843 rd_kafka_conf_set_rebalance_cb(conf.rk_conf, rebalance_cb);
844 rd_kafka_conf_set_default_topic_conf(conf.rk_conf, conf.rkt_conf);
845 conf.rkt_conf = NULL;
846
847 /* Create consumer */
848 if (!(conf.rk = rd_kafka_new(RD_KAFKA_CONSUMER, conf.rk_conf,
849 errstr, sizeof(errstr))))
850 KC_FATAL("Failed to create consumer: %s", errstr);
851 conf.rk_conf = NULL;
852
853 /* Forward main event queue to consumer queue so we can
854 * serve both queues with a single consumer_poll() call. */
855 rd_kafka_poll_set_consumer(conf.rk);
856
857 if (conf.debug)
858 rd_kafka_set_log_level(conf.rk, LOG_DEBUG);
859 else if (conf.verbosity == 0)
860 rd_kafka_set_log_level(conf.rk, 0);
861
862 /* Build subscription set */
863 topiclist = rd_kafka_topic_partition_list_new(topic_cnt);
864 for (i = 0 ; i < topic_cnt ; i++)
865 rd_kafka_topic_partition_list_add(topiclist, topics[i], -1);
866
867 /* Subscribe */
868 if ((err = rd_kafka_subscribe(conf.rk, topiclist)))
869 KC_FATAL("Failed to subscribe to %d topics: %s\n",
870 topiclist->cnt, rd_kafka_err2str(err));
871
872 KC_INFO(1, "Waiting for group rebalance\n");
873
874 rd_kafka_topic_partition_list_destroy(topiclist);
875
876 /* Read messages from Kafka, write to 'fp'. */
877 while (conf.run) {
878 rd_kafka_message_t *rkmessage;
879
880 rkmessage = rd_kafka_consumer_poll(conf.rk, 100);
881 if (!rkmessage)
882 continue;
883
884 consume_cb(rkmessage, fp);
885
886 rd_kafka_message_destroy(rkmessage);
887 }
888
889 if ((err = rd_kafka_consumer_close(conf.rk)))
890 KC_FATAL("Failed to close consumer: %s\n",
891 rd_kafka_err2str(err));
892
893 /* Wait for outstanding requests to finish. */
894 conf.run = 1;
895 while (conf.run && rd_kafka_outq_len(conf.rk) > 0)
896 rd_kafka_poll(conf.rk, 50);
897
898 if (conf.assignment)
899 rd_kafka_topic_partition_list_destroy(conf.assignment);
900
901 rd_kafka_destroy(conf.rk);
902 }
903 #endif
904
905 /**
906 * Get offsets from conf.startts for consumer_run
907 */
get_offsets(rd_kafka_metadata_topic_t * topic)908 static int64_t *get_offsets (rd_kafka_metadata_topic_t *topic) {
909 int i;
910 int64_t *offsets;
911 rd_kafka_resp_err_t err;
912 rd_kafka_topic_partition_list_t *rktparlistp =
913 rd_kafka_topic_partition_list_new(1);
914
915 for (i = 0 ; i < topic->partition_cnt ; i++) {
916 int32_t partition = topic->partitions[i].id;
917
918 /* If -p <part> was specified: skip unwanted partitions */
919 if (conf.partition != RD_KAFKA_PARTITION_UA &&
920 conf.partition != partition)
921 continue;
922
923 rd_kafka_topic_partition_list_add(
924 rktparlistp,
925 rd_kafka_topic_name(conf.rkt),
926 partition)->offset = conf.startts;
927
928 if (conf.partition != RD_KAFKA_PARTITION_UA)
929 break;
930 }
931 err = rd_kafka_offsets_for_times(conf.rk, rktparlistp,
932 conf.metadata_timeout * 1000);
933 if (err)
934 KC_FATAL("offsets_for_times failed: %s", rd_kafka_err2str(err));
935
936 offsets = calloc(sizeof(int64_t), topic->partition_cnt);
937 for (i = 0 ; i < rktparlistp->cnt ; i++) {
938 const rd_kafka_topic_partition_t *p = &rktparlistp->elems[i];
939 offsets[p->partition] = p->offset;
940 }
941 rd_kafka_topic_partition_list_destroy(rktparlistp);
942
943 return offsets;
944 }
945
946 /**
947 * Run consumer, consuming messages from Kafka and writing to 'fp'.
948 */
consumer_run(FILE * fp)949 static void consumer_run (FILE *fp) {
950 char errstr[512];
951 rd_kafka_resp_err_t err;
952 const rd_kafka_metadata_t *metadata;
953 int i;
954 int64_t *offsets = NULL;
955 rd_kafka_queue_t *rkqu;
956
957 /* Create consumer */
958 if (!(conf.rk = rd_kafka_new(RD_KAFKA_CONSUMER, conf.rk_conf,
959 errstr, sizeof(errstr))))
960 KC_FATAL("Failed to create consumer: %s", errstr);
961
962 if (!conf.debug && conf.verbosity == 0)
963 rd_kafka_set_log_level(conf.rk, 0);
964
965 /* The callback-based consumer API's offset store granularity is
966 * not good enough for us, disable automatic offset store
967 * and do it explicitly per-message in the consume callback instead. */
968 if (rd_kafka_topic_conf_set(conf.rkt_conf,
969 "auto.commit.enable", "false",
970 errstr, sizeof(errstr)) != RD_KAFKA_CONF_OK)
971 KC_FATAL("%s", errstr);
972
973 /* Create topic */
974 if (!(conf.rkt = rd_kafka_topic_new(conf.rk, conf.topic,
975 conf.rkt_conf)))
976 KC_FATAL("Failed to create topic %s: %s", conf.topic,
977 rd_kafka_err2str(rd_kafka_last_error()));
978
979 conf.rk_conf = NULL;
980 conf.rkt_conf = NULL;
981
982
983 /* Query broker for topic + partition information. */
984 if ((err = rd_kafka_metadata(conf.rk, 0, conf.rkt, &metadata,
985 conf.metadata_timeout * 1000)))
986 KC_FATAL("Failed to query metadata for topic %s: %s",
987 rd_kafka_topic_name(conf.rkt), rd_kafka_err2str(err));
988
989 /* Error handling */
990 if (metadata->topic_cnt == 0)
991 KC_FATAL("No such topic in cluster: %s",
992 rd_kafka_topic_name(conf.rkt));
993
994 if ((err = metadata->topics[0].err))
995 KC_FATAL("Topic %s error: %s",
996 rd_kafka_topic_name(conf.rkt), rd_kafka_err2str(err));
997
998 if (metadata->topics[0].partition_cnt == 0)
999 KC_FATAL("Topic %s has no partitions",
1000 rd_kafka_topic_name(conf.rkt));
1001
1002 /* If Exit-at-EOF is enabled, set up array to track EOF
1003 * state for each partition. */
1004 if (conf.exit_eof || conf.stopts) {
1005 part_stop = calloc(sizeof(*part_stop),
1006 metadata->topics[0].partition_cnt);
1007
1008 if (conf.partition != RD_KAFKA_PARTITION_UA)
1009 part_stop_thres = 1;
1010 else
1011 part_stop_thres = metadata->topics[0].partition_cnt;
1012 }
1013
1014 #if RD_KAFKA_VERSION >= 0x00090300
1015 if (conf.startts) {
1016 offsets = get_offsets(&metadata->topics[0]);
1017 }
1018 #endif
1019
1020 /* Create a shared queue that combines messages from
1021 * all wanted partitions. */
1022 rkqu = rd_kafka_queue_new(conf.rk);
1023
1024 /* Start consuming from all wanted partitions. */
1025 for (i = 0 ; i < metadata->topics[0].partition_cnt ; i++) {
1026 int32_t partition = metadata->topics[0].partitions[i].id;
1027
1028 /* If -p <part> was specified: skip unwanted partitions */
1029 if (conf.partition != RD_KAFKA_PARTITION_UA &&
1030 conf.partition != partition)
1031 continue;
1032
1033 /* Start consumer for this partition */
1034 if (rd_kafka_consume_start_queue(conf.rkt, partition,
1035 offsets ? offsets[i] :
1036 (conf.offset ==
1037 RD_KAFKA_OFFSET_INVALID ?
1038 RD_KAFKA_OFFSET_BEGINNING :
1039 conf.offset),
1040 rkqu) == -1)
1041 KC_FATAL("Failed to start consuming "
1042 "topic %s [%"PRId32"]: %s",
1043 conf.topic, partition,
1044 rd_kafka_err2str(rd_kafka_last_error()));
1045
1046 if (conf.partition != RD_KAFKA_PARTITION_UA)
1047 break;
1048 }
1049 free(offsets);
1050
1051 if (conf.partition != RD_KAFKA_PARTITION_UA &&
1052 i == metadata->topics[0].partition_cnt)
1053 KC_FATAL("Topic %s (with partitions 0..%i): "
1054 "partition %i does not exist",
1055 rd_kafka_topic_name(conf.rkt),
1056 metadata->topics[0].partition_cnt-1,
1057 conf.partition);
1058
1059
1060 /* Read messages from Kafka, write to 'fp'. */
1061 while (conf.run) {
1062 rd_kafka_consume_callback_queue(rkqu, 100,
1063 consume_cb, fp);
1064
1065 /* Poll for errors, etc */
1066 rd_kafka_poll(conf.rk, 0);
1067 }
1068
1069 /* Stop consuming */
1070 for (i = 0 ; i < metadata->topics[0].partition_cnt ; i++) {
1071 int32_t partition = metadata->topics[0].partitions[i].id;
1072
1073 /* If -p <part> was specified: skip unwanted partitions */
1074 if (conf.partition != RD_KAFKA_PARTITION_UA &&
1075 conf.partition != partition)
1076 continue;
1077
1078 /* Dont stop already stopped partitions */
1079 if (!part_stop || !part_stop[partition])
1080 rd_kafka_consume_stop(conf.rkt, partition);
1081
1082 rd_kafka_consume_stop(conf.rkt, partition);
1083 }
1084
1085 /* Destroy shared queue */
1086 rd_kafka_queue_destroy(rkqu);
1087
1088 /* Wait for outstanding requests to finish. */
1089 conf.run = 1;
1090 while (conf.run && rd_kafka_outq_len(conf.rk) > 0)
1091 rd_kafka_poll(conf.rk, 50);
1092
1093 if (conf.assignment)
1094 rd_kafka_topic_partition_list_destroy(conf.assignment);
1095
1096 rd_kafka_metadata_destroy(metadata);
1097 rd_kafka_topic_destroy(conf.rkt);
1098 rd_kafka_destroy(conf.rk);
1099 }
1100
1101
1102 /**
1103 * Print metadata information
1104 */
metadata_print(const rd_kafka_metadata_t * metadata,int32_t controllerid)1105 static void metadata_print (const rd_kafka_metadata_t *metadata,
1106 int32_t controllerid) {
1107 int i, j, k;
1108
1109 printf("Metadata for %s (from broker %"PRId32": %s):\n",
1110 conf.topic ? conf.topic : "all topics",
1111 metadata->orig_broker_id, metadata->orig_broker_name);
1112
1113 /* Iterate brokers */
1114 printf(" %i brokers:\n", metadata->broker_cnt);
1115 for (i = 0 ; i < metadata->broker_cnt ; i++)
1116 printf(" broker %"PRId32" at %s:%i%s\n",
1117 metadata->brokers[i].id,
1118 metadata->brokers[i].host,
1119 metadata->brokers[i].port,
1120 controllerid == metadata->brokers[i].id ?
1121 " (controller)" : "");
1122
1123 /* Iterate topics */
1124 printf(" %i topics:\n", metadata->topic_cnt);
1125 for (i = 0 ; i < metadata->topic_cnt ; i++) {
1126 const rd_kafka_metadata_topic_t *t = &metadata->topics[i];
1127 printf(" topic \"%s\" with %i partitions:",
1128 t->topic,
1129 t->partition_cnt);
1130 if (t->err) {
1131 printf(" %s", rd_kafka_err2str(t->err));
1132 if (t->err == RD_KAFKA_RESP_ERR_LEADER_NOT_AVAILABLE)
1133 printf(" (try again)");
1134 }
1135 printf("\n");
1136
1137 /* Iterate topic's partitions */
1138 for (j = 0 ; j < t->partition_cnt ; j++) {
1139 const rd_kafka_metadata_partition_t *p;
1140 p = &t->partitions[j];
1141 printf(" partition %"PRId32", "
1142 "leader %"PRId32", replicas: ",
1143 p->id, p->leader);
1144
1145 /* Iterate partition's replicas */
1146 for (k = 0 ; k < p->replica_cnt ; k++)
1147 printf("%s%"PRId32,
1148 k > 0 ? ",":"", p->replicas[k]);
1149
1150 /* Iterate partition's ISRs */
1151 printf(", isrs: ");
1152 for (k = 0 ; k < p->isr_cnt ; k++)
1153 printf("%s%"PRId32,
1154 k > 0 ? ",":"", p->isrs[k]);
1155 if (p->err)
1156 printf(", %s\n", rd_kafka_err2str(p->err));
1157 else
1158 printf("\n");
1159 }
1160 }
1161 }
1162
1163
1164 /**
1165 * Lists metadata
1166 */
metadata_list(void)1167 static void metadata_list (void) {
1168 char errstr[512];
1169 rd_kafka_resp_err_t err;
1170 const rd_kafka_metadata_t *metadata;
1171 int32_t controllerid = -1;
1172
1173 /* Create handle */
1174 if (!(conf.rk = rd_kafka_new(RD_KAFKA_PRODUCER, conf.rk_conf,
1175 errstr, sizeof(errstr))))
1176 KC_FATAL("Failed to create producer: %s", errstr);
1177
1178 if (!conf.debug && conf.verbosity == 0)
1179 rd_kafka_set_log_level(conf.rk, 0);
1180
1181 /* Create topic, if specified */
1182 if (conf.topic &&
1183 !(conf.rkt = rd_kafka_topic_new(conf.rk, conf.topic,
1184 conf.rkt_conf)))
1185 KC_FATAL("Failed to create topic %s: %s", conf.topic,
1186 rd_kafka_err2str(rd_kafka_last_error()));
1187
1188 conf.rk_conf = NULL;
1189 conf.rkt_conf = NULL;
1190
1191
1192 /* Fetch metadata */
1193 err = rd_kafka_metadata(conf.rk, conf.rkt ? 0 : 1, conf.rkt,
1194 &metadata, conf.metadata_timeout * 1000);
1195 if (err != RD_KAFKA_RESP_ERR_NO_ERROR)
1196 KC_FATAL("Failed to acquire metadata: %s%s",
1197 rd_kafka_err2str(err),
1198 err == RD_KAFKA_RESP_ERR__TRANSPORT ?
1199 " (Are the brokers reachable? "
1200 "Also try increasing the metadata timeout with "
1201 "-m <timeout>?)" : "");
1202
1203 #if HAVE_CONTROLLERID
1204 controllerid = rd_kafka_controllerid(conf.rk, 0);
1205 #endif
1206
1207 /* Print metadata */
1208 #if ENABLE_JSON
1209 if (conf.flags & CONF_F_FMT_JSON)
1210 metadata_print_json(metadata, controllerid);
1211 else
1212 #endif
1213 metadata_print(metadata, controllerid);
1214
1215 rd_kafka_metadata_destroy(metadata);
1216
1217 if (conf.rkt)
1218 rd_kafka_topic_destroy(conf.rkt);
1219 rd_kafka_destroy(conf.rk);
1220 }
1221
1222
1223 /**
1224 * Print usage and exit.
1225 */
usage(const char * argv0,int exitcode,const char * reason,int version_only)1226 static void RD_NORETURN usage (const char *argv0, int exitcode,
1227 const char *reason,
1228 int version_only) {
1229
1230 FILE *out = stdout;
1231 char features[256];
1232 size_t flen;
1233 rd_kafka_conf_t *tmpconf;
1234
1235 if (reason) {
1236 out = stderr;
1237 fprintf(out, "Error: %s\n\n", reason);
1238 }
1239
1240 if (!version_only)
1241 fprintf(out, "Usage: %s <options> [file1 file2 .. | topic1 topic2 ..]]\n",
1242 argv0);
1243
1244 /* Create a temporary config object to extract builtin.features */
1245 tmpconf = rd_kafka_conf_new();
1246 flen = sizeof(features);
1247 if (rd_kafka_conf_get(tmpconf, "builtin.features",
1248 features, &flen) != RD_KAFKA_CONF_OK)
1249 strncpy(features, "n/a", sizeof(features));
1250 rd_kafka_conf_destroy(tmpconf);
1251
1252 fprintf(out,
1253 "kcat - Apache Kafka producer and consumer tool\n"
1254 "https://github.com/edenhill/kcat\n"
1255 "Copyright (c) 2014-2021, Magnus Edenhill\n"
1256 "Version %s (%s%slibrdkafka %s builtin.features=%s)\n"
1257 "\n",
1258 KCAT_VERSION,
1259 ""
1260 #if ENABLE_JSON
1261 "JSON, "
1262 #endif
1263 #if ENABLE_AVRO
1264 "Avro, "
1265 #endif
1266 #if ENABLE_TXNS
1267 "Transactions, "
1268 #endif
1269 #if ENABLE_INCREMENTAL_ASSIGN
1270 "IncrementalAssign, "
1271 #endif
1272 ,
1273 #if ENABLE_JSON
1274 json_can_emit_verbatim() ? "JSONVerbatim, " : "",
1275 #else
1276 "",
1277 #endif
1278 rd_kafka_version_str(), features
1279 );
1280
1281 if (version_only)
1282 exit(exitcode);
1283
1284 fprintf(out, "\n"
1285 "General options:\n"
1286 " -C | -P | -L | -Q Mode: Consume, Produce, Metadata List, Query mode\n"
1287 #if ENABLE_KAFKACONSUMER
1288 " -G <group-id> Mode: High-level KafkaConsumer (Kafka >=0.9 balanced consumer groups)\n"
1289 " Expects a list of topics to subscribe to\n"
1290 #endif
1291 " -t <topic> Topic to consume from, produce to, "
1292 "or list\n"
1293 " -p <partition> Partition\n"
1294 " -b <brokers,..> Bootstrap broker(s) (host[:port])\n"
1295 " -D <delim> Message delimiter string:\n"
1296 " a-z | \\r | \\n | \\t | \\xNN ..\n"
1297 " Default: \\n\n"
1298 " -K <delim> Key delimiter (same format as -D)\n"
1299 " -c <cnt> Limit message count\n"
1300 " -m <seconds> Metadata (et.al.) request timeout.\n"
1301 " This limits how long kcat will block\n"
1302 " while waiting for initial metadata to be\n"
1303 " retrieved from the Kafka cluster.\n"
1304 " It also sets the timeout for the producer's\n"
1305 " transaction commits, init, aborts, etc.\n"
1306 " Default: 5 seconds.\n"
1307 " -F <config-file> Read configuration properties from file,\n"
1308 " file format is \"property=value\".\n"
1309 " The KCAT_CONFIG=path environment can "
1310 "also be used, but -F takes precedence.\n"
1311 " The default configuration file is "
1312 "$HOME/.config/kcat.conf\n"
1313 " -X list List available librdkafka configuration "
1314 "properties\n"
1315 " -X prop=val Set librdkafka configuration property.\n"
1316 " Properties prefixed with \"topic.\" are\n"
1317 " applied as topic properties.\n"
1318 #if ENABLE_AVRO
1319 " -X schema.registry.prop=val Set libserdes configuration property "
1320 "for the Avro/Schema-Registry client.\n"
1321 #endif
1322 " -X dump Dump configuration and exit.\n"
1323 " -d <dbg1,...> Enable librdkafka debugging:\n"
1324 " " RD_KAFKA_DEBUG_CONTEXTS "\n"
1325 " -q Be quiet (verbosity set to 0)\n"
1326 " -v Increase verbosity\n"
1327 " -E Do not exit on non-fatal error\n"
1328 " -V Print version\n"
1329 " -h Print usage help\n"
1330 "\n"
1331 "Producer options:\n"
1332 " -z snappy|gzip|lz4 Message compression. Default: none\n"
1333 " -p -1 Use random partitioner\n"
1334 " -D <delim> Delimiter to split input into messages\n"
1335 " -K <delim> Delimiter to split input key and message\n"
1336 " -k <str> Use a fixed key for all messages.\n"
1337 " If combined with -K, per-message keys\n"
1338 " takes precendence.\n"
1339 " -H <header=value> Add Message Headers "
1340 "(may be specified multiple times)\n"
1341 " -l Send messages from a file separated by\n"
1342 " delimiter, as with stdin.\n"
1343 " (only one file allowed)\n"
1344 " -T Output sent messages to stdout, acting like tee.\n"
1345 " -c <cnt> Exit after producing this number "
1346 "of messages\n"
1347 " -Z Send empty messages as NULL messages\n"
1348 " file1 file2.. Read messages from files.\n"
1349 " With -l, only one file permitted.\n"
1350 " Otherwise, the entire file contents will\n"
1351 " be sent as one single message.\n"
1352 " -X transactional.id=.. Enable transactions and send all\n"
1353 " messages in a single transaction which\n"
1354 " is committed when stdin is closed or the\n"
1355 " input file(s) are fully read.\n"
1356 " If kcat is terminated through Ctrl-C\n"
1357 " (et.al) the transaction will be aborted.\n"
1358 "\n"
1359 "Consumer options:\n"
1360 " -o <offset> Offset to start consuming from:\n"
1361 " beginning | end | stored |\n"
1362 " <value> (absolute offset) |\n"
1363 " -<value> (relative offset from end)\n"
1364 #if RD_KAFKA_VERSION >= 0x00090300
1365 " s@<value> (timestamp in ms to start at)\n"
1366 " e@<value> (timestamp in ms to stop at "
1367 "(not included))\n"
1368 #endif
1369 " -e Exit successfully when last message "
1370 "received\n"
1371 " -f <fmt..> Output formatting string, see below.\n"
1372 " Takes precedence over -D and -K.\n"
1373 #if ENABLE_JSON
1374 " -J Output with JSON envelope\n"
1375 #endif
1376 " -s key=<serdes> Deserialize non-NULL keys using <serdes>.\n"
1377 " -s value=<serdes> Deserialize non-NULL values using <serdes>.\n"
1378 " -s <serdes> Deserialize non-NULL keys and values using <serdes>.\n"
1379 " Available deserializers (<serdes>):\n"
1380 " <pack-str> - A combination of:\n"
1381 " <: little-endian,\n"
1382 " >: big-endian (recommended),\n"
1383 " b: signed 8-bit integer\n"
1384 " B: unsigned 8-bit integer\n"
1385 " h: signed 16-bit integer\n"
1386 " H: unsigned 16-bit integer\n"
1387 " i: signed 32-bit integer\n"
1388 " I: unsigned 32-bit integer\n"
1389 " q: signed 64-bit integer\n"
1390 " Q: unsigned 64-bit integer\n"
1391 " c: ASCII character\n"
1392 " s: remaining data is string\n"
1393 " $: match end-of-input (no more bytes remaining or a parse error is raised).\n"
1394 " Not including this token skips any\n"
1395 " remaining data after the pack-str is\n"
1396 " exhausted.\n"
1397 #if ENABLE_AVRO
1398 " avro - Avro-formatted with schema in Schema-Registry (requires -r)\n"
1399 " E.g.: -s key=i -s value=avro - key is 32-bit integer, value is Avro.\n"
1400 " or: -s avro - both key and value are Avro-serialized\n"
1401 #endif
1402 #if ENABLE_AVRO
1403 " -r <url> Schema registry URL (when avro deserializer is used with -s)\n"
1404 #endif
1405 " -D <delim> Delimiter to separate messages on output\n"
1406 " -K <delim> Print message keys prefixing the message\n"
1407 " with specified delimiter.\n"
1408 " -O Print message offset using -K delimiter\n"
1409 " -c <cnt> Exit after consuming this number "
1410 "of messages\n"
1411 " -Z Print NULL values and keys as \"%s\" "
1412 "instead of empty.\n"
1413 " For JSON (-J) the nullstr is always null.\n"
1414 " -u Unbuffered output\n"
1415 "\n"
1416 "Metadata options (-L):\n"
1417 " -t <topic> Topic to query (optional)\n"
1418 "\n"
1419 "Query options (-Q):\n"
1420 " -t <t>:<p>:<ts> Get offset for topic <t>,\n"
1421 " partition <p>, timestamp <ts>.\n"
1422 " Timestamp is the number of milliseconds\n"
1423 " since epoch UTC.\n"
1424 " Requires broker >= 0.10.0.0 and librdkafka >= 0.9.3.\n"
1425 " Multiple -t .. are allowed but a partition\n"
1426 " must only occur once.\n"
1427 "\n"
1428 "Format string tokens:\n"
1429 " %%s Message payload\n"
1430 " %%S Message payload length (or -1 for NULL)\n"
1431 " %%R Message payload length (or -1 for NULL) serialized\n"
1432 " as a binary big endian 32-bit signed integer\n"
1433 " %%k Message key\n"
1434 " %%K Message key length (or -1 for NULL)\n"
1435 #if RD_KAFKA_VERSION >= 0x000902ff
1436 " %%T Message timestamp (milliseconds since epoch UTC)\n"
1437 #endif
1438 #if HAVE_HEADERS
1439 " %%h Message headers (n=v CSV)\n"
1440 #endif
1441 " %%t Topic\n"
1442 " %%p Partition\n"
1443 " %%o Message offset\n"
1444 " \\n \\r \\t Newlines, tab\n"
1445 " \\xXX \\xNNN Any ASCII character\n"
1446 " Example:\n"
1447 " -f 'Topic %%t [%%p] at offset %%o: key %%k: %%s\\n'\n"
1448 "\n"
1449 #if ENABLE_JSON
1450 "JSON message envelope (on one line) when consuming with -J:\n"
1451 " { \"topic\": str, \"partition\": int, \"offset\": int,\n"
1452 " \"tstype\": \"create|logappend|unknown\", \"ts\": int, "
1453 "// timestamp in milliseconds since epoch\n"
1454 " \"broker\": int,\n"
1455 " \"headers\": { \"<name>\": str, .. }, // optional\n"
1456 " \"key\": str|json, \"payload\": str|json,\n"
1457 " \"key_error\": str, \"payload_error\": str, //optional\n"
1458 " \"key_schema_id\": int, "
1459 "\"value_schema_id\": int //optional\n"
1460 " }\n"
1461 " notes:\n"
1462 " - key_error and payload_error are only included if "
1463 "deserialization fails.\n"
1464 " - key_schema_id and value_schema_id are included for "
1465 "successfully deserialized Avro messages.\n"
1466 "\n"
1467 #endif
1468 "Consumer mode (writes messages to stdout):\n"
1469 " kcat -b <broker> -t <topic> -p <partition>\n"
1470 " or:\n"
1471 " kcat -C -b ...\n"
1472 "\n"
1473 #if ENABLE_KAFKACONSUMER
1474 "High-level KafkaConsumer mode:\n"
1475 " kcat -b <broker> -G <group-id> topic1 top2 ^aregex\\d+\n"
1476 "\n"
1477 #endif
1478 "Producer mode (reads messages from stdin):\n"
1479 " ... | kcat -b <broker> -t <topic> -p <partition>\n"
1480 " or:\n"
1481 " kcat -P -b ...\n"
1482 "\n"
1483 "Metadata listing:\n"
1484 " kcat -L -b <broker> [-t <topic>]\n"
1485 "\n"
1486 "Query offset by timestamp:\n"
1487 " kcat -Q -b broker -t <topic>:<partition>:<timestamp>\n"
1488 "\n",
1489 conf.null_str
1490 );
1491 exit(exitcode);
1492 }
1493
1494
1495 /**
1496 * Terminate by putting out the run flag.
1497 */
term(int sig)1498 static void term (int sig) {
1499 conf.run = 0;
1500 conf.term_sig = sig;
1501 }
1502
1503
1504 /**
1505 * librdkafka error callback
1506 */
error_cb(rd_kafka_t * rk,int err,const char * reason,void * opaque)1507 static void error_cb (rd_kafka_t *rk, int err,
1508 const char *reason, void *opaque) {
1509 #if RD_KAFKA_VERSION >= 0x01000000
1510 if (err == RD_KAFKA_RESP_ERR__FATAL) {
1511 /* A fatal error has been raised, extract the
1512 * underlying error, and start graceful termination -
1513 * this to make sure producer delivery reports are
1514 * handled before exiting. */
1515 char fatal_errstr[512];
1516 rd_kafka_resp_err_t fatal_err;
1517
1518 fatal_err = rd_kafka_fatal_error(rk, fatal_errstr,
1519 sizeof(fatal_errstr));
1520 KC_INFO(0, "FATAL CLIENT ERROR: %s: %s: terminating\n",
1521 rd_kafka_err2str(fatal_err), fatal_errstr);
1522 conf.run = 0;
1523
1524 } else
1525 #endif
1526 if (err == RD_KAFKA_RESP_ERR__ALL_BROKERS_DOWN) {
1527 KC_ERROR("%s: %s", rd_kafka_err2str(err),
1528 reason ? reason : "");
1529 } else {
1530 KC_INFO(1, "ERROR: %s: %s\n", rd_kafka_err2str(err),
1531 reason ? reason : "");
1532 }
1533 }
1534
1535
1536 /**
1537 * @brief Parse delimiter string from command line arguments and return
1538 * an allocated copy.
1539 */
parse_delim(const char * instr)1540 static char *parse_delim (const char *instr) {
1541 char *str;
1542
1543 /* Make a copy so we can modify the string. */
1544 str = strdup(instr);
1545
1546 while (1) {
1547 size_t skip = 0;
1548 char *t;
1549
1550 if ((t = strstr(str, "\\n"))) {
1551 *t = '\n';
1552 skip = 1;
1553 } else if ((t = strstr(str, "\\t"))) {
1554 *t = '\t';
1555 skip = 1;
1556 } else if ((t = strstr(str, "\\x"))) {
1557 char *end;
1558 int x;
1559 x = strtoul(t+strlen("\\x"), &end, 16) & 0xff;
1560 if (end == t+strlen("\\x"))
1561 KC_FATAL("Delimiter %s expects hex number", t);
1562 *t = (char)x;
1563 skip = (int)(end - t) - 1;
1564 } else
1565 break;
1566
1567 if (t && skip)
1568 memmove(t+1, t+1+skip, strlen(t+1+skip)+1);
1569 }
1570
1571 return str;
1572 }
1573
1574 /**
1575 * @brief Add topic+partition+offset to list, from :-separated string.
1576 *
1577 * "<t>:<p>:<o>"
1578 *
1579 * @remark Will modify \p str
1580 */
add_topparoff(const char * what,rd_kafka_topic_partition_list_t * rktparlist,char * str)1581 static void add_topparoff (const char *what,
1582 rd_kafka_topic_partition_list_t *rktparlist,
1583 char *str) {
1584 char *s, *t, *e;
1585 char *topic;
1586 int partition;
1587 int64_t offset;
1588
1589 if (!(s = strchr(str, ':')) ||
1590 !(t = strchr(s+1, ':')))
1591 KC_FATAL("%s: expected \"topic:partition:offset_or_timestamp\"", what);
1592
1593 topic = str;
1594 *s = '\0';
1595
1596 partition = strtoul(s+1, &e, 0);
1597 if (e == s+1)
1598 KC_FATAL("%s: expected \"topic:partition:offset_or_timestamp\"", what);
1599
1600 offset = strtoll(t+1, &e, 0);
1601 if (e == t+1)
1602 KC_FATAL("%s: expected \"topic:partition:offset_or_timestamp\"", what);
1603
1604 rd_kafka_topic_partition_list_add(rktparlist, topic, partition)->offset = offset;
1605 }
1606
1607 /**
1608 * Dump current rdkafka configuration to stdout.
1609 */
conf_dump(void)1610 static void conf_dump (void) {
1611 const char **arr;
1612 size_t cnt;
1613 int pass;
1614
1615 for (pass = 0 ; pass < 2 ; pass++) {
1616 int i;
1617
1618 if (pass == 0) {
1619 arr = rd_kafka_conf_dump(conf.rk_conf, &cnt);
1620 printf("# Global config\n");
1621 } else {
1622 printf("# Topic config\n");
1623 arr = rd_kafka_topic_conf_dump(conf.rkt_conf, &cnt);
1624 }
1625
1626 for (i = 0 ; i < (int)cnt ; i += 2)
1627 printf("%s = %s\n",
1628 arr[i], arr[i+1]);
1629
1630 printf("\n");
1631
1632 rd_kafka_conf_dump_free(arr, cnt);
1633 }
1634 }
1635
1636
1637 /**
1638 * @brief Try setting a config property. Provides "topic." fallthru.
1639 *
1640 * @remark \p val may be truncated by this function.
1641 *
1642 * @returns -1 on failure or 0 on success.
1643 */
try_conf_set(const char * name,char * val,char * errstr,size_t errstr_size)1644 static int try_conf_set (const char *name, char *val,
1645 char *errstr, size_t errstr_size) {
1646 rd_kafka_conf_res_t res = RD_KAFKA_CONF_UNKNOWN;
1647 size_t srlen = strlen("schema.registry.");
1648
1649 /* Pass schema.registry. config to the serdes */
1650 if (!strncmp(name, "schema.registry.", srlen)) {
1651 #if ENABLE_AVRO
1652 serdes_err_t serr;
1653
1654 if (!conf.srconf)
1655 conf.srconf = serdes_conf_new(NULL, 0, NULL);
1656
1657 if (!strcmp(name, "schema.registry.url")) {
1658 char *t;
1659
1660 /* Trim trailing slashes from URL to avoid 404 */
1661 for (t = val + strlen(val) - 1;
1662 t >= val && *t == '/'; t--)
1663 *t = '\0';
1664
1665 if (!*t) {
1666 snprintf(errstr, errstr_size,
1667 "schema.registry.url is empty");
1668 return -1;
1669 }
1670
1671 conf.flags |= CONF_F_SR_URL_SEEN;
1672 srlen = 0;
1673 }
1674
1675 serr = serdes_conf_set(conf.srconf, name+srlen, val,
1676 errstr, errstr_size);
1677 return serr == SERDES_ERR_OK ? 0 : -1;
1678 #else
1679 snprintf(errstr, errstr_size,
1680 "This build of kcat lacks "
1681 "Avro/Schema-Registry support");
1682 return -1;
1683 #endif
1684 }
1685
1686
1687 /* Try "topic." prefixed properties on topic
1688 * conf first, and then fall through to global if
1689 * it didnt match a topic configuration property. */
1690 if (!strncmp(name, "topic.", strlen("topic.")))
1691 res = rd_kafka_topic_conf_set(conf.rkt_conf,
1692 name+
1693 strlen("topic."),
1694 val,
1695 errstr, errstr_size);
1696 else
1697 /* If no "topic." prefix, try the topic config first. */
1698 res = rd_kafka_topic_conf_set(conf.rkt_conf,
1699 name, val,
1700 errstr, errstr_size);
1701
1702 if (res == RD_KAFKA_CONF_UNKNOWN)
1703 res = rd_kafka_conf_set(conf.rk_conf, name, val,
1704 errstr, errstr_size);
1705
1706 if (res != RD_KAFKA_CONF_OK)
1707 return -1;
1708
1709 if (!strcmp(name, "metadata.broker.list") ||
1710 !strcmp(name, "bootstrap.servers"))
1711 conf.flags |= CONF_F_BROKERS_SEEN;
1712
1713 if (!strcmp(name, "api.version.request"))
1714 conf.flags |= CONF_F_APIVERREQ_USER;
1715
1716 #if RD_KAFKA_VERSION >= 0x00090000
1717 rd_kafka_conf_set_throttle_cb(conf.rk_conf, throttle_cb);
1718 #endif
1719
1720
1721 return 0;
1722 }
1723
1724 /**
1725 * @brief Intercept configuration properties and try to identify
1726 * incompatible properties that needs to be converted to librdkafka
1727 * configuration properties.
1728 *
1729 * @returns -1 on failure, 0 if the property was not handled,
1730 * or 1 if it was handled.
1731 */
try_java_conf_set(const char * name,const char * val,char * errstr,size_t errstr_size)1732 static int try_java_conf_set (const char *name, const char *val,
1733 char *errstr, size_t errstr_size) {
1734 if (!strcmp(name, "ssl.endpoint.identification.algorithm"))
1735 return 1; /* SSL server verification:
1736 * not supported by librdkafka: ignore for now */
1737
1738 if (!strcmp(name, "sasl.jaas.config")) {
1739 char sasl_user[128], sasl_pass[128];
1740 if (sscanf(val,
1741 "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"%[^\"]\" password=\"%[^\"]\"",
1742 sasl_user, sasl_pass) == 2) {
1743 if (try_conf_set("sasl.username", sasl_user,
1744 errstr, errstr_size) == -1 ||
1745 try_conf_set("sasl.password", sasl_pass,
1746 errstr, errstr_size) == -1)
1747 return -1;
1748 return 1;
1749 }
1750 }
1751
1752 return 0;
1753 }
1754
1755
1756 /**
1757 * @brief Read config file, fail terminally if fatal is true, else
1758 * fail silently.
1759 *
1760 * @returns 0 on success or -1 on failure (unless fatal is true
1761 * in which case the app will have exited).
1762 */
read_conf_file(const char * path,int fatal)1763 static int read_conf_file (const char *path, int fatal) {
1764 FILE *fp;
1765 char buf[512];
1766 int line = 0;
1767
1768 if (!(fp = fopen(path, "r"))) {
1769 if (fatal)
1770 KC_FATAL("Failed to open %s: %s",
1771 path, strerror(errno));
1772 return -1;
1773 }
1774
1775 KC_INFO(fatal ? 1 : 3, "Reading configuration from file %s\n", path);
1776
1777 while (fgets(buf, sizeof(buf), fp)) {
1778 char *s = buf;
1779 char *t;
1780 char errstr[512];
1781 int r;
1782
1783 line++;
1784
1785 /* Left-trim */
1786 while (isspace(*s))
1787 s++;
1788
1789 /* Right-trim and remove newline */
1790 t = s + strlen(s) - 1;
1791 while (t > s && isspace(*t)) {
1792 *t = 0;
1793 t--;
1794 }
1795
1796 /* Ignore Empty line */
1797 if (!*s)
1798 continue;
1799
1800 /* Ignore comments */
1801 if (*s == '#')
1802 continue;
1803
1804 /* Strip escapes for \: \= which can be encountered in
1805 * Java configs (see comment below) */
1806 while ((t = strstr(s, "\\:"))) {
1807 memmove(t, t+1, strlen(t+1)+1); /* overwrite \: */
1808 *t = ':'; /* reinsert : */
1809 }
1810 while ((t = strstr(s, "\\="))) {
1811 memmove(t, t+1, strlen(t+1)+1); /* overwrite \= */
1812 *t = '='; /* reinsert : */
1813 }
1814
1815 /* Parse prop=value */
1816 if (!(t = strchr(s, '=')) || t == s)
1817 KC_FATAL("%s:%d: expected property=value\n",
1818 path, line);
1819
1820 *t = 0;
1821 t++;
1822
1823 /**
1824 * Attempt to support Java client configuration files,
1825 * such as the ccloud config.
1826 * There are some quirks with unnecessary escaping with \
1827 * that we remove, as well as parsing special configuration
1828 * properties that don't match librdkafka's.
1829 */
1830 r = try_java_conf_set(s, t, errstr, sizeof(errstr));
1831 if (r == -1)
1832 KC_FATAL("%s:%d: %s (java config conversion)\n",
1833 path, line, errstr);
1834 else if (r == 1)
1835 continue; /* Handled */
1836
1837 if (try_conf_set(s, t, errstr, sizeof(errstr)) == -1)
1838 KC_FATAL("%s:%d: %s\n", path, line, errstr);
1839 }
1840
1841 fclose(fp);
1842
1843 return 0;
1844 }
1845
1846
1847 /**
1848 * @returns the value for environment variable \p env, or NULL
1849 * if it is not set or it is empty.
1850 */
kc_getenv(const char * env)1851 static const char *kc_getenv (const char *env) {
1852 const char *val;
1853 if (!(val = getenv(env)) || !*val)
1854 return NULL;
1855 return val;
1856 }
1857
read_default_conf_files(void)1858 static void read_default_conf_files (void) {
1859 char kpath[512], kpath2[512];
1860 const char *home;
1861
1862 if (!(home = kc_getenv("HOME")))
1863 return;
1864
1865 snprintf(kpath, sizeof(kpath), "%s/.config/kcat.conf", home);
1866
1867 if (read_conf_file(kpath, 0/*not fatal*/) == 0)
1868 return;
1869
1870 snprintf(kpath2, sizeof(kpath2), "%s/.config/kafkacat.conf", home);
1871
1872 if (read_conf_file(kpath2, 0/*not fatal*/) == 0) {
1873 KC_INFO(1,
1874 "Configuration filename kafkacat.conf is "
1875 "deprecated!\n");
1876 KC_INFO(1, "Rename %s to %s\n", kpath2, kpath);
1877 }
1878 }
1879
1880
1881
unittest_strnstr(void)1882 static int unittest_strnstr (void) {
1883 struct {
1884 const char *sep;
1885 const char *hay;
1886 int offset;
1887 } exp[] = {
1888 { ";Sep;", ";Sep;Post", 0 },
1889 { ";Sep;", ";Sep;", 0 },
1890 { ";Sep;", "Pre;Sep;Post", 3 },
1891 { ";Sep;", "Pre;Sep;", 3 },
1892 { ";Sep;", "Pre;SepPost", -1 },
1893 { ";KeyDel;", "Key1;KeyDel;Value1", 4 },
1894 { ";", "Is The;", 6 },
1895 { NULL },
1896 };
1897 int i;
1898 int fails = 0;
1899
1900 for (i = 0 ; exp[i].sep ; i++) {
1901 const char *t = rd_strnstr(exp[i].hay, strlen(exp[i].hay),
1902 exp[i].sep, strlen(exp[i].sep));
1903 const char *e = exp[i].hay + exp[i].offset;
1904 const char *fail = NULL;
1905
1906 if (exp[i].offset == -1) {
1907 if (t)
1908 fail = "expected no match";
1909 } else if (!t) {
1910 fail = "expected match";
1911 } else if (t != e)
1912 fail = "invalid match";
1913
1914 if (!fail)
1915 continue;
1916
1917 fprintf(stderr,
1918 "%s: FAILED: for hay %d: "
1919 "match is %p '%s' for %p '%s' in %p '%s' "
1920 "(want offset %d, not %d): %s\n",
1921 __FUNCTION__,
1922 i,
1923 t, t,
1924 exp[i].sep, exp[i].sep,
1925 exp[i].hay, exp[i].hay,
1926 exp[i].offset,
1927 t ? (int)(t - exp[i].hay) : -1,
1928 fail);
1929 fails++;
1930 }
1931
1932 return fails;
1933 }
1934
unittest_parse_delim(void)1935 static int unittest_parse_delim (void) {
1936 struct {
1937 const char *in;
1938 const char *exp;
1939 } delims[] = {
1940 { "", "" },
1941 { "\\n", "\n" },
1942 { "\\t\\n\\n", "\t\n\n" },
1943 { "\\x54!\\x45\\x53T", "T!EST" },
1944 { "\\x30", "0" },
1945 { NULL }
1946 };
1947 int i;
1948 int fails = 0;
1949
1950 for (i = 0 ; delims[i].in ; i++) {
1951 char *out = parse_delim(delims[i].in);
1952 if (strcmp(out, delims[i].exp))
1953 fprintf(stderr, "%s: FAILED: "
1954 "expected '%s' to return '%s', not '%s'\n",
1955 __FUNCTION__, delims[i].in, delims[i].exp, out);
1956 free(out);
1957 }
1958
1959 return fails;
1960 }
1961
1962
1963 /**
1964 * @brief Run unittests
1965 *
1966 * @returns the number of failed tests.
1967 */
unittest(void)1968 static int unittest (void) {
1969 int r = 0;
1970
1971 r += unittest_strnstr();
1972 r += unittest_parse_delim();
1973
1974 return r;
1975 }
1976
1977 /**
1978 * @brief Add a single header specified as a command line option.
1979 *
1980 * @param inp "name=value" or "name" formatted header
1981 */
add_header(const char * inp)1982 static void add_header (const char *inp) {
1983 const char *t;
1984 rd_kafka_resp_err_t err;
1985
1986 t = strchr(inp, '=');
1987 if (t == inp || !*inp)
1988 KC_FATAL("Expected -H \"name=value..\" or -H \"name\"");
1989
1990 if (!conf.headers)
1991 conf.headers = rd_kafka_headers_new(8);
1992
1993
1994 err = rd_kafka_header_add(conf.headers,
1995 inp,
1996 t ? (ssize_t)(t-inp) : -1,
1997 t ? t+1 : NULL, -1);
1998 if (err)
1999 KC_FATAL("Failed to add header \"%s\": %s",
2000 inp, rd_kafka_err2str(err));
2001 }
2002
2003
2004 /**
2005 * Parse command line arguments
2006 */
argparse(int argc,char ** argv,rd_kafka_topic_partition_list_t ** rktparlistp)2007 static void argparse (int argc, char **argv,
2008 rd_kafka_topic_partition_list_t **rktparlistp) {
2009 char errstr[512];
2010 int opt;
2011 const char *fmt = NULL;
2012 const char *delim = "\n";
2013 const char *key_delim = NULL;
2014 char tmp_fmt[64];
2015 int do_conf_dump = 0;
2016 int conf_files_read = 0;
2017 int i;
2018
2019 while ((opt = getopt(argc, argv,
2020 ":PCG:LQt:p:b:z:o:eED:K:k:H:Od:qvF:X:c:Tuf:ZlVh"
2021 "s:r:Jm:U")) != -1) {
2022 switch (opt) {
2023 case 'P':
2024 case 'C':
2025 case 'L':
2026 case 'Q':
2027 conf.mode = opt;
2028 break;
2029 #if ENABLE_KAFKACONSUMER
2030 case 'G':
2031 conf.mode = opt;
2032 conf.group = optarg;
2033 if (rd_kafka_conf_set(conf.rk_conf, "group.id", optarg,
2034 errstr, sizeof(errstr)) !=
2035 RD_KAFKA_CONF_OK)
2036 KC_FATAL("%s", errstr);
2037 break;
2038 #endif
2039 case 't':
2040 if (conf.mode == 'Q') {
2041 if (!*rktparlistp)
2042 *rktparlistp = rd_kafka_topic_partition_list_new(1);
2043 add_topparoff("-t", *rktparlistp, optarg);
2044 conf.flags |= CONF_F_APIVERREQ;
2045 } else
2046 conf.topic = optarg;
2047
2048 break;
2049 case 'p':
2050 conf.partition = atoi(optarg);
2051 break;
2052 case 'b':
2053 conf.brokers = optarg;
2054 conf.flags |= CONF_F_BROKERS_SEEN;
2055 break;
2056 case 'z':
2057 if (rd_kafka_conf_set(conf.rk_conf,
2058 "compression.codec", optarg,
2059 errstr, sizeof(errstr)) !=
2060 RD_KAFKA_CONF_OK)
2061 KC_FATAL("%s", errstr);
2062 break;
2063 case 'o':
2064 if (!strcmp(optarg, "end"))
2065 conf.offset = RD_KAFKA_OFFSET_END;
2066 else if (!strcmp(optarg, "beginning"))
2067 conf.offset = RD_KAFKA_OFFSET_BEGINNING;
2068 else if (!strcmp(optarg, "stored"))
2069 conf.offset = RD_KAFKA_OFFSET_STORED;
2070 #if RD_KAFKA_VERSION >= 0x00090300
2071 else if (!strncmp(optarg, "s@", 2)) {
2072 conf.startts = strtoll(optarg+2, NULL, 10);
2073 conf.flags |= CONF_F_APIVERREQ;
2074 } else if (!strncmp(optarg, "e@", 2)) {
2075 conf.stopts = strtoll(optarg+2, NULL, 10);
2076 conf.flags |= CONF_F_APIVERREQ;
2077 }
2078 #endif
2079 else {
2080 conf.offset = strtoll(optarg, NULL, 10);
2081 if (conf.offset < 0)
2082 conf.offset = RD_KAFKA_OFFSET_TAIL(-conf.offset);
2083 }
2084 break;
2085 case 'e':
2086 conf.exit_eof = 1;
2087 break;
2088 case 'E':
2089 conf.exitonerror = 0;
2090 break;
2091 case 'f':
2092 fmt = optarg;
2093 break;
2094 case 'J':
2095 #if ENABLE_JSON
2096 conf.flags |= CONF_F_FMT_JSON;
2097 #else
2098 KC_FATAL("This build of kcat lacks JSON support");
2099 #endif
2100 break;
2101
2102 case 's':
2103 {
2104 int field = -1;
2105 const char *t = optarg;
2106
2107 if (!strncmp(t, "key=", strlen("key="))) {
2108 t += strlen("key=");
2109 field = KC_MSG_FIELD_KEY;
2110 } else if (!strncmp(t, "value=", strlen("value="))) {
2111 t += strlen("value=");
2112 field = KC_MSG_FIELD_VALUE;
2113 }
2114
2115 if (field == -1 || field == KC_MSG_FIELD_KEY) {
2116 if (strcmp(t, "avro"))
2117 pack_check("key", t);
2118 conf.pack[KC_MSG_FIELD_KEY] = t;
2119 }
2120
2121 if (field == -1 || field == KC_MSG_FIELD_VALUE) {
2122 if (strcmp(t, "avro"))
2123 pack_check("value", t);
2124 conf.pack[KC_MSG_FIELD_VALUE] = t;
2125 }
2126 }
2127 break;
2128 case 'r':
2129 #if ENABLE_AVRO
2130 if (!*optarg)
2131 KC_FATAL("-r url must not be empty");
2132
2133 if (try_conf_set("schema.registry.url", optarg,
2134 errstr, sizeof(errstr)) == -1)
2135 KC_FATAL("%s", errstr);
2136 #else
2137 KC_FATAL("This build of kcat lacks "
2138 "Avro/Schema-Registry support");
2139 #endif
2140 break;
2141 case 'D':
2142 delim = optarg;
2143 break;
2144 case 'K':
2145 key_delim = optarg;
2146 conf.flags |= CONF_F_KEY_DELIM;
2147 break;
2148 case 'k':
2149 conf.fixed_key = optarg;
2150 conf.fixed_key_len = (size_t)(strlen(conf.fixed_key));
2151 break;
2152 case 'H':
2153 add_header(optarg);
2154 break;
2155 case 'l':
2156 conf.flags |= CONF_F_LINE;
2157 break;
2158 case 'O':
2159 conf.flags |= CONF_F_OFFSET;
2160 break;
2161 case 'c':
2162 conf.msg_cnt = strtoll(optarg, NULL, 10);
2163 break;
2164 case 'm':
2165 conf.metadata_timeout = strtoll(optarg, NULL, 10);
2166 break;
2167 case 'Z':
2168 conf.flags |= CONF_F_NULL;
2169 conf.null_str_len = strlen(conf.null_str);
2170 break;
2171 case 'd':
2172 conf.debug = optarg;
2173 if (rd_kafka_conf_set(conf.rk_conf, "debug", conf.debug,
2174 errstr, sizeof(errstr)) !=
2175 RD_KAFKA_CONF_OK)
2176 KC_FATAL("%s", errstr);
2177 break;
2178 case 'q':
2179 conf.verbosity = 0;
2180 break;
2181 case 'v':
2182 conf.verbosity++;
2183 break;
2184 case 'T':
2185 conf.flags |= CONF_F_TEE;
2186 break;
2187 case 'u':
2188 setbuf(stdout, NULL);
2189 break;
2190 case 'F':
2191 conf.flags |= CONF_F_NO_CONF_SEARCH;
2192 if (!strcmp(optarg, "-") || !strcmp(optarg, "none"))
2193 break;
2194
2195 read_conf_file(optarg, 1);
2196 conf_files_read++;
2197 break;
2198 case 'X':
2199 {
2200 char *name, *val;
2201
2202 if (!strcmp(optarg, "list") ||
2203 !strcmp(optarg, "help")) {
2204 rd_kafka_conf_properties_show(stdout);
2205 exit(0);
2206 }
2207
2208 if (!strcmp(optarg, "dump")) {
2209 do_conf_dump = 1;
2210 continue;
2211 }
2212
2213 name = optarg;
2214 if (!(val = strchr(name, '='))) {
2215 fprintf(stderr, "%% Expected "
2216 "-X property=value, not %s, "
2217 "use -X list to display available "
2218 "properties\n", name);
2219 exit(1);
2220 }
2221
2222 *val = '\0';
2223 val++;
2224
2225 if (try_conf_set(name, val,
2226 errstr, sizeof(errstr)) == -1)
2227 KC_FATAL("%s", errstr);
2228 }
2229 break;
2230
2231 case 'U':
2232 if (unittest())
2233 exit(1);
2234 else
2235 exit(0);
2236 break;
2237
2238 case 'V':
2239 usage(argv[0], 0, NULL, 1);
2240 break;
2241
2242 case 'h':
2243 usage(argv[0], 0, NULL, 0);
2244 break;
2245
2246 default:
2247 usage(argv[0], 1, "unknown argument", 0);
2248 break;
2249 }
2250 }
2251
2252
2253 if (conf_files_read == 0) {
2254 const char *cpath = kc_getenv("KCAT_CONFIG");
2255 if (cpath) {
2256 conf.flags |= CONF_F_NO_CONF_SEARCH;
2257 read_conf_file(cpath, 1/*fatal errors*/);
2258
2259 } else if ((cpath = kc_getenv("KAFKACAT_CONFIG"))) {
2260 KC_INFO(1, "KAFKA_CONFIG is deprecated!\n");
2261 KC_INFO(1, "Rename KAFKA_CONFIG to KCAT_CONFIG\n");
2262 conf.flags |= CONF_F_NO_CONF_SEARCH;
2263 read_conf_file(cpath, 1/*fatal errors*/);
2264 }
2265 }
2266
2267 if (!(conf.flags & CONF_F_NO_CONF_SEARCH))
2268 read_default_conf_files();
2269
2270 /* Dump configuration and exit, if so desired. */
2271 if (do_conf_dump) {
2272 conf_dump();
2273 exit(0);
2274 }
2275
2276 if (!(conf.flags & CONF_F_BROKERS_SEEN))
2277 usage(argv[0], 1, "-b <broker,..> missing", 0);
2278
2279 /* Decide mode if not specified */
2280 if (!conf.mode) {
2281 if (_COMPAT(isatty)(STDIN_FILENO))
2282 conf.mode = 'C';
2283 else
2284 conf.mode = 'P';
2285 KC_INFO(1, "Auto-selecting %s mode (use -P or -C to override)\n",
2286 conf.mode == 'C' ? "Consumer":"Producer");
2287 }
2288
2289
2290 if (!strchr("GLQ", conf.mode) && !conf.topic)
2291 usage(argv[0], 1, "-t <topic> missing", 0);
2292 else if (conf.mode == 'Q' && !*rktparlistp)
2293 usage(argv[0], 1,
2294 "-t <topic>:<partition>:<offset_or_timestamp> missing",
2295 0);
2296
2297 if (conf.brokers &&
2298 rd_kafka_conf_set(conf.rk_conf, "metadata.broker.list",
2299 conf.brokers, errstr, sizeof(errstr)) !=
2300 RD_KAFKA_CONF_OK)
2301 usage(argv[0], 1, errstr, 0);
2302
2303 rd_kafka_conf_set_error_cb(conf.rk_conf, error_cb);
2304
2305 fmt_init();
2306
2307 /*
2308 * Verify serdes
2309 */
2310 for (i = 0 ; i < KC_MSG_FIELD_CNT ; i++) {
2311 if (!conf.pack[i])
2312 continue;
2313
2314 if (!strchr("GC", conf.mode))
2315 KC_FATAL("-s serdes only available in the consumer");
2316
2317 if (conf.pack[i] && !strcmp(conf.pack[i], "avro")) {
2318 #if !ENABLE_AVRO
2319 KC_FATAL("This build of kcat lacks "
2320 "Avro/Schema-Registry support");
2321 #endif
2322 #if ENABLE_JSON
2323 /* Avro is decoded to JSON which needs to be
2324 * written verbatim to the JSON envelope when
2325 * using -J: libyajl does not support this,
2326 * but my own fork of yajl does. */
2327 if (conf.flags & CONF_F_FMT_JSON &&
2328 !json_can_emit_verbatim())
2329 KC_FATAL("This build of kcat lacks "
2330 "support for emitting "
2331 "JSON-formatted "
2332 "message keys and values: "
2333 "try without -J or build "
2334 "kcat with yajl from "
2335 "https://github.com/edenhill/yajl");
2336 #endif
2337
2338 if (i == KC_MSG_FIELD_VALUE)
2339 conf.flags |= CONF_F_FMT_AVRO_VALUE;
2340 else if (i == KC_MSG_FIELD_KEY)
2341 conf.flags |= CONF_F_FMT_AVRO_KEY;
2342 continue;
2343 }
2344 }
2345
2346
2347 /*
2348 * Verify and initialize Avro/SR
2349 */
2350 #if ENABLE_AVRO
2351 if (conf.flags & (CONF_F_FMT_AVRO_VALUE|CONF_F_FMT_AVRO_KEY)) {
2352
2353 if (!(conf.flags & CONF_F_SR_URL_SEEN))
2354 KC_FATAL("-s avro requires -r <sr_url>");
2355
2356 if (!strchr("GC", conf.mode))
2357 KC_FATAL("Avro and Schema-registry support is "
2358 "currently only available in the consumer");
2359
2360 /* Initialize Avro/Schema-Registry client */
2361 kc_avro_init(NULL, NULL, NULL, NULL);
2362 }
2363 #endif
2364
2365
2366 /* If avro key is to be deserialized, set up an delimiter so that
2367 * the key is actually emitted. */
2368 if ((conf.flags & CONF_F_FMT_AVRO_KEY) && !key_delim)
2369 key_delim = "";
2370
2371 if (key_delim) {
2372 conf.key_delim = parse_delim(key_delim);
2373 conf.key_delim_size = strlen(conf.key_delim);
2374 }
2375
2376 conf.delim = parse_delim(delim);
2377 conf.delim_size = strlen(conf.delim);
2378
2379 if (strchr("GC", conf.mode)) {
2380 /* Must be explicitly enabled for librdkafka >= v1.0.0 */
2381 rd_kafka_conf_set(conf.rk_conf, "enable.partition.eof", "true",
2382 NULL, 0);
2383
2384 if (!fmt) {
2385 if ((conf.flags & CONF_F_FMT_JSON)) {
2386 /* For JSON the format string is simply the
2387 * output object delimiter (e.g., newline). */
2388 fmt = conf.delim;
2389 } else {
2390 if (conf.key_delim)
2391 snprintf(tmp_fmt, sizeof(tmp_fmt),
2392 "%%k%s%%s%s",
2393 conf.key_delim, conf.delim);
2394 else
2395 snprintf(tmp_fmt, sizeof(tmp_fmt),
2396 "%%s%s", conf.delim);
2397 fmt = tmp_fmt;
2398 }
2399 }
2400
2401 fmt_parse(fmt);
2402
2403 } else if (conf.mode == 'P') {
2404 if (conf.delim_size == 0)
2405 KC_FATAL("Message delimiter -D must not be empty "
2406 "when producing");
2407 }
2408
2409 /* Automatically enable API version requests if needed and
2410 * user hasn't explicitly configured it (in any way). */
2411 if ((conf.flags & (CONF_F_APIVERREQ | CONF_F_APIVERREQ_USER)) ==
2412 CONF_F_APIVERREQ) {
2413 KC_INFO(2, "Automatically enabling api.version.request=true\n");
2414 rd_kafka_conf_set(conf.rk_conf, "api.version.request", "true",
2415 NULL, 0);
2416 }
2417 }
2418
2419
2420
2421
main(int argc,char ** argv)2422 int main (int argc, char **argv) {
2423 #ifdef SIGIO
2424 char tmp[16];
2425 #endif
2426 FILE *in = stdin;
2427 struct timeval tv;
2428 rd_kafka_topic_partition_list_t *rktparlist = NULL;
2429
2430 /* Certain Docker images don't have kcat as the entry point,
2431 * requiring `kcat` to be the first argument. As these images
2432 * are fixed the examples get outdated and that first argument
2433 * will still be passed to the container and thus kcat,
2434 * so remove it here. */
2435 if (argc > 1 && (!strcmp(argv[1], "kcat") ||
2436 !strcmp(argv[1], "kafkacat"))) {
2437 if (argc > 2)
2438 memmove(&argv[1], &argv[2], sizeof(*argv) * (argc - 2));
2439 argc--;
2440 }
2441
2442 /* Seed rng for random partitioner, jitter, etc. */
2443 rd_gettimeofday(&tv, NULL);
2444 srand(tv.tv_usec);
2445
2446 /* Create config containers */
2447 conf.rk_conf = rd_kafka_conf_new();
2448 conf.rkt_conf = rd_kafka_topic_conf_new();
2449
2450 /*
2451 * Default config
2452 */
2453 #ifdef SIGIO
2454 /* Enable quick termination of librdkafka */
2455 snprintf(tmp, sizeof(tmp), "%i", SIGIO);
2456 rd_kafka_conf_set(conf.rk_conf, "internal.termination.signal",
2457 tmp, NULL, 0);
2458 #endif
2459
2460 /* Log callback */
2461 rd_kafka_conf_set_log_cb(conf.rk_conf, rd_kafka_log_print);
2462
2463 /* Parse command line arguments */
2464 argparse(argc, argv, &rktparlist);
2465
2466 if (optind < argc) {
2467 if (!strchr("PG", conf.mode))
2468 usage(argv[0], 1,
2469 "file/topic list only allowed in "
2470 "producer(-P)/kafkaconsumer(-G) mode", 0);
2471 else if ((conf.flags & CONF_F_LINE) && argc - optind > 1)
2472 KC_FATAL("Only one file allowed for line mode (-l)");
2473 else if (conf.flags & CONF_F_LINE) {
2474 in = fopen(argv[optind], "r");
2475 if (in == NULL)
2476 KC_FATAL("Cannot open %s: %s", argv[optind],
2477 strerror(errno));
2478 }
2479 }
2480
2481 signal(SIGINT, term);
2482 signal(SIGTERM, term);
2483 #ifdef SIGPIPE
2484 signal(SIGPIPE, term);
2485 #endif
2486
2487 /* Run according to mode */
2488 switch (conf.mode)
2489 {
2490 case 'C':
2491 consumer_run(stdout);
2492 break;
2493
2494 #if ENABLE_KAFKACONSUMER
2495 case 'G':
2496 if (conf.stopts || conf.startts)
2497 KC_FATAL("-o ..@ timestamps can't be used "
2498 "with -G mode\n");
2499 kafkaconsumer_run(stdout, &argv[optind], argc-optind);
2500 break;
2501 #endif
2502
2503 case 'P':
2504 producer_run(in, &argv[optind], argc-optind);
2505 break;
2506
2507 case 'L':
2508 metadata_list();
2509 break;
2510
2511 case 'Q':
2512 if (!rktparlist)
2513 usage(argv[0], 1,
2514 "-Q requires one or more "
2515 "-t <topic>:<partition>:<timestamp>", 0);
2516
2517 query_offsets_by_time(rktparlist);
2518
2519 rd_kafka_topic_partition_list_destroy(rktparlist);
2520 break;
2521
2522 default:
2523 usage(argv[0], 0, NULL, 0);
2524 break;
2525 }
2526
2527 if (conf.headers)
2528 rd_kafka_headers_destroy(conf.headers);
2529
2530 if (conf.key_delim)
2531 free(conf.key_delim);
2532 if (conf.delim)
2533 free(conf.delim);
2534
2535 if (in != stdin)
2536 fclose(in);
2537
2538 rd_kafka_wait_destroyed(5000);
2539
2540 #if ENABLE_AVRO
2541 kc_avro_term();
2542 #endif
2543
2544 fmt_term();
2545
2546 exit(conf.exitcode);
2547 }
2548