1 /* 2 * librdkafka - Apache Kafka C library 3 * 4 * Copyright (c) 2016, Magnus Edenhill 5 * All rights reserved. 6 * 7 * Redistribution and use in source and binary forms, with or without 8 * modification, are permitted provided that the following conditions are met: 9 * 10 * 1. Redistributions of source code must retain the above copyright notice, 11 * this list of conditions and the following disclaimer. 12 * 2. Redistributions in binary form must reproduce the above copyright notice, 13 * this list of conditions and the following disclaimer in the documentation 14 * and/or other materials provided with the distribution. 15 * 16 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" 17 * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE 18 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE 19 * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE 20 * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR 21 * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF 22 * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS 23 * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN 24 * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) 25 * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE 26 * POSSIBILITY OF SUCH DAMAGE. 27 */ 28 29 #include "test.h" 30 31 /* Typical include path would be <librdkafka/rdkafka.h>, but this program 32 * is built from within the librdkafka source tree and thus differs. */ 33 #include "rdkafka.h" /* for Kafka driver */ 34 35 36 /** 37 * Basic performance tests. 38 * These tests dont fail but provide a throughput rate indication. 39 * 40 * + Produce N messages to one partition, acks=1, size=100 41 */ 42 43 44 int main_0038_performance (int argc, char **argv) { 45 const char *topic = test_mk_topic_name(__FUNCTION__, 1); 46 const int partition = 0; 47 const int msgsize = 100; 48 uint64_t testid; 49 rd_kafka_conf_t *conf; 50 rd_kafka_t *rk; 51 rd_kafka_topic_t *rkt; 52 test_timing_t t_create, t_produce, t_consume; 53 int totsize = 1024 * 1024 * (test_quick ? 8 : 128); 54 int msgcnt; 55 56 if (!strcmp(test_mode, "valgrind") || !strcmp(test_mode, "helgrind") || 57 !strcmp(test_mode, "drd")) 58 totsize = 1024*1024*8; /* 8 meg, valgrind is slow. */ 59 60 msgcnt = totsize / msgsize; 61 62 TEST_SAY("Producing %d messages of size %d to %s [%d]\n", 63 msgcnt, (int)msgsize, topic, partition); 64 testid = test_id_generate(); 65 test_conf_init(&conf, NULL, 120); 66 rd_kafka_conf_set_dr_msg_cb(conf, test_dr_msg_cb); 67 test_conf_set(conf, "queue.buffering.max.messages", "10000000"); 68 test_conf_set(conf, "linger.ms", "100"); 69 rk = test_create_handle(RD_KAFKA_PRODUCER, conf); 70 rkt = test_create_producer_topic(rk, topic, "acks", "1", NULL); 71 72 /* First produce one message to create the topic, etc, this might take 73 * a while and we dont want this to affect the throughput timing. */ 74 TIMING_START(&t_create, "CREATE TOPIC"); 75 test_produce_msgs(rk, rkt, testid, partition, 0, 1, NULL, msgsize); 76 TIMING_STOP(&t_create); 77 78 TIMING_START(&t_produce, "PRODUCE"); 79 test_produce_msgs(rk, rkt, testid, partition, 1, msgcnt-1, NULL, msgsize); 80 TIMING_STOP(&t_produce); 81 82 TEST_SAY("Destroying producer\n"); 83 rd_kafka_topic_destroy(rkt); 84 rd_kafka_destroy(rk); error(SQLRETURN a)85 86 TEST_SAY("Creating consumer\n"); 87 test_conf_init(&conf, NULL, 120); 88 rk = test_create_consumer(NULL, NULL, conf, NULL); 89 rkt = rd_kafka_topic_new(rk, topic, NULL); 90 91 test_consumer_start("CONSUME", rkt, partition, 92 RD_KAFKA_OFFSET_BEGINNING); 93 TIMING_START(&t_consume, "CONSUME"); luasql_registerobj(lua_State * L,int index,void * obj)94 test_consume_msgs("CONSUME", rkt, testid, partition, TEST_NO_SEEK, 95 0, msgcnt, 1); 96 TIMING_STOP(&t_consume); 97 test_consumer_stop("CONSUME", rkt, partition); 98 99 rd_kafka_topic_destroy(rkt); 100 rd_kafka_destroy(rk); 101 102 TEST_REPORT("{ \"producer\": " 103 " { \"mb_per_sec\": %.2f, \"records_per_sec\": %.2f }," 104 " \"consumer\": " 105 "{ \"mb_per_sec\": %.2f, \"records_per_sec\": %.2f } " luasql_unregisterobj(lua_State * L,void * obj)106 "}", 107 (double) 108 (totsize/((double)TIMING_DURATION(&t_produce)/1000000.0f)) / 109 1000000.0f, 110 (float) 111 (msgcnt/((double)TIMING_DURATION(&t_produce)/1000000.0f)), 112 (double) 113 (totsize/((double)TIMING_DURATION(&t_consume)/1000000.0f)) / 114 1000000.0f, 115 (float) 116 (msgcnt/((double)TIMING_DURATION(&t_consume)/1000000.0f))); 117 return 0; 118 } 119