1 /*
2  * librdkafka - Apache Kafka C library
3  *
4  * Copyright (c) 2016, Magnus Edenhill
5  * All rights reserved.
6  *
7  * Redistribution and use in source and binary forms, with or without
8  * modification, are permitted provided that the following conditions are met:
9  *
10  * 1. Redistributions of source code must retain the above copyright notice,
11  *    this list of conditions and the following disclaimer.
12  * 2. Redistributions in binary form must reproduce the above copyright notice,
13  *    this list of conditions and the following disclaimer in the documentation
14  *    and/or other materials provided with the distribution.
15  *
16  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
17  * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
18  * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
19  * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
20  * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
21  * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
22  * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
23  * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
24  * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
25  * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
26  * POSSIBILITY OF SUCH DAMAGE.
27  */
28 
29 
30 #include "testcpp.h"
31 #include <cstring>
32 
33 /**
34  * Verification of difference between empty and null Key and Value
35  */
36 
37 
check_equal(const char * exp,const char * actual,size_t len,std::string what)38 static int check_equal (const char *exp,
39                          const char *actual, size_t len,
40                          std::string what) {
41   size_t exp_len = exp ? strlen(exp) : 0;
42   int failures = 0;
43 
44   if (!actual && len != 0) {
45     Test::FailLater(tostr() << what << ": expected length 0 for Null, not " << len);
46     failures++;
47   }
48 
49   if (exp) {
50     if (!actual) {
51       Test::FailLater(tostr() << what << ": expected \"" << exp << "\", not Null");
52       failures++;
53 
54     } else if (len != exp_len || strncmp(exp, actual, exp_len)) {
55       Test::FailLater(tostr() << what << ": expected \"" << exp << "\", not \"" << actual << "\" (" << len << " bytes)");
56       failures++;
57     }
58 
59   } else {
60     if (actual) {
61       Test::FailLater(tostr() << what << ": expected Null, not \"" << actual << "\" (" << len << " bytes)");
62       failures++;
63     }
64   }
65 
66   if (!failures)
67     Test::Say(3, tostr() << what << ": matched expectation\n");
68 
69   return failures;
70 }
71 
72 
do_test_null_empty(bool api_version_request)73 static void do_test_null_empty (bool api_version_request) {
74   std::string topic = Test::mk_topic_name("0070_null_empty", 1);
75   const int partition = 0;
76 
77   Test::Say(tostr() << "Testing with api.version.request=" << api_version_request << " on topic " << topic << " partition " << partition << "\n");
78 
79   RdKafka::Conf *conf;
80   Test::conf_init(&conf, NULL, 0);
81   Test::conf_set(conf, "api.version.request",
82                  api_version_request ? "true" : "false");
83   Test::conf_set(conf, "acks", "all");
84 
85 
86   std::string errstr;
87   RdKafka::Producer *p = RdKafka::Producer::create(conf, errstr);
88   if (!p)
89     Test::Fail("Failed to create Producer: " + errstr);
90   delete conf;
91 
92   const int msgcnt = 8;
93   static const char *msgs[msgcnt*2] = {
94     NULL, NULL,
95     "key2", NULL,
96     "key3", "val3",
97     NULL, "val4",
98     "", NULL,
99     NULL, "",
100     "", ""
101   };
102 
103   RdKafka::ErrorCode err;
104 
105   for (int i = 0 ; i < msgcnt * 2 ; i += 2) {
106     Test::Say(3, tostr() << "Produce message #" << (i/2) <<
107               ": key=\"" << (msgs[i] ? msgs[i] : "Null") <<
108               "\", value=\"" << (msgs[i+1] ? msgs[i+1] : "Null") << "\"\n");
109     err = p->produce(topic, partition, RdKafka::Producer::RK_MSG_COPY,
110                      /* Value */
111                      (void *)msgs[i+1], msgs[i+1] ? strlen(msgs[i+1]) : 0,
112                      /* Key */
113                      (void *)msgs[i], msgs[i] ? strlen(msgs[i]) : 0,
114                      0, NULL);
115     if (err != RdKafka::ERR_NO_ERROR)
116       Test::Fail("Produce failed: " + RdKafka::err2str(err));
117   }
118 
119   if (p->flush(tmout_multip(3*5000)) != 0)
120     Test::Fail("Not all messages flushed");
121 
122   Test::Say(tostr() << "Produced " << msgcnt << " messages to " << topic << "\n");
123 
124   delete p;
125 
126   /*
127    * Now consume messages from the beginning, making sure they match
128    * what was produced.
129    */
130 
131   /* Create consumer */
132   Test::conf_init(&conf, NULL, 10);
133   Test::conf_set(conf, "group.id", topic);
134   Test::conf_set(conf, "api.version.request",
135                  api_version_request ? "true" : "false");
136   Test::conf_set(conf, "enable.auto.commit", "false");
137 
138   RdKafka::KafkaConsumer *c = RdKafka::KafkaConsumer::create(conf, errstr);
139   if (!c)
140     Test::Fail("Failed to create KafkaConsumer: " + errstr);
141   delete conf;
142 
143   /* Assign the partition */
144   std::vector<RdKafka::TopicPartition*> parts;
145   parts.push_back(RdKafka::TopicPartition::create(topic, partition,
146                                                  RdKafka::Topic::OFFSET_BEGINNING));
147   err = c->assign(parts);
148   if (err != RdKafka::ERR_NO_ERROR)
149     Test::Fail("assign() failed: " + RdKafka::err2str(err));
150   RdKafka::TopicPartition::destroy(parts);
151 
152   /* Start consuming */
153   int failures = 0;
154   for (int i = 0 ; i < msgcnt * 2 ; i += 2) {
155     RdKafka::Message *msg = c->consume(tmout_multip(5000));
156     if (msg->err())
157       Test::Fail(tostr() << "consume() failed at message " << (i/2) << ": " <<
158                  msg->errstr());
159 
160     /* verify key */
161     failures += check_equal(msgs[i], msg->key() ? msg->key()->c_str() : NULL, msg->key_len(),
162                             tostr() << "message #" << (i/2) << " (offset " << msg->offset() << ") key");
163     /* verify key_pointer() API as too */
164     failures += check_equal(msgs[i], (const char *)msg->key_pointer(), msg->key_len(),
165                 tostr() << "message #" << (i/2) << " (offset " << msg->offset() << ") key");
166 
167     /* verify value */
168     failures += check_equal(msgs[i+1], (const char *)msg->payload(), msg->len(),
169                 tostr() << "message #" << (i/2) << " (offset " << msg->offset() << ") value");
170     delete msg;
171   }
172 
173   Test::Say(tostr() << "Done consuming, closing. " << failures << " test failures\n");
174   if (failures)
175     Test::Fail(tostr() << "See " << failures << "  previous test failure(s)");
176 
177   c->close();
178   delete c;
179 }
180 
181 
182 extern "C" {
main_0070_null_empty(int argc,char ** argv)183   int main_0070_null_empty (int argc, char **argv) {
184     if (test_broker_version >= TEST_BRKVER(0,10,0,0))
185       do_test_null_empty(true);
186     do_test_null_empty(false);
187     return 0;
188   }
189 }
190