1 /*
2 * librdkafka - Apache Kafka C library
3 *
4 * Copyright (c) 2016, Magnus Edenhill
5 * All rights reserved.
6 *
7 * Redistribution and use in source and binary forms, with or without
8 * modification, are permitted provided that the following conditions are met:
9 *
10 * 1. Redistributions of source code must retain the above copyright notice,
11 * this list of conditions and the following disclaimer.
12 * 2. Redistributions in binary form must reproduce the above copyright notice,
13 * this list of conditions and the following disclaimer in the documentation
14 * and/or other materials provided with the distribution.
15 *
16 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
17 * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
18 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
19 * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
20 * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
21 * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
22 * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
23 * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
24 * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
25 * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
26 * POSSIBILITY OF SUCH DAMAGE.
27 */
28
29
30 #include "testcpp.h"
31 #include <cstring>
32
33 /**
34 * Verification of difference between empty and null Key and Value
35 */
36
37
check_equal(const char * exp,const char * actual,size_t len,std::string what)38 static int check_equal (const char *exp,
39 const char *actual, size_t len,
40 std::string what) {
41 size_t exp_len = exp ? strlen(exp) : 0;
42 int failures = 0;
43
44 if (!actual && len != 0) {
45 Test::FailLater(tostr() << what << ": expected length 0 for Null, not " << len);
46 failures++;
47 }
48
49 if (exp) {
50 if (!actual) {
51 Test::FailLater(tostr() << what << ": expected \"" << exp << "\", not Null");
52 failures++;
53
54 } else if (len != exp_len || strncmp(exp, actual, exp_len)) {
55 Test::FailLater(tostr() << what << ": expected \"" << exp << "\", not \"" << actual << "\" (" << len << " bytes)");
56 failures++;
57 }
58
59 } else {
60 if (actual) {
61 Test::FailLater(tostr() << what << ": expected Null, not \"" << actual << "\" (" << len << " bytes)");
62 failures++;
63 }
64 }
65
66 if (!failures)
67 Test::Say(3, tostr() << what << ": matched expectation\n");
68
69 return failures;
70 }
71
72
do_test_null_empty(bool api_version_request)73 static void do_test_null_empty (bool api_version_request) {
74 std::string topic = Test::mk_topic_name("0070_null_empty", 1);
75 const int partition = 0;
76
77 Test::Say(tostr() << "Testing with api.version.request=" << api_version_request << " on topic " << topic << " partition " << partition << "\n");
78
79 RdKafka::Conf *conf;
80 Test::conf_init(&conf, NULL, 0);
81 Test::conf_set(conf, "api.version.request",
82 api_version_request ? "true" : "false");
83 Test::conf_set(conf, "acks", "all");
84
85
86 std::string errstr;
87 RdKafka::Producer *p = RdKafka::Producer::create(conf, errstr);
88 if (!p)
89 Test::Fail("Failed to create Producer: " + errstr);
90 delete conf;
91
92 const int msgcnt = 8;
93 static const char *msgs[msgcnt*2] = {
94 NULL, NULL,
95 "key2", NULL,
96 "key3", "val3",
97 NULL, "val4",
98 "", NULL,
99 NULL, "",
100 "", ""
101 };
102
103 RdKafka::ErrorCode err;
104
105 for (int i = 0 ; i < msgcnt * 2 ; i += 2) {
106 Test::Say(3, tostr() << "Produce message #" << (i/2) <<
107 ": key=\"" << (msgs[i] ? msgs[i] : "Null") <<
108 "\", value=\"" << (msgs[i+1] ? msgs[i+1] : "Null") << "\"\n");
109 err = p->produce(topic, partition, RdKafka::Producer::RK_MSG_COPY,
110 /* Value */
111 (void *)msgs[i+1], msgs[i+1] ? strlen(msgs[i+1]) : 0,
112 /* Key */
113 (void *)msgs[i], msgs[i] ? strlen(msgs[i]) : 0,
114 0, NULL);
115 if (err != RdKafka::ERR_NO_ERROR)
116 Test::Fail("Produce failed: " + RdKafka::err2str(err));
117 }
118
119 if (p->flush(tmout_multip(3*5000)) != 0)
120 Test::Fail("Not all messages flushed");
121
122 Test::Say(tostr() << "Produced " << msgcnt << " messages to " << topic << "\n");
123
124 delete p;
125
126 /*
127 * Now consume messages from the beginning, making sure they match
128 * what was produced.
129 */
130
131 /* Create consumer */
132 Test::conf_init(&conf, NULL, 10);
133 Test::conf_set(conf, "group.id", topic);
134 Test::conf_set(conf, "api.version.request",
135 api_version_request ? "true" : "false");
136 Test::conf_set(conf, "enable.auto.commit", "false");
137
138 RdKafka::KafkaConsumer *c = RdKafka::KafkaConsumer::create(conf, errstr);
139 if (!c)
140 Test::Fail("Failed to create KafkaConsumer: " + errstr);
141 delete conf;
142
143 /* Assign the partition */
144 std::vector<RdKafka::TopicPartition*> parts;
145 parts.push_back(RdKafka::TopicPartition::create(topic, partition,
146 RdKafka::Topic::OFFSET_BEGINNING));
147 err = c->assign(parts);
148 if (err != RdKafka::ERR_NO_ERROR)
149 Test::Fail("assign() failed: " + RdKafka::err2str(err));
150 RdKafka::TopicPartition::destroy(parts);
151
152 /* Start consuming */
153 int failures = 0;
154 for (int i = 0 ; i < msgcnt * 2 ; i += 2) {
155 RdKafka::Message *msg = c->consume(tmout_multip(5000));
156 if (msg->err())
157 Test::Fail(tostr() << "consume() failed at message " << (i/2) << ": " <<
158 msg->errstr());
159
160 /* verify key */
161 failures += check_equal(msgs[i], msg->key() ? msg->key()->c_str() : NULL, msg->key_len(),
162 tostr() << "message #" << (i/2) << " (offset " << msg->offset() << ") key");
163 /* verify key_pointer() API as too */
164 failures += check_equal(msgs[i], (const char *)msg->key_pointer(), msg->key_len(),
165 tostr() << "message #" << (i/2) << " (offset " << msg->offset() << ") key");
166
167 /* verify value */
168 failures += check_equal(msgs[i+1], (const char *)msg->payload(), msg->len(),
169 tostr() << "message #" << (i/2) << " (offset " << msg->offset() << ") value");
170 delete msg;
171 }
172
173 Test::Say(tostr() << "Done consuming, closing. " << failures << " test failures\n");
174 if (failures)
175 Test::Fail(tostr() << "See " << failures << " previous test failure(s)");
176
177 c->close();
178 delete c;
179 }
180
181
182 extern "C" {
main_0070_null_empty(int argc,char ** argv)183 int main_0070_null_empty (int argc, char **argv) {
184 if (test_broker_version >= TEST_BRKVER(0,10,0,0))
185 do_test_null_empty(true);
186 do_test_null_empty(false);
187 return 0;
188 }
189 }
190