1 /*
2  * librdkafka - Apache Kafka C library
3  *
4  * Copyright (c) 2016, Magnus Edenhill
5  * All rights reserved.
6  *
7  * Redistribution and use in source and binary forms, with or without
8  * modification, are permitted provided that the following conditions are met:
9  *
10  * 1. Redistributions of source code must retain the above copyright notice,
11  *    this list of conditions and the following disclaimer.
12  * 2. Redistributions in binary form must reproduce the above copyright notice,
13  *    this list of conditions and the following disclaimer in the documentation
14  *    and/or other materials provided with the distribution.
15  *
16  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
17  * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
18  * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
19  * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
20  * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
21  * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
22  * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
23  * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
24  * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
25  * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
26  * POSSIBILITY OF SUCH DAMAGE.
27  */
28 
29 #include "test.h"
30 
31 /* Typical include path would be <librdkafka/rdkafka.h>, but this program
32  * is built from within the librdkafka source tree and thus differs. */
33 #include "rdkafka.h"  /* for Kafka driver */
34 
35 
36 /**
37  * Basic performance tests.
38  * These tests dont fail but provide a throughput rate indication.
39  *
40  * + Produce N messages to one partition, acks=1, size=100
41  */
42 
43 
main_0038_performance(int argc,char ** argv)44 int main_0038_performance (int argc, char **argv) {
45 	const char *topic = test_mk_topic_name(__FUNCTION__, 1);
46 	const int partition = 0;
47 	const int msgsize = 100;
48 	uint64_t testid;
49 	rd_kafka_conf_t *conf;
50 	rd_kafka_t *rk;
51 	rd_kafka_topic_t *rkt;
52 	test_timing_t t_create, t_produce, t_consume;
53 	int totsize = 1024 * 1024 * (test_quick ? 8 : 128);
54 	int msgcnt;
55 
56 	if (!strcmp(test_mode, "valgrind") || !strcmp(test_mode, "helgrind") ||
57 	    !strcmp(test_mode, "drd"))
58 		totsize = 1024*1024*8; /* 8 meg, valgrind is slow. */
59 
60 	msgcnt = totsize / msgsize;
61 
62 	TEST_SAY("Producing %d messages of size %d to %s [%d]\n",
63 		 msgcnt, (int)msgsize, topic, partition);
64 	testid = test_id_generate();
65 	test_conf_init(&conf, NULL, 120);
66 	rd_kafka_conf_set_dr_msg_cb(conf, test_dr_msg_cb);
67 	test_conf_set(conf, "queue.buffering.max.messages", "10000000");
68         test_conf_set(conf, "linger.ms", "100");
69 	rk = test_create_handle(RD_KAFKA_PRODUCER, conf);
70 	rkt = test_create_producer_topic(rk, topic, "acks", "1", NULL);
71 
72 	/* First produce one message to create the topic, etc, this might take
73 	 * a while and we dont want this to affect the throughput timing. */
74 	TIMING_START(&t_create, "CREATE TOPIC");
75 	test_produce_msgs(rk, rkt, testid, partition, 0, 1, NULL, msgsize);
76 	TIMING_STOP(&t_create);
77 
78 	TIMING_START(&t_produce, "PRODUCE");
79 	test_produce_msgs(rk, rkt, testid, partition, 1, msgcnt-1, NULL, msgsize);
80 	TIMING_STOP(&t_produce);
81 
82 	TEST_SAY("Destroying producer\n");
83 	rd_kafka_topic_destroy(rkt);
84 	rd_kafka_destroy(rk);
85 
86 	TEST_SAY("Creating consumer\n");
87 	test_conf_init(&conf, NULL, 120);
88 	rk = test_create_consumer(NULL, NULL, conf, NULL);
89 	rkt = rd_kafka_topic_new(rk, topic, NULL);
90 
91 	test_consumer_start("CONSUME", rkt, partition,
92 			    RD_KAFKA_OFFSET_BEGINNING);
93 	TIMING_START(&t_consume, "CONSUME");
94 	test_consume_msgs("CONSUME", rkt, testid, partition, TEST_NO_SEEK,
95 			  0, msgcnt, 1);
96 	TIMING_STOP(&t_consume);
97 	test_consumer_stop("CONSUME", rkt, partition);
98 
99 	rd_kafka_topic_destroy(rkt);
100 	rd_kafka_destroy(rk);
101 
102 	TEST_REPORT("{ \"producer\": "
103 		    " { \"mb_per_sec\": %.2f, \"records_per_sec\": %.2f },"
104 		    " \"consumer\": "
105 		    "{ \"mb_per_sec\": %.2f, \"records_per_sec\": %.2f } "
106 		    "}",
107 		    (double)
108 		    (totsize/((double)TIMING_DURATION(&t_produce)/1000000.0f)) /
109 		    1000000.0f,
110 		    (float)
111 		    (msgcnt/((double)TIMING_DURATION(&t_produce)/1000000.0f)),
112 		    (double)
113 		    (totsize/((double)TIMING_DURATION(&t_consume)/1000000.0f)) /
114 		    1000000.0f,
115 		    (float)
116 		    (msgcnt/((double)TIMING_DURATION(&t_consume)/1000000.0f)));
117 	return 0;
118 }
119