1 /*
2  * librdkafka - Apache Kafka C library
3  *
4  * Copyright (c) 2012-2015, Magnus Edenhill
5  * All rights reserved.
6  *
7  * Redistribution and use in source and binary forms, with or without
8  * modification, are permitted provided that the following conditions are met:
9  *
10  * 1. Redistributions of source code must retain the above copyright notice,
11  *    this list of conditions and the following disclaimer.
12  * 2. Redistributions in binary form must reproduce the above copyright notice,
13  *    this list of conditions and the following disclaimer in the documentation
14  *    and/or other materials provided with the distribution.
15  *
16  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
17  * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
18  * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
19  * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
20  * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
21  * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
22  * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
23  * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
24  * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
25  * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
26  * POSSIBILITY OF SUCH DAMAGE.
27  */
28 
29 #include "test.h"
30 #include "rdkafka.h"
31 
32 /**
33  * Message Headers end-to-end tests
34  */
35 
36 
37 
38 static int exp_msgid = 0;
39 
40 struct expect {
41         const char *name;
42         const char *value;
43 };
44 
45 
46 
expect_check(const char * what,const struct expect * expected,rd_kafka_message_t * rkmessage,int is_const)47 static void expect_check (const char *what, const struct expect *expected,
48                           rd_kafka_message_t *rkmessage, int is_const) {
49         const struct expect *exp;
50         rd_kafka_resp_err_t err;
51         size_t idx = 0;
52         const char *name;
53         const char *value;
54         size_t size;
55         rd_kafka_headers_t *hdrs;
56         int msgid;
57 
58         if (rkmessage->len != sizeof(msgid))
59                 TEST_FAIL("%s: expected message len %"PRIusz" == sizeof(int)",
60                           what, rkmessage->len);
61 
62         memcpy(&msgid, rkmessage->payload, rkmessage->len);
63 
64         if ((err = rd_kafka_message_headers(rkmessage, &hdrs))) {
65                 if (msgid == 0) {
66                         rd_kafka_resp_err_t err2;
67                         TEST_SAYL(3, "%s: Msg #%d: no headers, good\n",
68                                   what, msgid);
69 
70                         err2 = rd_kafka_message_detach_headers(rkmessage, &hdrs);
71                         TEST_ASSERT(err == err2,
72                                     "expected detach_headers() error %s "
73                                     "to match headers() error %s",
74                                     rd_kafka_err2str(err2),
75                                     rd_kafka_err2str(err));
76 
77                         return; /* No headers expected for first message */
78                 }
79 
80                 TEST_FAIL("%s: Expected headers in message %d: %s", what, msgid,
81                           rd_kafka_err2str(err));
82         } else {
83                 TEST_ASSERT(msgid != 0,
84                             "%s: first message should have no headers", what);
85         }
86 
87         test_headers_dump(what, 3, hdrs);
88 
89         for (idx = 0, exp = expected ;
90              !rd_kafka_header_get_all(hdrs, idx, &name,
91                                       (const void **)&value, &size) ;
92              idx++, exp++) {
93 
94                 TEST_SAYL(3, "%s: Msg #%d: "
95                           "Header #%"PRIusz": %s='%s' (expecting %s='%s')\n",
96                           what, msgid, idx, name, value ? value : "(NULL)",
97                           exp->name, exp->value ? exp->value : "(NULL)");
98 
99                 if (strcmp(name, exp->name))
100                         TEST_FAIL("%s: Msg #%d: "
101                                   "Expected header %s at idx #%"PRIusz
102                                   ", not '%s' (%"PRIusz")",
103                                   what, msgid, exp->name, idx, name,
104                                   strlen(name));
105 
106                 if (!strcmp(name, "msgid")) {
107                         int vid;
108 
109                         /* Special handling: compare msgid header value
110                          * to message body, should be identical */
111                         if (size != rkmessage->len || size != sizeof(int))
112                                 TEST_FAIL("%s: "
113                                           "Expected msgid/int-sized payload "
114                                           "%"PRIusz", got %"PRIusz,
115                                           what, size, rkmessage->len);
116 
117                         /* Copy to avoid unaligned access (by cast) */
118                         memcpy(&vid, value, size);
119 
120                         if (vid != msgid)
121                                 TEST_FAIL("%s: Header msgid %d != payload %d",
122                                           what, vid, msgid);
123 
124                         if (exp_msgid != vid)
125                                 TEST_FAIL("%s: Expected msgid %d, not %d",
126                                           what, exp_msgid, vid);
127                         continue;
128                 }
129 
130                 if (!exp->value) {
131                         /* Expected NULL value */
132                         TEST_ASSERT(!value,
133                                     "%s: Expected NULL value for %s, got %s",
134                                     what, exp->name, value);
135 
136                 } else {
137                         TEST_ASSERT(value,
138                                     "%s: "
139                                     "Expected non-NULL value for %s, got NULL",
140                                     what, exp->name);
141 
142                         TEST_ASSERT(size == strlen(exp->value),
143                                     "%s: Expected size %"PRIusz" for %s, "
144                                     "not %"PRIusz,
145                                     what, strlen(exp->value), exp->name, size);
146 
147                         TEST_ASSERT(value[size] == '\0',
148                                     "%s: "
149                                     "Expected implicit null-terminator for %s",
150                                     what, exp->name);
151 
152                         TEST_ASSERT(!strcmp(exp->value, value),
153                                     "%s: "
154                                     "Expected value %s for %s, not %s",
155                                     what, exp->value, exp->name, value);
156                 }
157         }
158 
159         TEST_ASSERT(exp->name == NULL,
160                     "%s: Expected the expected, but stuck at %s which was "
161                     "unexpected",
162                     what, exp->name);
163 
164         if (!strcmp(what, "handle_consumed_msg") && !is_const &&
165             (msgid % 3) == 0) {
166                 rd_kafka_headers_t *dhdrs;
167 
168                 err = rd_kafka_message_detach_headers(rkmessage, &dhdrs);
169                 TEST_ASSERT(!err,
170                             "detach_headers() should not fail, got %s",
171                             rd_kafka_err2str(err));
172                 TEST_ASSERT(hdrs == dhdrs);
173 
174                 /* Verify that a new headers object can be obtained */
175                 err = rd_kafka_message_headers(rkmessage, &hdrs);
176                 TEST_ASSERT(err == RD_KAFKA_RESP_ERR_NO_ERROR);
177                 TEST_ASSERT(hdrs != dhdrs);
178                 rd_kafka_headers_destroy(dhdrs);
179 
180                 expect_check("post_detach_headers", expected,
181                              rkmessage, is_const);
182        }
183 }
184 
185 
186 /**
187  * @brief Final (as in no more header modifications) message check.
188  */
msg_final_check(const char * what,rd_kafka_message_t * rkmessage,int is_const)189 static void msg_final_check (const char *what,
190                              rd_kafka_message_t *rkmessage, int is_const) {
191         const struct expect expected[] = {
192                 { "msgid", NULL }, /* special handling */
193                 { "static", "hey" },
194                 { "null", NULL },
195                 { "empty", "" },
196                 { "send1", "1" },
197                 { "multi", "multi5" },
198                 { NULL }
199         };
200 
201         expect_check(what, expected, rkmessage, is_const);
202 
203         exp_msgid++;
204 
205 
206 }
207 
208 /**
209  * @brief Handle consumed message, must be identical to dr_msg_cb
210  */
handle_consumed_msg(rd_kafka_message_t * rkmessage)211 static void handle_consumed_msg (rd_kafka_message_t *rkmessage) {
212         msg_final_check(__FUNCTION__, rkmessage, 0);
213 }
214 
215 /**
216  * @brief Delivery report callback
217  */
dr_msg_cb(rd_kafka_t * rk,const rd_kafka_message_t * rkmessage,void * opaque)218 static void dr_msg_cb (rd_kafka_t *rk,
219                        const rd_kafka_message_t *rkmessage, void *opaque) {
220         TEST_ASSERT(!rkmessage->err,
221                     "Message delivery failed: %s",
222                     rd_kafka_err2str(rkmessage->err));
223 
224         msg_final_check(__FUNCTION__, (rd_kafka_message_t *)rkmessage, 1);
225 }
226 
227 
228 /**
229  * @brief First on_send() interceptor
230  */
on_send1(rd_kafka_t * rk,rd_kafka_message_t * rkmessage,void * ic_opaque)231 static rd_kafka_resp_err_t on_send1 (rd_kafka_t *rk,
232                                      rd_kafka_message_t *rkmessage,
233                                      void *ic_opaque) {
234         const struct expect expected[] = {
235                 { "msgid", NULL }, /* special handling */
236                 { "static", "hey" },
237                 { "multi", "multi1" },
238                 { "multi", "multi2" },
239                 { "multi", "multi3" },
240                 { "null", NULL },
241                 { "empty", "" },
242                 { NULL }
243         };
244         rd_kafka_headers_t *hdrs;
245         rd_kafka_resp_err_t err;
246 
247         expect_check(__FUNCTION__, expected, rkmessage, 0);
248 
249         err = rd_kafka_message_headers(rkmessage, &hdrs);
250         if (err) /* First message has no headers. */
251                 return RD_KAFKA_RESP_ERR_NO_ERROR;
252 
253         rd_kafka_header_add(hdrs, "multi", -1, "multi4", -1);
254         rd_kafka_header_add(hdrs, "send1", -1, "1", -1);
255         rd_kafka_header_remove(hdrs, "multi");
256         rd_kafka_header_add(hdrs, "multi", -1, "multi5", -1);
257 
258         return RD_KAFKA_RESP_ERR_NO_ERROR;
259 }
260 
261 
262 /**
263  * @brief Second on_send() interceptor
264  */
on_send2(rd_kafka_t * rk,rd_kafka_message_t * rkmessage,void * ic_opaque)265 static rd_kafka_resp_err_t on_send2 (rd_kafka_t *rk,
266                                      rd_kafka_message_t *rkmessage,
267                                      void *ic_opaque) {
268         const struct expect expected[] = {
269                 { "msgid", NULL }, /* special handling */
270                 { "static", "hey" },
271                 { "null", NULL },
272                 { "empty", "" },
273                 { "send1", "1" },
274                 { "multi", "multi5" },
275                 { NULL }
276         };
277 
278         expect_check(__FUNCTION__, expected, rkmessage, 0);
279 
280         return RD_KAFKA_RESP_ERR_NO_ERROR;
281 }
282 
283 /**
284  * @brief on_new() interceptor to set up message interceptors
285  *        from rd_kafka_new().
286  */
on_new(rd_kafka_t * rk,const rd_kafka_conf_t * conf,void * ic_opaque,char * errstr,size_t errstr_size)287 static rd_kafka_resp_err_t on_new (rd_kafka_t *rk, const rd_kafka_conf_t *conf,
288                                    void *ic_opaque,
289                                    char *errstr, size_t errstr_size) {
290         rd_kafka_interceptor_add_on_send(rk, __FILE__, on_send1, NULL);
291         rd_kafka_interceptor_add_on_send(rk, __FILE__, on_send2, NULL);
292         return RD_KAFKA_RESP_ERR_NO_ERROR;
293 }
294 
295 
do_produce(const char * topic,int msgcnt)296 static void do_produce (const char *topic, int msgcnt) {
297         rd_kafka_t *rk;
298         rd_kafka_conf_t *conf;
299         int i;
300         rd_kafka_resp_err_t err;
301 
302         test_conf_init(&conf, NULL, 0);
303         test_conf_set(conf, "acks", "all");
304         rd_kafka_conf_set_dr_msg_cb(conf, dr_msg_cb);
305 
306         rd_kafka_conf_interceptor_add_on_new(conf, __FILE__, on_new, NULL);
307 
308         rk = test_create_handle(RD_KAFKA_PRODUCER, conf);
309 
310         /* First message is without headers (negative testing) */
311         i = 0;
312         err = rd_kafka_producev(
313                 rk,
314                 RD_KAFKA_V_TOPIC(topic),
315                 RD_KAFKA_V_PARTITION(0),
316                 RD_KAFKA_V_VALUE(&i, sizeof(i)),
317                 RD_KAFKA_V_MSGFLAGS(RD_KAFKA_MSG_F_COPY),
318                 RD_KAFKA_V_END);
319         TEST_ASSERT(!err,
320                     "producev() failed: %s", rd_kafka_err2str(err));
321         exp_msgid++;
322 
323         for (i = 1 ; i < msgcnt ; i++, exp_msgid++) {
324                 err = rd_kafka_producev(
325                         rk,
326                         RD_KAFKA_V_TOPIC(topic),
327                         RD_KAFKA_V_PARTITION(0),
328                         RD_KAFKA_V_VALUE(&i, sizeof(i)),
329                         RD_KAFKA_V_MSGFLAGS(RD_KAFKA_MSG_F_COPY),
330                         RD_KAFKA_V_HEADER("msgid", &i, sizeof(i)),
331                         RD_KAFKA_V_HEADER("static", "hey", -1),
332                         RD_KAFKA_V_HEADER("multi", "multi1", -1),
333                         RD_KAFKA_V_HEADER("multi", "multi2", 6),
334                         RD_KAFKA_V_HEADER("multi", "multi3", strlen("multi3")),
335                         RD_KAFKA_V_HEADER("null", NULL, 0),
336                         RD_KAFKA_V_HEADER("empty", "", 0),
337                         RD_KAFKA_V_END);
338                 TEST_ASSERT(!err,
339                             "producev() failed: %s", rd_kafka_err2str(err));
340         }
341 
342         /* Reset expected message id for dr */
343         exp_msgid = 0;
344 
345         /* Wait for timeouts and delivery reports */
346         rd_kafka_flush(rk, tmout_multip(5000));
347 
348         rd_kafka_destroy(rk);
349 }
350 
do_consume(const char * topic,int msgcnt)351 static void do_consume (const char *topic, int msgcnt) {
352         rd_kafka_t *rk;
353         rd_kafka_topic_partition_list_t *parts;
354 
355         rk = test_create_consumer(topic, NULL, NULL, NULL);
356 
357         parts = rd_kafka_topic_partition_list_new(1);
358         rd_kafka_topic_partition_list_add(parts, topic, 0)->offset =
359                 RD_KAFKA_OFFSET_BEGINNING;
360 
361         test_consumer_assign("assign", rk, parts);
362 
363         rd_kafka_topic_partition_list_destroy(parts);
364 
365         exp_msgid = 0;
366 
367         while (exp_msgid < msgcnt) {
368                 rd_kafka_message_t *rkm;
369 
370                 rkm = rd_kafka_consumer_poll(rk, 1000);
371                 if (!rkm)
372                         continue;
373 
374                 if (rkm->err)
375                         TEST_FAIL("consume error while expecting msgid %d/%d: "
376                                   "%s",
377                                   exp_msgid, msgcnt,
378                                   rd_kafka_message_errstr(rkm));
379 
380                 handle_consumed_msg(rkm);
381 
382                 rd_kafka_message_destroy(rkm);
383         }
384 
385         test_consumer_close(rk);
386         rd_kafka_destroy(rk);
387 }
388 
389 
main_0073_headers(int argc,char ** argv)390 int main_0073_headers (int argc, char **argv) {
391         const char *topic = test_mk_topic_name(__FUNCTION__ + 5, 1);
392         const int msgcnt = 10;
393 
394         do_produce(topic, msgcnt);
395         do_consume(topic, msgcnt);
396 
397         return 0;
398 }
399