1 /*
2 * librdkafka - Apache Kafka C library
3 *
4 * Copyright (c) 2012-2014, Magnus Edenhill
5 * All rights reserved.
6 *
7 * Redistribution and use in source and binary forms, with or without
8 * modification, are permitted provided that the following conditions are met:
9 *
10 * 1. Redistributions of source code must retain the above copyright notice,
11 * this list of conditions and the following disclaimer.
12 * 2. Redistributions in binary form must reproduce the above copyright notice,
13 * this list of conditions and the following disclaimer in the documentation
14 * and/or other materials provided with the distribution.
15 *
16 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
17 * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
18 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
19 * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
20 * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
21 * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
22 * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
23 * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
24 * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
25 * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
26 * POSSIBILITY OF SUCH DAMAGE.
27 */
28
29 /**
30 * - Generate unique topic name (there is a C function for that in test.h wihch you should use)
31 * - Query metadata for that topic
32 * - Wait one second
33 * - Query again, it should now have isrs and everything
34 * Note: The test require auto.create.topics.enable = true in kafka server properties.
35 */
36
37
38 #define _GNU_SOURCE
destroy_flags_rebalance_cb(rd_kafka_t * rk,rd_kafka_resp_err_t err,rd_kafka_topic_partition_list_t * parts,void * opaque)39 #include <sys/time.h>
40 #include <time.h>
41 #include <string>
42 #include <sstream>
43 #include <iostream>
44
45
46 extern "C" {
47 #include "test.h"
48 }
49
50 /* Typical include path would be <librdkafka/rdkafka.h>, but this program
51 * is built from within the librdkafka source tree and thus differs. */
52 #include "rdkafkacpp.h" /* for Kafka driver */
53
54 /**
55 * Generate unique topic name (there is a C function for that in test.h wihch you should use)
56 * Query metadata for that topic
57 * Wait one second
58 * Query again, it should now have isrs and everything
59 */
60 static void test_metadata_cpp (void) {
61 RdKafka::Conf *conf = RdKafka::Conf::create(RdKafka::Conf::CONF_GLOBAL); /* @TODO: Do we need to merge with C test_conf_init()? */
62 RdKafka::Conf *tconf = RdKafka::Conf::create(RdKafka::Conf::CONF_TOPIC); /* @TODO: Same of prev */
63
64 RdKafka::Metadata *metadata;
65 RdKafka::ErrorCode err;
66 int msgcnt = test_on_ci ? 1000 : 10000;
67 int partition_cnt = 2;
68 int i;
do_test_destroy_flags(const char * topic,int destroy_flags,int local_mode,const struct df_args * args)69 uint64_t testid;
70 int msg_base = 0;
71 std::string errstr;
72 const char *topic_str = test_mk_topic_name("0013", 1);
73 /* if(!topic){
74 TEST_FAIL()
75 }*/
76
77 //const RdKafka::Conf::ConfResult confResult = conf->set("debug","all",errstr);
78 //if(confResult != RdKafka::Conf::CONF_OK){
79 // std::stringstream errstring;
80 // errstring << "Can't set config" << errstr;
81 // TEST_FAIL(errstring.str().c_str());
82 //}
83
84 TEST_SAY("Topic %s.\n", topic_str);
85
86 const RdKafka::Conf::ConfResult confBrokerResult = conf->set("metadata.broker.list", "localhost:9092", errstr);
87 if(confBrokerResult != RdKafka::Conf::CONF_OK){
88 std::stringstream errstring;
89 errstring << "Can't set broker" << errstr;
90 TEST_FAIL(errstring.str().c_str());
91 }
92
93 /* Create a producer to fetch metadata */
94 RdKafka::Producer *producer = RdKafka::Producer::create(conf, errstr);
95 if (!producer) {
96 std::stringstream errstring;
97 errstring << "Can't create producer" << errstr;
98 TEST_FAIL(errstring.str().c_str());
99 }
100
101 /*
102 * Create topic handle.
103 */
104 RdKafka::Topic *topic = NULL;
105 topic = RdKafka::Topic::create(producer, topic_str, tconf, errstr);
106 if (!topic) {
107 std::stringstream errstring;
108 errstring << "Can't create topic" << errstr;
109 exit(1);
110 }
111
112 /* First request of metadata: It have to fail */
113 err = producer->metadata(topic!=NULL, topic,
114 &metadata, 5000);
115 if (err != RdKafka::ERR_NO_ERROR) {
116 std::stringstream errstring;
117 errstring << "Can't request first metadata: " << errstr;
118 TEST_FAIL(errstring.str().c_str());
119 }
120
121 /* It's a new topic, it should have no partitions */
122 if(metadata->topics()->at(0)->partitions()->size() != 0){
123 TEST_FAIL("ISRS != 0");
124 }
125
126 sleep(1);
127
128 /* Second request of metadata: It have to success */
129 err = producer->metadata(topic!=NULL, topic,
130 &metadata, 5000);
131
132 /* It should have now partitions */
133 if(metadata->topics()->at(0)->partitions()->size() == 0){
134 TEST_FAIL("ISRS == 0");
135 }
136
137
138 delete topic;
139 delete producer;
140 delete tconf;
141 delete conf;
142
143 /* Wait for everything to be cleaned up since broker destroys are
144 * handled in its own thread. */
145 test_wait_exit(10);
146
147 /* If we havent failed at this point then
148 * there were no threads leaked */
149 return;
150 }
151
152 int main (int argc, char **argv) {
153 test_conf_init (NULL, NULL, 20);
154 test_metadata_cpp();
155 return 0;
156 }
157