1 /*
2  * librdkafka - Apache Kafka C library
3  *
4  * Copyright (c) 2012-2015, Magnus Edenhill
5  * All rights reserved.
6  *
7  * Redistribution and use in source and binary forms, with or without
8  * modification, are permitted provided that the following conditions are met:
9  *
10  * 1. Redistributions of source code must retain the above copyright notice,
11  *    this list of conditions and the following disclaimer.
12  * 2. Redistributions in binary form must reproduce the above copyright notice,
13  *    this list of conditions and the following disclaimer in the documentation
14  *    and/or other materials provided with the distribution.
15  *
16  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
17  * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
18  * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
19  * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
20  * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
21  * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
22  * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
23  * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
24  * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
25  * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
26  * POSSIBILITY OF SUCH DAMAGE.
27  */
28 
29 #include "test.h"
30 #include "rdkafka.h"
31 
32 /**
33  * Local (no broker) unit-like tests of Message Headers
34  */
35 
36 
37 
38 static int exp_msgid = 0;
39 
40 struct expect {
41         const char *name;
42         const char *value;
43 };
44 
45 /**
46  * @brief returns the message id
47  */
expect_check(const char * what,const struct expect * expected,const rd_kafka_message_t * rkmessage)48 static int expect_check (const char *what, const struct expect *expected,
49                           const rd_kafka_message_t *rkmessage) {
50         const struct expect *exp;
51         rd_kafka_resp_err_t err;
52         size_t idx = 0;
53         const char *name;
54         const char *value;
55         size_t size;
56         rd_kafka_headers_t *hdrs;
57         int msgid;
58 
59         if (rkmessage->len != sizeof(msgid))
60                 TEST_FAIL("%s: expected message len %"PRIusz" == sizeof(int)",
61                           what, rkmessage->len);
62 
63         memcpy(&msgid, rkmessage->payload, rkmessage->len);
64 
65         if ((err = rd_kafka_message_headers(rkmessage, &hdrs))) {
66                 if (msgid == 0)
67                         return 0; /* No headers expected for first message */
68 
69                 TEST_FAIL("%s: Expected headers in message %d: %s", what, msgid,
70                           rd_kafka_err2str(err));
71         } else {
72                 TEST_ASSERT(msgid != 0,
73                             "%s: first message should have no headers", what);
74         }
75 
76         /* msgid should always be first and has a variable value so hard to
77          * match with the expect struct. */
78         for (idx = 0, exp = expected ;
79              !rd_kafka_header_get_all(hdrs, idx, &name,
80                                       (const void **)&value, &size) ;
81              idx++, exp++) {
82 
83                 TEST_SAYL(3, "%s: Msg #%d: "
84                           "Header #%"PRIusz": %s='%s' (expecting %s='%s')\n",
85                           what, msgid, idx, name, value ? value : "(NULL)",
86                           exp->name, exp->value ? exp->value : "(NULL)");
87 
88                 if (strcmp(name, exp->name))
89                         TEST_FAIL("%s: Expected header %s at idx #%"PRIusz
90                                   ", not %s",
91                                   what, exp->name, idx-1, name);
92 
93                 if (!strcmp(name, "msgid")) {
94                         int vid;
95 
96                         /* Special handling: compare msgid header value
97                          * to message body, should be identical */
98                         if (size != rkmessage->len || size != sizeof(int))
99                                 TEST_FAIL("%s: "
100                                           "Expected msgid/int-sized payload "
101                                           "%"PRIusz", got %"PRIusz,
102                                           what, size, rkmessage->len);
103 
104                         /* Copy to avoid unaligned access (by cast) */
105                         memcpy(&vid, value, size);
106 
107                         if (vid != msgid)
108                                 TEST_FAIL("%s: Header msgid %d != payload %d",
109                                           what, vid, msgid);
110 
111                         if (exp_msgid != vid)
112                                 TEST_FAIL("%s: Expected msgid %d, not %d",
113                                           what, exp_msgid, vid);
114                         continue;
115                 }
116 
117                 if (!exp->value) {
118                         /* Expected NULL value */
119                         TEST_ASSERT(!value,
120                                     "%s: Expected NULL value for %s, got %s",
121                                     what, exp->name, value);
122 
123                 } else {
124                         TEST_ASSERT(value,
125                                     "%s: "
126                                     "Expected non-NULL value for %s, got NULL",
127                                     what, exp->name);
128 
129                         TEST_ASSERT(size == strlen(exp->value),
130                                     "%s: Expected size %"PRIusz" for %s, "
131                                     "not %"PRIusz,
132                                     what, strlen(exp->value), exp->name, size);
133 
134                         TEST_ASSERT(value[size] == '\0',
135                                     "%s: "
136                                     "Expected implicit null-terminator for %s",
137                                     what, exp->name);
138 
139                         TEST_ASSERT(!strcmp(exp->value, value),
140                                     "%s: "
141                                     "Expected value %s for %s, not %s",
142                                     what, exp->value, exp->name, value);
143                 }
144         }
145 
146         TEST_ASSERT(exp->name == NULL,
147                     "%s: Expected the expected, but stuck at %s which was "
148                     "unexpected",
149                     what, exp->name);
150 
151         return msgid;
152 }
153 
154 
155 /**
156  * @brief Delivery report callback
157  */
dr_msg_cb(rd_kafka_t * rk,const rd_kafka_message_t * rkmessage,void * opaque)158 static void dr_msg_cb (rd_kafka_t *rk,
159                        const rd_kafka_message_t *rkmessage, void *opaque) {
160         const struct expect expected[] = {
161                 { "msgid", NULL }, /* special handling */
162                 { "static", "hey" },
163                 { "null", NULL },
164                 { "empty", "" },
165                 { "send1", "1" },
166                 { "multi", "multi5" },
167                 { NULL }
168         };
169         const struct expect replace_expected[] = {
170                 { "msgid", NULL },
171                 { "new", "one" },
172                 { "this is the", NULL },
173                 { "replaced headers\"", "" },
174                 { "new", "right?" },
175                 { NULL }
176         };
177         const struct expect *exp;
178         rd_kafka_headers_t *new_hdrs;
179         int msgid;
180 
181         TEST_ASSERT(rkmessage->err == RD_KAFKA_RESP_ERR__MSG_TIMED_OUT,
182                     "Expected message to fail with MSG_TIMED_OUT, not %s",
183                     rd_kafka_err2str(rkmessage->err));
184 
185         msgid = expect_check(__FUNCTION__, expected, rkmessage);
186 
187         /* Replace entire headers list */
188         if (msgid > 0) {
189                 new_hdrs = rd_kafka_headers_new(1);
190                 rd_kafka_header_add(new_hdrs, "msgid", -1,
191                                     &msgid, sizeof(msgid));
192                 for (exp = &replace_expected[1] ; exp->name ; exp++)
193                         rd_kafka_header_add(new_hdrs,
194                                             exp->name, -1, exp->value, -1);
195 
196                 rd_kafka_message_set_headers((rd_kafka_message_t *)rkmessage,
197                                              new_hdrs);
198 
199                 expect_check(__FUNCTION__, replace_expected, rkmessage);
200         }
201 
202         exp_msgid++;
203 
204 }
205 
expect_iter(const char * what,const rd_kafka_headers_t * hdrs,const char * name,const char ** expected,size_t cnt)206 static void expect_iter (const char *what,
207                          const rd_kafka_headers_t *hdrs, const char *name,
208                          const char **expected, size_t cnt) {
209         size_t idx;
210         rd_kafka_resp_err_t err;
211         const void *value;
212         size_t size;
213 
214         for (idx = 0 ;
215              !(err = rd_kafka_header_get(hdrs, idx, name, &value, &size)) ;\
216              idx++) {
217                 TEST_ASSERT(idx < cnt,
218                             "%s: too many headers matching '%s', "
219                             "expected %"PRIusz,
220                             what, name, cnt);
221                 TEST_SAYL(3, "%s: get(%"PRIusz", '%s') "
222                           "expecting '%s' =? '%s'\n",
223                           what, idx, name, expected[idx], (const char *)value);
224 
225 
226                 TEST_ASSERT(!strcmp((const char *)value, expected[idx]),
227                             "%s: get(%"PRIusz", '%s') expected '%s', not '%s'",
228                             what, idx, name, expected[idx],
229                             (const char *)value);
230         }
231 
232         TEST_ASSERT(idx == cnt,
233                     "%s: expected %"PRIusz" headers matching '%s', not %"PRIusz,
234                     what, cnt, name, idx);
235 }
236 
237 
238 
239 /**
240  * @brief First on_send() interceptor
241  */
on_send1(rd_kafka_t * rk,rd_kafka_message_t * rkmessage,void * ic_opaque)242 static rd_kafka_resp_err_t on_send1 (rd_kafka_t *rk,
243                                      rd_kafka_message_t *rkmessage,
244                                      void *ic_opaque) {
245         const struct expect expected[] = {
246                 { "msgid", NULL }, /* special handling */
247                 { "static", "hey" },
248                 { "multi", "multi1" },
249                 { "multi", "multi2" },
250                 { "multi", "multi3" },
251                 { "null", NULL },
252                 { "empty", "" },
253                 { NULL }
254         };
255         const char *expect_iter_multi[4] = {
256                 "multi1",
257                 "multi2",
258                 "multi3",
259                 "multi4" /* added below */
260         };
261         const char *expect_iter_static[1] = {
262                 "hey"
263         };
264         rd_kafka_headers_t *hdrs;
265         size_t header_cnt;
266         rd_kafka_resp_err_t err;
267         const void *value;
268         size_t size;
269 
270         expect_check(__FUNCTION__, expected, rkmessage);
271 
272         err = rd_kafka_message_headers(rkmessage, &hdrs);
273         if (err) /* First message has no headers. */
274                 return RD_KAFKA_RESP_ERR_NO_ERROR;
275 
276         header_cnt = rd_kafka_header_cnt(hdrs);
277         TEST_ASSERT(header_cnt == 7,
278                     "Expected 7 length got %"PRIusz"", header_cnt);
279 
280         rd_kafka_header_add(hdrs, "multi", -1, "multi4", -1);
281 
282         header_cnt = rd_kafka_header_cnt(hdrs);
283         TEST_ASSERT(header_cnt == 8,
284                     "Expected 8 length got %"PRIusz"", header_cnt);
285 
286         /* test iter() */
287         expect_iter(__FUNCTION__, hdrs, "multi", expect_iter_multi, 4);
288         expect_iter(__FUNCTION__, hdrs, "static", expect_iter_static, 1);
289         expect_iter(__FUNCTION__, hdrs, "notexists", NULL, 0);
290 
291         rd_kafka_header_add(hdrs, "send1", -1, "1", -1);
292 
293         header_cnt = rd_kafka_header_cnt(hdrs);
294         TEST_ASSERT(header_cnt == 9,
295                     "Expected 9 length got %"PRIusz"", header_cnt);
296 
297         rd_kafka_header_remove(hdrs, "multi");
298 
299         header_cnt = rd_kafka_header_cnt(hdrs);
300         TEST_ASSERT(header_cnt == 5,
301                     "Expected 5 length got %"PRIusz"", header_cnt);
302 
303         rd_kafka_header_add(hdrs, "multi", -1, "multi5", -1);
304 
305         header_cnt = rd_kafka_header_cnt(hdrs);
306         TEST_ASSERT(header_cnt == 6,
307                     "Expected 6 length got %"PRIusz"", header_cnt);
308 
309         /* test get_last() */
310         err = rd_kafka_header_get_last(hdrs, "multi", &value, &size);
311         TEST_ASSERT(!err, "%s", rd_kafka_err2str(err));
312         TEST_ASSERT(size == strlen("multi5") &&
313                     !strcmp((const char *)value, "multi5"),
314                     "expected 'multi5', not '%s'",
315                     (const char *)value);
316 
317         return RD_KAFKA_RESP_ERR_NO_ERROR;
318 }
319 
320 
321 /**
322  * @brief Second on_send() interceptor
323  */
on_send2(rd_kafka_t * rk,rd_kafka_message_t * rkmessage,void * ic_opaque)324 static rd_kafka_resp_err_t on_send2 (rd_kafka_t *rk,
325                                      rd_kafka_message_t *rkmessage,
326                                      void *ic_opaque) {
327         const struct expect expected[] = {
328                 { "msgid", NULL }, /* special handling */
329                 { "static", "hey" },
330                 { "null", NULL },
331                 { "empty", "" },
332                 { "send1", "1" },
333                 { "multi", "multi5" },
334                 { NULL }
335         };
336 
337         expect_check(__FUNCTION__, expected, rkmessage);
338 
339         return RD_KAFKA_RESP_ERR_NO_ERROR;
340 }
341 
342 /**
343  * @brief on_new() interceptor to set up message interceptors
344  *        from rd_kafka_new().
345  */
on_new(rd_kafka_t * rk,const rd_kafka_conf_t * conf,void * ic_opaque,char * errstr,size_t errstr_size)346 static rd_kafka_resp_err_t on_new (rd_kafka_t *rk, const rd_kafka_conf_t *conf,
347                                    void *ic_opaque,
348                                    char *errstr, size_t errstr_size) {
349         rd_kafka_interceptor_add_on_send(rk, __FILE__, on_send1, NULL);
350         rd_kafka_interceptor_add_on_send(rk, __FILE__, on_send2, NULL);
351         return RD_KAFKA_RESP_ERR_NO_ERROR;
352 }
353 
354 
main_0072_headers_ut(int argc,char ** argv)355 int main_0072_headers_ut (int argc, char **argv) {
356         const char *topic = test_mk_topic_name(__FUNCTION__ + 5, 0);
357         rd_kafka_t *rk;
358         rd_kafka_conf_t *conf;
359         int i;
360         size_t header_cnt;
361         const int msgcnt = 10;
362         rd_kafka_resp_err_t err;
363 
364         conf = rd_kafka_conf_new();
365         test_conf_set(conf, "message.timeout.ms", "1");
366         rd_kafka_conf_set_dr_msg_cb(conf, dr_msg_cb);
367 
368         rd_kafka_conf_interceptor_add_on_new(conf, __FILE__, on_new, NULL);
369 
370         rk = test_create_handle(RD_KAFKA_PRODUCER, conf);
371 
372         /* First message is without headers (negative testing) */
373         i = 0;
374         err = rd_kafka_producev(
375                 rk,
376                 RD_KAFKA_V_TOPIC(topic),
377                 RD_KAFKA_V_VALUE(&i, sizeof(i)),
378                 RD_KAFKA_V_MSGFLAGS(RD_KAFKA_MSG_F_COPY),
379                 RD_KAFKA_V_END);
380         TEST_ASSERT(!err,
381                     "producev() failed: %s", rd_kafka_err2str(err));
382         exp_msgid++;
383 
384         for (i = 1 ; i < msgcnt ; i++, exp_msgid++) {
385                 /* Use headers list on one message */
386                 if (i == 3) {
387                         rd_kafka_headers_t *hdrs = rd_kafka_headers_new(4);
388 
389                         header_cnt = rd_kafka_header_cnt(hdrs);
390                         TEST_ASSERT(header_cnt == 0,
391                                     "Expected 0 length got %"PRIusz"", header_cnt);
392 
393                         rd_kafka_headers_t *copied;
394 
395                         rd_kafka_header_add(hdrs, "msgid", -1, &i, sizeof(i));
396                         rd_kafka_header_add(hdrs, "static", -1, "hey", -1);
397                         rd_kafka_header_add(hdrs, "multi", -1, "multi1", -1);
398                         rd_kafka_header_add(hdrs, "multi", -1, "multi2", 6);
399                         rd_kafka_header_add(hdrs, "multi", -1, "multi3", strlen("multi3"));
400                         rd_kafka_header_add(hdrs, "null", -1, NULL, 0);
401 
402                         /* Make a copy of the headers to verify copy() */
403                         copied = rd_kafka_headers_copy(hdrs);
404 
405                         header_cnt = rd_kafka_header_cnt(hdrs);
406                         TEST_ASSERT(header_cnt == 6,
407                                     "Expected 6 length got %"PRIusz"", header_cnt);
408 
409                         rd_kafka_headers_destroy(hdrs);
410 
411                         /* Last header ("empty") is added below */
412 
413                         /* Try unsupported _V_HEADER() and _V_HEADERS() mix,
414                          * must fail with CONFLICT */
415                         err = rd_kafka_producev(
416                                 rk,
417                                 RD_KAFKA_V_TOPIC(topic),
418                                 RD_KAFKA_V_VALUE(&i, sizeof(i)),
419                                 RD_KAFKA_V_MSGFLAGS(RD_KAFKA_MSG_F_COPY),
420                                 RD_KAFKA_V_HEADER("will_be_removed", "yep", -1),
421                                 RD_KAFKA_V_HEADERS(copied),
422                                 RD_KAFKA_V_HEADER("empty", "", 0),
423                                 RD_KAFKA_V_END);
424                         TEST_ASSERT(err == RD_KAFKA_RESP_ERR__CONFLICT,
425                                     "producev(): expected CONFLICT, got %s",
426                                     rd_kafka_err2str(err));
427 
428                         /* Proper call using only _V_HEADERS() */
429                         rd_kafka_header_add(copied, "empty", -1, "", -1);
430                         err = rd_kafka_producev(
431                                 rk,
432                                 RD_KAFKA_V_TOPIC(topic),
433                                 RD_KAFKA_V_VALUE(&i, sizeof(i)),
434                                 RD_KAFKA_V_MSGFLAGS(RD_KAFKA_MSG_F_COPY),
435                                 RD_KAFKA_V_HEADERS(copied),
436                                 RD_KAFKA_V_END);
437                         TEST_ASSERT(!err, "producev() failed: %s",
438                                     rd_kafka_err2str(err));
439 
440                 } else {
441                         err = rd_kafka_producev(
442                                 rk,
443                                 RD_KAFKA_V_TOPIC(topic),
444                                 RD_KAFKA_V_VALUE(&i, sizeof(i)),
445                                 RD_KAFKA_V_MSGFLAGS(RD_KAFKA_MSG_F_COPY),
446                                 RD_KAFKA_V_HEADER("msgid", &i, sizeof(i)),
447                                 RD_KAFKA_V_HEADER("static", "hey", -1),
448                                 RD_KAFKA_V_HEADER("multi", "multi1", -1),
449                                 RD_KAFKA_V_HEADER("multi", "multi2", 6),
450                                 RD_KAFKA_V_HEADER("multi", "multi3", strlen("multi3")),
451                                 RD_KAFKA_V_HEADER("null", NULL, 0),
452                                 RD_KAFKA_V_HEADER("empty", "", 0),
453                                 RD_KAFKA_V_END);
454                         TEST_ASSERT(!err,
455                                     "producev() failed: %s", rd_kafka_err2str(err));
456                 }
457         }
458 
459         /* Reset expected message id for dr */
460         exp_msgid = 0;
461 
462         /* Wait for timeouts and delivery reports */
463         rd_kafka_flush(rk, 5000);
464 
465         rd_kafka_destroy(rk);
466 
467         return 0;
468 }
469