1 /*
2  * librdkafka - Apache Kafka C library
3  *
4  * Copyright (c) 2016, Magnus Edenhill
5  * All rights reserved.
6  *
7  * Redistribution and use in source and binary forms, with or without
8  * modification, are permitted provided that the following conditions are met:
9  *
10  * 1. Redistributions of source code must retain the above copyright notice,
11  *    this list of conditions and the following disclaimer.
12  * 2. Redistributions in binary form must reproduce the above copyright notice,
13  *    this list of conditions and the following disclaimer in the documentation
14  *    and/or other materials provided with the distribution.
15  *
16  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
17  * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
18  * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
19  * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
20  * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
21  * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
22  * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
23  * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
24  * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
25  * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
26  * POSSIBILITY OF SUCH DAMAGE.
27  */
28 
29 #include <iostream>
30 #include "testcpp.h"
31 
32 
33 
34 /**
35  * Issue #1306
36  *
37  * Consume from an empty topic using Consumer and KafkaConsumer.
38  */
39 
40 
41 static void do_test_empty_topic_consumer () {
42   std::string errstr;
43   std::string topic = Test::mk_topic_name("0067_empty_topic", 1);
44   const int32_t partition = 0;
45 
46   RdKafka::Conf *conf;
47 
48   Test::conf_init(&conf, NULL, 0);
49 
50   Test::conf_set(conf, "enable.partition.eof", "true");
51   Test::conf_set(conf, "allow.auto.create.topics", "true");
52 
53   /* Create simple consumer */
54   RdKafka::Consumer *consumer = RdKafka::Consumer::create(conf, errstr);
55   if (!consumer)
56           Test::Fail("Failed to create Consumer: " + errstr);
57 
58   RdKafka::Topic *rkt = RdKafka::Topic::create(consumer, topic, NULL, errstr);
59   if (!rkt)
60           Test::Fail("Simple Topic failed: " + errstr);
61 
62 
63   /* Create the topic through a metadata request. */
64   Test::Say("Creating empty topic " + topic + "\n");
65   RdKafka::Metadata *md;
66   RdKafka::ErrorCode err = consumer->metadata(false, rkt, &md,
67                                               tmout_multip(10*1000));
68   if (err)
69           Test::Fail("Failed to create topic " + topic + ": " + RdKafka::err2str(err));
70   delete md;
71 
72   /* Start consumer */
73   err = consumer->start(rkt, partition, RdKafka::Topic::OFFSET_BEGINNING);
74   if (err)
75           Test::Fail("Consume start() failed: " + RdKafka::err2str(err));
76 
77   /* Consume using legacy consumer, should give an EOF and nothing else. */
78   Test::Say("Simple Consumer: consuming\n");
79   RdKafka::Message *msg = consumer->consume(rkt, partition,
80                                             tmout_multip(10 * 1000));
81   if (msg->err() != RdKafka::ERR__PARTITION_EOF)
82           Test::Fail("Simple consume() expected EOF, got " + RdKafka::err2str(msg->err()));
83   delete msg;
84 
85   /* Nothing else should come now, just a consume() timeout */
86   msg = consumer->consume(rkt, partition, 1 * 1000);
87   if (msg->err() != RdKafka::ERR__TIMED_OUT)
88           Test::Fail("Simple consume() expected timeout, got " + RdKafka::err2str(msg->err()));
89   delete msg;
90 
91   consumer->stop(rkt, partition);
92 
93   delete rkt;
94   delete consumer;
95 
96 
97   /*
98    * Now do the same thing using the high-level KafkaConsumer.
99    */
100 
101   Test::conf_set(conf, "group.id", topic);
102 
103   Test::conf_set(conf, "enable.partition.eof", "true");
104   Test::conf_set(conf, "allow.auto.create.topics", "true");
105 
106   RdKafka::KafkaConsumer *kconsumer = RdKafka::KafkaConsumer::create(conf, errstr);
107   if (!kconsumer)
108           Test::Fail("Failed to create KafkaConsumer: " + errstr);
109 
110   std::vector<RdKafka::TopicPartition*> part;
111   part.push_back(RdKafka::TopicPartition::create(topic, partition));
112 
113   err = kconsumer->assign(part);
114   if (err)
115           Test::Fail("assign() failed: " + RdKafka::err2str(err));
116 
117   RdKafka::TopicPartition::destroy(part);
118 
119   Test::Say("KafkaConsumer: consuming\n");
120   msg = kconsumer->consume(tmout_multip(5 * 1000));
121   if (msg->err() != RdKafka::ERR__PARTITION_EOF)
122           Test::Fail("KafkaConsumer consume() expected EOF, got " + RdKafka::err2str(msg->err()));
123   delete msg;
124 
125   /* Nothing else should come now, just a consume() timeout */
126   msg = kconsumer->consume(1 * 1000);
127   if (msg->err() != RdKafka::ERR__TIMED_OUT)
128           Test::Fail("KafkaConsumer consume() expected timeout, got " + RdKafka::err2str(msg->err()));
129   delete msg;
130 
131   kconsumer->close();
132 
133   delete kconsumer;
134   delete conf;
135 }
136 
137 extern "C" {
138   int main_0067_empty_topic (int argc, char **argv) {
139     do_test_empty_topic_consumer();
140     return 0;
141   }
142 }
143