1 /*
2 * librdkafka - Apache Kafka C library
3 *
4 * Copyright (c) 2016, Magnus Edenhill
5 * All rights reserved.
6 *
7 * Redistribution and use in source and binary forms, with or without
8 * modification, are permitted provided that the following conditions are met:
9 *
10 * 1. Redistributions of source code must retain the above copyright notice,
11 * this list of conditions and the following disclaimer.
12 * 2. Redistributions in binary form must reproduce the above copyright notice,
13 * this list of conditions and the following disclaimer in the documentation
14 * and/or other materials provided with the distribution.
15 *
16 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
17 * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
18 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
19 * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
20 * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
21 * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
22 * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
23 * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
24 * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
25 * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
26 * POSSIBILITY OF SUCH DAMAGE.
27 */
28
29 #include "test.h"
30
31 /* Typical include path would be <librdkafka/rdkafka.h>, but this program
32 * is built from within the librdkafka source tree and thus differs. */
33 #include "rdkafka.h" /* for Kafka driver */
34
35
36 /**
37 * Basic performance tests.
38 * These tests dont fail but provide a throughput rate indication.
39 *
40 * + Produce N messages to one partition, acks=1, size=100
41 */
42
43
main_0038_performance(int argc,char ** argv)44 int main_0038_performance (int argc, char **argv) {
45 const char *topic = test_mk_topic_name(__FUNCTION__, 1);
46 const int partition = 0;
47 const int msgsize = 100;
48 uint64_t testid;
49 rd_kafka_conf_t *conf;
50 rd_kafka_t *rk;
51 rd_kafka_topic_t *rkt;
52 test_timing_t t_create, t_produce, t_consume;
53 int totsize = 1024 * 1024 * (test_quick ? 8 : 128);
54 int msgcnt;
55
56 if (!strcmp(test_mode, "valgrind") || !strcmp(test_mode, "helgrind") ||
57 !strcmp(test_mode, "drd"))
58 totsize = 1024*1024*8; /* 8 meg, valgrind is slow. */
59
60 msgcnt = totsize / msgsize;
61
62 TEST_SAY("Producing %d messages of size %d to %s [%d]\n",
63 msgcnt, (int)msgsize, topic, partition);
64 testid = test_id_generate();
65 test_conf_init(&conf, NULL, 120);
66 rd_kafka_conf_set_dr_msg_cb(conf, test_dr_msg_cb);
67 test_conf_set(conf, "queue.buffering.max.messages", "10000000");
68 test_conf_set(conf, "linger.ms", "100");
69 rk = test_create_handle(RD_KAFKA_PRODUCER, conf);
70 rkt = test_create_producer_topic(rk, topic, "acks", "1", NULL);
71
72 /* First produce one message to create the topic, etc, this might take
73 * a while and we dont want this to affect the throughput timing. */
74 TIMING_START(&t_create, "CREATE TOPIC");
75 test_produce_msgs(rk, rkt, testid, partition, 0, 1, NULL, msgsize);
76 TIMING_STOP(&t_create);
77
78 TIMING_START(&t_produce, "PRODUCE");
79 test_produce_msgs(rk, rkt, testid, partition, 1, msgcnt-1, NULL, msgsize);
80 TIMING_STOP(&t_produce);
81
82 TEST_SAY("Destroying producer\n");
83 rd_kafka_topic_destroy(rkt);
84 rd_kafka_destroy(rk);
85
86 TEST_SAY("Creating consumer\n");
87 test_conf_init(&conf, NULL, 120);
88 rk = test_create_consumer(NULL, NULL, conf, NULL);
89 rkt = rd_kafka_topic_new(rk, topic, NULL);
90
91 test_consumer_start("CONSUME", rkt, partition,
92 RD_KAFKA_OFFSET_BEGINNING);
93 TIMING_START(&t_consume, "CONSUME");
94 test_consume_msgs("CONSUME", rkt, testid, partition, TEST_NO_SEEK,
95 0, msgcnt, 1);
96 TIMING_STOP(&t_consume);
97 test_consumer_stop("CONSUME", rkt, partition);
98
99 rd_kafka_topic_destroy(rkt);
100 rd_kafka_destroy(rk);
101
102 TEST_REPORT("{ \"producer\": "
103 " { \"mb_per_sec\": %.2f, \"records_per_sec\": %.2f },"
104 " \"consumer\": "
105 "{ \"mb_per_sec\": %.2f, \"records_per_sec\": %.2f } "
106 "}",
107 (double)
108 (totsize/((double)TIMING_DURATION(&t_produce)/1000000.0f)) /
109 1000000.0f,
110 (float)
111 (msgcnt/((double)TIMING_DURATION(&t_produce)/1000000.0f)),
112 (double)
113 (totsize/((double)TIMING_DURATION(&t_consume)/1000000.0f)) /
114 1000000.0f,
115 (float)
116 (msgcnt/((double)TIMING_DURATION(&t_consume)/1000000.0f)));
117 return 0;
118 }
119