1 /*
2 * librdkafka - Apache Kafka C library
3 *
4 * Copyright (c) 2012, Magnus Edenhill
5 * All rights reserved.
6 *
7 * Redistribution and use in source and binary forms, with or without
8 * modification, are permitted provided that the following conditions are met:
9 *
10 * 1. Redistributions of source code must retain the above copyright notice,
11 * this list of conditions and the following disclaimer.
12 * 2. Redistributions in binary form must reproduce the above copyright notice,
13 * this list of conditions and the following disclaimer in the documentation
14 * and/or other materials provided with the distribution.
15 *
16 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
17 * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
18 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
19 * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
20 * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
21 * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
22 * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
23 * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
24 * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
25 * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
26 * POSSIBILITY OF SUCH DAMAGE.
27 */
28
29 /**
30 * Apache Kafka consumer & producer performance tester
31 * using the Kafka driver from librdkafka
32 * (https://github.com/edenhill/librdkafka)
33 */
34
35 #ifdef _MSC_VER
36 #define _CRT_SECURE_NO_WARNINGS /* Silence nonsense on MSVC */
37 #endif
38
39 #include "../src/rd.h"
40
41 #define _GNU_SOURCE /* for strndup() */
42 #include <ctype.h>
43 #include <signal.h>
44 #include <string.h>
45 #include <errno.h>
46
47 /* Typical include path would be <librdkafka/rdkafka.h>, but this program
48 * is built from within the librdkafka source tree and thus differs. */
49 #include "rdkafka.h" /* for Kafka driver */
50 /* Do not include these defines from your program, they will not be
51 * provided by librdkafka. */
52 #include "rd.h"
53 #include "rdtime.h"
54
55 #ifdef _WIN32
56 #include "../win32/wingetopt.h"
57 #include "../win32/wintime.h"
58 #endif
59
60
61 static volatile sig_atomic_t run = 1;
62 static int forever = 1;
63 static rd_ts_t dispintvl = 1000;
64 static int do_seq = 0;
65 static int exit_after = 0;
66 static int exit_eof = 0;
67 static FILE *stats_fp;
68 static int dr_disp_div;
69 static int verbosity = 1;
70 static int latency_mode = 0;
71 static FILE *latency_fp = NULL;
72 static int msgcnt = -1;
73 static int incremental_mode = 0;
74 static int partition_cnt = 0;
75 static int eof_cnt = 0;
76 static int with_dr = 1;
77 static int read_hdrs = 0;
78
79
stop(int sig)80 static void stop (int sig) {
81 if (!run)
82 exit(0);
83 run = 0;
84 }
85
86 static long int msgs_wait_cnt = 0;
87 static long int msgs_wait_produce_cnt = 0;
88 static rd_ts_t t_end;
89 static rd_kafka_t *global_rk;
90
91 struct avg {
92 int64_t val;
93 int cnt;
94 uint64_t ts_start;
95 };
96
97 static struct {
98 rd_ts_t t_start;
99 rd_ts_t t_end;
100 rd_ts_t t_end_send;
101 uint64_t msgs;
102 uint64_t msgs_last;
103 uint64_t msgs_dr_ok;
104 uint64_t msgs_dr_err;
105 uint64_t bytes_dr_ok;
106 uint64_t bytes;
107 uint64_t bytes_last;
108 uint64_t tx;
109 uint64_t tx_err;
110 uint64_t avg_rtt;
111 uint64_t offset;
112 rd_ts_t t_fetch_latency;
113 rd_ts_t t_last;
114 rd_ts_t t_enobufs_last;
115 rd_ts_t t_total;
116 rd_ts_t latency_last;
117 rd_ts_t latency_lo;
118 rd_ts_t latency_hi;
119 rd_ts_t latency_sum;
120 int latency_cnt;
121 int64_t last_offset;
122 } cnt;
123
124
wall_clock(void)125 uint64_t wall_clock (void) {
126 struct timeval tv;
127 gettimeofday(&tv, NULL);
128 return ((uint64_t)tv.tv_sec * 1000000LLU) +
129 ((uint64_t)tv.tv_usec);
130 }
131
err_cb(rd_kafka_t * rk,int err,const char * reason,void * opaque)132 static void err_cb (rd_kafka_t *rk, int err, const char *reason, void *opaque) {
133 if (err == RD_KAFKA_RESP_ERR__FATAL) {
134 char errstr[512];
135 err = rd_kafka_fatal_error(rk, errstr, sizeof(errstr));
136 printf("%% FATAL ERROR CALLBACK: %s: %s: %s\n",
137 rd_kafka_name(rk), rd_kafka_err2str(err), errstr);
138 } else {
139 printf("%% ERROR CALLBACK: %s: %s: %s\n",
140 rd_kafka_name(rk), rd_kafka_err2str(err), reason);
141 }
142 }
143
throttle_cb(rd_kafka_t * rk,const char * broker_name,int32_t broker_id,int throttle_time_ms,void * opaque)144 static void throttle_cb (rd_kafka_t *rk, const char *broker_name,
145 int32_t broker_id, int throttle_time_ms,
146 void *opaque) {
147 printf("%% THROTTLED %dms by %s (%"PRId32")\n", throttle_time_ms,
148 broker_name, broker_id);
149 }
150
offset_commit_cb(rd_kafka_t * rk,rd_kafka_resp_err_t err,rd_kafka_topic_partition_list_t * offsets,void * opaque)151 static void offset_commit_cb (rd_kafka_t *rk, rd_kafka_resp_err_t err,
152 rd_kafka_topic_partition_list_t *offsets,
153 void *opaque) {
154 int i;
155
156 if (err || verbosity >= 2)
157 printf("%% Offset commit of %d partition(s): %s\n",
158 offsets->cnt, rd_kafka_err2str(err));
159
160 for (i = 0 ; i < offsets->cnt ; i++) {
161 rd_kafka_topic_partition_t *rktpar = &offsets->elems[i];
162 if (rktpar->err || verbosity >= 2)
163 printf("%% %s [%"PRId32"] @ %"PRId64": %s\n",
164 rktpar->topic, rktpar->partition,
165 rktpar->offset, rd_kafka_err2str(err));
166 }
167 }
168
169 /**
170 * @brief Add latency measurement
171 */
latency_add(int64_t ts,const char * who)172 static void latency_add (int64_t ts, const char *who) {
173 if (ts > cnt.latency_hi)
174 cnt.latency_hi = ts;
175 if (!cnt.latency_lo || ts < cnt.latency_lo)
176 cnt.latency_lo = ts;
177 cnt.latency_last = ts;
178 cnt.latency_cnt++;
179 cnt.latency_sum += ts;
180 if (latency_fp)
181 fprintf(latency_fp, "%"PRIu64"\n", ts);
182 }
183
184
msg_delivered(rd_kafka_t * rk,const rd_kafka_message_t * rkmessage,void * opaque)185 static void msg_delivered (rd_kafka_t *rk,
186 const rd_kafka_message_t *rkmessage, void *opaque) {
187 static rd_ts_t last;
188 rd_ts_t now = rd_clock();
189 static int msgs;
190
191 msgs++;
192
193 msgs_wait_cnt--;
194
195 if (rkmessage->err)
196 cnt.msgs_dr_err++;
197 else {
198 cnt.msgs_dr_ok++;
199 cnt.bytes_dr_ok += rkmessage->len;
200 }
201
202 if (latency_mode) {
203 /* Extract latency */
204 int64_t source_ts;
205 if (sscanf(rkmessage->payload, "LATENCY:%"SCNd64,
206 &source_ts) == 1)
207 latency_add(wall_clock() - source_ts, "producer");
208 }
209
210
211 if ((rkmessage->err &&
212 (cnt.msgs_dr_err < 50 ||
213 !(cnt.msgs_dr_err % (dispintvl / 1000)))) ||
214 !last || msgs_wait_cnt < 5 ||
215 !(msgs_wait_cnt % dr_disp_div) ||
216 (now - last) >= dispintvl * 1000 ||
217 verbosity >= 3) {
218 if (rkmessage->err && verbosity >= 2)
219 printf("%% Message delivery failed: %s [%"PRId32"]: "
220 "%s (%li remain)\n",
221 rd_kafka_topic_name(rkmessage->rkt),
222 rkmessage->partition,
223 rd_kafka_err2str(rkmessage->err),
224 msgs_wait_cnt);
225 else if (verbosity > 2)
226 printf("%% Message delivered (offset %"PRId64"): "
227 "%li remain\n",
228 rkmessage->offset, msgs_wait_cnt);
229 if (verbosity >= 3 && do_seq)
230 printf(" --> \"%.*s\"\n",
231 (int)rkmessage->len,
232 (const char *)rkmessage->payload);
233 last = now;
234 }
235
236 cnt.last_offset = rkmessage->offset;
237
238 if (msgs_wait_produce_cnt == 0 && msgs_wait_cnt == 0 && !forever) {
239 if (verbosity >= 2 && cnt.msgs > 0) {
240 double error_percent =
241 (double)(cnt.msgs - cnt.msgs_dr_ok) /
242 cnt.msgs * 100;
243 printf("%% Messages delivered with failure "
244 "percentage of %.5f%%\n", error_percent);
245 }
246 t_end = rd_clock();
247 run = 0;
248 }
249
250 if (exit_after && exit_after <= msgs) {
251 printf("%% Hard exit after %i messages, as requested\n",
252 exit_after);
253 exit(0);
254 }
255 }
256
257
msg_consume(rd_kafka_message_t * rkmessage,void * opaque)258 static void msg_consume (rd_kafka_message_t *rkmessage, void *opaque) {
259
260 if (rkmessage->err) {
261 if (rkmessage->err == RD_KAFKA_RESP_ERR__PARTITION_EOF) {
262 cnt.offset = rkmessage->offset;
263
264 if (verbosity >= 1)
265 printf("%% Consumer reached end of "
266 "%s [%"PRId32"] "
267 "message queue at offset %"PRId64"\n",
268 rd_kafka_topic_name(rkmessage->rkt),
269 rkmessage->partition, rkmessage->offset);
270
271 if (exit_eof && ++eof_cnt == partition_cnt)
272 run = 0;
273
274 return;
275 }
276
277 printf("%% Consume error for topic \"%s\" [%"PRId32"] "
278 "offset %"PRId64": %s\n",
279 rkmessage->rkt ? rd_kafka_topic_name(rkmessage->rkt):"",
280 rkmessage->partition,
281 rkmessage->offset,
282 rd_kafka_message_errstr(rkmessage));
283
284 if (rkmessage->err == RD_KAFKA_RESP_ERR__UNKNOWN_PARTITION ||
285 rkmessage->err == RD_KAFKA_RESP_ERR__UNKNOWN_TOPIC)
286 run = 0;
287
288 cnt.msgs_dr_err++;
289 return;
290 }
291
292 /* Start measuring from first message received */
293 if (!cnt.t_start)
294 cnt.t_start = cnt.t_last = rd_clock();
295
296 cnt.offset = rkmessage->offset;
297 cnt.msgs++;
298 cnt.bytes += rkmessage->len;
299
300 if (verbosity >= 3 ||
301 (verbosity >= 2 && !(cnt.msgs % 1000000)))
302 printf("@%"PRId64": %.*s: %.*s\n",
303 rkmessage->offset,
304 (int)rkmessage->key_len, (char *)rkmessage->key,
305 (int)rkmessage->len, (char *)rkmessage->payload);
306
307
308 if (latency_mode) {
309 int64_t remote_ts, ts;
310
311 if (rkmessage->len > 8 &&
312 !memcmp(rkmessage->payload, "LATENCY:", 8) &&
313 sscanf(rkmessage->payload, "LATENCY:%"SCNd64,
314 &remote_ts) == 1) {
315 ts = wall_clock() - remote_ts;
316 if (ts > 0 && ts < (1000000 * 60 * 5)) {
317 latency_add(ts, "consumer");
318 } else {
319 if (verbosity >= 1)
320 printf("Received latency timestamp is too far off: %"PRId64"us (message offset %"PRId64"): ignored\n",
321 ts, rkmessage->offset);
322 }
323 } else if (verbosity > 1)
324 printf("not a LATENCY payload: %.*s\n",
325 (int)rkmessage->len,
326 (char *)rkmessage->payload);
327
328 }
329
330 if (read_hdrs) {
331 rd_kafka_headers_t *hdrs;
332 /* Force parsing of headers but don't do anything with them. */
333 rd_kafka_message_headers(rkmessage, &hdrs);
334 }
335
336 if (msgcnt != -1 && (int)cnt.msgs >= msgcnt)
337 run = 0;
338 }
339
340
rebalance_cb(rd_kafka_t * rk,rd_kafka_resp_err_t err,rd_kafka_topic_partition_list_t * partitions,void * opaque)341 static void rebalance_cb (rd_kafka_t *rk,
342 rd_kafka_resp_err_t err,
343 rd_kafka_topic_partition_list_t *partitions,
344 void *opaque) {
345 rd_kafka_error_t *error = NULL;
346 rd_kafka_resp_err_t ret_err = RD_KAFKA_RESP_ERR_NO_ERROR;
347
348 if (exit_eof &&
349 !strcmp(rd_kafka_rebalance_protocol(rk), "COOPERATIVE"))
350 fprintf(stderr, "%% This example has not been modified to "
351 "support -e (exit on EOF) when "
352 "partition.assignment.strategy "
353 "is set to an incremental/cooperative strategy: "
354 "-e will not behave as expected\n");
355
356 switch (err)
357 {
358 case RD_KAFKA_RESP_ERR__ASSIGN_PARTITIONS:
359 fprintf(stderr,
360 "%% Group rebalanced (%s): "
361 "%d new partition(s) assigned\n",
362 rd_kafka_rebalance_protocol(rk), partitions->cnt);
363
364 if (!strcmp(rd_kafka_rebalance_protocol(rk), "COOPERATIVE")) {
365 error = rd_kafka_incremental_assign(rk, partitions);
366 } else {
367 ret_err = rd_kafka_assign(rk, partitions);
368 eof_cnt = 0;
369 }
370
371 partition_cnt += partitions->cnt;
372 break;
373
374 case RD_KAFKA_RESP_ERR__REVOKE_PARTITIONS:
375 fprintf(stderr,
376 "%% Group rebalanced (%s): %d partition(s) revoked\n",
377 rd_kafka_rebalance_protocol(rk), partitions->cnt);
378
379 if (!strcmp(rd_kafka_rebalance_protocol(rk), "COOPERATIVE")) {
380 error = rd_kafka_incremental_unassign(rk, partitions);
381 partition_cnt -= partitions->cnt;
382 } else {
383 ret_err = rd_kafka_assign(rk, NULL);
384 partition_cnt = 0;
385 }
386
387 eof_cnt = 0; /* FIXME: Not correct for incremental case */
388 break;
389
390 default:
391 break;
392 }
393
394 if (error) {
395 fprintf(stderr, "%% incremental assign failure: %s\n",
396 rd_kafka_error_string(error));
397 rd_kafka_error_destroy(error);
398 } else if (ret_err) {
399 fprintf(stderr, "%% assign failure: %s\n",
400 rd_kafka_err2str(ret_err));
401 }
402 }
403
404
405 /**
406 * Find and extract single value from a two-level search.
407 * First find 'field1', then find 'field2' and extract its value.
408 * Returns 0 on miss else the value.
409 */
json_parse_fields(const char * json,const char ** end,const char * field1,const char * field2)410 static uint64_t json_parse_fields (const char *json, const char **end,
411 const char *field1, const char *field2) {
412 const char *t = json;
413 const char *t2;
414 int len1 = (int)strlen(field1);
415 int len2 = (int)strlen(field2);
416
417 while ((t2 = strstr(t, field1))) {
418 uint64_t v;
419
420 t = t2;
421 t += len1;
422
423 /* Find field */
424 if (!(t2 = strstr(t, field2)))
425 continue;
426 t2 += len2;
427
428 while (isspace((int)*t2))
429 t2++;
430
431 v = strtoull(t2, (char **)&t, 10);
432 if (t2 == t)
433 continue;
434
435 *end = t;
436 return v;
437 }
438
439 *end = t + strlen(t);
440 return 0;
441 }
442
443 /**
444 * Parse various values from rdkafka stats
445 */
json_parse_stats(const char * json)446 static void json_parse_stats (const char *json) {
447 const char *t;
448 #define MAX_AVGS 100 /* max number of brokers to scan for rtt */
449 uint64_t avg_rtt[MAX_AVGS+1];
450 int avg_rtt_i = 0;
451
452 /* Store totals at end of array */
453 avg_rtt[MAX_AVGS] = 0;
454
455 /* Extract all broker RTTs */
456 t = json;
457 while (avg_rtt_i < MAX_AVGS && *t) {
458 avg_rtt[avg_rtt_i] = json_parse_fields(t, &t,
459 "\"rtt\":",
460 "\"avg\":");
461
462 /* Skip low RTT values, means no messages are passing */
463 if (avg_rtt[avg_rtt_i] < 100 /*0.1ms*/)
464 continue;
465
466
467 avg_rtt[MAX_AVGS] += avg_rtt[avg_rtt_i];
468 avg_rtt_i++;
469 }
470
471 if (avg_rtt_i > 0)
472 avg_rtt[MAX_AVGS] /= avg_rtt_i;
473
474 cnt.avg_rtt = avg_rtt[MAX_AVGS];
475 }
476
477
stats_cb(rd_kafka_t * rk,char * json,size_t json_len,void * opaque)478 static int stats_cb (rd_kafka_t *rk, char *json, size_t json_len,
479 void *opaque) {
480
481 /* Extract values for our own stats */
482 json_parse_stats(json);
483
484 if (stats_fp)
485 fprintf(stats_fp, "%s\n", json);
486 return 0;
487 }
488
489 #define _OTYPE_TAB 0x1 /* tabular format */
490 #define _OTYPE_SUMMARY 0x2 /* summary format */
491 #define _OTYPE_FORCE 0x4 /* force output regardless of interval timing */
print_stats(rd_kafka_t * rk,int mode,int otype,const char * compression)492 static void print_stats (rd_kafka_t *rk,
493 int mode, int otype, const char *compression) {
494 rd_ts_t now = rd_clock();
495 rd_ts_t t_total;
496 static int rows_written = 0;
497 int print_header;
498 double latency_avg = 0.0f;
499 char extra[512];
500 int extra_of = 0;
501 *extra = '\0';
502
503 if (!(otype & _OTYPE_FORCE) &&
504 (((otype & _OTYPE_SUMMARY) && verbosity == 0) ||
505 cnt.t_last + dispintvl > now))
506 return;
507
508 print_header = !rows_written ||(verbosity > 0 && !(rows_written % 20));
509
510 if (cnt.t_end_send)
511 t_total = cnt.t_end_send - cnt.t_start;
512 else if (cnt.t_end)
513 t_total = cnt.t_end - cnt.t_start;
514 else if (cnt.t_start)
515 t_total = now - cnt.t_start;
516 else
517 t_total = 1;
518
519 if (latency_mode && cnt.latency_cnt)
520 latency_avg = (double)cnt.latency_sum /
521 (double)cnt.latency_cnt;
522
523 if (mode == 'P') {
524
525 if (otype & _OTYPE_TAB) {
526 #define ROW_START() do {} while (0)
527 #define COL_HDR(NAME) printf("| %10.10s ", (NAME))
528 #define COL_PR64(NAME,VAL) printf("| %10"PRIu64" ", (VAL))
529 #define COL_PRF(NAME,VAL) printf("| %10.2f ", (VAL))
530 #define ROW_END() do { \
531 printf("\n"); \
532 rows_written++; \
533 } while (0)
534
535 if (print_header) {
536 /* First time, print header */
537 ROW_START();
538 COL_HDR("elapsed");
539 COL_HDR("msgs");
540 COL_HDR("bytes");
541 COL_HDR("rtt");
542 COL_HDR("dr");
543 COL_HDR("dr_m/s");
544 COL_HDR("dr_MB/s");
545 COL_HDR("dr_err");
546 COL_HDR("tx_err");
547 COL_HDR("outq");
548 COL_HDR("offset");
549 if (latency_mode) {
550 COL_HDR("lat_curr");
551 COL_HDR("lat_avg");
552 COL_HDR("lat_lo");
553 COL_HDR("lat_hi");
554 }
555
556 ROW_END();
557 }
558
559 ROW_START();
560 COL_PR64("elapsed", t_total / 1000);
561 COL_PR64("msgs", cnt.msgs);
562 COL_PR64("bytes", cnt.bytes);
563 COL_PR64("rtt", cnt.avg_rtt / 1000);
564 COL_PR64("dr", cnt.msgs_dr_ok);
565 COL_PR64("dr_m/s",
566 ((cnt.msgs_dr_ok * 1000000) / t_total));
567 COL_PRF("dr_MB/s",
568 (float)((cnt.bytes_dr_ok) / (float)t_total));
569 COL_PR64("dr_err", cnt.msgs_dr_err);
570 COL_PR64("tx_err", cnt.tx_err);
571 COL_PR64("outq",
572 rk ? (uint64_t)rd_kafka_outq_len(rk) : 0);
573 COL_PR64("offset", (uint64_t)cnt.last_offset);
574 if (latency_mode) {
575 COL_PRF("lat_curr", cnt.latency_last / 1000.0f);
576 COL_PRF("lat_avg", latency_avg / 1000.0f);
577 COL_PRF("lat_lo", cnt.latency_lo / 1000.0f);
578 COL_PRF("lat_hi", cnt.latency_hi / 1000.0f);
579 }
580 ROW_END();
581 }
582
583 if (otype & _OTYPE_SUMMARY) {
584 printf("%% %"PRIu64" messages produced "
585 "(%"PRIu64" bytes), "
586 "%"PRIu64" delivered "
587 "(offset %"PRId64", %"PRIu64" failed) "
588 "in %"PRIu64"ms: %"PRIu64" msgs/s and "
589 "%.02f MB/s, "
590 "%"PRIu64" produce failures, %i in queue, "
591 "%s compression\n",
592 cnt.msgs, cnt.bytes,
593 cnt.msgs_dr_ok, cnt.last_offset, cnt.msgs_dr_err,
594 t_total / 1000,
595 ((cnt.msgs_dr_ok * 1000000) / t_total),
596 (float)((cnt.bytes_dr_ok) / (float)t_total),
597 cnt.tx_err,
598 rk ? rd_kafka_outq_len(rk) : 0,
599 compression);
600 }
601
602 } else {
603
604 if (otype & _OTYPE_TAB) {
605 if (print_header) {
606 /* First time, print header */
607 ROW_START();
608 COL_HDR("elapsed");
609 COL_HDR("msgs");
610 COL_HDR("bytes");
611 COL_HDR("rtt");
612 COL_HDR("m/s");
613 COL_HDR("MB/s");
614 COL_HDR("rx_err");
615 COL_HDR("offset");
616 if (latency_mode) {
617 COL_HDR("lat_curr");
618 COL_HDR("lat_avg");
619 COL_HDR("lat_lo");
620 COL_HDR("lat_hi");
621 }
622 ROW_END();
623 }
624
625 ROW_START();
626 COL_PR64("elapsed", t_total / 1000);
627 COL_PR64("msgs", cnt.msgs);
628 COL_PR64("bytes", cnt.bytes);
629 COL_PR64("rtt", cnt.avg_rtt / 1000);
630 COL_PR64("m/s",
631 ((cnt.msgs * 1000000) / t_total));
632 COL_PRF("MB/s",
633 (float)((cnt.bytes) / (float)t_total));
634 COL_PR64("rx_err", cnt.msgs_dr_err);
635 COL_PR64("offset", cnt.offset);
636 if (latency_mode) {
637 COL_PRF("lat_curr", cnt.latency_last / 1000.0f);
638 COL_PRF("lat_avg", latency_avg / 1000.0f);
639 COL_PRF("lat_lo", cnt.latency_lo / 1000.0f);
640 COL_PRF("lat_hi", cnt.latency_hi / 1000.0f);
641 }
642 ROW_END();
643
644 }
645
646 if (otype & _OTYPE_SUMMARY) {
647 if (latency_avg >= 1.0f)
648 extra_of += rd_snprintf(extra+extra_of,
649 sizeof(extra)-extra_of,
650 ", latency "
651 "curr/avg/lo/hi "
652 "%.2f/%.2f/%.2f/%.2fms",
653 cnt.latency_last / 1000.0f,
654 latency_avg / 1000.0f,
655 cnt.latency_lo / 1000.0f,
656 cnt.latency_hi / 1000.0f)
657 ;
658 printf("%% %"PRIu64" messages (%"PRIu64" bytes) "
659 "consumed in %"PRIu64"ms: %"PRIu64" msgs/s "
660 "(%.02f MB/s)"
661 "%s\n",
662 cnt.msgs, cnt.bytes,
663 t_total / 1000,
664 ((cnt.msgs * 1000000) / t_total),
665 (float)((cnt.bytes) / (float)t_total),
666 extra);
667 }
668
669 if (incremental_mode && now > cnt.t_last) {
670 uint64_t i_msgs = cnt.msgs - cnt.msgs_last;
671 uint64_t i_bytes = cnt.bytes - cnt.bytes_last;
672 uint64_t i_time = cnt.t_last ? now - cnt.t_last : 0;
673
674 printf("%% INTERVAL: %"PRIu64" messages "
675 "(%"PRIu64" bytes) "
676 "consumed in %"PRIu64"ms: %"PRIu64" msgs/s "
677 "(%.02f MB/s)"
678 "%s\n",
679 i_msgs, i_bytes,
680 i_time / 1000,
681 ((i_msgs * 1000000) / i_time),
682 (float)((i_bytes) / (float)i_time),
683 extra);
684
685 }
686 }
687
688 cnt.t_last = now;
689 cnt.msgs_last = cnt.msgs;
690 cnt.bytes_last = cnt.bytes;
691 }
692
693
sig_usr1(int sig)694 static void sig_usr1 (int sig) {
695 rd_kafka_dump(stdout, global_rk);
696 }
697
698
699 /**
700 * @brief Read config from file
701 * @returns -1 on error, else 0.
702 */
read_conf_file(rd_kafka_conf_t * conf,const char * path)703 static int read_conf_file (rd_kafka_conf_t *conf, const char *path) {
704 FILE *fp;
705 char buf[512];
706 int line = 0;
707 char errstr[512];
708
709 if (!(fp = fopen(path, "r"))) {
710 fprintf(stderr, "%% Failed to open %s: %s\n",
711 path, strerror(errno));
712 return -1;
713 }
714
715 while (fgets(buf, sizeof(buf), fp)) {
716 char *s = buf;
717 char *t;
718 rd_kafka_conf_res_t r = RD_KAFKA_CONF_UNKNOWN;
719
720 line++;
721
722 while (isspace((int)*s))
723 s++;
724
725 if (!*s || *s == '#')
726 continue;
727
728 if ((t = strchr(buf, '\n')))
729 *t = '\0';
730
731 t = strchr(buf, '=');
732 if (!t || t == s || !*(t+1)) {
733 fprintf(stderr, "%% %s:%d: expected key=value\n",
734 path, line);
735 fclose(fp);
736 return -1;
737 }
738
739 *(t++) = '\0';
740
741 /* Try global config */
742 r = rd_kafka_conf_set(conf, s, t, errstr, sizeof(errstr));
743
744 if (r == RD_KAFKA_CONF_OK)
745 continue;
746
747 fprintf(stderr, "%% %s:%d: %s=%s: %s\n",
748 path, line, s, t, errstr);
749 fclose(fp);
750 return -1;
751 }
752
753 fclose(fp);
754
755 return 0;
756 }
757
758
do_produce(rd_kafka_t * rk,rd_kafka_topic_t * rkt,int32_t partition,int msgflags,void * payload,size_t size,const void * key,size_t key_size,const rd_kafka_headers_t * hdrs)759 static rd_kafka_resp_err_t do_produce (rd_kafka_t *rk,
760 rd_kafka_topic_t *rkt, int32_t partition,
761 int msgflags,
762 void *payload, size_t size,
763 const void *key, size_t key_size,
764 const rd_kafka_headers_t *hdrs) {
765
766 /* Send/Produce message. */
767 if (hdrs) {
768 rd_kafka_headers_t *hdrs_copy;
769 rd_kafka_resp_err_t err;
770
771 hdrs_copy = rd_kafka_headers_copy(hdrs);
772
773 err = rd_kafka_producev(
774 rk,
775 RD_KAFKA_V_RKT(rkt),
776 RD_KAFKA_V_PARTITION(partition),
777 RD_KAFKA_V_MSGFLAGS(msgflags),
778 RD_KAFKA_V_VALUE(payload, size),
779 RD_KAFKA_V_KEY(key, key_size),
780 RD_KAFKA_V_HEADERS(hdrs_copy),
781 RD_KAFKA_V_END);
782
783 if (err)
784 rd_kafka_headers_destroy(hdrs_copy);
785
786 return err;
787
788 } else {
789 if (rd_kafka_produce(rkt, partition, msgflags, payload, size,
790 key, key_size, NULL) == -1)
791 return rd_kafka_last_error();
792 }
793
794 return RD_KAFKA_RESP_ERR_NO_ERROR;
795 }
796
797 /**
798 * @brief Sleep for \p sleep_us microseconds.
799 */
do_sleep(int sleep_us)800 static void do_sleep (int sleep_us) {
801 if (sleep_us > 100) {
802 #ifdef _WIN32
803 Sleep(sleep_us / 1000);
804 #else
805 usleep(sleep_us);
806 #endif
807 } else {
808 rd_ts_t next = rd_clock() + (rd_ts_t)sleep_us;
809 while (next > rd_clock())
810 ;
811 }
812 }
813
814
main(int argc,char ** argv)815 int main (int argc, char **argv) {
816 char *brokers = NULL;
817 char mode = 'C';
818 char *topic = NULL;
819 const char *key = NULL;
820 int *partitions = NULL;
821 int opt;
822 int sendflags = 0;
823 char *msgpattern = "librdkafka_performance testing!";
824 int msgsize = -1;
825 const char *debug = NULL;
826 int do_conf_dump = 0;
827 rd_ts_t now;
828 char errstr[512];
829 uint64_t seq = 0;
830 int seed = (int)time(NULL);
831 rd_kafka_t *rk;
832 rd_kafka_topic_t *rkt;
833 rd_kafka_conf_t *conf;
834 rd_kafka_queue_t *rkqu = NULL;
835 const char *compression = "no";
836 int64_t start_offset = 0;
837 int batch_size = 0;
838 int idle = 0;
839 const char *stats_cmd = NULL;
840 char *stats_intvlstr = NULL;
841 char tmp[128];
842 char *tmp2;
843 int otype = _OTYPE_SUMMARY;
844 double dtmp;
845 int rate_sleep = 0;
846 rd_kafka_topic_partition_list_t *topics;
847 int exitcode = 0;
848 rd_kafka_headers_t *hdrs = NULL;
849 rd_kafka_resp_err_t err;
850
851 /* Kafka configuration */
852 conf = rd_kafka_conf_new();
853 rd_kafka_conf_set_error_cb(conf, err_cb);
854 rd_kafka_conf_set_throttle_cb(conf, throttle_cb);
855 rd_kafka_conf_set_offset_commit_cb(conf, offset_commit_cb);
856
857 #ifdef SIGIO
858 /* Quick termination */
859 rd_snprintf(tmp, sizeof(tmp), "%i", SIGIO);
860 rd_kafka_conf_set(conf, "internal.termination.signal", tmp, NULL, 0);
861 #endif
862
863 /* Producer config */
864 rd_kafka_conf_set(conf, "linger.ms", "1000", NULL, 0);
865 rd_kafka_conf_set(conf, "message.send.max.retries", "3", NULL, 0);
866 rd_kafka_conf_set(conf, "retry.backoff.ms", "500", NULL, 0);
867
868 /* Consumer config */
869 /* Tell rdkafka to (try to) maintain 1M messages
870 * in its internal receive buffers. This is to avoid
871 * application -> rdkafka -> broker per-message ping-pong
872 * latency.
873 * The larger the local queue, the higher the performance.
874 * Try other values with: ... -X queued.min.messages=1000
875 */
876 rd_kafka_conf_set(conf, "queued.min.messages", "1000000", NULL, 0);
877 rd_kafka_conf_set(conf, "session.timeout.ms", "6000", NULL, 0);
878 rd_kafka_conf_set(conf, "auto.offset.reset", "earliest", NULL, 0);
879
880 topics = rd_kafka_topic_partition_list_new(1);
881
882 while ((opt =
883 getopt(argc, argv,
884 "PCG:t:p:b:s:k:c:fi:MDd:m:S:x:"
885 "R:a:z:o:X:B:eT:Y:qvIur:lA:OwNH:")) != -1) {
886 switch (opt) {
887 case 'G':
888 if (rd_kafka_conf_set(conf, "group.id", optarg,
889 errstr, sizeof(errstr)) !=
890 RD_KAFKA_CONF_OK) {
891 fprintf(stderr, "%% %s\n", errstr);
892 exit(1);
893 }
894 /* FALLTHRU */
895 case 'P':
896 case 'C':
897 mode = opt;
898 break;
899 case 't':
900 rd_kafka_topic_partition_list_add(topics, optarg,
901 RD_KAFKA_PARTITION_UA);
902 break;
903 case 'p':
904 partition_cnt++;
905 partitions = realloc(partitions, sizeof(*partitions) * partition_cnt);
906 partitions[partition_cnt-1] = atoi(optarg);
907 break;
908
909 case 'b':
910 brokers = optarg;
911 break;
912 case 's':
913 msgsize = atoi(optarg);
914 break;
915 case 'k':
916 key = optarg;
917 break;
918 case 'c':
919 msgcnt = atoi(optarg);
920 break;
921 case 'D':
922 sendflags |= RD_KAFKA_MSG_F_FREE;
923 break;
924 case 'i':
925 dispintvl = atoi(optarg);
926 break;
927 case 'm':
928 msgpattern = optarg;
929 break;
930 case 'S':
931 seq = strtoull(optarg, NULL, 10);
932 do_seq = 1;
933 break;
934 case 'x':
935 exit_after = atoi(optarg);
936 break;
937 case 'R':
938 seed = atoi(optarg);
939 break;
940 case 'a':
941 if (rd_kafka_conf_set(conf,
942 "acks",
943 optarg,
944 errstr, sizeof(errstr)) !=
945 RD_KAFKA_CONF_OK) {
946 fprintf(stderr, "%% %s\n", errstr);
947 exit(1);
948 }
949 break;
950 case 'B':
951 batch_size = atoi(optarg);
952 break;
953 case 'z':
954 if (rd_kafka_conf_set(conf, "compression.codec",
955 optarg,
956 errstr, sizeof(errstr)) !=
957 RD_KAFKA_CONF_OK) {
958 fprintf(stderr, "%% %s\n", errstr);
959 exit(1);
960 }
961 compression = optarg;
962 break;
963 case 'o':
964 if (!strcmp(optarg, "end"))
965 start_offset = RD_KAFKA_OFFSET_END;
966 else if (!strcmp(optarg, "beginning"))
967 start_offset = RD_KAFKA_OFFSET_BEGINNING;
968 else if (!strcmp(optarg, "stored"))
969 start_offset = RD_KAFKA_OFFSET_STORED;
970 else {
971 start_offset = strtoll(optarg, NULL, 10);
972
973 if (start_offset < 0)
974 start_offset = RD_KAFKA_OFFSET_TAIL(-start_offset);
975 }
976
977 break;
978 case 'e':
979 exit_eof = 1;
980 break;
981 case 'd':
982 debug = optarg;
983 break;
984 case 'H':
985 if (!strcmp(optarg, "parse"))
986 read_hdrs = 1;
987 else {
988 char *name, *val;
989 size_t name_sz = -1;
990
991 name = optarg;
992 val = strchr(name, '=');
993 if (val) {
994 name_sz = (size_t)(val-name);
995 val++; /* past the '=' */
996 }
997
998 if (!hdrs)
999 hdrs = rd_kafka_headers_new(8);
1000
1001 err = rd_kafka_header_add(hdrs, name, name_sz, val, -1);
1002 if (err) {
1003 fprintf(stderr,
1004 "%% Failed to add header %s: %s\n",
1005 name, rd_kafka_err2str(err));
1006 exit(1);
1007 }
1008 }
1009 break;
1010 case 'X':
1011 {
1012 char *name, *val;
1013 rd_kafka_conf_res_t res;
1014
1015 if (!strcmp(optarg, "list") ||
1016 !strcmp(optarg, "help")) {
1017 rd_kafka_conf_properties_show(stdout);
1018 exit(0);
1019 }
1020
1021 if (!strcmp(optarg, "dump")) {
1022 do_conf_dump = 1;
1023 continue;
1024 }
1025
1026 name = optarg;
1027 if (!(val = strchr(name, '='))) {
1028 fprintf(stderr, "%% Expected "
1029 "-X property=value, not %s\n", name);
1030 exit(1);
1031 }
1032
1033 *val = '\0';
1034 val++;
1035
1036 if (!strcmp(name, "file")) {
1037 if (read_conf_file(conf, val) == -1)
1038 exit(1);
1039 break;
1040 }
1041
1042 res = rd_kafka_conf_set(conf, name, val,
1043 errstr, sizeof(errstr));
1044
1045 if (res != RD_KAFKA_CONF_OK) {
1046 fprintf(stderr, "%% %s\n", errstr);
1047 exit(1);
1048 }
1049 }
1050 break;
1051
1052 case 'T':
1053 stats_intvlstr = optarg;
1054 break;
1055 case 'Y':
1056 stats_cmd = optarg;
1057 break;
1058
1059 case 'q':
1060 verbosity--;
1061 break;
1062
1063 case 'v':
1064 verbosity++;
1065 break;
1066
1067 case 'I':
1068 idle = 1;
1069 break;
1070
1071 case 'u':
1072 otype = _OTYPE_TAB;
1073 verbosity--; /* remove some fluff */
1074 break;
1075
1076 case 'r':
1077 dtmp = strtod(optarg, &tmp2);
1078 if (tmp2 == optarg ||
1079 (dtmp >= -0.001 && dtmp <= 0.001)) {
1080 fprintf(stderr, "%% Invalid rate: %s\n",
1081 optarg);
1082 exit(1);
1083 }
1084
1085 rate_sleep = (int)(1000000.0 / dtmp);
1086 break;
1087
1088 case 'l':
1089 latency_mode = 1;
1090 break;
1091
1092 case 'A':
1093 if (!(latency_fp = fopen(optarg, "w"))) {
1094 fprintf(stderr,
1095 "%% Cant open %s: %s\n",
1096 optarg, strerror(errno));
1097 exit(1);
1098 }
1099 break;
1100
1101 case 'M':
1102 incremental_mode = 1;
1103 break;
1104
1105 case 'N':
1106 with_dr = 0;
1107 break;
1108
1109 default:
1110 fprintf(stderr, "Unknown option: %c\n", opt);
1111 goto usage;
1112 }
1113 }
1114
1115 if (topics->cnt == 0 || optind != argc) {
1116 if (optind < argc)
1117 fprintf(stderr, "Unknown argument: %s\n", argv[optind]);
1118 usage:
1119 fprintf(stderr,
1120 "Usage: %s [-C|-P] -t <topic> "
1121 "[-p <partition>] [-b <broker,broker..>] [options..]\n"
1122 "\n"
1123 "librdkafka version %s (0x%08x)\n"
1124 "\n"
1125 " Options:\n"
1126 " -C | -P | Consumer or Producer mode\n"
1127 " -G <groupid> High-level Kafka Consumer mode\n"
1128 " -t <topic> Topic to consume / produce\n"
1129 " -p <num> Partition (defaults to random). "
1130 "Multiple partitions are allowed in -C consumer mode.\n"
1131 " -M Print consumer interval stats\n"
1132 " -b <brokers> Broker address list (host[:port],..)\n"
1133 " -s <size> Message size (producer)\n"
1134 " -k <key> Message key (producer)\n"
1135 " -H <name[=value]> Add header to message (producer)\n"
1136 " -H parse Read message headers (consumer)\n"
1137 " -c <cnt> Messages to transmit/receive\n"
1138 " -x <cnt> Hard exit after transmitting <cnt> messages (producer)\n"
1139 " -D Copy/Duplicate data buffer (producer)\n"
1140 " -i <ms> Display interval\n"
1141 " -m <msg> Message payload pattern\n"
1142 " -S <start> Send a sequence number starting at "
1143 "<start> as payload\n"
1144 " -R <seed> Random seed value (defaults to time)\n"
1145 " -a <acks> Required acks (producer): "
1146 "-1, 0, 1, >1\n"
1147 " -B <size> Consume batch size (# of msgs)\n"
1148 " -z <codec> Enable compression:\n"
1149 " none|gzip|snappy\n"
1150 " -o <offset> Start offset (consumer)\n"
1151 " beginning, end, NNNNN or -NNNNN\n"
1152 " -d [facs..] Enable debugging contexts:\n"
1153 " %s\n"
1154 " -X <prop=name> Set arbitrary librdkafka "
1155 "configuration property\n"
1156 " -X file=<path> Read config from file.\n"
1157 " -X list Show full list of supported properties.\n"
1158 " -X dump Show configuration\n"
1159 " -T <intvl> Enable statistics from librdkafka at "
1160 "specified interval (ms)\n"
1161 " -Y <command> Pipe statistics to <command>\n"
1162 " -I Idle: dont produce any messages\n"
1163 " -q Decrease verbosity\n"
1164 " -v Increase verbosity (default 1)\n"
1165 " -u Output stats in table format\n"
1166 " -r <rate> Producer msg/s limit\n"
1167 " -l Latency measurement.\n"
1168 " Needs two matching instances, one\n"
1169 " consumer and one producer, both\n"
1170 " running with the -l switch.\n"
1171 " -l Producer: per-message latency stats\n"
1172 " -A <file> Write per-message latency stats to "
1173 "<file>. Requires -l\n"
1174 " -O Report produced offset (producer)\n"
1175 " -N No delivery reports (producer)\n"
1176 "\n"
1177 " In Consumer mode:\n"
1178 " consumes messages and prints thruput\n"
1179 " If -B <..> is supplied the batch consumer\n"
1180 " mode is used, else the callback mode is used.\n"
1181 "\n"
1182 " In Producer mode:\n"
1183 " writes messages of size -s <..> and prints thruput\n"
1184 "\n",
1185 argv[0],
1186 rd_kafka_version_str(), rd_kafka_version(),
1187 RD_KAFKA_DEBUG_CONTEXTS);
1188 exit(1);
1189 }
1190
1191
1192 dispintvl *= 1000; /* us */
1193
1194 if (verbosity > 1)
1195 printf("%% Using random seed %i, verbosity level %i\n",
1196 seed, verbosity);
1197 srand(seed);
1198 signal(SIGINT, stop);
1199 #ifdef SIGUSR1
1200 signal(SIGUSR1, sig_usr1);
1201 #endif
1202
1203
1204 if (debug &&
1205 rd_kafka_conf_set(conf, "debug", debug, errstr, sizeof(errstr)) !=
1206 RD_KAFKA_CONF_OK) {
1207 printf("%% Debug configuration failed: %s: %s\n",
1208 errstr, debug);
1209 exit(1);
1210 }
1211
1212 /* Always enable stats (for RTT extraction), and if user supplied
1213 * the -T <intvl> option we let her take part of the stats aswell. */
1214 rd_kafka_conf_set_stats_cb(conf, stats_cb);
1215
1216 if (!stats_intvlstr) {
1217 /* if no user-desired stats, adjust stats interval
1218 * to the display interval. */
1219 rd_snprintf(tmp, sizeof(tmp), "%"PRId64, dispintvl / 1000);
1220 }
1221
1222 if (rd_kafka_conf_set(conf, "statistics.interval.ms",
1223 stats_intvlstr ? stats_intvlstr : tmp,
1224 errstr, sizeof(errstr)) !=
1225 RD_KAFKA_CONF_OK) {
1226 fprintf(stderr, "%% %s\n", errstr);
1227 exit(1);
1228 }
1229
1230 if (do_conf_dump) {
1231 const char **arr;
1232 size_t cnt;
1233 int pass;
1234
1235 for (pass = 0 ; pass < 2 ; pass++) {
1236 int i;
1237
1238 if (pass == 0) {
1239 arr = rd_kafka_conf_dump(conf, &cnt);
1240 printf("# Global config\n");
1241 } else {
1242 rd_kafka_topic_conf_t *topic_conf =
1243 rd_kafka_conf_get_default_topic_conf(
1244 conf);
1245
1246 if (topic_conf) {
1247 printf("# Topic config\n");
1248 arr = rd_kafka_topic_conf_dump(
1249 topic_conf, &cnt);
1250 } else {
1251 arr = NULL;
1252 }
1253 }
1254
1255 if (!arr)
1256 continue;
1257
1258 for (i = 0 ; i < (int)cnt ; i += 2)
1259 printf("%s = %s\n",
1260 arr[i], arr[i+1]);
1261
1262 printf("\n");
1263
1264 rd_kafka_conf_dump_free(arr, cnt);
1265 }
1266
1267 exit(0);
1268 }
1269
1270 if (latency_mode)
1271 do_seq = 0;
1272
1273 if (stats_intvlstr) {
1274 /* User enabled stats (-T) */
1275
1276 #ifndef _WIN32
1277 if (stats_cmd) {
1278 if (!(stats_fp = popen(stats_cmd,
1279 #ifdef __linux__
1280 "we"
1281 #else
1282 "w"
1283 #endif
1284 ))) {
1285 fprintf(stderr,
1286 "%% Failed to start stats command: "
1287 "%s: %s", stats_cmd, strerror(errno));
1288 exit(1);
1289 }
1290 } else
1291 #endif
1292 stats_fp = stdout;
1293 }
1294
1295 if (msgcnt != -1)
1296 forever = 0;
1297
1298 if (msgsize == -1)
1299 msgsize = (int)strlen(msgpattern);
1300
1301 topic = topics->elems[0].topic;
1302
1303 if (mode == 'C' || mode == 'G')
1304 rd_kafka_conf_set(conf, "enable.partition.eof", "true",
1305 NULL, 0);
1306
1307 if (read_hdrs && mode == 'P') {
1308 fprintf(stderr, "%% producer can not read headers\n");
1309 exit(1);
1310 }
1311
1312 if (hdrs && mode != 'P') {
1313 fprintf(stderr, "%% consumer can not add headers\n");
1314 exit(1);
1315 }
1316
1317 /* Set bootstrap servers */
1318 if (brokers &&
1319 rd_kafka_conf_set(conf, "bootstrap.servers", brokers,
1320 errstr, sizeof(errstr)) != RD_KAFKA_CONF_OK) {
1321 fprintf(stderr, "%% %s\n", errstr);
1322 exit(1);
1323 }
1324
1325 if (mode == 'P') {
1326 /*
1327 * Producer
1328 */
1329 char *sbuf;
1330 char *pbuf;
1331 int outq;
1332 int keylen = key ? (int)strlen(key) : 0;
1333 off_t rof = 0;
1334 size_t plen = strlen(msgpattern);
1335 int partition = partitions ? partitions[0] :
1336 RD_KAFKA_PARTITION_UA;
1337
1338 if (latency_mode) {
1339 int minlen = (int)(strlen("LATENCY:") +
1340 strlen("18446744073709551615 ")+1);
1341 msgsize = RD_MAX(minlen, msgsize);
1342 sendflags |= RD_KAFKA_MSG_F_COPY;
1343 } else if (do_seq) {
1344 int minlen = (int)strlen("18446744073709551615 ")+1;
1345 if (msgsize < minlen)
1346 msgsize = minlen;
1347
1348 /* Force duplication of payload */
1349 sendflags |= RD_KAFKA_MSG_F_FREE;
1350 }
1351
1352 sbuf = malloc(msgsize);
1353
1354 /* Copy payload content to new buffer */
1355 while (rof < msgsize) {
1356 size_t xlen = RD_MIN((size_t)msgsize-rof, plen);
1357 memcpy(sbuf+rof, msgpattern, xlen);
1358 rof += (off_t)xlen;
1359 }
1360
1361 if (msgcnt == -1)
1362 printf("%% Sending messages of size %i bytes\n",
1363 msgsize);
1364 else
1365 printf("%% Sending %i messages of size %i bytes\n",
1366 msgcnt, msgsize);
1367
1368 if (with_dr)
1369 rd_kafka_conf_set_dr_msg_cb(conf, msg_delivered);
1370
1371 /* Create Kafka handle */
1372 if (!(rk = rd_kafka_new(RD_KAFKA_PRODUCER, conf,
1373 errstr, sizeof(errstr)))) {
1374 fprintf(stderr,
1375 "%% Failed to create Kafka producer: %s\n",
1376 errstr);
1377 exit(1);
1378 }
1379
1380 global_rk = rk;
1381
1382 /* Explicitly create topic to avoid per-msg lookups. */
1383 rkt = rd_kafka_topic_new(rk, topic, NULL);
1384
1385
1386 if (rate_sleep && verbosity >= 2)
1387 fprintf(stderr,
1388 "%% Inter message rate limiter sleep %ius\n",
1389 rate_sleep);
1390
1391 dr_disp_div = msgcnt / 50;
1392 if (dr_disp_div == 0)
1393 dr_disp_div = 10;
1394
1395 cnt.t_start = cnt.t_last = rd_clock();
1396
1397 msgs_wait_produce_cnt = msgcnt;
1398
1399 while (run && (msgcnt == -1 || (int)cnt.msgs < msgcnt)) {
1400 /* Send/Produce message. */
1401
1402 if (idle) {
1403 rd_kafka_poll(rk, 1000);
1404 continue;
1405 }
1406
1407 if (latency_mode) {
1408 rd_snprintf(sbuf, msgsize-1,
1409 "LATENCY:%"PRIu64, wall_clock());
1410 } else if (do_seq) {
1411 rd_snprintf(sbuf,
1412 msgsize-1, "%"PRIu64": ", seq);
1413 seq++;
1414 }
1415
1416 if (sendflags & RD_KAFKA_MSG_F_FREE) {
1417 /* Duplicate memory */
1418 pbuf = malloc(msgsize);
1419 memcpy(pbuf, sbuf, msgsize);
1420 } else
1421 pbuf = sbuf;
1422
1423 if (msgsize == 0)
1424 pbuf = NULL;
1425
1426 cnt.tx++;
1427 while (run &&
1428 (err = do_produce(rk, rkt, partition, sendflags,
1429 pbuf, msgsize,
1430 key, keylen, hdrs))) {
1431 if (err == RD_KAFKA_RESP_ERR__UNKNOWN_PARTITION)
1432 printf("%% No such partition: "
1433 "%"PRId32"\n", partition);
1434 else if (verbosity >= 3 ||
1435 (err != RD_KAFKA_RESP_ERR__QUEUE_FULL && verbosity >= 1))
1436 printf("%% produce error: %s%s\n",
1437 rd_kafka_err2str(err),
1438 err == RD_KAFKA_RESP_ERR__QUEUE_FULL ?
1439 " (backpressure)" : "");
1440
1441 cnt.tx_err++;
1442 if (err != RD_KAFKA_RESP_ERR__QUEUE_FULL) {
1443 run = 0;
1444 break;
1445 }
1446 now = rd_clock();
1447 if (verbosity >= 2 &&
1448 cnt.t_enobufs_last + dispintvl <= now) {
1449 printf("%% Backpressure %i "
1450 "(tx %"PRIu64", "
1451 "txerr %"PRIu64")\n",
1452 rd_kafka_outq_len(rk),
1453 cnt.tx, cnt.tx_err);
1454 cnt.t_enobufs_last = now;
1455 }
1456
1457 /* Poll to handle delivery reports */
1458 rd_kafka_poll(rk, 10);
1459
1460 print_stats(rk, mode, otype, compression);
1461 }
1462
1463 msgs_wait_cnt++;
1464 if (msgs_wait_produce_cnt != -1)
1465 msgs_wait_produce_cnt--;
1466 cnt.msgs++;
1467 cnt.bytes += msgsize;
1468
1469 /* Must poll to handle delivery reports */
1470 if (rate_sleep) {
1471 rd_ts_t next = rd_clock() + (rd_ts_t) rate_sleep;
1472 do {
1473 rd_kafka_poll(rk,
1474 (int)RD_MAX(0,
1475 (next - rd_clock()) / 1000));
1476 } while (next > rd_clock());
1477 } else {
1478 rd_kafka_poll(rk, 0);
1479 }
1480
1481 print_stats(rk, mode, otype, compression);
1482 }
1483
1484 forever = 0;
1485 if (verbosity >= 2)
1486 printf("%% All messages produced, "
1487 "now waiting for %li deliveries\n",
1488 msgs_wait_cnt);
1489
1490 /* Wait for messages to be delivered */
1491 while (run && rd_kafka_poll(rk, 1000) != -1)
1492 print_stats(rk, mode, otype, compression);
1493
1494
1495 outq = rd_kafka_outq_len(rk);
1496 if (verbosity >= 2)
1497 printf("%% %i messages in outq\n", outq);
1498 cnt.msgs -= outq;
1499 cnt.t_end = t_end;
1500
1501 if (cnt.tx_err > 0)
1502 printf("%% %"PRIu64" backpressures for %"PRIu64
1503 " produce calls: %.3f%% backpressure rate\n",
1504 cnt.tx_err, cnt.tx,
1505 ((double)cnt.tx_err / (double)cnt.tx) * 100.0);
1506
1507 /* Destroy topic */
1508 rd_kafka_topic_destroy(rkt);
1509
1510 /* Destroy the handle */
1511 rd_kafka_destroy(rk);
1512 global_rk = rk = NULL;
1513
1514 free(sbuf);
1515
1516 exitcode = cnt.msgs == cnt.msgs_dr_ok ? 0 : 1;
1517
1518 } else if (mode == 'C') {
1519 /*
1520 * Consumer
1521 */
1522
1523 rd_kafka_message_t **rkmessages = NULL;
1524 size_t i = 0;
1525
1526 /* Create Kafka handle */
1527 if (!(rk = rd_kafka_new(RD_KAFKA_CONSUMER, conf,
1528 errstr, sizeof(errstr)))) {
1529 fprintf(stderr,
1530 "%% Failed to create Kafka consumer: %s\n",
1531 errstr);
1532 exit(1);
1533 }
1534
1535 global_rk = rk;
1536
1537 /* Create topic to consume from */
1538 rkt = rd_kafka_topic_new(rk, topic, NULL);
1539
1540 /* Batch consumer */
1541 if (batch_size)
1542 rkmessages = malloc(sizeof(*rkmessages) * batch_size);
1543
1544 /* Start consuming */
1545 rkqu = rd_kafka_queue_new(rk);
1546 for (i=0 ; i<(size_t)partition_cnt ; ++i) {
1547 const int r = rd_kafka_consume_start_queue(rkt,
1548 partitions[i], start_offset, rkqu);
1549
1550 if (r == -1) {
1551 fprintf(stderr, "%% Error creating queue: %s\n",
1552 rd_kafka_err2str(rd_kafka_last_error()));
1553 exit(1);
1554 }
1555 }
1556
1557 while (run && (msgcnt == -1 || msgcnt > (int)cnt.msgs)) {
1558 /* Consume messages.
1559 * A message may either be a real message, or
1560 * an error signaling (if rkmessage->err is set).
1561 */
1562 uint64_t fetch_latency;
1563 ssize_t r;
1564
1565 fetch_latency = rd_clock();
1566
1567 if (batch_size) {
1568 int partition = partitions ? partitions[0] :
1569 RD_KAFKA_PARTITION_UA;
1570
1571 /* Batch fetch mode */
1572 r = rd_kafka_consume_batch(rkt, partition,
1573 1000,
1574 rkmessages,
1575 batch_size);
1576 if (r != -1) {
1577 for (i = 0 ; (ssize_t)i < r ; i++) {
1578 msg_consume(rkmessages[i],
1579 NULL);
1580 rd_kafka_message_destroy(
1581 rkmessages[i]);
1582 }
1583 }
1584 } else {
1585 /* Queue mode */
1586 r = rd_kafka_consume_callback_queue(rkqu, 1000,
1587 msg_consume,
1588 NULL);
1589 }
1590
1591 cnt.t_fetch_latency += rd_clock() - fetch_latency;
1592 if (r == -1)
1593 fprintf(stderr, "%% Error: %s\n",
1594 rd_kafka_err2str(rd_kafka_last_error()));
1595 else if (r > 0 && rate_sleep) {
1596 /* Simulate processing time
1597 * if `-r <rate>` was set. */
1598 do_sleep(rate_sleep);
1599 }
1600
1601
1602 print_stats(rk, mode, otype, compression);
1603
1604 /* Poll to handle stats callbacks */
1605 rd_kafka_poll(rk, 0);
1606 }
1607 cnt.t_end = rd_clock();
1608
1609 /* Stop consuming */
1610 for (i=0 ; i<(size_t)partition_cnt ; ++i) {
1611 int r = rd_kafka_consume_stop(rkt, (int32_t)i);
1612 if (r == -1) {
1613 fprintf(stderr,
1614 "%% Error in consume_stop: %s\n",
1615 rd_kafka_err2str(rd_kafka_last_error()));
1616 }
1617 }
1618 rd_kafka_queue_destroy(rkqu);
1619
1620 /* Destroy topic */
1621 rd_kafka_topic_destroy(rkt);
1622
1623 if (batch_size)
1624 free(rkmessages);
1625
1626 /* Destroy the handle */
1627 rd_kafka_destroy(rk);
1628
1629 global_rk = rk = NULL;
1630
1631 } else if (mode == 'G') {
1632 /*
1633 * High-level balanced Consumer
1634 */
1635
1636 rd_kafka_conf_set_rebalance_cb(conf, rebalance_cb);
1637
1638 /* Create Kafka handle */
1639 if (!(rk = rd_kafka_new(RD_KAFKA_CONSUMER, conf,
1640 errstr, sizeof(errstr)))) {
1641 fprintf(stderr,
1642 "%% Failed to create Kafka consumer: %s\n",
1643 errstr);
1644 exit(1);
1645 }
1646
1647 /* Forward all events to consumer queue */
1648 rd_kafka_poll_set_consumer(rk);
1649
1650 global_rk = rk;
1651
1652 err = rd_kafka_subscribe(rk, topics);
1653 if (err) {
1654 fprintf(stderr, "%% Subscribe failed: %s\n",
1655 rd_kafka_err2str(err));
1656 exit(1);
1657 }
1658 fprintf(stderr, "%% Waiting for group rebalance..\n");
1659
1660 while (run && (msgcnt == -1 || msgcnt > (int)cnt.msgs)) {
1661 /* Consume messages.
1662 * A message may either be a real message, or
1663 * an event (if rkmessage->err is set).
1664 */
1665 rd_kafka_message_t *rkmessage;
1666 uint64_t fetch_latency;
1667
1668 fetch_latency = rd_clock();
1669
1670 rkmessage = rd_kafka_consumer_poll(rk, 1000);
1671 if (rkmessage) {
1672 msg_consume(rkmessage, NULL);
1673 rd_kafka_message_destroy(rkmessage);
1674
1675 /* Simulate processing time
1676 * if `-r <rate>` was set. */
1677 if (rate_sleep)
1678 do_sleep(rate_sleep);
1679 }
1680
1681 cnt.t_fetch_latency += rd_clock() - fetch_latency;
1682
1683 print_stats(rk, mode, otype, compression);
1684 }
1685 cnt.t_end = rd_clock();
1686
1687 err = rd_kafka_consumer_close(rk);
1688 if (err)
1689 fprintf(stderr, "%% Failed to close consumer: %s\n",
1690 rd_kafka_err2str(err));
1691
1692 rd_kafka_destroy(rk);
1693 }
1694
1695 if (hdrs)
1696 rd_kafka_headers_destroy(hdrs);
1697
1698 print_stats(NULL, mode, otype|_OTYPE_FORCE, compression);
1699
1700 if (cnt.t_fetch_latency && cnt.msgs)
1701 printf("%% Average application fetch latency: %"PRIu64"us\n",
1702 cnt.t_fetch_latency / cnt.msgs);
1703
1704 if (latency_fp)
1705 fclose(latency_fp);
1706
1707 if (stats_fp) {
1708 #ifndef _WIN32
1709 pclose(stats_fp);
1710 #endif
1711 stats_fp = NULL;
1712 }
1713
1714 if (partitions)
1715 free(partitions);
1716
1717 rd_kafka_topic_partition_list_destroy(topics);
1718
1719 /* Let background threads clean up and terminate cleanly. */
1720 rd_kafka_wait_destroyed(2000);
1721
1722 return exitcode;
1723 }
1724