1 2 /* 3 * librdkafka - Apache Kafka C library 4 * 5 * Copyright (c) 2012-2015, Magnus Edenhill 6 * All rights reserved. 7 * 8 * Redistribution and use in source and binary forms, with or without 9 * modification, are permitted provided that the following conditions are met: 10 * 11 * 1. Redistributions of source code must retain the above copyright notice, 12 * this list of conditions and the following disclaimer. 13 * 2. Redistributions in binary form must reproduce the above copyright notice, 14 * this list of conditions and the following disclaimer in the documentation 15 * and/or other materials provided with the distribution. 16 * 17 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" 18 * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE 19 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE 20 * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE 21 * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR 22 * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF 23 * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS 24 * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN 25 * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) 26 * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE 27 * POSSIBILITY OF SUCH DAMAGE. 28 */ 29 30 #include "test.h" 31 32 /* Typical include path would be <librdkafka/rdkafka.h>, but this program 33 * is built from within the librdkafka source tree and thus differs. */ 34 #include "rdkafka.h" /* for Kafka driver */ 35 36 37 /** 38 * Verify that rd_kafka_(query|get)_watermark_offsets() works. 39 */ 40 41 42 int main_0031_get_offsets (int argc, char **argv) { 43 const char *topic = test_mk_topic_name(__FUNCTION__, 1); 44 const int msgcnt = test_quick ? 10 : 100; 45 rd_kafka_t *rk; 46 rd_kafka_topic_t *rkt; 47 int64_t qry_low = -1234, qry_high = -1235; 48 int64_t get_low = -1234, get_high = -1235; 49 rd_kafka_resp_err_t err; 50 test_timing_t t_qry, t_get; 51 uint64_t testid; 52 53 /* Produce messages */ 54 testid = test_produce_msgs_easy(topic, 0, 0, msgcnt); 55 56 /* Get offsets */ 57 rk = test_create_consumer(NULL, NULL, NULL, NULL 58 ); 59 60 TIMING_START(&t_qry, "query_watermark_offsets"); 61 err = rd_kafka_query_watermark_offsets(rk, topic, 0, 62 &qry_low, &qry_high, 63 tmout_multip(10*1000)); 64 TIMING_STOP(&t_qry); 65 if (err) 66 TEST_FAIL("query_watermark_offsets failed: %s\n", 67 rd_kafka_err2str(err)); 68 69 if (qry_low != 0 && qry_high != msgcnt) 70 TEST_FAIL("Expected low,high %d,%d, but got " 71 "%"PRId64",%"PRId64, 72 0, msgcnt, qry_low, qry_high); 73 74 TEST_SAY("query_watermark_offsets: " 75 "offsets %"PRId64", %"PRId64"\n", qry_low, qry_high); 76 77 /* Now start consuming to update the offset cache, then query it 78 * with the get_ API. */ 79 rkt = test_create_topic_object(rk, topic, NULL); 80 81 test_consumer_start("get", rkt, 0, RD_KAFKA_OFFSET_BEGINNING); 82 test_consume_msgs("get", rkt, testid, 0, TEST_NO_SEEK, 83 0, msgcnt, 0); 84 /* After at least one message has been consumed the 85 * watermarks are cached. */ 86 87 TIMING_START(&t_get, "get_watermark_offsets"); 88 err = rd_kafka_get_watermark_offsets(rk, topic, 0, 89 &get_low, &get_high); 90 TIMING_STOP(&t_get); 91 if (err) 92 TEST_FAIL("get_watermark_offsets failed: %s\n", 93 rd_kafka_err2str(err)); 94 95 TEST_SAY("get_watermark_offsets: " 96 "offsets %"PRId64", %"PRId64"\n", get_low, get_high); 97 98 if (get_high != qry_high) 99 TEST_FAIL("query/get discrepancies: " 100 "low: %"PRId64"/%"PRId64", high: %"PRId64"/%"PRId64, 101 qry_low, get_low, qry_high, get_high); 102 if (get_low >= get_high) 103 TEST_FAIL("get_watermark_offsets: " 104 "low %"PRId64" >= high %"PRId64, 105 get_low, get_high); 106 107 /* FIXME: We currently dont bother checking the get_low offset 108 * since it requires stats to be enabled. */ 109 110 test_consumer_stop("get", rkt, 0); 111 112 rd_kafka_topic_destroy(rkt); 113 rd_kafka_destroy(rk); 114 115 return 0; 116 } 117