1 /*
2  * librdkafka - Apache Kafka C library
3  *
4  * Copyright (c) 2012-2014, Magnus Edenhill
5  * All rights reserved.
6  *
7  * Redistribution and use in source and binary forms, with or without
8  * modification, are permitted provided that the following conditions are met:
9  *
10  * 1. Redistributions of source code must retain the above copyright notice,
11  *    this list of conditions and the following disclaimer.
12  * 2. Redistributions in binary form must reproduce the above copyright notice,
13  *    this list of conditions and the following disclaimer in the documentation
14  *    and/or other materials provided with the distribution.
15  *
16  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
17  * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
18  * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
19  * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
20  * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
21  * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
22  * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
23  * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
24  * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
25  * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
26  * POSSIBILITY OF SUCH DAMAGE.
27  */
28 
29 /**
30  * - Generate unique topic name (there is a C function for that in test.h wihch you should use)
31  * - Query metadata for that topic
32  * - Wait one second
33  * - Query again, it should now have isrs and everything
34  * Note: The test require auto.create.topics.enable = true in kafka server properties.
35  */
36 
37 
38 #define _GNU_SOURCE
destroy_flags_rebalance_cb(rd_kafka_t * rk,rd_kafka_resp_err_t err,rd_kafka_topic_partition_list_t * parts,void * opaque)39 #include <sys/time.h>
40 #include <time.h>
41 #include <string>
42 #include <sstream>
43 #include <iostream>
44 
45 
46 extern "C" {
47 #include "test.h"
48 }
49 
50 /* Typical include path would be <librdkafka/rdkafka.h>, but this program
51  * is built from within the librdkafka source tree and thus differs. */
52 #include "rdkafkacpp.h"  /* for Kafka driver */
53 
54 /**
55  * Generate unique topic name (there is a C function for that in test.h wihch you should use)
56  * Query metadata for that topic
57  * Wait one second
58  * Query again, it should now have isrs and everything
59  */
60 static void test_metadata_cpp (void) {
61 	RdKafka::Conf *conf  = RdKafka::Conf::create(RdKafka::Conf::CONF_GLOBAL); /* @TODO: Do we need to merge with C test_conf_init()? */
62 	RdKafka::Conf *tconf = RdKafka::Conf::create(RdKafka::Conf::CONF_TOPIC); /* @TODO: Same of prev */
63 
64 	RdKafka::Metadata *metadata;
65 	RdKafka::ErrorCode err;
66 	int msgcnt = test_on_ci ? 1000 : 10000;
67 	int partition_cnt = 2;
68 	int i;
do_test_destroy_flags(const char * topic,int destroy_flags,int local_mode,const struct df_args * args)69 	uint64_t testid;
70 	int msg_base = 0;
71 	std::string errstr;
72 	const char *topic_str = test_mk_topic_name("0013", 1);
73 /*	if(!topic){
74 		TEST_FAIL()
75 	}*/
76 
77 	//const RdKafka::Conf::ConfResult confResult = conf->set("debug","all",errstr);
78 	//if(confResult != RdKafka::Conf::CONF_OK){
79 	//	std::stringstream errstring;
80 	//	errstring << "Can't set config" << errstr;
81 	//	TEST_FAIL(errstring.str().c_str());
82 	//}
83 
84 	TEST_SAY("Topic %s.\n", topic_str);
85 
86 	const RdKafka::Conf::ConfResult confBrokerResult = conf->set("metadata.broker.list", "localhost:9092", errstr);
87 	if(confBrokerResult != RdKafka::Conf::CONF_OK){
88 		std::stringstream errstring;
89 		errstring << "Can't set broker" << errstr;
90 		TEST_FAIL(errstring.str().c_str());
91 	}
92 
93 	/* Create a producer to fetch metadata */
94 	RdKafka::Producer *producer = RdKafka::Producer::create(conf, errstr);
95 	if (!producer) {
96 		std::stringstream errstring;
97 		errstring << "Can't create producer" << errstr;
98 		TEST_FAIL(errstring.str().c_str());
99 	}
100 
101 	/*
102 	* Create topic handle.
103 	*/
104 	RdKafka::Topic *topic = NULL;
105 	topic = RdKafka::Topic::create(producer, topic_str, tconf, errstr);
106 	if (!topic) {
107 		std::stringstream errstring;
108 		errstring << "Can't create topic" << errstr;
109 		exit(1);
110 	}
111 
112 	/* First request of metadata: It have to fail */
113 	err = producer->metadata(topic!=NULL, topic,
114 	                      &metadata, 5000);
115 	if (err != RdKafka::ERR_NO_ERROR) {
116 		std::stringstream errstring;
117 		errstring << "Can't request first metadata: " << errstr;
118 		TEST_FAIL(errstring.str().c_str());
119 	}
120 
121 	/* It's a new topic, it should have no partitions */
122 	if(metadata->topics()->at(0)->partitions()->size() != 0){
123 		TEST_FAIL("ISRS != 0");
124 	}
125 
126 	sleep(1);
127 
128 	/* Second request of metadata: It have to success */
129 	err = producer->metadata(topic!=NULL, topic,
130 	                      &metadata, 5000);
131 
132 	/* It should have now partitions */
133 	if(metadata->topics()->at(0)->partitions()->size() == 0){
134 		TEST_FAIL("ISRS == 0");
135 	}
136 
137 
138 	delete topic;
139 	delete producer;
140 	delete tconf;
141 	delete conf;
142 
143 	/* Wait for everything to be cleaned up since broker destroys are
144 	 * handled in its own thread. */
145 	test_wait_exit(10);
146 
147 	/* If we havent failed at this point then
148 	 * there were no threads leaked */
149 	return;
150 }
151 
152 int main (int argc, char **argv) {
153 	test_conf_init (NULL, NULL, 20);
154 	test_metadata_cpp();
155 	return 0;
156 }
157