1 /*
2 * librdkafka - Apache Kafka C library
3 *
4 * Copyright (c) 2012-2013, Magnus Edenhill
5 * All rights reserved.
6 *
7 * Redistribution and use in source and binary forms, with or without
8 * modification, are permitted provided that the following conditions are met:
9 *
10 * 1. Redistributions of source code must retain the above copyright notice,
11 * this list of conditions and the following disclaimer.
12 * 2. Redistributions in binary form must reproduce the above copyright notice,
13 * this list of conditions and the following disclaimer in the documentation
14 * and/or other materials provided with the distribution.
15 *
16 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
17 * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
18 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
19 * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
20 * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
21 * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
22 * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
23 * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
24 * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
25 * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
26 * POSSIBILITY OF SUCH DAMAGE.
27 */
28
29 /**
30 * Produce messages, then consume them.
31 * Consume both through the standard interface and through the queue interface.
32 */
33
34
35 #include "test.h"
36
37 /* Typical include path would be <librdkafka/rdkafka.h>, but this program
38 * is built from within the librdkafka source tree and thus differs. */
39 #include "rdkafka.h" /* for Kafka driver */
40
41
42 static int prod_msg_remains = 0;
43 static int fails = 0;
44
45 /**
46 * Delivery reported callback.
47 * Called for each message once to signal its delivery status.
48 */
dr_cb(rd_kafka_t * rk,void * payload,size_t len,rd_kafka_resp_err_t err,void * opaque,void * msg_opaque)49 static void dr_cb (rd_kafka_t *rk, void *payload, size_t len,
50 rd_kafka_resp_err_t err, void *opaque, void *msg_opaque) {
51
52 if (err != RD_KAFKA_RESP_ERR_NO_ERROR)
53 TEST_FAIL("Message delivery failed: %s\n",
54 rd_kafka_err2str(err));
55
56 if (prod_msg_remains == 0)
57 TEST_FAIL("Too many messages delivered (prod_msg_remains %i)",
58 prod_msg_remains);
59
60 prod_msg_remains--;
61 }
62
63
64 /**
65 * Produces 'msgcnt' messages split over 'partition_cnt' partitions.
66 */
produce_messages(uint64_t testid,const char * topic,int partition_cnt,int msgcnt)67 static void produce_messages (uint64_t testid, const char *topic,
68 int partition_cnt, int msgcnt) {
69 int r;
70 rd_kafka_t *rk;
71 rd_kafka_topic_t *rkt;
72 rd_kafka_conf_t *conf;
73 rd_kafka_topic_conf_t *topic_conf;
74 char errstr[512];
75 char msg[128];
76 int failcnt = 0;
77 int i;
78 rd_kafka_message_t *rkmessages;
79 int32_t partition;
80 int msgid = 0;
81
82 test_conf_init(&conf, &topic_conf, 20);
83
84 rd_kafka_conf_set_dr_cb(conf, dr_cb);
85
86 /* Make sure all replicas are in-sync after producing
87 * so that consume test wont fail. */
88 rd_kafka_topic_conf_set(topic_conf, "request.required.acks", "-1",
89 errstr, sizeof(errstr));
90
91 /* Create kafka instance */
92 rk = test_create_handle(RD_KAFKA_PRODUCER, conf);
93
94 rkt = rd_kafka_topic_new(rk, topic, topic_conf);
95 if (!rkt)
96 TEST_FAIL("Failed to create topic: %s\n",
97 rd_strerror(errno));
98
99 /* Create messages. */
100 prod_msg_remains = msgcnt;
101 rkmessages = calloc(sizeof(*rkmessages), msgcnt / partition_cnt);
102 for (partition = 0 ; partition < partition_cnt ; partition++) {
103 int batch_cnt = msgcnt / partition_cnt;
104
105 for (i = 0 ; i < batch_cnt ; i++) {
106 rd_snprintf(msg, sizeof(msg),
107 "testid=%"PRIu64", partition=%i, msg=%i",
108 testid, (int)partition, msgid);
109 rkmessages[i].payload = rd_strdup(msg);
110 rkmessages[i].len = strlen(msg);
111 msgid++;
112 }
113
114 TEST_SAY("Start produce to partition %i: msgs #%d..%d\n",
115 (int)partition, msgid-batch_cnt, msgid);
116 /* Produce batch for this partition */
117 r = rd_kafka_produce_batch(rkt, partition, RD_KAFKA_MSG_F_FREE,
118 rkmessages, batch_cnt);
119 if (r == -1)
120 TEST_FAIL("Failed to produce "
121 "batch for partition %i: %s",
122 (int)partition,
123 rd_kafka_err2str(rd_kafka_last_error()));
124
125 /* Scan through messages to check for errors. */
126 for (i = 0 ; i < batch_cnt ; i++) {
127 if (rkmessages[i].err) {
128 failcnt++;
129 if (failcnt < 100)
130 TEST_SAY("Message #%i failed: %s\n",
131 i,
132 rd_kafka_err2str(rkmessages[i].
133 err));
134 }
135 }
136
137 /* All messages should've been produced. */
138 if (r < batch_cnt) {
139 TEST_SAY("Not all messages were accepted "
140 "by produce_batch(): %i < %i\n", r, batch_cnt);
141
142 if (batch_cnt - r != failcnt)
143 TEST_SAY("Discrepency between failed "
144 "messages (%i) "
145 "and return value %i (%i - %i)\n",
146 failcnt, batch_cnt - r, batch_cnt, r);
147 TEST_FAIL("%i/%i messages failed\n",
148 batch_cnt - r, batch_cnt);
149 }
150
151 TEST_SAY("Produced %i messages to partition %i, "
152 "waiting for deliveries\n", r, partition);
153 }
154
155
156 free(rkmessages);
157
158 /* Wait for messages to be delivered */
159 while (rd_kafka_outq_len(rk) > 0)
160 rd_kafka_poll(rk, 100);
161
162 if (fails)
163 TEST_FAIL("%i failures, see previous errors", fails);
164
165 if (prod_msg_remains != 0)
166 TEST_FAIL("Still waiting for %i messages to be produced",
167 prod_msg_remains);
168
169 /* Destroy topic */
170 rd_kafka_topic_destroy(rkt);
171
172 /* Destroy rdkafka instance */
173 TEST_SAY("Destroying kafka instance %s\n", rd_kafka_name(rk));
174 rd_kafka_destroy(rk);
175 }
176
177
178
179 static int *cons_msgs;
180 static int cons_msgs_size;
181 static int cons_msgs_cnt;
182
verify_consumed_msg_reset(int msgcnt)183 static void verify_consumed_msg_reset (int msgcnt) {
184 TEST_SAY("Resetting consumed_msgs (msgcnt %d)\n", msgcnt);
185 if (cons_msgs) {
186 free(cons_msgs);
187 cons_msgs = NULL;
188 }
189
190 if (msgcnt) {
191 int i;
192
193 cons_msgs = malloc(sizeof(*cons_msgs) * msgcnt);
194 for (i = 0 ; i < msgcnt ; i++)
195 cons_msgs[i] = -1;
196 }
197
198 cons_msgs_size = msgcnt;
199 cons_msgs_cnt = 0;
200 }
201
202
int_cmp(const void * _a,const void * _b)203 static int int_cmp (const void *_a, const void *_b) {
204 int a = *(int *)_a;
205 int b = *(int *)_b;
206 return RD_CMP(a, b);
207 }
208
verify_consumed_msg_check0(const char * func,int line)209 static void verify_consumed_msg_check0 (const char *func, int line) {
210 int i;
211 int fails = 0;
212
213 if (cons_msgs_cnt < cons_msgs_size) {
214 TEST_SAY("Missing %i messages in consumer\n",
215 cons_msgs_size - cons_msgs_cnt);
216 fails++;
217 }
218
219 qsort(cons_msgs, cons_msgs_size, sizeof(*cons_msgs), int_cmp);
220
221 for (i = 0 ; i < cons_msgs_size ; i++) {
222 if (cons_msgs[i] != i) {
223 TEST_SAY("Consumed message #%i is wrong, "
224 "expected #%i\n",
225 cons_msgs[i], i);
226 fails++;
227 }
228 }
229
230 if (fails)
231 TEST_FAIL("See above error(s)");
232
233 verify_consumed_msg_reset(0);
234 }
235
236
237 #define verify_consumed_msg_check() \
238 verify_consumed_msg_check0(__FUNCTION__,__LINE__)
239
240
241
verify_consumed_msg0(const char * func,int line,uint64_t testid,int32_t partition,int msgnum,rd_kafka_message_t * rkmessage)242 static void verify_consumed_msg0 (const char *func, int line,
243 uint64_t testid, int32_t partition,
244 int msgnum,
245 rd_kafka_message_t *rkmessage) {
246 uint64_t in_testid;
247 int in_part;
248 int in_msgnum;
249 char buf[1024];
250
251 if (rkmessage->len +1 >= sizeof(buf))
252 TEST_FAIL("Incoming message too large (%i): "
253 "not sourced by this test",
254 (int)rkmessage->len);
255
256 rd_snprintf(buf, sizeof(buf), "%.*s",
257 (int)rkmessage->len, (char *)rkmessage->payload);
258
259 if (sscanf(buf, "testid=%"SCNu64", partition=%i, msg=%i",
260 &in_testid, &in_part, &in_msgnum) != 3)
261 TEST_FAIL("Incorrect message format: %s", buf);
262
263 if (test_level > 2) {
264 TEST_SAY("%s:%i: Our testid %"PRIu64", part %i =? %i, "
265 "msg %i =? %i "
266 ", message's: \"%s\"\n",
267 func, line,
268 testid, (int)partition, (int)rkmessage->partition,
269 msgnum, in_msgnum, buf);
270 }
271
272 if (testid != in_testid ||
273 (partition != -1 && partition != in_part) ||
274 (msgnum != -1 && msgnum != in_msgnum) ||
275 (in_msgnum < 0 || in_msgnum > cons_msgs_size))
276 goto fail_match;
277
278 if (cons_msgs_cnt == cons_msgs_size) {
279 TEST_SAY("Too many messages in cons_msgs (%i) while reading "
280 "message \"%s\"\n",
281 cons_msgs_cnt, buf);
282 verify_consumed_msg_check();
283 TEST_FAIL("See above error(s)");
284 }
285
286 cons_msgs[cons_msgs_cnt++] = in_msgnum;
287
288 return;
289
290 fail_match:
291 TEST_FAIL("%s:%i: Our testid %"PRIu64", part %i, msg %i/%i did "
292 "not match message's: \"%s\"\n",
293 func, line,
294 testid, (int)partition, msgnum, cons_msgs_size, buf);
295 }
296
297 #define verify_consumed_msg(testid,part,msgnum,rkmessage) \
298 verify_consumed_msg0(__FUNCTION__,__LINE__,testid,part,msgnum,rkmessage)
299
300
consume_messages(uint64_t testid,const char * topic,int32_t partition,int msg_base,int batch_cnt,int msgcnt)301 static void consume_messages (uint64_t testid, const char *topic,
302 int32_t partition, int msg_base, int batch_cnt,
303 int msgcnt) {
304 rd_kafka_t *rk;
305 rd_kafka_topic_t *rkt;
306 rd_kafka_conf_t *conf;
307 rd_kafka_topic_conf_t *topic_conf;
308 int i;
309
310 test_conf_init(&conf, &topic_conf, 20);
311
312 /* Create kafka instance */
313 rk = test_create_handle(RD_KAFKA_CONSUMER, conf);
314
315 TEST_SAY("Created kafka instance %s\n", rd_kafka_name(rk));
316
317 rkt = rd_kafka_topic_new(rk, topic, topic_conf);
318 if (!rkt)
319 TEST_FAIL("Failed to create topic: %s\n",
320 rd_strerror(errno));
321
322 TEST_SAY("Consuming %i messages from partition %i\n",
323 batch_cnt, partition);
324
325 /* Consume messages */
326 if (rd_kafka_consume_start(rkt, partition,
327 RD_KAFKA_OFFSET_TAIL(batch_cnt)) == -1)
328 TEST_FAIL("consume_start(%i, -%i) failed: %s",
329 (int)partition, batch_cnt,
330 rd_kafka_err2str(rd_kafka_last_error()));
331
332 for (i = 0 ; i < batch_cnt ; ) {
333 rd_kafka_message_t *rkmessage;
334
335 rkmessage = rd_kafka_consume(rkt, partition,
336 tmout_multip(5000));
337 if (!rkmessage)
338 TEST_FAIL("Failed to consume message %i/%i from "
339 "partition %i: %s",
340 i, batch_cnt, (int)partition,
341 rd_kafka_err2str(rd_kafka_last_error()));
342 if (rkmessage->err) {
343 if (rkmessage->err == RD_KAFKA_RESP_ERR__PARTITION_EOF){
344 rd_kafka_message_destroy(rkmessage);
345 continue;
346 }
347 TEST_FAIL("Consume message %i/%i from partition %i "
348 "has error: %s: %s",
349 i, batch_cnt, (int)partition,
350 rd_kafka_err2str(rkmessage->err),
351 rd_kafka_message_errstr(rkmessage));
352 }
353
354 verify_consumed_msg(testid, partition, msg_base+i, rkmessage);
355
356 rd_kafka_message_destroy(rkmessage);
357 i++;
358 }
359
360 rd_kafka_consume_stop(rkt, partition);
361
362 /* Destroy topic */
363 rd_kafka_topic_destroy(rkt);
364
365 /* Destroy rdkafka instance */
366 TEST_SAY("Destroying kafka instance %s\n", rd_kafka_name(rk));
367 rd_kafka_destroy(rk);
368 }
369
370
consume_messages_with_queues(uint64_t testid,const char * topic,int partition_cnt,int msgcnt)371 static void consume_messages_with_queues (uint64_t testid, const char *topic,
372 int partition_cnt, int msgcnt) {
373 rd_kafka_t *rk;
374 rd_kafka_topic_t *rkt;
375 rd_kafka_conf_t *conf;
376 rd_kafka_topic_conf_t *topic_conf;
377 rd_kafka_queue_t *rkqu;
378 int i;
379 int32_t partition;
380 int batch_cnt = msgcnt / partition_cnt;
381
382 test_conf_init(&conf, &topic_conf, 20);
383
384 test_conf_set(conf, "enable.partition.eof", "true");
385
386 /* Create kafka instance */
387 rk = test_create_handle(RD_KAFKA_CONSUMER, conf);
388
389 /* Create queue */
390 rkqu = rd_kafka_queue_new(rk);
391
392
393 rkt = rd_kafka_topic_new(rk, topic, topic_conf);
394 if (!rkt)
395 TEST_FAIL("Failed to create topic: %s\n",
396 rd_strerror(errno));
397
398 TEST_SAY("Consuming %i messages from one queue serving %i partitions\n",
399 msgcnt, partition_cnt);
400
401 /* Start consuming each partition */
402 for (partition = 0 ; partition < partition_cnt ; partition++) {
403 /* Consume messages */
404 TEST_SAY("Start consuming partition %i at offset -%i\n",
405 partition, batch_cnt);
406 if (rd_kafka_consume_start_queue(rkt, partition,
407 RD_KAFKA_OFFSET_TAIL(batch_cnt),
408 rkqu) == -1)
409 TEST_FAIL("consume_start_queue(%i) failed: %s",
410 (int)partition,
411 rd_kafka_err2str(rd_kafka_last_error()));
412 }
413
414
415 /* Consume messages from queue */
416 for (i = 0 ; i < msgcnt ; ) {
417 rd_kafka_message_t *rkmessage;
418
419 rkmessage = rd_kafka_consume_queue(rkqu, tmout_multip(5000));
420 if (!rkmessage)
421 TEST_FAIL("Failed to consume message %i/%i from "
422 "queue: %s",
423 i, msgcnt,
424 rd_kafka_err2str(rd_kafka_last_error()));
425 if (rkmessage->err) {
426 if (rkmessage->err == RD_KAFKA_RESP_ERR__PARTITION_EOF){
427 TEST_SAY("Topic %s [%"PRId32"] reached "
428 "EOF at offset %"PRId64"\n",
429 rd_kafka_topic_name(rkmessage->rkt),
430 rkmessage->partition,
431 rkmessage->offset);
432 rd_kafka_message_destroy(rkmessage);
433 continue;
434 }
435 TEST_FAIL("Consume message %i/%i from queue "
436 "has error (offset %"PRId64
437 ", partition %"PRId32"): %s",
438 i, msgcnt,
439 rkmessage->offset, rkmessage->partition,
440 rd_kafka_err2str(rkmessage->err));
441 }
442
443 verify_consumed_msg(testid, -1, -1, rkmessage);
444
445 rd_kafka_message_destroy(rkmessage);
446 i++;
447 }
448
449 /* Stop consuming each partition */
450 for (partition = 0 ; partition < partition_cnt ; partition++)
451 rd_kafka_consume_stop(rkt, partition);
452
453 /* Destroy queue */
454 rd_kafka_queue_destroy(rkqu);
455
456 /* Destroy topic */
457 rd_kafka_topic_destroy(rkt);
458
459 /* Destroy rdkafka instance */
460 TEST_SAY("Destroying kafka instance %s\n", rd_kafka_name(rk));
461 rd_kafka_destroy(rk);
462 }
463
464
465 /**
466 * Produce to two partitions.
467 * Consume with standard interface from both, one after the other.
468 * Consume with queue interface from both, simultanously.
469 */
test_produce_consume(void)470 static void test_produce_consume (void) {
471 int msgcnt = test_quick ? 100 : 1000;
472 int partition_cnt = 2;
473 int i;
474 uint64_t testid;
475 int msg_base = 0;
476 const char *topic;
477
478 /* Generate a testid so we can differentiate messages
479 * from other tests */
480 testid = test_id_generate();
481
482 /* Read test.conf to configure topic name */
483 test_conf_init(NULL, NULL, 20);
484 topic = test_mk_topic_name("0012", 1);
485
486 TEST_SAY("Topic %s, testid %"PRIu64"\n", topic, testid);
487
488 /* Produce messages */
489 produce_messages(testid, topic, partition_cnt, msgcnt);
490
491
492 /* Consume messages with standard interface */
493 verify_consumed_msg_reset(msgcnt);
494 for (i = 0 ; i < partition_cnt ; i++) {
495 consume_messages(testid, topic, i,
496 msg_base, msgcnt / partition_cnt, msgcnt);
497 msg_base += msgcnt / partition_cnt;
498 }
499 verify_consumed_msg_check();
500
501 /* Consume messages with queue interface */
502 verify_consumed_msg_reset(msgcnt);
503 consume_messages_with_queues(testid, topic, partition_cnt, msgcnt);
504 verify_consumed_msg_check();
505
506 return;
507 }
508
509
510
511
main_0012_produce_consume(int argc,char ** argv)512 int main_0012_produce_consume (int argc, char **argv) {
513 test_produce_consume();
514 return 0;
515 }
516