1 /*
2 * librdkafka - Apache Kafka C library
3 *
4 * Copyright (c) 2012-2015, Magnus Edenhill
5 * All rights reserved.
6 *
7 * Redistribution and use in source and binary forms, with or without
8 * modification, are permitted provided that the following conditions are met:
9 *
10 * 1. Redistributions of source code must retain the above copyright notice,
11 * this list of conditions and the following disclaimer.
12 * 2. Redistributions in binary form must reproduce the above copyright notice,
13 * this list of conditions and the following disclaimer in the documentation
14 * and/or other materials provided with the distribution.
15 *
16 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
17 * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
18 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
19 * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
20 * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
21 * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
22 * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
23 * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
24 * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
25 * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
26 * POSSIBILITY OF SUCH DAMAGE.
27 */
28
29 #include "test.h"
30 #include "rdkafka.h"
31
32 /**
33 * Message Headers end-to-end tests
34 */
35
36
37
38 static int exp_msgid = 0;
39
40 struct expect {
41 const char *name;
42 const char *value;
43 };
44
45
46
expect_check(const char * what,const struct expect * expected,rd_kafka_message_t * rkmessage,int is_const)47 static void expect_check (const char *what, const struct expect *expected,
48 rd_kafka_message_t *rkmessage, int is_const) {
49 const struct expect *exp;
50 rd_kafka_resp_err_t err;
51 size_t idx = 0;
52 const char *name;
53 const char *value;
54 size_t size;
55 rd_kafka_headers_t *hdrs;
56 int msgid;
57
58 if (rkmessage->len != sizeof(msgid))
59 TEST_FAIL("%s: expected message len %"PRIusz" == sizeof(int)",
60 what, rkmessage->len);
61
62 memcpy(&msgid, rkmessage->payload, rkmessage->len);
63
64 if ((err = rd_kafka_message_headers(rkmessage, &hdrs))) {
65 if (msgid == 0) {
66 rd_kafka_resp_err_t err2;
67 TEST_SAYL(3, "%s: Msg #%d: no headers, good\n",
68 what, msgid);
69
70 err2 = rd_kafka_message_detach_headers(rkmessage, &hdrs);
71 TEST_ASSERT(err == err2,
72 "expected detach_headers() error %s "
73 "to match headers() error %s",
74 rd_kafka_err2str(err2),
75 rd_kafka_err2str(err));
76
77 return; /* No headers expected for first message */
78 }
79
80 TEST_FAIL("%s: Expected headers in message %d: %s", what, msgid,
81 rd_kafka_err2str(err));
82 } else {
83 TEST_ASSERT(msgid != 0,
84 "%s: first message should have no headers", what);
85 }
86
87 test_headers_dump(what, 3, hdrs);
88
89 for (idx = 0, exp = expected ;
90 !rd_kafka_header_get_all(hdrs, idx, &name,
91 (const void **)&value, &size) ;
92 idx++, exp++) {
93
94 TEST_SAYL(3, "%s: Msg #%d: "
95 "Header #%"PRIusz": %s='%s' (expecting %s='%s')\n",
96 what, msgid, idx, name, value ? value : "(NULL)",
97 exp->name, exp->value ? exp->value : "(NULL)");
98
99 if (strcmp(name, exp->name))
100 TEST_FAIL("%s: Msg #%d: "
101 "Expected header %s at idx #%"PRIusz
102 ", not '%s' (%"PRIusz")",
103 what, msgid, exp->name, idx, name,
104 strlen(name));
105
106 if (!strcmp(name, "msgid")) {
107 int vid;
108
109 /* Special handling: compare msgid header value
110 * to message body, should be identical */
111 if (size != rkmessage->len || size != sizeof(int))
112 TEST_FAIL("%s: "
113 "Expected msgid/int-sized payload "
114 "%"PRIusz", got %"PRIusz,
115 what, size, rkmessage->len);
116
117 /* Copy to avoid unaligned access (by cast) */
118 memcpy(&vid, value, size);
119
120 if (vid != msgid)
121 TEST_FAIL("%s: Header msgid %d != payload %d",
122 what, vid, msgid);
123
124 if (exp_msgid != vid)
125 TEST_FAIL("%s: Expected msgid %d, not %d",
126 what, exp_msgid, vid);
127 continue;
128 }
129
130 if (!exp->value) {
131 /* Expected NULL value */
132 TEST_ASSERT(!value,
133 "%s: Expected NULL value for %s, got %s",
134 what, exp->name, value);
135
136 } else {
137 TEST_ASSERT(value,
138 "%s: "
139 "Expected non-NULL value for %s, got NULL",
140 what, exp->name);
141
142 TEST_ASSERT(size == strlen(exp->value),
143 "%s: Expected size %"PRIusz" for %s, "
144 "not %"PRIusz,
145 what, strlen(exp->value), exp->name, size);
146
147 TEST_ASSERT(value[size] == '\0',
148 "%s: "
149 "Expected implicit null-terminator for %s",
150 what, exp->name);
151
152 TEST_ASSERT(!strcmp(exp->value, value),
153 "%s: "
154 "Expected value %s for %s, not %s",
155 what, exp->value, exp->name, value);
156 }
157 }
158
159 TEST_ASSERT(exp->name == NULL,
160 "%s: Expected the expected, but stuck at %s which was "
161 "unexpected",
162 what, exp->name);
163
164 if (!strcmp(what, "handle_consumed_msg") && !is_const &&
165 (msgid % 3) == 0) {
166 rd_kafka_headers_t *dhdrs;
167
168 err = rd_kafka_message_detach_headers(rkmessage, &dhdrs);
169 TEST_ASSERT(!err,
170 "detach_headers() should not fail, got %s",
171 rd_kafka_err2str(err));
172 TEST_ASSERT(hdrs == dhdrs);
173
174 /* Verify that a new headers object can be obtained */
175 err = rd_kafka_message_headers(rkmessage, &hdrs);
176 TEST_ASSERT(err == RD_KAFKA_RESP_ERR_NO_ERROR);
177 TEST_ASSERT(hdrs != dhdrs);
178 rd_kafka_headers_destroy(dhdrs);
179
180 expect_check("post_detach_headers", expected,
181 rkmessage, is_const);
182 }
183 }
184
185
186 /**
187 * @brief Final (as in no more header modifications) message check.
188 */
msg_final_check(const char * what,rd_kafka_message_t * rkmessage,int is_const)189 static void msg_final_check (const char *what,
190 rd_kafka_message_t *rkmessage, int is_const) {
191 const struct expect expected[] = {
192 { "msgid", NULL }, /* special handling */
193 { "static", "hey" },
194 { "null", NULL },
195 { "empty", "" },
196 { "send1", "1" },
197 { "multi", "multi5" },
198 { NULL }
199 };
200
201 expect_check(what, expected, rkmessage, is_const);
202
203 exp_msgid++;
204
205
206 }
207
208 /**
209 * @brief Handle consumed message, must be identical to dr_msg_cb
210 */
handle_consumed_msg(rd_kafka_message_t * rkmessage)211 static void handle_consumed_msg (rd_kafka_message_t *rkmessage) {
212 msg_final_check(__FUNCTION__, rkmessage, 0);
213 }
214
215 /**
216 * @brief Delivery report callback
217 */
dr_msg_cb(rd_kafka_t * rk,const rd_kafka_message_t * rkmessage,void * opaque)218 static void dr_msg_cb (rd_kafka_t *rk,
219 const rd_kafka_message_t *rkmessage, void *opaque) {
220 TEST_ASSERT(!rkmessage->err,
221 "Message delivery failed: %s",
222 rd_kafka_err2str(rkmessage->err));
223
224 msg_final_check(__FUNCTION__, (rd_kafka_message_t *)rkmessage, 1);
225 }
226
227
228 /**
229 * @brief First on_send() interceptor
230 */
on_send1(rd_kafka_t * rk,rd_kafka_message_t * rkmessage,void * ic_opaque)231 static rd_kafka_resp_err_t on_send1 (rd_kafka_t *rk,
232 rd_kafka_message_t *rkmessage,
233 void *ic_opaque) {
234 const struct expect expected[] = {
235 { "msgid", NULL }, /* special handling */
236 { "static", "hey" },
237 { "multi", "multi1" },
238 { "multi", "multi2" },
239 { "multi", "multi3" },
240 { "null", NULL },
241 { "empty", "" },
242 { NULL }
243 };
244 rd_kafka_headers_t *hdrs;
245 rd_kafka_resp_err_t err;
246
247 expect_check(__FUNCTION__, expected, rkmessage, 0);
248
249 err = rd_kafka_message_headers(rkmessage, &hdrs);
250 if (err) /* First message has no headers. */
251 return RD_KAFKA_RESP_ERR_NO_ERROR;
252
253 rd_kafka_header_add(hdrs, "multi", -1, "multi4", -1);
254 rd_kafka_header_add(hdrs, "send1", -1, "1", -1);
255 rd_kafka_header_remove(hdrs, "multi");
256 rd_kafka_header_add(hdrs, "multi", -1, "multi5", -1);
257
258 return RD_KAFKA_RESP_ERR_NO_ERROR;
259 }
260
261
262 /**
263 * @brief Second on_send() interceptor
264 */
on_send2(rd_kafka_t * rk,rd_kafka_message_t * rkmessage,void * ic_opaque)265 static rd_kafka_resp_err_t on_send2 (rd_kafka_t *rk,
266 rd_kafka_message_t *rkmessage,
267 void *ic_opaque) {
268 const struct expect expected[] = {
269 { "msgid", NULL }, /* special handling */
270 { "static", "hey" },
271 { "null", NULL },
272 { "empty", "" },
273 { "send1", "1" },
274 { "multi", "multi5" },
275 { NULL }
276 };
277
278 expect_check(__FUNCTION__, expected, rkmessage, 0);
279
280 return RD_KAFKA_RESP_ERR_NO_ERROR;
281 }
282
283 /**
284 * @brief on_new() interceptor to set up message interceptors
285 * from rd_kafka_new().
286 */
on_new(rd_kafka_t * rk,const rd_kafka_conf_t * conf,void * ic_opaque,char * errstr,size_t errstr_size)287 static rd_kafka_resp_err_t on_new (rd_kafka_t *rk, const rd_kafka_conf_t *conf,
288 void *ic_opaque,
289 char *errstr, size_t errstr_size) {
290 rd_kafka_interceptor_add_on_send(rk, __FILE__, on_send1, NULL);
291 rd_kafka_interceptor_add_on_send(rk, __FILE__, on_send2, NULL);
292 return RD_KAFKA_RESP_ERR_NO_ERROR;
293 }
294
295
do_produce(const char * topic,int msgcnt)296 static void do_produce (const char *topic, int msgcnt) {
297 rd_kafka_t *rk;
298 rd_kafka_conf_t *conf;
299 int i;
300 rd_kafka_resp_err_t err;
301
302 test_conf_init(&conf, NULL, 0);
303 test_conf_set(conf, "acks", "all");
304 rd_kafka_conf_set_dr_msg_cb(conf, dr_msg_cb);
305
306 rd_kafka_conf_interceptor_add_on_new(conf, __FILE__, on_new, NULL);
307
308 rk = test_create_handle(RD_KAFKA_PRODUCER, conf);
309
310 /* First message is without headers (negative testing) */
311 i = 0;
312 err = rd_kafka_producev(
313 rk,
314 RD_KAFKA_V_TOPIC(topic),
315 RD_KAFKA_V_PARTITION(0),
316 RD_KAFKA_V_VALUE(&i, sizeof(i)),
317 RD_KAFKA_V_MSGFLAGS(RD_KAFKA_MSG_F_COPY),
318 RD_KAFKA_V_END);
319 TEST_ASSERT(!err,
320 "producev() failed: %s", rd_kafka_err2str(err));
321 exp_msgid++;
322
323 for (i = 1 ; i < msgcnt ; i++, exp_msgid++) {
324 err = rd_kafka_producev(
325 rk,
326 RD_KAFKA_V_TOPIC(topic),
327 RD_KAFKA_V_PARTITION(0),
328 RD_KAFKA_V_VALUE(&i, sizeof(i)),
329 RD_KAFKA_V_MSGFLAGS(RD_KAFKA_MSG_F_COPY),
330 RD_KAFKA_V_HEADER("msgid", &i, sizeof(i)),
331 RD_KAFKA_V_HEADER("static", "hey", -1),
332 RD_KAFKA_V_HEADER("multi", "multi1", -1),
333 RD_KAFKA_V_HEADER("multi", "multi2", 6),
334 RD_KAFKA_V_HEADER("multi", "multi3", strlen("multi3")),
335 RD_KAFKA_V_HEADER("null", NULL, 0),
336 RD_KAFKA_V_HEADER("empty", "", 0),
337 RD_KAFKA_V_END);
338 TEST_ASSERT(!err,
339 "producev() failed: %s", rd_kafka_err2str(err));
340 }
341
342 /* Reset expected message id for dr */
343 exp_msgid = 0;
344
345 /* Wait for timeouts and delivery reports */
346 rd_kafka_flush(rk, tmout_multip(5000));
347
348 rd_kafka_destroy(rk);
349 }
350
do_consume(const char * topic,int msgcnt)351 static void do_consume (const char *topic, int msgcnt) {
352 rd_kafka_t *rk;
353 rd_kafka_topic_partition_list_t *parts;
354
355 rk = test_create_consumer(topic, NULL, NULL, NULL);
356
357 parts = rd_kafka_topic_partition_list_new(1);
358 rd_kafka_topic_partition_list_add(parts, topic, 0)->offset =
359 RD_KAFKA_OFFSET_BEGINNING;
360
361 test_consumer_assign("assign", rk, parts);
362
363 rd_kafka_topic_partition_list_destroy(parts);
364
365 exp_msgid = 0;
366
367 while (exp_msgid < msgcnt) {
368 rd_kafka_message_t *rkm;
369
370 rkm = rd_kafka_consumer_poll(rk, 1000);
371 if (!rkm)
372 continue;
373
374 if (rkm->err)
375 TEST_FAIL("consume error while expecting msgid %d/%d: "
376 "%s",
377 exp_msgid, msgcnt,
378 rd_kafka_message_errstr(rkm));
379
380 handle_consumed_msg(rkm);
381
382 rd_kafka_message_destroy(rkm);
383 }
384
385 test_consumer_close(rk);
386 rd_kafka_destroy(rk);
387 }
388
389
main_0073_headers(int argc,char ** argv)390 int main_0073_headers (int argc, char **argv) {
391 const char *topic = test_mk_topic_name(__FUNCTION__ + 5, 1);
392 const int msgcnt = 10;
393
394 do_produce(topic, msgcnt);
395 do_consume(topic, msgcnt);
396
397 return 0;
398 }
399