1 /*
2  * librdkafka - Apache Kafka C library
3  *
4  * Copyright (c) 2012, Magnus Edenhill
5  * All rights reserved.
6  *
7  * Redistribution and use in source and binary forms, with or without
8  * modification, are permitted provided that the following conditions are met:
9  *
10  * 1. Redistributions of source code must retain the above copyright notice,
11  *    this list of conditions and the following disclaimer.
12  * 2. Redistributions in binary form must reproduce the above copyright notice,
13  *    this list of conditions and the following disclaimer in the documentation
14  *    and/or other materials provided with the distribution.
15  *
16  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
17  * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
18  * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
19  * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
20  * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
21  * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
22  * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
23  * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
24  * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
25  * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
26  * POSSIBILITY OF SUCH DAMAGE.
27  */
28 
29 /**
30  * Apache Kafka consumer & producer performance tester
31  * using the Kafka driver from librdkafka
32  * (https://github.com/edenhill/librdkafka)
33  */
34 
35 #ifdef _MSC_VER
36 #define  _CRT_SECURE_NO_WARNINGS /* Silence nonsense on MSVC */
37 #endif
38 
39 #include "../src/rd.h"
40 
41 #define _GNU_SOURCE /* for strndup() */
42 #include <ctype.h>
43 #include <signal.h>
44 #include <string.h>
45 #include <errno.h>
46 
47 /* Typical include path would be <librdkafka/rdkafka.h>, but this program
48  * is built from within the librdkafka source tree and thus differs. */
49 #include "rdkafka.h"  /* for Kafka driver */
50 /* Do not include these defines from your program, they will not be
51  * provided by librdkafka. */
52 #include "rd.h"
53 #include "rdtime.h"
54 
55 #ifdef _WIN32
56 #include "../win32/wingetopt.h"
57 #include "../win32/wintime.h"
58 #endif
59 
60 
61 static volatile sig_atomic_t run = 1;
62 static int forever = 1;
63 static rd_ts_t dispintvl = 1000;
64 static int do_seq = 0;
65 static int exit_after = 0;
66 static int exit_eof = 0;
67 static FILE *stats_fp;
68 static int dr_disp_div;
69 static int verbosity = 1;
70 static int latency_mode = 0;
71 static FILE *latency_fp = NULL;
72 static int msgcnt = -1;
73 static int incremental_mode = 0;
74 static int partition_cnt = 0;
75 static int eof_cnt = 0;
76 static int with_dr = 1;
77 static int read_hdrs = 0;
78 
79 
stop(int sig)80 static void stop (int sig) {
81         if (!run)
82                 exit(0);
83 	run = 0;
84 }
85 
86 static long int msgs_wait_cnt = 0;
87 static long int msgs_wait_produce_cnt = 0;
88 static rd_ts_t t_end;
89 static rd_kafka_t *global_rk;
90 
91 struct avg {
92         int64_t  val;
93         int      cnt;
94         uint64_t ts_start;
95 };
96 
97 static struct {
98 	rd_ts_t  t_start;
99 	rd_ts_t  t_end;
100 	rd_ts_t  t_end_send;
101 	uint64_t msgs;
102 	uint64_t msgs_last;
103         uint64_t msgs_dr_ok;
104         uint64_t msgs_dr_err;
105         uint64_t bytes_dr_ok;
106 	uint64_t bytes;
107 	uint64_t bytes_last;
108 	uint64_t tx;
109 	uint64_t tx_err;
110         uint64_t avg_rtt;
111         uint64_t offset;
112 	rd_ts_t  t_fetch_latency;
113 	rd_ts_t  t_last;
114         rd_ts_t  t_enobufs_last;
115 	rd_ts_t  t_total;
116         rd_ts_t  latency_last;
117         rd_ts_t  latency_lo;
118         rd_ts_t  latency_hi;
119         rd_ts_t  latency_sum;
120         int      latency_cnt;
121         int64_t  last_offset;
122 } cnt;
123 
124 
wall_clock(void)125 uint64_t wall_clock (void) {
126         struct timeval tv;
127         gettimeofday(&tv, NULL);
128         return ((uint64_t)tv.tv_sec * 1000000LLU) +
129 		((uint64_t)tv.tv_usec);
130 }
131 
err_cb(rd_kafka_t * rk,int err,const char * reason,void * opaque)132 static void err_cb (rd_kafka_t *rk, int err, const char *reason, void *opaque) {
133         if (err == RD_KAFKA_RESP_ERR__FATAL) {
134                 char errstr[512];
135                 err = rd_kafka_fatal_error(rk, errstr, sizeof(errstr));
136                 printf("%% FATAL ERROR CALLBACK: %s: %s: %s\n",
137                        rd_kafka_name(rk), rd_kafka_err2str(err), errstr);
138         } else {
139                 printf("%% ERROR CALLBACK: %s: %s: %s\n",
140                        rd_kafka_name(rk), rd_kafka_err2str(err), reason);
141         }
142 }
143 
throttle_cb(rd_kafka_t * rk,const char * broker_name,int32_t broker_id,int throttle_time_ms,void * opaque)144 static void throttle_cb (rd_kafka_t *rk, const char *broker_name,
145 			 int32_t broker_id, int throttle_time_ms,
146 			 void *opaque) {
147 	printf("%% THROTTLED %dms by %s (%"PRId32")\n", throttle_time_ms,
148 	       broker_name, broker_id);
149 }
150 
offset_commit_cb(rd_kafka_t * rk,rd_kafka_resp_err_t err,rd_kafka_topic_partition_list_t * offsets,void * opaque)151 static void offset_commit_cb (rd_kafka_t *rk, rd_kafka_resp_err_t err,
152                               rd_kafka_topic_partition_list_t *offsets,
153                               void *opaque) {
154         int i;
155 
156         if (err || verbosity >= 2)
157                 printf("%% Offset commit of %d partition(s): %s\n",
158                        offsets->cnt, rd_kafka_err2str(err));
159 
160         for (i = 0 ; i < offsets->cnt ; i++) {
161                 rd_kafka_topic_partition_t *rktpar = &offsets->elems[i];
162                 if (rktpar->err || verbosity >= 2)
163                         printf("%%  %s [%"PRId32"] @ %"PRId64": %s\n",
164                                rktpar->topic, rktpar->partition,
165                                rktpar->offset, rd_kafka_err2str(err));
166         }
167 }
168 
169 /**
170  * @brief Add latency measurement
171  */
latency_add(int64_t ts,const char * who)172 static void latency_add (int64_t ts, const char *who) {
173         if (ts > cnt.latency_hi)
174                 cnt.latency_hi = ts;
175         if (!cnt.latency_lo || ts < cnt.latency_lo)
176                 cnt.latency_lo = ts;
177         cnt.latency_last = ts;
178         cnt.latency_cnt++;
179         cnt.latency_sum += ts;
180         if (latency_fp)
181                 fprintf(latency_fp, "%"PRIu64"\n", ts);
182 }
183 
184 
msg_delivered(rd_kafka_t * rk,const rd_kafka_message_t * rkmessage,void * opaque)185 static void msg_delivered (rd_kafka_t *rk,
186                            const rd_kafka_message_t *rkmessage, void *opaque) {
187 	static rd_ts_t last;
188 	rd_ts_t now = rd_clock();
189 	static int msgs;
190 
191         msgs++;
192 
193 	msgs_wait_cnt--;
194 
195 	if (rkmessage->err)
196                 cnt.msgs_dr_err++;
197         else {
198                 cnt.msgs_dr_ok++;
199                 cnt.bytes_dr_ok += rkmessage->len;
200         }
201 
202         if (latency_mode) {
203                 /* Extract latency */
204                 int64_t source_ts;
205                 if (sscanf(rkmessage->payload, "LATENCY:%"SCNd64,
206                            &source_ts) == 1)
207                         latency_add(wall_clock() - source_ts, "producer");
208         }
209 
210 
211 	if ((rkmessage->err &&
212 	     (cnt.msgs_dr_err < 50 ||
213               !(cnt.msgs_dr_err % (dispintvl / 1000)))) ||
214 	    !last || msgs_wait_cnt < 5 ||
215 	    !(msgs_wait_cnt % dr_disp_div) ||
216 	    (now - last) >= dispintvl * 1000 ||
217             verbosity >= 3) {
218 		if (rkmessage->err && verbosity >= 2)
219 			printf("%% Message delivery failed: %s [%"PRId32"]: "
220 			       "%s (%li remain)\n",
221 			       rd_kafka_topic_name(rkmessage->rkt),
222 			       rkmessage->partition,
223 			       rd_kafka_err2str(rkmessage->err),
224 			       msgs_wait_cnt);
225 		else if (verbosity > 2)
226 			printf("%% Message delivered (offset %"PRId64"): "
227                                "%li remain\n",
228                                rkmessage->offset, msgs_wait_cnt);
229 		if (verbosity >= 3 && do_seq)
230 			printf(" --> \"%.*s\"\n",
231                                (int)rkmessage->len,
232                                (const char *)rkmessage->payload);
233 		last = now;
234 	}
235 
236         cnt.last_offset = rkmessage->offset;
237 
238 	if (msgs_wait_produce_cnt == 0 && msgs_wait_cnt == 0 && !forever) {
239                 if (verbosity >= 2 && cnt.msgs > 0) {
240                         double error_percent =
241                                 (double)(cnt.msgs - cnt.msgs_dr_ok) /
242                                 cnt.msgs * 100;
243                         printf("%% Messages delivered with failure "
244                                "percentage of %.5f%%\n", error_percent);
245                 }
246 		t_end = rd_clock();
247 		run = 0;
248 	}
249 
250 	if (exit_after && exit_after <= msgs) {
251 		printf("%% Hard exit after %i messages, as requested\n",
252 		       exit_after);
253 		exit(0);
254 	}
255 }
256 
257 
msg_consume(rd_kafka_message_t * rkmessage,void * opaque)258 static void msg_consume (rd_kafka_message_t *rkmessage, void *opaque) {
259 
260 	if (rkmessage->err) {
261 		if (rkmessage->err == RD_KAFKA_RESP_ERR__PARTITION_EOF) {
262                         cnt.offset = rkmessage->offset;
263 
264                         if (verbosity >= 1)
265                                 printf("%% Consumer reached end of "
266                                        "%s [%"PRId32"] "
267                                        "message queue at offset %"PRId64"\n",
268                                        rd_kafka_topic_name(rkmessage->rkt),
269                                        rkmessage->partition, rkmessage->offset);
270 
271 			if (exit_eof && ++eof_cnt == partition_cnt)
272 				run = 0;
273 
274 			return;
275 		}
276 
277 		printf("%% Consume error for topic \"%s\" [%"PRId32"] "
278 		       "offset %"PRId64": %s\n",
279 		       rkmessage->rkt ? rd_kafka_topic_name(rkmessage->rkt):"",
280 		       rkmessage->partition,
281 		       rkmessage->offset,
282 		       rd_kafka_message_errstr(rkmessage));
283 
284                 if (rkmessage->err == RD_KAFKA_RESP_ERR__UNKNOWN_PARTITION ||
285                     rkmessage->err == RD_KAFKA_RESP_ERR__UNKNOWN_TOPIC)
286                         run = 0;
287 
288                 cnt.msgs_dr_err++;
289 		return;
290 	}
291 
292 	/* Start measuring from first message received */
293 	if (!cnt.t_start)
294 		cnt.t_start = cnt.t_last = rd_clock();
295 
296         cnt.offset = rkmessage->offset;
297 	cnt.msgs++;
298 	cnt.bytes += rkmessage->len;
299 
300 	if (verbosity >= 3 ||
301             (verbosity >= 2 && !(cnt.msgs % 1000000)))
302 		printf("@%"PRId64": %.*s: %.*s\n",
303 		       rkmessage->offset,
304                        (int)rkmessage->key_len, (char *)rkmessage->key,
305 		       (int)rkmessage->len, (char *)rkmessage->payload);
306 
307 
308         if (latency_mode) {
309                 int64_t remote_ts, ts;
310 
311                 if (rkmessage->len > 8 &&
312                     !memcmp(rkmessage->payload, "LATENCY:", 8) &&
313                     sscanf(rkmessage->payload, "LATENCY:%"SCNd64,
314                            &remote_ts) == 1) {
315                         ts = wall_clock() - remote_ts;
316                         if (ts > 0 && ts < (1000000 * 60 * 5)) {
317                                 latency_add(ts, "consumer");
318                         } else {
319                                 if (verbosity >= 1)
320                                         printf("Received latency timestamp is too far off: %"PRId64"us (message offset %"PRId64"): ignored\n",
321                                                ts, rkmessage->offset);
322                         }
323                 } else if (verbosity > 1)
324                         printf("not a LATENCY payload: %.*s\n",
325                                (int)rkmessage->len,
326                                (char *)rkmessage->payload);
327 
328         }
329 
330         if (read_hdrs) {
331                 rd_kafka_headers_t *hdrs;
332                 /* Force parsing of headers but don't do anything with them. */
333                 rd_kafka_message_headers(rkmessage, &hdrs);
334         }
335 
336         if (msgcnt != -1 && (int)cnt.msgs >= msgcnt)
337                 run = 0;
338 }
339 
340 
rebalance_cb(rd_kafka_t * rk,rd_kafka_resp_err_t err,rd_kafka_topic_partition_list_t * partitions,void * opaque)341 static void rebalance_cb (rd_kafka_t *rk,
342                           rd_kafka_resp_err_t err,
343                           rd_kafka_topic_partition_list_t *partitions,
344                           void *opaque) {
345         rd_kafka_error_t *error = NULL;
346         rd_kafka_resp_err_t ret_err = RD_KAFKA_RESP_ERR_NO_ERROR;
347 
348         if (exit_eof &&
349             !strcmp(rd_kafka_rebalance_protocol(rk), "COOPERATIVE"))
350                 fprintf(stderr, "%% This example has not been modified to "
351                         "support -e (exit on EOF) when "
352                         "partition.assignment.strategy "
353                         "is set to an incremental/cooperative strategy: "
354                         "-e will not behave as expected\n");
355 
356         switch (err)
357         {
358         case RD_KAFKA_RESP_ERR__ASSIGN_PARTITIONS:
359                 fprintf(stderr,
360                         "%% Group rebalanced (%s): "
361                         "%d new partition(s) assigned\n",
362                         rd_kafka_rebalance_protocol(rk), partitions->cnt);
363 
364                 if (!strcmp(rd_kafka_rebalance_protocol(rk), "COOPERATIVE")) {
365                         error = rd_kafka_incremental_assign(rk, partitions);
366                 } else {
367                         ret_err = rd_kafka_assign(rk, partitions);
368                         eof_cnt = 0;
369                 }
370 
371                 partition_cnt += partitions->cnt;
372                 break;
373 
374         case RD_KAFKA_RESP_ERR__REVOKE_PARTITIONS:
375                 fprintf(stderr,
376                         "%% Group rebalanced (%s): %d partition(s) revoked\n",
377                         rd_kafka_rebalance_protocol(rk), partitions->cnt);
378 
379                 if (!strcmp(rd_kafka_rebalance_protocol(rk), "COOPERATIVE")) {
380                         error = rd_kafka_incremental_unassign(rk, partitions);
381                         partition_cnt -= partitions->cnt;
382                 } else {
383                         ret_err = rd_kafka_assign(rk, NULL);
384                         partition_cnt = 0;
385                 }
386 
387                 eof_cnt = 0; /* FIXME: Not correct for incremental case */
388                 break;
389 
390         default:
391                 break;
392         }
393 
394         if (error) {
395                 fprintf(stderr, "%% incremental assign failure: %s\n",
396                         rd_kafka_error_string(error));
397                 rd_kafka_error_destroy(error);
398         } else if (ret_err) {
399                 fprintf(stderr, "%% assign failure: %s\n",
400                         rd_kafka_err2str(ret_err));
401         }
402 }
403 
404 
405 /**
406  * Find and extract single value from a two-level search.
407  * First find 'field1', then find 'field2' and extract its value.
408  * Returns 0 on miss else the value.
409  */
json_parse_fields(const char * json,const char ** end,const char * field1,const char * field2)410 static uint64_t json_parse_fields (const char *json, const char **end,
411                                    const char *field1, const char *field2) {
412         const char *t = json;
413         const char *t2;
414         int len1 = (int)strlen(field1);
415         int len2 = (int)strlen(field2);
416 
417         while ((t2 = strstr(t, field1))) {
418                 uint64_t v;
419 
420                 t = t2;
421                 t += len1;
422 
423                 /* Find field */
424                 if (!(t2 = strstr(t, field2)))
425                         continue;
426                 t2 += len2;
427 
428                 while (isspace((int)*t2))
429                         t2++;
430 
431                 v = strtoull(t2, (char **)&t, 10);
432                 if (t2 == t)
433                         continue;
434 
435                 *end = t;
436                 return v;
437         }
438 
439         *end = t + strlen(t);
440         return 0;
441 }
442 
443 /**
444  * Parse various values from rdkafka stats
445  */
json_parse_stats(const char * json)446 static void json_parse_stats (const char *json) {
447         const char *t;
448 #define MAX_AVGS 100 /* max number of brokers to scan for rtt */
449         uint64_t avg_rtt[MAX_AVGS+1];
450         int avg_rtt_i     = 0;
451 
452         /* Store totals at end of array */
453         avg_rtt[MAX_AVGS]     = 0;
454 
455         /* Extract all broker RTTs */
456         t = json;
457         while (avg_rtt_i < MAX_AVGS && *t) {
458                 avg_rtt[avg_rtt_i] = json_parse_fields(t, &t,
459                                                        "\"rtt\":",
460                                                        "\"avg\":");
461 
462                 /* Skip low RTT values, means no messages are passing */
463                 if (avg_rtt[avg_rtt_i] < 100 /*0.1ms*/)
464                         continue;
465 
466 
467                 avg_rtt[MAX_AVGS] += avg_rtt[avg_rtt_i];
468                 avg_rtt_i++;
469         }
470 
471         if (avg_rtt_i > 0)
472                 avg_rtt[MAX_AVGS] /= avg_rtt_i;
473 
474         cnt.avg_rtt = avg_rtt[MAX_AVGS];
475 }
476 
477 
stats_cb(rd_kafka_t * rk,char * json,size_t json_len,void * opaque)478 static int stats_cb (rd_kafka_t *rk, char *json, size_t json_len,
479 		     void *opaque) {
480 
481         /* Extract values for our own stats */
482         json_parse_stats(json);
483 
484         if (stats_fp)
485                 fprintf(stats_fp, "%s\n", json);
486 	return 0;
487 }
488 
489 #define _OTYPE_TAB      0x1  /* tabular format */
490 #define _OTYPE_SUMMARY  0x2  /* summary format */
491 #define _OTYPE_FORCE    0x4  /* force output regardless of interval timing */
print_stats(rd_kafka_t * rk,int mode,int otype,const char * compression)492 static void print_stats (rd_kafka_t *rk,
493                          int mode, int otype, const char *compression) {
494 	rd_ts_t now = rd_clock();
495 	rd_ts_t t_total;
496         static int rows_written = 0;
497         int print_header;
498         double latency_avg = 0.0f;
499         char extra[512];
500         int extra_of = 0;
501         *extra = '\0';
502 
503 	if (!(otype & _OTYPE_FORCE) &&
504             (((otype & _OTYPE_SUMMARY) && verbosity == 0) ||
505              cnt.t_last + dispintvl > now))
506 		return;
507 
508         print_header = !rows_written ||(verbosity > 0 && !(rows_written % 20));
509 
510 	if (cnt.t_end_send)
511 		t_total = cnt.t_end_send - cnt.t_start;
512 	else if (cnt.t_end)
513 		t_total = cnt.t_end - cnt.t_start;
514 	else if (cnt.t_start)
515 		t_total = now - cnt.t_start;
516 	else
517 		t_total = 1;
518 
519         if (latency_mode && cnt.latency_cnt)
520                 latency_avg = (double)cnt.latency_sum /
521                         (double)cnt.latency_cnt;
522 
523         if (mode == 'P') {
524 
525                 if (otype & _OTYPE_TAB) {
526 #define ROW_START()        do {} while (0)
527 #define COL_HDR(NAME)      printf("| %10.10s ", (NAME))
528 #define COL_PR64(NAME,VAL) printf("| %10"PRIu64" ", (VAL))
529 #define COL_PRF(NAME,VAL)  printf("| %10.2f ", (VAL))
530 #define ROW_END()          do {                 \
531                                 printf("\n");   \
532                                 rows_written++; \
533                         } while (0)
534 
535                         if (print_header) {
536                                 /* First time, print header */
537                                 ROW_START();
538                                 COL_HDR("elapsed");
539                                 COL_HDR("msgs");
540                                 COL_HDR("bytes");
541                                 COL_HDR("rtt");
542                                 COL_HDR("dr");
543                                 COL_HDR("dr_m/s");
544                                 COL_HDR("dr_MB/s");
545                                 COL_HDR("dr_err");
546                                 COL_HDR("tx_err");
547                                 COL_HDR("outq");
548                                 COL_HDR("offset");
549                                 if (latency_mode) {
550                                         COL_HDR("lat_curr");
551                                         COL_HDR("lat_avg");
552                                         COL_HDR("lat_lo");
553                                         COL_HDR("lat_hi");
554                                 }
555 
556                                 ROW_END();
557                         }
558 
559                         ROW_START();
560                         COL_PR64("elapsed", t_total / 1000);
561                         COL_PR64("msgs", cnt.msgs);
562                         COL_PR64("bytes", cnt.bytes);
563                         COL_PR64("rtt", cnt.avg_rtt / 1000);
564                         COL_PR64("dr", cnt.msgs_dr_ok);
565                         COL_PR64("dr_m/s",
566                                  ((cnt.msgs_dr_ok * 1000000) / t_total));
567                         COL_PRF("dr_MB/s",
568                                 (float)((cnt.bytes_dr_ok) / (float)t_total));
569                         COL_PR64("dr_err", cnt.msgs_dr_err);
570                         COL_PR64("tx_err", cnt.tx_err);
571                         COL_PR64("outq",
572                                  rk ? (uint64_t)rd_kafka_outq_len(rk) : 0);
573                         COL_PR64("offset", (uint64_t)cnt.last_offset);
574                         if (latency_mode) {
575                                 COL_PRF("lat_curr", cnt.latency_last / 1000.0f);
576                                 COL_PRF("lat_avg", latency_avg / 1000.0f);
577                                 COL_PRF("lat_lo", cnt.latency_lo / 1000.0f);
578                                 COL_PRF("lat_hi", cnt.latency_hi / 1000.0f);
579                         }
580                         ROW_END();
581                 }
582 
583                 if (otype & _OTYPE_SUMMARY) {
584                         printf("%% %"PRIu64" messages produced "
585                                "(%"PRIu64" bytes), "
586                                "%"PRIu64" delivered "
587                                "(offset %"PRId64", %"PRIu64" failed) "
588                                "in %"PRIu64"ms: %"PRIu64" msgs/s and "
589                                "%.02f MB/s, "
590                                "%"PRIu64" produce failures, %i in queue, "
591                                "%s compression\n",
592                                cnt.msgs, cnt.bytes,
593                                cnt.msgs_dr_ok, cnt.last_offset, cnt.msgs_dr_err,
594                                t_total / 1000,
595                                ((cnt.msgs_dr_ok * 1000000) / t_total),
596                                (float)((cnt.bytes_dr_ok) / (float)t_total),
597                                cnt.tx_err,
598                                rk ? rd_kafka_outq_len(rk) : 0,
599                                compression);
600                 }
601 
602         } else {
603 
604                 if (otype & _OTYPE_TAB) {
605                         if (print_header) {
606                                 /* First time, print header */
607                                 ROW_START();
608                                 COL_HDR("elapsed");
609                                 COL_HDR("msgs");
610                                 COL_HDR("bytes");
611                                 COL_HDR("rtt");
612                                 COL_HDR("m/s");
613                                 COL_HDR("MB/s");
614                                 COL_HDR("rx_err");
615                                 COL_HDR("offset");
616                                 if (latency_mode) {
617                                         COL_HDR("lat_curr");
618                                         COL_HDR("lat_avg");
619                                         COL_HDR("lat_lo");
620                                         COL_HDR("lat_hi");
621                                 }
622                                 ROW_END();
623                         }
624 
625                         ROW_START();
626                         COL_PR64("elapsed", t_total / 1000);
627                         COL_PR64("msgs", cnt.msgs);
628                         COL_PR64("bytes", cnt.bytes);
629                         COL_PR64("rtt", cnt.avg_rtt / 1000);
630                         COL_PR64("m/s",
631                                  ((cnt.msgs * 1000000) / t_total));
632                         COL_PRF("MB/s",
633                                 (float)((cnt.bytes) / (float)t_total));
634                         COL_PR64("rx_err", cnt.msgs_dr_err);
635                         COL_PR64("offset", cnt.offset);
636                         if (latency_mode) {
637                                 COL_PRF("lat_curr", cnt.latency_last / 1000.0f);
638                                 COL_PRF("lat_avg", latency_avg / 1000.0f);
639                                 COL_PRF("lat_lo", cnt.latency_lo / 1000.0f);
640                                 COL_PRF("lat_hi", cnt.latency_hi / 1000.0f);
641                         }
642                         ROW_END();
643 
644                 }
645 
646                 if (otype & _OTYPE_SUMMARY) {
647                         if (latency_avg >= 1.0f)
648                                 extra_of += rd_snprintf(extra+extra_of,
649                                                      sizeof(extra)-extra_of,
650                                                      ", latency "
651                                                      "curr/avg/lo/hi "
652                                                      "%.2f/%.2f/%.2f/%.2fms",
653                                                      cnt.latency_last / 1000.0f,
654                                                      latency_avg  / 1000.0f,
655                                                      cnt.latency_lo / 1000.0f,
656                                                      cnt.latency_hi / 1000.0f)
657 ;
658                         printf("%% %"PRIu64" messages (%"PRIu64" bytes) "
659                                "consumed in %"PRIu64"ms: %"PRIu64" msgs/s "
660                                "(%.02f MB/s)"
661                                "%s\n",
662                                cnt.msgs, cnt.bytes,
663                                t_total / 1000,
664                                ((cnt.msgs * 1000000) / t_total),
665                                (float)((cnt.bytes) / (float)t_total),
666                                extra);
667                 }
668 
669                 if (incremental_mode && now > cnt.t_last) {
670                         uint64_t i_msgs = cnt.msgs - cnt.msgs_last;
671                         uint64_t i_bytes = cnt.bytes - cnt.bytes_last;
672                         uint64_t i_time = cnt.t_last ? now - cnt.t_last : 0;
673 
674                         printf("%% INTERVAL: %"PRIu64" messages "
675                                "(%"PRIu64" bytes) "
676                                "consumed in %"PRIu64"ms: %"PRIu64" msgs/s "
677                                "(%.02f MB/s)"
678                                "%s\n",
679                                i_msgs, i_bytes,
680                                i_time / 1000,
681                                ((i_msgs * 1000000) / i_time),
682                                (float)((i_bytes) / (float)i_time),
683                                extra);
684 
685                 }
686         }
687 
688 	cnt.t_last = now;
689 	cnt.msgs_last = cnt.msgs;
690 	cnt.bytes_last = cnt.bytes;
691 }
692 
693 
sig_usr1(int sig)694 static void sig_usr1 (int sig) {
695 	rd_kafka_dump(stdout, global_rk);
696 }
697 
698 
699 /**
700  * @brief Read config from file
701  * @returns -1 on error, else 0.
702  */
read_conf_file(rd_kafka_conf_t * conf,const char * path)703 static int read_conf_file (rd_kafka_conf_t *conf, const char *path) {
704         FILE *fp;
705         char buf[512];
706         int line = 0;
707         char errstr[512];
708 
709         if (!(fp = fopen(path, "r"))) {
710                 fprintf(stderr, "%% Failed to open %s: %s\n",
711                         path, strerror(errno));
712                 return -1;
713         }
714 
715         while (fgets(buf, sizeof(buf), fp)) {
716                 char *s = buf;
717                 char *t;
718                 rd_kafka_conf_res_t r = RD_KAFKA_CONF_UNKNOWN;
719 
720                 line++;
721 
722                 while (isspace((int)*s))
723                         s++;
724 
725                 if (!*s || *s == '#')
726                         continue;
727 
728                 if ((t = strchr(buf, '\n')))
729                         *t = '\0';
730 
731                 t = strchr(buf, '=');
732                 if (!t || t == s || !*(t+1)) {
733                         fprintf(stderr, "%% %s:%d: expected key=value\n",
734                                 path, line);
735                         fclose(fp);
736                         return -1;
737                 }
738 
739                 *(t++) = '\0';
740 
741                 /* Try global config */
742                 r = rd_kafka_conf_set(conf, s, t, errstr, sizeof(errstr));
743 
744                 if (r == RD_KAFKA_CONF_OK)
745                         continue;
746 
747                 fprintf(stderr, "%% %s:%d: %s=%s: %s\n",
748                         path, line, s, t, errstr);
749                 fclose(fp);
750                 return -1;
751         }
752 
753         fclose(fp);
754 
755         return 0;
756 }
757 
758 
do_produce(rd_kafka_t * rk,rd_kafka_topic_t * rkt,int32_t partition,int msgflags,void * payload,size_t size,const void * key,size_t key_size,const rd_kafka_headers_t * hdrs)759 static rd_kafka_resp_err_t do_produce (rd_kafka_t *rk,
760                                        rd_kafka_topic_t *rkt, int32_t partition,
761                                        int msgflags,
762                                        void *payload, size_t size,
763                                        const void *key, size_t key_size,
764                                        const rd_kafka_headers_t *hdrs) {
765 
766         /* Send/Produce message. */
767         if (hdrs) {
768                 rd_kafka_headers_t *hdrs_copy;
769                 rd_kafka_resp_err_t err;
770 
771                 hdrs_copy = rd_kafka_headers_copy(hdrs);
772 
773                 err = rd_kafka_producev(
774                         rk,
775                         RD_KAFKA_V_RKT(rkt),
776                         RD_KAFKA_V_PARTITION(partition),
777                         RD_KAFKA_V_MSGFLAGS(msgflags),
778                         RD_KAFKA_V_VALUE(payload, size),
779                         RD_KAFKA_V_KEY(key, key_size),
780                         RD_KAFKA_V_HEADERS(hdrs_copy),
781                         RD_KAFKA_V_END);
782 
783                 if (err)
784                         rd_kafka_headers_destroy(hdrs_copy);
785 
786                 return err;
787 
788         } else {
789                 if (rd_kafka_produce(rkt, partition, msgflags, payload, size,
790                                      key, key_size, NULL) == -1)
791                         return rd_kafka_last_error();
792         }
793 
794         return RD_KAFKA_RESP_ERR_NO_ERROR;
795 }
796 
797 /**
798  * @brief Sleep for \p sleep_us microseconds.
799  */
do_sleep(int sleep_us)800 static void do_sleep (int sleep_us) {
801         if (sleep_us > 100) {
802 #ifdef _WIN32
803                 Sleep(sleep_us / 1000);
804 #else
805                 usleep(sleep_us);
806 #endif
807         } else {
808                 rd_ts_t next = rd_clock() + (rd_ts_t)sleep_us;
809                 while (next > rd_clock())
810                         ;
811         }
812 }
813 
814 
main(int argc,char ** argv)815 int main (int argc, char **argv) {
816 	char *brokers = NULL;
817 	char mode = 'C';
818 	char *topic = NULL;
819 	const char *key = NULL;
820         int *partitions = NULL;
821 	int opt;
822 	int sendflags = 0;
823 	char *msgpattern = "librdkafka_performance testing!";
824 	int msgsize = -1;
825 	const char *debug = NULL;
826 	int do_conf_dump = 0;
827 	rd_ts_t now;
828 	char errstr[512];
829 	uint64_t seq = 0;
830 	int seed = (int)time(NULL);
831         rd_kafka_t *rk;
832 	rd_kafka_topic_t *rkt;
833 	rd_kafka_conf_t *conf;
834 	rd_kafka_queue_t *rkqu = NULL;
835 	const char *compression = "no";
836 	int64_t start_offset = 0;
837 	int batch_size = 0;
838 	int idle = 0;
839         const char *stats_cmd = NULL;
840         char *stats_intvlstr = NULL;
841         char tmp[128];
842         char *tmp2;
843         int otype = _OTYPE_SUMMARY;
844         double dtmp;
845         int rate_sleep = 0;
846 	rd_kafka_topic_partition_list_t *topics;
847         int exitcode = 0;
848         rd_kafka_headers_t *hdrs = NULL;
849         rd_kafka_resp_err_t err;
850 
851 	/* Kafka configuration */
852 	conf = rd_kafka_conf_new();
853 	rd_kafka_conf_set_error_cb(conf, err_cb);
854 	rd_kafka_conf_set_throttle_cb(conf, throttle_cb);
855         rd_kafka_conf_set_offset_commit_cb(conf, offset_commit_cb);
856 
857 #ifdef SIGIO
858         /* Quick termination */
859 	rd_snprintf(tmp, sizeof(tmp), "%i", SIGIO);
860 	rd_kafka_conf_set(conf, "internal.termination.signal", tmp, NULL, 0);
861 #endif
862 
863         /* Producer config */
864         rd_kafka_conf_set(conf, "linger.ms", "1000", NULL, 0);
865         rd_kafka_conf_set(conf, "message.send.max.retries", "3", NULL, 0);
866         rd_kafka_conf_set(conf, "retry.backoff.ms", "500", NULL, 0);
867 
868 	/* Consumer config */
869 	/* Tell rdkafka to (try to) maintain 1M messages
870 	 * in its internal receive buffers. This is to avoid
871 	 * application -> rdkafka -> broker  per-message ping-pong
872 	 * latency.
873 	 * The larger the local queue, the higher the performance.
874 	 * Try other values with: ... -X queued.min.messages=1000
875 	 */
876 	rd_kafka_conf_set(conf, "queued.min.messages", "1000000", NULL, 0);
877 	rd_kafka_conf_set(conf, "session.timeout.ms", "6000", NULL, 0);
878         rd_kafka_conf_set(conf, "auto.offset.reset", "earliest", NULL, 0);
879 
880 	topics = rd_kafka_topic_partition_list_new(1);
881 
882 	while ((opt =
883 		getopt(argc, argv,
884 		       "PCG:t:p:b:s:k:c:fi:MDd:m:S:x:"
885                        "R:a:z:o:X:B:eT:Y:qvIur:lA:OwNH:")) != -1) {
886 		switch (opt) {
887 		case 'G':
888 			if (rd_kafka_conf_set(conf, "group.id", optarg,
889 					      errstr, sizeof(errstr)) !=
890 			    RD_KAFKA_CONF_OK) {
891 				fprintf(stderr, "%% %s\n", errstr);
892 				exit(1);
893 			}
894 			/* FALLTHRU */
895 		case 'P':
896 		case 'C':
897 			mode = opt;
898 			break;
899 		case 't':
900 			rd_kafka_topic_partition_list_add(topics, optarg,
901 							  RD_KAFKA_PARTITION_UA);
902 			break;
903 		case 'p':
904                         partition_cnt++;
905 			partitions = realloc(partitions, sizeof(*partitions) * partition_cnt);
906 			partitions[partition_cnt-1] = atoi(optarg);
907 			break;
908 
909 		case 'b':
910 			brokers = optarg;
911 			break;
912 		case 's':
913 			msgsize = atoi(optarg);
914 			break;
915 		case 'k':
916 			key = optarg;
917 			break;
918 		case 'c':
919 			msgcnt = atoi(optarg);
920 			break;
921 		case 'D':
922 			sendflags |= RD_KAFKA_MSG_F_FREE;
923 			break;
924 		case 'i':
925 			dispintvl = atoi(optarg);
926 			break;
927 		case 'm':
928 			msgpattern = optarg;
929 			break;
930 		case 'S':
931 			seq = strtoull(optarg, NULL, 10);
932 			do_seq = 1;
933 			break;
934 		case 'x':
935 			exit_after = atoi(optarg);
936 			break;
937 		case 'R':
938 			seed = atoi(optarg);
939 			break;
940 		case 'a':
941 			if (rd_kafka_conf_set(conf,
942                                               "acks",
943                                               optarg,
944                                               errstr, sizeof(errstr)) !=
945 			    RD_KAFKA_CONF_OK) {
946 				fprintf(stderr, "%% %s\n", errstr);
947 				exit(1);
948 			}
949 			break;
950 		case 'B':
951 			batch_size = atoi(optarg);
952 			break;
953 		case 'z':
954 			if (rd_kafka_conf_set(conf, "compression.codec",
955 					      optarg,
956 					      errstr, sizeof(errstr)) !=
957 			    RD_KAFKA_CONF_OK) {
958 				fprintf(stderr, "%% %s\n", errstr);
959 				exit(1);
960 			}
961 			compression = optarg;
962 			break;
963 		case 'o':
964 			if (!strcmp(optarg, "end"))
965 				start_offset = RD_KAFKA_OFFSET_END;
966 			else if (!strcmp(optarg, "beginning"))
967 				start_offset = RD_KAFKA_OFFSET_BEGINNING;
968 			else if (!strcmp(optarg, "stored"))
969 				start_offset = RD_KAFKA_OFFSET_STORED;
970 			else {
971 				start_offset = strtoll(optarg, NULL, 10);
972 
973 				if (start_offset < 0)
974 					start_offset = RD_KAFKA_OFFSET_TAIL(-start_offset);
975 			}
976 
977 			break;
978 		case 'e':
979 			exit_eof = 1;
980 			break;
981 		case 'd':
982 			debug = optarg;
983 			break;
984         case 'H':
985             if (!strcmp(optarg, "parse"))
986                 read_hdrs = 1;
987             else {
988                 char *name, *val;
989                 size_t name_sz = -1;
990 
991                 name = optarg;
992                 val = strchr(name, '=');
993                 if (val) {
994                         name_sz = (size_t)(val-name);
995                         val++; /* past the '=' */
996                 }
997 
998                 if (!hdrs)
999                         hdrs = rd_kafka_headers_new(8);
1000 
1001                 err = rd_kafka_header_add(hdrs, name, name_sz, val, -1);
1002                 if (err) {
1003                         fprintf(stderr,
1004                                 "%% Failed to add header %s: %s\n",
1005                                 name, rd_kafka_err2str(err));
1006                         exit(1);
1007                 }
1008             }
1009             break;
1010 		case 'X':
1011 		{
1012 			char *name, *val;
1013 			rd_kafka_conf_res_t res;
1014 
1015 			if (!strcmp(optarg, "list") ||
1016 			    !strcmp(optarg, "help")) {
1017 				rd_kafka_conf_properties_show(stdout);
1018 				exit(0);
1019 			}
1020 
1021 			if (!strcmp(optarg, "dump")) {
1022 				do_conf_dump = 1;
1023 				continue;
1024 			}
1025 
1026 			name = optarg;
1027 			if (!(val = strchr(name, '='))) {
1028 				fprintf(stderr, "%% Expected "
1029 					"-X property=value, not %s\n", name);
1030 				exit(1);
1031 			}
1032 
1033 			*val = '\0';
1034 			val++;
1035 
1036                         if (!strcmp(name, "file")) {
1037                                 if (read_conf_file(conf, val) == -1)
1038                                         exit(1);
1039                                 break;
1040                         }
1041 
1042                         res = rd_kafka_conf_set(conf, name, val,
1043                                                 errstr, sizeof(errstr));
1044 
1045 			if (res != RD_KAFKA_CONF_OK) {
1046 				fprintf(stderr, "%% %s\n", errstr);
1047 				exit(1);
1048 			}
1049 		}
1050 		break;
1051 
1052 		case 'T':
1053                         stats_intvlstr = optarg;
1054 			break;
1055                 case 'Y':
1056                         stats_cmd = optarg;
1057                         break;
1058 
1059 		case 'q':
1060                         verbosity--;
1061 			break;
1062 
1063 		case 'v':
1064                         verbosity++;
1065 			break;
1066 
1067 		case 'I':
1068 			idle = 1;
1069 			break;
1070 
1071                 case 'u':
1072                         otype = _OTYPE_TAB;
1073                         verbosity--; /* remove some fluff */
1074                         break;
1075 
1076                 case 'r':
1077                         dtmp = strtod(optarg, &tmp2);
1078                         if (tmp2 == optarg ||
1079                             (dtmp >= -0.001 && dtmp <= 0.001)) {
1080                                 fprintf(stderr, "%% Invalid rate: %s\n",
1081                                         optarg);
1082                                 exit(1);
1083                         }
1084 
1085                         rate_sleep = (int)(1000000.0 / dtmp);
1086                         break;
1087 
1088                 case 'l':
1089                         latency_mode = 1;
1090 			break;
1091 
1092 		case 'A':
1093 			if (!(latency_fp = fopen(optarg, "w"))) {
1094 				fprintf(stderr,
1095 					"%% Cant open %s: %s\n",
1096 					optarg, strerror(errno));
1097 				exit(1);
1098 			}
1099                         break;
1100 
1101 		case 'M':
1102 			incremental_mode = 1;
1103 			break;
1104 
1105 		case 'N':
1106 			with_dr = 0;
1107 			break;
1108 
1109 		default:
1110                         fprintf(stderr, "Unknown option: %c\n", opt);
1111 			goto usage;
1112 		}
1113 	}
1114 
1115 	if (topics->cnt == 0 || optind != argc) {
1116                 if (optind < argc)
1117                         fprintf(stderr, "Unknown argument: %s\n", argv[optind]);
1118 	usage:
1119 		fprintf(stderr,
1120 			"Usage: %s [-C|-P] -t <topic> "
1121 			"[-p <partition>] [-b <broker,broker..>] [options..]\n"
1122 			"\n"
1123 			"librdkafka version %s (0x%08x)\n"
1124 			"\n"
1125 			" Options:\n"
1126 			"  -C | -P |    Consumer or Producer mode\n"
1127 			"  -G <groupid> High-level Kafka Consumer mode\n"
1128 			"  -t <topic>   Topic to consume / produce\n"
1129 			"  -p <num>     Partition (defaults to random). "
1130 			"Multiple partitions are allowed in -C consumer mode.\n"
1131 			"  -M           Print consumer interval stats\n"
1132 			"  -b <brokers> Broker address list (host[:port],..)\n"
1133 			"  -s <size>    Message size (producer)\n"
1134 			"  -k <key>     Message key (producer)\n"
1135             "  -H <name[=value]> Add header to message (producer)\n"
1136             "  -H parse     Read message headers (consumer)\n"
1137 			"  -c <cnt>     Messages to transmit/receive\n"
1138 			"  -x <cnt>     Hard exit after transmitting <cnt> messages (producer)\n"
1139 			"  -D           Copy/Duplicate data buffer (producer)\n"
1140 			"  -i <ms>      Display interval\n"
1141 			"  -m <msg>     Message payload pattern\n"
1142 			"  -S <start>   Send a sequence number starting at "
1143 			"<start> as payload\n"
1144 			"  -R <seed>    Random seed value (defaults to time)\n"
1145 			"  -a <acks>    Required acks (producer): "
1146 			"-1, 0, 1, >1\n"
1147 			"  -B <size>    Consume batch size (# of msgs)\n"
1148 			"  -z <codec>   Enable compression:\n"
1149 			"               none|gzip|snappy\n"
1150 			"  -o <offset>  Start offset (consumer)\n"
1151 			"               beginning, end, NNNNN or -NNNNN\n"
1152 			"  -d [facs..]  Enable debugging contexts:\n"
1153 			"               %s\n"
1154 			"  -X <prop=name> Set arbitrary librdkafka "
1155 			"configuration property\n"
1156                         "  -X file=<path> Read config from file.\n"
1157                         "  -X list      Show full list of supported properties.\n"
1158                         "  -X dump      Show configuration\n"
1159 			"  -T <intvl>   Enable statistics from librdkafka at "
1160 			"specified interval (ms)\n"
1161                         "  -Y <command> Pipe statistics to <command>\n"
1162 			"  -I           Idle: dont produce any messages\n"
1163 			"  -q           Decrease verbosity\n"
1164                         "  -v           Increase verbosity (default 1)\n"
1165                         "  -u           Output stats in table format\n"
1166                         "  -r <rate>    Producer msg/s limit\n"
1167                         "  -l           Latency measurement.\n"
1168                         "               Needs two matching instances, one\n"
1169                         "               consumer and one producer, both\n"
1170                         "               running with the -l switch.\n"
1171                         "  -l           Producer: per-message latency stats\n"
1172 			"  -A <file>    Write per-message latency stats to "
1173 			"<file>. Requires -l\n"
1174                         "  -O           Report produced offset (producer)\n"
1175 			"  -N           No delivery reports (producer)\n"
1176 			"\n"
1177 			" In Consumer mode:\n"
1178 			"  consumes messages and prints thruput\n"
1179 			"  If -B <..> is supplied the batch consumer\n"
1180 			"  mode is used, else the callback mode is used.\n"
1181 			"\n"
1182 			" In Producer mode:\n"
1183 			"  writes messages of size -s <..> and prints thruput\n"
1184 			"\n",
1185 			argv[0],
1186 			rd_kafka_version_str(), rd_kafka_version(),
1187 			RD_KAFKA_DEBUG_CONTEXTS);
1188 		exit(1);
1189 	}
1190 
1191 
1192 	dispintvl *= 1000; /* us */
1193 
1194         if (verbosity > 1)
1195                 printf("%% Using random seed %i, verbosity level %i\n",
1196                        seed, verbosity);
1197 	srand(seed);
1198 	signal(SIGINT, stop);
1199 #ifdef SIGUSR1
1200 	signal(SIGUSR1, sig_usr1);
1201 #endif
1202 
1203 
1204 	if (debug &&
1205 	    rd_kafka_conf_set(conf, "debug", debug, errstr, sizeof(errstr)) !=
1206 	    RD_KAFKA_CONF_OK) {
1207 		printf("%% Debug configuration failed: %s: %s\n",
1208 		       errstr, debug);
1209 		exit(1);
1210 	}
1211 
1212         /* Always enable stats (for RTT extraction), and if user supplied
1213          * the -T <intvl> option we let her take part of the stats aswell. */
1214         rd_kafka_conf_set_stats_cb(conf, stats_cb);
1215 
1216         if (!stats_intvlstr) {
1217                 /* if no user-desired stats, adjust stats interval
1218                  * to the display interval. */
1219                 rd_snprintf(tmp, sizeof(tmp), "%"PRId64, dispintvl / 1000);
1220         }
1221 
1222         if (rd_kafka_conf_set(conf, "statistics.interval.ms",
1223                               stats_intvlstr ? stats_intvlstr : tmp,
1224                               errstr, sizeof(errstr)) !=
1225             RD_KAFKA_CONF_OK) {
1226                 fprintf(stderr, "%% %s\n", errstr);
1227                 exit(1);
1228         }
1229 
1230         if (do_conf_dump) {
1231                 const char **arr;
1232                 size_t cnt;
1233                 int pass;
1234 
1235                 for (pass = 0 ; pass < 2 ; pass++) {
1236                         int i;
1237 
1238                         if (pass == 0) {
1239                                 arr = rd_kafka_conf_dump(conf, &cnt);
1240                                 printf("# Global config\n");
1241                         } else {
1242                                 rd_kafka_topic_conf_t *topic_conf =
1243                                         rd_kafka_conf_get_default_topic_conf(
1244                                                 conf);
1245 
1246                                 if (topic_conf) {
1247                                         printf("# Topic config\n");
1248                                         arr = rd_kafka_topic_conf_dump(
1249                                                 topic_conf, &cnt);
1250                                 } else {
1251                                         arr = NULL;
1252                                 }
1253                         }
1254 
1255                         if (!arr)
1256                                 continue;
1257 
1258                         for (i = 0 ; i < (int)cnt ; i += 2)
1259                                 printf("%s = %s\n",
1260                                        arr[i], arr[i+1]);
1261 
1262                         printf("\n");
1263 
1264                         rd_kafka_conf_dump_free(arr, cnt);
1265                 }
1266 
1267                 exit(0);
1268         }
1269 
1270         if (latency_mode)
1271                 do_seq = 0;
1272 
1273         if (stats_intvlstr) {
1274                 /* User enabled stats (-T) */
1275 
1276 #ifndef _WIN32
1277                 if (stats_cmd) {
1278                         if (!(stats_fp = popen(stats_cmd,
1279 #ifdef __linux__
1280                                                "we"
1281 #else
1282                                                "w"
1283 #endif
1284                                                ))) {
1285                                 fprintf(stderr,
1286                                         "%% Failed to start stats command: "
1287                                         "%s: %s", stats_cmd, strerror(errno));
1288                                 exit(1);
1289                         }
1290                 } else
1291 #endif
1292                         stats_fp = stdout;
1293         }
1294 
1295 	if (msgcnt != -1)
1296 		forever = 0;
1297 
1298 	if (msgsize == -1)
1299 		msgsize = (int)strlen(msgpattern);
1300 
1301 	topic = topics->elems[0].topic;
1302 
1303         if (mode == 'C' || mode == 'G')
1304                 rd_kafka_conf_set(conf, "enable.partition.eof", "true",
1305                                   NULL, 0);
1306 
1307         if (read_hdrs && mode == 'P') {
1308                 fprintf(stderr, "%% producer can not read headers\n");
1309                 exit(1);
1310         }
1311 
1312         if (hdrs && mode != 'P') {
1313                 fprintf(stderr, "%% consumer can not add headers\n");
1314                 exit(1);
1315         }
1316 
1317         /* Set bootstrap servers */
1318         if (brokers &&
1319             rd_kafka_conf_set(conf, "bootstrap.servers", brokers,
1320                               errstr, sizeof(errstr)) != RD_KAFKA_CONF_OK) {
1321                 fprintf(stderr, "%% %s\n", errstr);
1322                 exit(1);
1323         }
1324 
1325 	if (mode == 'P') {
1326 		/*
1327 		 * Producer
1328 		 */
1329 		char *sbuf;
1330 		char *pbuf;
1331 		int outq;
1332 		int keylen = key ? (int)strlen(key) : 0;
1333 		off_t rof = 0;
1334 		size_t plen = strlen(msgpattern);
1335 		int partition = partitions ? partitions[0] :
1336 			RD_KAFKA_PARTITION_UA;
1337 
1338                 if (latency_mode) {
1339                         int minlen = (int)(strlen("LATENCY:") +
1340                                            strlen("18446744073709551615 ")+1);
1341                         msgsize = RD_MAX(minlen, msgsize);
1342                         sendflags |= RD_KAFKA_MSG_F_COPY;
1343 		} else if (do_seq) {
1344                         int minlen = (int)strlen("18446744073709551615 ")+1;
1345                         if (msgsize < minlen)
1346                                 msgsize = minlen;
1347 
1348 			/* Force duplication of payload */
1349                         sendflags |= RD_KAFKA_MSG_F_FREE;
1350 		}
1351 
1352 		sbuf = malloc(msgsize);
1353 
1354 		/* Copy payload content to new buffer */
1355 		while (rof < msgsize) {
1356 			size_t xlen = RD_MIN((size_t)msgsize-rof, plen);
1357 			memcpy(sbuf+rof, msgpattern, xlen);
1358 			rof += (off_t)xlen;
1359 		}
1360 
1361 		if (msgcnt == -1)
1362 			printf("%% Sending messages of size %i bytes\n",
1363 			       msgsize);
1364 		else
1365 			printf("%% Sending %i messages of size %i bytes\n",
1366 			       msgcnt, msgsize);
1367 
1368 		if (with_dr)
1369 			rd_kafka_conf_set_dr_msg_cb(conf, msg_delivered);
1370 
1371 		/* Create Kafka handle */
1372 		if (!(rk = rd_kafka_new(RD_KAFKA_PRODUCER, conf,
1373  					errstr, sizeof(errstr)))) {
1374 			fprintf(stderr,
1375 				"%% Failed to create Kafka producer: %s\n",
1376 				errstr);
1377 			exit(1);
1378 		}
1379 
1380                 global_rk = rk;
1381 
1382 		/* Explicitly create topic to avoid per-msg lookups. */
1383 		rkt = rd_kafka_topic_new(rk, topic, NULL);
1384 
1385 
1386                 if (rate_sleep && verbosity >= 2)
1387                         fprintf(stderr,
1388                                 "%% Inter message rate limiter sleep %ius\n",
1389                                 rate_sleep);
1390 
1391                 dr_disp_div = msgcnt / 50;
1392                 if (dr_disp_div == 0)
1393                         dr_disp_div = 10;
1394 
1395 		cnt.t_start = cnt.t_last = rd_clock();
1396 
1397 		msgs_wait_produce_cnt = msgcnt;
1398 
1399 		while (run && (msgcnt == -1 || (int)cnt.msgs < msgcnt)) {
1400 			/* Send/Produce message. */
1401 
1402 			if (idle) {
1403 				rd_kafka_poll(rk, 1000);
1404 				continue;
1405 			}
1406 
1407                         if (latency_mode) {
1408                                 rd_snprintf(sbuf, msgsize-1,
1409                                          "LATENCY:%"PRIu64,  wall_clock());
1410                         } else if (do_seq) {
1411                                 rd_snprintf(sbuf,
1412                                          msgsize-1, "%"PRIu64": ", seq);
1413                                 seq++;
1414 			}
1415 
1416 			if (sendflags & RD_KAFKA_MSG_F_FREE) {
1417 				/* Duplicate memory */
1418 				pbuf = malloc(msgsize);
1419 				memcpy(pbuf, sbuf, msgsize);
1420 			} else
1421 				pbuf = sbuf;
1422 
1423                         if (msgsize == 0)
1424                                 pbuf = NULL;
1425 
1426 			cnt.tx++;
1427 			while (run &&
1428                                (err = do_produce(rk, rkt, partition, sendflags,
1429                                                  pbuf, msgsize,
1430                                                  key, keylen, hdrs))) {
1431 				if (err == RD_KAFKA_RESP_ERR__UNKNOWN_PARTITION)
1432 					printf("%% No such partition: "
1433 						   "%"PRId32"\n", partition);
1434 				else if (verbosity >= 3 ||
1435 					(err != RD_KAFKA_RESP_ERR__QUEUE_FULL && verbosity >= 1))
1436 					printf("%% produce error: %s%s\n",
1437 						   rd_kafka_err2str(err),
1438 						   err == RD_KAFKA_RESP_ERR__QUEUE_FULL ?
1439 						   " (backpressure)" : "");
1440 
1441 				cnt.tx_err++;
1442 				if (err != RD_KAFKA_RESP_ERR__QUEUE_FULL) {
1443 					run = 0;
1444 					break;
1445 				}
1446 				now = rd_clock();
1447 				if (verbosity >= 2 &&
1448                                     cnt.t_enobufs_last + dispintvl <= now) {
1449 					printf("%% Backpressure %i "
1450 					       "(tx %"PRIu64", "
1451 					       "txerr %"PRIu64")\n",
1452 					       rd_kafka_outq_len(rk),
1453 					       cnt.tx, cnt.tx_err);
1454 					cnt.t_enobufs_last = now;
1455 				}
1456 
1457 				/* Poll to handle delivery reports */
1458 				rd_kafka_poll(rk, 10);
1459 
1460                                 print_stats(rk, mode, otype, compression);
1461 			}
1462 
1463 			msgs_wait_cnt++;
1464 			if (msgs_wait_produce_cnt != -1)
1465 				msgs_wait_produce_cnt--;
1466 			cnt.msgs++;
1467 			cnt.bytes += msgsize;
1468 
1469 			/* Must poll to handle delivery reports */
1470 			if (rate_sleep) {
1471 				rd_ts_t next = rd_clock() + (rd_ts_t) rate_sleep;
1472 				do {
1473 					rd_kafka_poll(rk,
1474 						      (int)RD_MAX(0,
1475 						      (next - rd_clock()) / 1000));
1476 				} while (next > rd_clock());
1477 			} else {
1478 				rd_kafka_poll(rk, 0);
1479 			}
1480 
1481 			print_stats(rk, mode, otype, compression);
1482 		}
1483 
1484 		forever = 0;
1485                 if (verbosity >= 2)
1486                         printf("%% All messages produced, "
1487                                "now waiting for %li deliveries\n",
1488                                msgs_wait_cnt);
1489 
1490 		/* Wait for messages to be delivered */
1491                 while (run && rd_kafka_poll(rk, 1000) != -1)
1492 			print_stats(rk, mode, otype, compression);
1493 
1494 
1495 		outq = rd_kafka_outq_len(rk);
1496                 if (verbosity >= 2)
1497                         printf("%% %i messages in outq\n", outq);
1498 		cnt.msgs -= outq;
1499 		cnt.t_end = t_end;
1500 
1501 		if (cnt.tx_err > 0)
1502 			printf("%% %"PRIu64" backpressures for %"PRIu64
1503 			       " produce calls: %.3f%% backpressure rate\n",
1504 			       cnt.tx_err, cnt.tx,
1505 			       ((double)cnt.tx_err / (double)cnt.tx) * 100.0);
1506 
1507 		/* Destroy topic */
1508 		rd_kafka_topic_destroy(rkt);
1509 
1510 		/* Destroy the handle */
1511 		rd_kafka_destroy(rk);
1512                 global_rk = rk = NULL;
1513 
1514 		free(sbuf);
1515 
1516                 exitcode = cnt.msgs == cnt.msgs_dr_ok ? 0 : 1;
1517 
1518 	} else if (mode == 'C') {
1519 		/*
1520 		 * Consumer
1521 		 */
1522 
1523 		rd_kafka_message_t **rkmessages = NULL;
1524 		size_t i = 0;
1525 
1526 		/* Create Kafka handle */
1527 		if (!(rk = rd_kafka_new(RD_KAFKA_CONSUMER, conf,
1528 					errstr, sizeof(errstr)))) {
1529 			fprintf(stderr,
1530 				"%% Failed to create Kafka consumer: %s\n",
1531 				errstr);
1532 			exit(1);
1533 		}
1534 
1535                 global_rk = rk;
1536 
1537 		/* Create topic to consume from */
1538 		rkt = rd_kafka_topic_new(rk, topic, NULL);
1539 
1540 		/* Batch consumer */
1541 		if (batch_size)
1542 			rkmessages = malloc(sizeof(*rkmessages) * batch_size);
1543 
1544 		/* Start consuming */
1545 		rkqu = rd_kafka_queue_new(rk);
1546 		for (i=0 ; i<(size_t)partition_cnt ; ++i) {
1547 			const int r = rd_kafka_consume_start_queue(rkt,
1548 				partitions[i], start_offset, rkqu);
1549 
1550 			if (r == -1) {
1551                                 fprintf(stderr, "%% Error creating queue: %s\n",
1552                                         rd_kafka_err2str(rd_kafka_last_error()));
1553 				exit(1);
1554 			}
1555 		}
1556 
1557 		while (run && (msgcnt == -1 || msgcnt > (int)cnt.msgs)) {
1558 			/* Consume messages.
1559 			 * A message may either be a real message, or
1560 			 * an error signaling (if rkmessage->err is set).
1561 			 */
1562 			uint64_t fetch_latency;
1563 			ssize_t r;
1564 
1565 			fetch_latency = rd_clock();
1566 
1567 			if (batch_size) {
1568 				int partition = partitions ? partitions[0] :
1569 				    RD_KAFKA_PARTITION_UA;
1570 
1571 				/* Batch fetch mode */
1572 				r = rd_kafka_consume_batch(rkt, partition,
1573 							   1000,
1574 							   rkmessages,
1575 							   batch_size);
1576 				if (r != -1) {
1577 					for (i = 0 ; (ssize_t)i < r ; i++) {
1578 						msg_consume(rkmessages[i],
1579 							NULL);
1580 						rd_kafka_message_destroy(
1581 							rkmessages[i]);
1582 					}
1583 				}
1584 			} else {
1585 				/* Queue mode */
1586 				r = rd_kafka_consume_callback_queue(rkqu, 1000,
1587 							msg_consume,
1588 							NULL);
1589 			}
1590 
1591 			cnt.t_fetch_latency += rd_clock() - fetch_latency;
1592                         if (r == -1)
1593                                 fprintf(stderr, "%% Error: %s\n",
1594                                         rd_kafka_err2str(rd_kafka_last_error()));
1595                         else if (r > 0 && rate_sleep) {
1596                                 /* Simulate processing time
1597                                  * if `-r <rate>` was set. */
1598                                 do_sleep(rate_sleep);
1599                         }
1600 
1601 
1602 			print_stats(rk, mode, otype, compression);
1603 
1604 			/* Poll to handle stats callbacks */
1605 			rd_kafka_poll(rk, 0);
1606 		}
1607 		cnt.t_end = rd_clock();
1608 
1609 		/* Stop consuming */
1610 		for (i=0 ; i<(size_t)partition_cnt ; ++i) {
1611 			int r = rd_kafka_consume_stop(rkt, (int32_t)i);
1612 			if (r == -1) {
1613                                 fprintf(stderr,
1614                                         "%% Error in consume_stop: %s\n",
1615                                         rd_kafka_err2str(rd_kafka_last_error()));
1616 			}
1617 		}
1618 		rd_kafka_queue_destroy(rkqu);
1619 
1620 		/* Destroy topic */
1621 		rd_kafka_topic_destroy(rkt);
1622 
1623 		if (batch_size)
1624 			free(rkmessages);
1625 
1626 		/* Destroy the handle */
1627 		rd_kafka_destroy(rk);
1628 
1629                 global_rk = rk = NULL;
1630 
1631 	} else if (mode == 'G') {
1632 		/*
1633 		 * High-level balanced Consumer
1634 		 */
1635 
1636 		rd_kafka_conf_set_rebalance_cb(conf, rebalance_cb);
1637 
1638 		/* Create Kafka handle */
1639 		if (!(rk = rd_kafka_new(RD_KAFKA_CONSUMER, conf,
1640 					errstr, sizeof(errstr)))) {
1641 			fprintf(stderr,
1642 				"%% Failed to create Kafka consumer: %s\n",
1643 				errstr);
1644 			exit(1);
1645 		}
1646 
1647 		/* Forward all events to consumer queue */
1648 		rd_kafka_poll_set_consumer(rk);
1649 
1650                 global_rk = rk;
1651 
1652 		err = rd_kafka_subscribe(rk, topics);
1653 		if (err) {
1654 			fprintf(stderr, "%% Subscribe failed: %s\n",
1655 				rd_kafka_err2str(err));
1656 			exit(1);
1657 		}
1658 		fprintf(stderr, "%% Waiting for group rebalance..\n");
1659 
1660 		while (run && (msgcnt == -1 || msgcnt > (int)cnt.msgs)) {
1661 			/* Consume messages.
1662 			 * A message may either be a real message, or
1663 			 * an event (if rkmessage->err is set).
1664 			 */
1665 			rd_kafka_message_t *rkmessage;
1666 			uint64_t fetch_latency;
1667 
1668 			fetch_latency = rd_clock();
1669 
1670 			rkmessage = rd_kafka_consumer_poll(rk, 1000);
1671 			if (rkmessage) {
1672 				msg_consume(rkmessage, NULL);
1673 				rd_kafka_message_destroy(rkmessage);
1674 
1675                                 /* Simulate processing time
1676                                  * if `-r <rate>` was set. */
1677                                 if (rate_sleep)
1678                                         do_sleep(rate_sleep);
1679 			}
1680 
1681 			cnt.t_fetch_latency += rd_clock() - fetch_latency;
1682 
1683 			print_stats(rk, mode, otype, compression);
1684 		}
1685 		cnt.t_end = rd_clock();
1686 
1687 		err = rd_kafka_consumer_close(rk);
1688 		if (err)
1689 			fprintf(stderr, "%% Failed to close consumer: %s\n",
1690 				rd_kafka_err2str(err));
1691 
1692 		rd_kafka_destroy(rk);
1693 	}
1694 
1695         if (hdrs)
1696                 rd_kafka_headers_destroy(hdrs);
1697 
1698 	print_stats(NULL, mode, otype|_OTYPE_FORCE, compression);
1699 
1700 	if (cnt.t_fetch_latency && cnt.msgs)
1701 		printf("%% Average application fetch latency: %"PRIu64"us\n",
1702 		       cnt.t_fetch_latency / cnt.msgs);
1703 
1704 	if (latency_fp)
1705 		fclose(latency_fp);
1706 
1707         if (stats_fp) {
1708 #ifndef _WIN32
1709                 pclose(stats_fp);
1710 #endif
1711                 stats_fp = NULL;
1712         }
1713 
1714         if (partitions)
1715                 free(partitions);
1716 
1717 	rd_kafka_topic_partition_list_destroy(topics);
1718 
1719 	/* Let background threads clean up and terminate cleanly. */
1720 	rd_kafka_wait_destroyed(2000);
1721 
1722 	return exitcode;
1723 }
1724