1 /* 2 * librdkafka - Apache Kafka C library 3 * 4 * Copyright (c) 2016, Magnus Edenhill 5 * All rights reserved. 6 * 7 * Redistribution and use in source and binary forms, with or without 8 * modification, are permitted provided that the following conditions are met: 9 * 10 * 1. Redistributions of source code must retain the above copyright notice, 11 * this list of conditions and the following disclaimer. 12 * 2. Redistributions in binary form must reproduce the above copyright notice, 13 * this list of conditions and the following disclaimer in the documentation 14 * and/or other materials provided with the distribution. 15 * 16 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" 17 * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE 18 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE 19 * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE 20 * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR 21 * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF 22 * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS 23 * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN 24 * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) 25 * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE 26 * POSSIBILITY OF SUCH DAMAGE. 27 */ 28 29 #include "test.h" 30 31 /* Typical include path would be <librdkafka/rdkafka.h>, but this program 32 * is built from within the librdkafka source tree and thus differs. */ 33 #include "rdkafka.h" /* for Kafka driver */ 34 35 36 /** 37 * Issue #641: correct handling of partial messages in FetchResponse 38 * 39 * General idea: 40 * - Produce messages of 1000 bytes each 41 * - Set fetch.message.max.bytes to 1500 so that only one full message 42 * can be fetched per request. 43 * - Make sure all messages are received correctly and in order. 44 */ 45 46 47 int main_0036_partial_fetch (int argc, char **argv) { 48 const char *topic = test_mk_topic_name(__FUNCTION__, 1); 49 const int partition = 0; 50 const int msgcnt = 100; 51 const int msgsize = 1000; 52 uint64_t testid; 53 rd_kafka_conf_t *conf; 54 rd_kafka_t *rk; 55 rd_kafka_topic_t *rkt; 56 57 TEST_SAY("Producing %d messages of size %d to %s [%d]\n", 58 msgcnt, (int)msgsize, topic, partition); 59 testid = test_id_generate(); 60 rk = test_create_producer(); 61 rkt = test_create_producer_topic(rk, topic, NULL); 62 63 test_produce_msgs(rk, rkt, testid, partition, 0, msgcnt, NULL, msgsize); 64 65 rd_kafka_topic_destroy(rkt); 66 rd_kafka_destroy(rk); 67 68 TEST_SAY("Creating consumer\n"); 69 test_conf_init(&conf, NULL, 0); 70 /* This should fetch 1.5 messages per fetch, thus resulting in 71 * partial fetches, hopefully. */ 72 test_conf_set(conf, "fetch.message.max.bytes", "1500"); 73 rk = test_create_consumer(NULL, NULL, conf, NULL); 74 rkt = rd_kafka_topic_new(rk, topic, NULL); 75 76 test_consumer_start("CONSUME", rkt, partition, 77 RD_KAFKA_OFFSET_BEGINNING); 78 test_consume_msgs("CONSUME", rkt, testid, partition, TEST_NO_SEEK, 79 0, msgcnt, 1); 80 test_consumer_stop("CONSUME", rkt, partition); 81 82 rd_kafka_topic_destroy(rkt); 83 rd_kafka_destroy(rk); 84 85 return 0; 86 } 87