1 /*
2 * librdkafka - Apache Kafka C library
3 *
4 * Copyright (c) 2012-2015, Magnus Edenhill
5 * All rights reserved.
6 *
7 * Redistribution and use in source and binary forms, with or without
8 * modification, are permitted provided that the following conditions are met:
9 *
10 * 1. Redistributions of source code must retain the above copyright notice,
11 * this list of conditions and the following disclaimer.
12 * 2. Redistributions in binary form must reproduce the above copyright notice,
13 * this list of conditions and the following disclaimer in the documentation
14 * and/or other materials provided with the distribution.
15 *
16 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
17 * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
18 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
19 * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
20 * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
21 * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
22 * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
23 * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
24 * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
25 * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
26 * POSSIBILITY OF SUCH DAMAGE.
27 */
28
29 #include "test.h"
30 #include "rdkafka.h"
31
32 /**
33 * Local (no broker) unit-like tests of Message Headers
34 */
35
36
37
38 static int exp_msgid = 0;
39
40 struct expect {
41 const char *name;
42 const char *value;
43 };
44
45 /**
46 * @brief returns the message id
47 */
expect_check(const char * what,const struct expect * expected,const rd_kafka_message_t * rkmessage)48 static int expect_check (const char *what, const struct expect *expected,
49 const rd_kafka_message_t *rkmessage) {
50 const struct expect *exp;
51 rd_kafka_resp_err_t err;
52 size_t idx = 0;
53 const char *name;
54 const char *value;
55 size_t size;
56 rd_kafka_headers_t *hdrs;
57 int msgid;
58
59 if (rkmessage->len != sizeof(msgid))
60 TEST_FAIL("%s: expected message len %"PRIusz" == sizeof(int)",
61 what, rkmessage->len);
62
63 memcpy(&msgid, rkmessage->payload, rkmessage->len);
64
65 if ((err = rd_kafka_message_headers(rkmessage, &hdrs))) {
66 if (msgid == 0)
67 return 0; /* No headers expected for first message */
68
69 TEST_FAIL("%s: Expected headers in message %d: %s", what, msgid,
70 rd_kafka_err2str(err));
71 } else {
72 TEST_ASSERT(msgid != 0,
73 "%s: first message should have no headers", what);
74 }
75
76 /* msgid should always be first and has a variable value so hard to
77 * match with the expect struct. */
78 for (idx = 0, exp = expected ;
79 !rd_kafka_header_get_all(hdrs, idx, &name,
80 (const void **)&value, &size) ;
81 idx++, exp++) {
82
83 TEST_SAYL(3, "%s: Msg #%d: "
84 "Header #%"PRIusz": %s='%s' (expecting %s='%s')\n",
85 what, msgid, idx, name, value ? value : "(NULL)",
86 exp->name, exp->value ? exp->value : "(NULL)");
87
88 if (strcmp(name, exp->name))
89 TEST_FAIL("%s: Expected header %s at idx #%"PRIusz
90 ", not %s",
91 what, exp->name, idx-1, name);
92
93 if (!strcmp(name, "msgid")) {
94 int vid;
95
96 /* Special handling: compare msgid header value
97 * to message body, should be identical */
98 if (size != rkmessage->len || size != sizeof(int))
99 TEST_FAIL("%s: "
100 "Expected msgid/int-sized payload "
101 "%"PRIusz", got %"PRIusz,
102 what, size, rkmessage->len);
103
104 /* Copy to avoid unaligned access (by cast) */
105 memcpy(&vid, value, size);
106
107 if (vid != msgid)
108 TEST_FAIL("%s: Header msgid %d != payload %d",
109 what, vid, msgid);
110
111 if (exp_msgid != vid)
112 TEST_FAIL("%s: Expected msgid %d, not %d",
113 what, exp_msgid, vid);
114 continue;
115 }
116
117 if (!exp->value) {
118 /* Expected NULL value */
119 TEST_ASSERT(!value,
120 "%s: Expected NULL value for %s, got %s",
121 what, exp->name, value);
122
123 } else {
124 TEST_ASSERT(value,
125 "%s: "
126 "Expected non-NULL value for %s, got NULL",
127 what, exp->name);
128
129 TEST_ASSERT(size == strlen(exp->value),
130 "%s: Expected size %"PRIusz" for %s, "
131 "not %"PRIusz,
132 what, strlen(exp->value), exp->name, size);
133
134 TEST_ASSERT(value[size] == '\0',
135 "%s: "
136 "Expected implicit null-terminator for %s",
137 what, exp->name);
138
139 TEST_ASSERT(!strcmp(exp->value, value),
140 "%s: "
141 "Expected value %s for %s, not %s",
142 what, exp->value, exp->name, value);
143 }
144 }
145
146 TEST_ASSERT(exp->name == NULL,
147 "%s: Expected the expected, but stuck at %s which was "
148 "unexpected",
149 what, exp->name);
150
151 return msgid;
152 }
153
154
155 /**
156 * @brief Delivery report callback
157 */
dr_msg_cb(rd_kafka_t * rk,const rd_kafka_message_t * rkmessage,void * opaque)158 static void dr_msg_cb (rd_kafka_t *rk,
159 const rd_kafka_message_t *rkmessage, void *opaque) {
160 const struct expect expected[] = {
161 { "msgid", NULL }, /* special handling */
162 { "static", "hey" },
163 { "null", NULL },
164 { "empty", "" },
165 { "send1", "1" },
166 { "multi", "multi5" },
167 { NULL }
168 };
169 const struct expect replace_expected[] = {
170 { "msgid", NULL },
171 { "new", "one" },
172 { "this is the", NULL },
173 { "replaced headers\"", "" },
174 { "new", "right?" },
175 { NULL }
176 };
177 const struct expect *exp;
178 rd_kafka_headers_t *new_hdrs;
179 int msgid;
180
181 TEST_ASSERT(rkmessage->err == RD_KAFKA_RESP_ERR__MSG_TIMED_OUT,
182 "Expected message to fail with MSG_TIMED_OUT, not %s",
183 rd_kafka_err2str(rkmessage->err));
184
185 msgid = expect_check(__FUNCTION__, expected, rkmessage);
186
187 /* Replace entire headers list */
188 if (msgid > 0) {
189 new_hdrs = rd_kafka_headers_new(1);
190 rd_kafka_header_add(new_hdrs, "msgid", -1,
191 &msgid, sizeof(msgid));
192 for (exp = &replace_expected[1] ; exp->name ; exp++)
193 rd_kafka_header_add(new_hdrs,
194 exp->name, -1, exp->value, -1);
195
196 rd_kafka_message_set_headers((rd_kafka_message_t *)rkmessage,
197 new_hdrs);
198
199 expect_check(__FUNCTION__, replace_expected, rkmessage);
200 }
201
202 exp_msgid++;
203
204 }
205
expect_iter(const char * what,const rd_kafka_headers_t * hdrs,const char * name,const char ** expected,size_t cnt)206 static void expect_iter (const char *what,
207 const rd_kafka_headers_t *hdrs, const char *name,
208 const char **expected, size_t cnt) {
209 size_t idx;
210 rd_kafka_resp_err_t err;
211 const void *value;
212 size_t size;
213
214 for (idx = 0 ;
215 !(err = rd_kafka_header_get(hdrs, idx, name, &value, &size)) ;\
216 idx++) {
217 TEST_ASSERT(idx < cnt,
218 "%s: too many headers matching '%s', "
219 "expected %"PRIusz,
220 what, name, cnt);
221 TEST_SAYL(3, "%s: get(%"PRIusz", '%s') "
222 "expecting '%s' =? '%s'\n",
223 what, idx, name, expected[idx], (const char *)value);
224
225
226 TEST_ASSERT(!strcmp((const char *)value, expected[idx]),
227 "%s: get(%"PRIusz", '%s') expected '%s', not '%s'",
228 what, idx, name, expected[idx],
229 (const char *)value);
230 }
231
232 TEST_ASSERT(idx == cnt,
233 "%s: expected %"PRIusz" headers matching '%s', not %"PRIusz,
234 what, cnt, name, idx);
235 }
236
237
238
239 /**
240 * @brief First on_send() interceptor
241 */
on_send1(rd_kafka_t * rk,rd_kafka_message_t * rkmessage,void * ic_opaque)242 static rd_kafka_resp_err_t on_send1 (rd_kafka_t *rk,
243 rd_kafka_message_t *rkmessage,
244 void *ic_opaque) {
245 const struct expect expected[] = {
246 { "msgid", NULL }, /* special handling */
247 { "static", "hey" },
248 { "multi", "multi1" },
249 { "multi", "multi2" },
250 { "multi", "multi3" },
251 { "null", NULL },
252 { "empty", "" },
253 { NULL }
254 };
255 const char *expect_iter_multi[4] = {
256 "multi1",
257 "multi2",
258 "multi3",
259 "multi4" /* added below */
260 };
261 const char *expect_iter_static[1] = {
262 "hey"
263 };
264 rd_kafka_headers_t *hdrs;
265 size_t header_cnt;
266 rd_kafka_resp_err_t err;
267 const void *value;
268 size_t size;
269
270 expect_check(__FUNCTION__, expected, rkmessage);
271
272 err = rd_kafka_message_headers(rkmessage, &hdrs);
273 if (err) /* First message has no headers. */
274 return RD_KAFKA_RESP_ERR_NO_ERROR;
275
276 header_cnt = rd_kafka_header_cnt(hdrs);
277 TEST_ASSERT(header_cnt == 7,
278 "Expected 7 length got %"PRIusz"", header_cnt);
279
280 rd_kafka_header_add(hdrs, "multi", -1, "multi4", -1);
281
282 header_cnt = rd_kafka_header_cnt(hdrs);
283 TEST_ASSERT(header_cnt == 8,
284 "Expected 8 length got %"PRIusz"", header_cnt);
285
286 /* test iter() */
287 expect_iter(__FUNCTION__, hdrs, "multi", expect_iter_multi, 4);
288 expect_iter(__FUNCTION__, hdrs, "static", expect_iter_static, 1);
289 expect_iter(__FUNCTION__, hdrs, "notexists", NULL, 0);
290
291 rd_kafka_header_add(hdrs, "send1", -1, "1", -1);
292
293 header_cnt = rd_kafka_header_cnt(hdrs);
294 TEST_ASSERT(header_cnt == 9,
295 "Expected 9 length got %"PRIusz"", header_cnt);
296
297 rd_kafka_header_remove(hdrs, "multi");
298
299 header_cnt = rd_kafka_header_cnt(hdrs);
300 TEST_ASSERT(header_cnt == 5,
301 "Expected 5 length got %"PRIusz"", header_cnt);
302
303 rd_kafka_header_add(hdrs, "multi", -1, "multi5", -1);
304
305 header_cnt = rd_kafka_header_cnt(hdrs);
306 TEST_ASSERT(header_cnt == 6,
307 "Expected 6 length got %"PRIusz"", header_cnt);
308
309 /* test get_last() */
310 err = rd_kafka_header_get_last(hdrs, "multi", &value, &size);
311 TEST_ASSERT(!err, "%s", rd_kafka_err2str(err));
312 TEST_ASSERT(size == strlen("multi5") &&
313 !strcmp((const char *)value, "multi5"),
314 "expected 'multi5', not '%s'",
315 (const char *)value);
316
317 return RD_KAFKA_RESP_ERR_NO_ERROR;
318 }
319
320
321 /**
322 * @brief Second on_send() interceptor
323 */
on_send2(rd_kafka_t * rk,rd_kafka_message_t * rkmessage,void * ic_opaque)324 static rd_kafka_resp_err_t on_send2 (rd_kafka_t *rk,
325 rd_kafka_message_t *rkmessage,
326 void *ic_opaque) {
327 const struct expect expected[] = {
328 { "msgid", NULL }, /* special handling */
329 { "static", "hey" },
330 { "null", NULL },
331 { "empty", "" },
332 { "send1", "1" },
333 { "multi", "multi5" },
334 { NULL }
335 };
336
337 expect_check(__FUNCTION__, expected, rkmessage);
338
339 return RD_KAFKA_RESP_ERR_NO_ERROR;
340 }
341
342 /**
343 * @brief on_new() interceptor to set up message interceptors
344 * from rd_kafka_new().
345 */
on_new(rd_kafka_t * rk,const rd_kafka_conf_t * conf,void * ic_opaque,char * errstr,size_t errstr_size)346 static rd_kafka_resp_err_t on_new (rd_kafka_t *rk, const rd_kafka_conf_t *conf,
347 void *ic_opaque,
348 char *errstr, size_t errstr_size) {
349 rd_kafka_interceptor_add_on_send(rk, __FILE__, on_send1, NULL);
350 rd_kafka_interceptor_add_on_send(rk, __FILE__, on_send2, NULL);
351 return RD_KAFKA_RESP_ERR_NO_ERROR;
352 }
353
354
main_0072_headers_ut(int argc,char ** argv)355 int main_0072_headers_ut (int argc, char **argv) {
356 const char *topic = test_mk_topic_name(__FUNCTION__ + 5, 0);
357 rd_kafka_t *rk;
358 rd_kafka_conf_t *conf;
359 int i;
360 size_t header_cnt;
361 const int msgcnt = 10;
362 rd_kafka_resp_err_t err;
363
364 conf = rd_kafka_conf_new();
365 test_conf_set(conf, "message.timeout.ms", "1");
366 rd_kafka_conf_set_dr_msg_cb(conf, dr_msg_cb);
367
368 rd_kafka_conf_interceptor_add_on_new(conf, __FILE__, on_new, NULL);
369
370 rk = test_create_handle(RD_KAFKA_PRODUCER, conf);
371
372 /* First message is without headers (negative testing) */
373 i = 0;
374 err = rd_kafka_producev(
375 rk,
376 RD_KAFKA_V_TOPIC(topic),
377 RD_KAFKA_V_VALUE(&i, sizeof(i)),
378 RD_KAFKA_V_MSGFLAGS(RD_KAFKA_MSG_F_COPY),
379 RD_KAFKA_V_END);
380 TEST_ASSERT(!err,
381 "producev() failed: %s", rd_kafka_err2str(err));
382 exp_msgid++;
383
384 for (i = 1 ; i < msgcnt ; i++, exp_msgid++) {
385 /* Use headers list on one message */
386 if (i == 3) {
387 rd_kafka_headers_t *hdrs = rd_kafka_headers_new(4);
388
389 header_cnt = rd_kafka_header_cnt(hdrs);
390 TEST_ASSERT(header_cnt == 0,
391 "Expected 0 length got %"PRIusz"", header_cnt);
392
393 rd_kafka_headers_t *copied;
394
395 rd_kafka_header_add(hdrs, "msgid", -1, &i, sizeof(i));
396 rd_kafka_header_add(hdrs, "static", -1, "hey", -1);
397 rd_kafka_header_add(hdrs, "multi", -1, "multi1", -1);
398 rd_kafka_header_add(hdrs, "multi", -1, "multi2", 6);
399 rd_kafka_header_add(hdrs, "multi", -1, "multi3", strlen("multi3"));
400 rd_kafka_header_add(hdrs, "null", -1, NULL, 0);
401
402 /* Make a copy of the headers to verify copy() */
403 copied = rd_kafka_headers_copy(hdrs);
404
405 header_cnt = rd_kafka_header_cnt(hdrs);
406 TEST_ASSERT(header_cnt == 6,
407 "Expected 6 length got %"PRIusz"", header_cnt);
408
409 rd_kafka_headers_destroy(hdrs);
410
411 /* Last header ("empty") is added below */
412
413 /* Try unsupported _V_HEADER() and _V_HEADERS() mix,
414 * must fail with CONFLICT */
415 err = rd_kafka_producev(
416 rk,
417 RD_KAFKA_V_TOPIC(topic),
418 RD_KAFKA_V_VALUE(&i, sizeof(i)),
419 RD_KAFKA_V_MSGFLAGS(RD_KAFKA_MSG_F_COPY),
420 RD_KAFKA_V_HEADER("will_be_removed", "yep", -1),
421 RD_KAFKA_V_HEADERS(copied),
422 RD_KAFKA_V_HEADER("empty", "", 0),
423 RD_KAFKA_V_END);
424 TEST_ASSERT(err == RD_KAFKA_RESP_ERR__CONFLICT,
425 "producev(): expected CONFLICT, got %s",
426 rd_kafka_err2str(err));
427
428 /* Proper call using only _V_HEADERS() */
429 rd_kafka_header_add(copied, "empty", -1, "", -1);
430 err = rd_kafka_producev(
431 rk,
432 RD_KAFKA_V_TOPIC(topic),
433 RD_KAFKA_V_VALUE(&i, sizeof(i)),
434 RD_KAFKA_V_MSGFLAGS(RD_KAFKA_MSG_F_COPY),
435 RD_KAFKA_V_HEADERS(copied),
436 RD_KAFKA_V_END);
437 TEST_ASSERT(!err, "producev() failed: %s",
438 rd_kafka_err2str(err));
439
440 } else {
441 err = rd_kafka_producev(
442 rk,
443 RD_KAFKA_V_TOPIC(topic),
444 RD_KAFKA_V_VALUE(&i, sizeof(i)),
445 RD_KAFKA_V_MSGFLAGS(RD_KAFKA_MSG_F_COPY),
446 RD_KAFKA_V_HEADER("msgid", &i, sizeof(i)),
447 RD_KAFKA_V_HEADER("static", "hey", -1),
448 RD_KAFKA_V_HEADER("multi", "multi1", -1),
449 RD_KAFKA_V_HEADER("multi", "multi2", 6),
450 RD_KAFKA_V_HEADER("multi", "multi3", strlen("multi3")),
451 RD_KAFKA_V_HEADER("null", NULL, 0),
452 RD_KAFKA_V_HEADER("empty", "", 0),
453 RD_KAFKA_V_END);
454 TEST_ASSERT(!err,
455 "producev() failed: %s", rd_kafka_err2str(err));
456 }
457 }
458
459 /* Reset expected message id for dr */
460 exp_msgid = 0;
461
462 /* Wait for timeouts and delivery reports */
463 rd_kafka_flush(rk, 5000);
464
465 rd_kafka_destroy(rk);
466
467 return 0;
468 }
469