1 
2 /*
3  * librdkafka - Apache Kafka C library
4  *
5  * Copyright (c) 2012-2015, Magnus Edenhill
6  * All rights reserved.
7  *
8  * Redistribution and use in source and binary forms, with or without
9  * modification, are permitted provided that the following conditions are met:
10  *
11  * 1. Redistributions of source code must retain the above copyright notice,
12  *    this list of conditions and the following disclaimer.
13  * 2. Redistributions in binary form must reproduce the above copyright notice,
14  *    this list of conditions and the following disclaimer in the documentation
15  *    and/or other materials provided with the distribution.
16  *
17  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
18  * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
19  * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
20  * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
21  * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
22  * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
23  * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
24  * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
25  * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
26  * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
27  * POSSIBILITY OF SUCH DAMAGE.
28  */
29 
30 #include "test.h"
31 
32 /* Typical include path would be <librdkafka/rdkafka.h>, but this program
33  * is built from within the librdkafka source tree and thus differs. */
34 #include "rdkafka.h"  /* for Kafka driver */
35 
36 
37 /**
38  * Verify that rd_kafka_(query|get)_watermark_offsets() works.
39  */
40 
41 
42 int main_0031_get_offsets (int argc, char **argv) {
43 	const char *topic = test_mk_topic_name(__FUNCTION__, 1);
44         const int msgcnt = test_quick ? 10 : 100;
45 	rd_kafka_t *rk;
46 	rd_kafka_topic_t *rkt;
47 	int64_t qry_low = -1234, qry_high = -1235;
48 	int64_t get_low = -1234, get_high = -1235;
49 	rd_kafka_resp_err_t err;
50 	test_timing_t t_qry, t_get;
51 	uint64_t testid;
52 
53         /* Produce messages */
54         testid = test_produce_msgs_easy(topic, 0, 0, msgcnt);
55 
56 	/* Get offsets */
57 	rk = test_create_consumer(NULL, NULL, NULL, NULL
58 );
59 
60 	TIMING_START(&t_qry, "query_watermark_offsets");
61 	err = rd_kafka_query_watermark_offsets(rk, topic, 0,
62 					       &qry_low, &qry_high,
63                                                tmout_multip(10*1000));
64 	TIMING_STOP(&t_qry);
65 	if (err)
66 		TEST_FAIL("query_watermark_offsets failed: %s\n",
67 			  rd_kafka_err2str(err));
68 
69 	if (qry_low != 0 && qry_high != msgcnt)
70 		TEST_FAIL("Expected low,high %d,%d, but got "
71 			  "%"PRId64",%"PRId64,
72 			  0, msgcnt, qry_low, qry_high);
73 
74 	TEST_SAY("query_watermark_offsets: "
75 		 "offsets %"PRId64", %"PRId64"\n", qry_low, qry_high);
76 
77 	/* Now start consuming to update the offset cache, then query it
78 	 * with the get_ API. */
79 	rkt = test_create_topic_object(rk, topic, NULL);
80 
81 	test_consumer_start("get", rkt, 0, RD_KAFKA_OFFSET_BEGINNING);
82 	test_consume_msgs("get", rkt, testid, 0, TEST_NO_SEEK,
83 			  0, msgcnt, 0);
84 	/* After at least one message has been consumed the
85 	 * watermarks are cached. */
86 
87 	TIMING_START(&t_get, "get_watermark_offsets");
88 	err = rd_kafka_get_watermark_offsets(rk, topic, 0,
89 					     &get_low, &get_high);
90 	TIMING_STOP(&t_get);
91 	if (err)
92 		TEST_FAIL("get_watermark_offsets failed: %s\n",
93 			  rd_kafka_err2str(err));
94 
95 	TEST_SAY("get_watermark_offsets: "
96 		 "offsets %"PRId64", %"PRId64"\n", get_low, get_high);
97 
98 	if (get_high != qry_high)
99 		TEST_FAIL("query/get discrepancies: "
100 			  "low: %"PRId64"/%"PRId64", high: %"PRId64"/%"PRId64,
101 			  qry_low, get_low, qry_high, get_high);
102 	if (get_low >= get_high)
103 		TEST_FAIL("get_watermark_offsets: "
104 			  "low %"PRId64" >= high %"PRId64,
105 			  get_low, get_high);
106 
107 	/* FIXME: We currently dont bother checking the get_low offset
108 	 *        since it requires stats to be enabled. */
109 
110 	test_consumer_stop("get", rkt, 0);
111 
112 	rd_kafka_topic_destroy(rkt);
113 	rd_kafka_destroy(rk);
114 
115         return 0;
116 }
117