1 /*
2  * librdkafka - Apache Kafka C library
3  *
4  * Copyright (c) 2012-2013, Magnus Edenhill
5  * All rights reserved.
6  *
7  * Redistribution and use in source and binary forms, with or without
8  * modification, are permitted provided that the following conditions are met:
9  *
10  * 1. Redistributions of source code must retain the above copyright notice,
11  *    this list of conditions and the following disclaimer.
12  * 2. Redistributions in binary form must reproduce the above copyright notice,
13  *    this list of conditions and the following disclaimer in the documentation
14  *    and/or other materials provided with the distribution.
15  *
16  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
17  * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
18  * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
19  * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
20  * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
21  * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
22  * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
23  * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
24  * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
25  * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
26  * POSSIBILITY OF SUCH DAMAGE.
27  */
28 
29 /**
30  * Produce messages, then consume them.
31  * Consume both through the standard interface and through the queue interface.
32  */
33 
34 
35 #include "test.h"
36 
37 /* Typical include path would be <librdkafka/rdkafka.h>, but this program
38  * is built from within the librdkafka source tree and thus differs. */
39 #include "rdkafka.h"  /* for Kafka driver */
40 
41 
42 static int prod_msg_remains = 0;
43 static int fails = 0;
44 
45 /**
46  * Delivery reported callback.
47  * Called for each message once to signal its delivery status.
48  */
dr_cb(rd_kafka_t * rk,void * payload,size_t len,rd_kafka_resp_err_t err,void * opaque,void * msg_opaque)49 static void dr_cb (rd_kafka_t *rk, void *payload, size_t len,
50 		   rd_kafka_resp_err_t err, void *opaque, void *msg_opaque) {
51 
52 	if (err != RD_KAFKA_RESP_ERR_NO_ERROR)
53 		TEST_FAIL("Message delivery failed: %s\n",
54 			  rd_kafka_err2str(err));
55 
56 	if (prod_msg_remains == 0)
57 		TEST_FAIL("Too many messages delivered (prod_msg_remains %i)",
58 			  prod_msg_remains);
59 
60 	prod_msg_remains--;
61 }
62 
63 
64 /**
65  * Produces 'msgcnt' messages split over 'partition_cnt' partitions.
66  */
produce_messages(uint64_t testid,const char * topic,int partition_cnt,int msgcnt)67 static void produce_messages (uint64_t testid, const char *topic,
68 			      int partition_cnt, int msgcnt) {
69 	int r;
70 	rd_kafka_t *rk;
71 	rd_kafka_topic_t *rkt;
72 	rd_kafka_conf_t *conf;
73 	rd_kafka_topic_conf_t *topic_conf;
74 	char errstr[512];
75 	char msg[128];
76 	int failcnt = 0;
77 	int i;
78         rd_kafka_message_t *rkmessages;
79 	int32_t partition;
80 	int msgid = 0;
81 
82 	test_conf_init(&conf, &topic_conf, 20);
83 
84 	rd_kafka_conf_set_dr_cb(conf, dr_cb);
85 
86         /* Make sure all replicas are in-sync after producing
87          * so that consume test wont fail. */
88         rd_kafka_topic_conf_set(topic_conf, "request.required.acks", "-1",
89                                 errstr, sizeof(errstr));
90 
91 	/* Create kafka instance */
92 	rk = test_create_handle(RD_KAFKA_PRODUCER, conf);
93 
94 	rkt = rd_kafka_topic_new(rk, topic, topic_conf);
95 	if (!rkt)
96 		TEST_FAIL("Failed to create topic: %s\n",
97 			  rd_strerror(errno));
98 
99         /* Create messages. */
100 	prod_msg_remains = msgcnt;
101         rkmessages = calloc(sizeof(*rkmessages), msgcnt / partition_cnt);
102 	for (partition = 0 ; partition < partition_cnt ; partition++) {
103 		int batch_cnt = msgcnt / partition_cnt;
104 
105 		for (i = 0 ; i < batch_cnt ; i++) {
106 			rd_snprintf(msg, sizeof(msg),
107 				 "testid=%"PRIu64", partition=%i, msg=%i",
108 				 testid, (int)partition, msgid);
109 			rkmessages[i].payload   = rd_strdup(msg);
110 			rkmessages[i].len       = strlen(msg);
111 			msgid++;
112 		}
113 
114 		TEST_SAY("Start produce to partition %i: msgs #%d..%d\n",
115 			 (int)partition, msgid-batch_cnt, msgid);
116 		/* Produce batch for this partition */
117 		r = rd_kafka_produce_batch(rkt, partition, RD_KAFKA_MSG_F_FREE,
118 					    rkmessages, batch_cnt);
119 		if (r == -1)
120 			TEST_FAIL("Failed to produce "
121 				  "batch for partition %i: %s",
122 				  (int)partition,
123 				  rd_kafka_err2str(rd_kafka_last_error()));
124 
125 		/* Scan through messages to check for errors. */
126 		for (i = 0 ; i < batch_cnt ; i++) {
127 			if (rkmessages[i].err) {
128 				failcnt++;
129 				if (failcnt < 100)
130 					TEST_SAY("Message #%i failed: %s\n",
131 						 i,
132 						 rd_kafka_err2str(rkmessages[i].
133 								  err));
134 			}
135 		}
136 
137 		/* All messages should've been produced. */
138 		if (r < batch_cnt) {
139 			TEST_SAY("Not all messages were accepted "
140 				 "by produce_batch(): %i < %i\n", r, batch_cnt);
141 
142 			if (batch_cnt - r != failcnt)
143 				TEST_SAY("Discrepency between failed "
144 					 "messages (%i) "
145 					 "and return value %i (%i - %i)\n",
146 					 failcnt, batch_cnt - r, batch_cnt, r);
147 			TEST_FAIL("%i/%i messages failed\n",
148 				  batch_cnt - r, batch_cnt);
149 		}
150 
151 		TEST_SAY("Produced %i messages to partition %i, "
152 			 "waiting for deliveries\n", r, partition);
153 	}
154 
155 
156         free(rkmessages);
157 
158 	/* Wait for messages to be delivered */
159 	while (rd_kafka_outq_len(rk) > 0)
160 		rd_kafka_poll(rk, 100);
161 
162 	if (fails)
163 		TEST_FAIL("%i failures, see previous errors", fails);
164 
165 	if (prod_msg_remains != 0)
166 		TEST_FAIL("Still waiting for %i messages to be produced",
167 			  prod_msg_remains);
168 
169 	/* Destroy topic */
170 	rd_kafka_topic_destroy(rkt);
171 
172 	/* Destroy rdkafka instance */
173 	TEST_SAY("Destroying kafka instance %s\n", rd_kafka_name(rk));
174 	rd_kafka_destroy(rk);
175 }
176 
177 
178 
179 static int *cons_msgs;
180 static int  cons_msgs_size;
181 static int  cons_msgs_cnt;
182 
verify_consumed_msg_reset(int msgcnt)183 static void verify_consumed_msg_reset (int msgcnt) {
184 	TEST_SAY("Resetting consumed_msgs (msgcnt %d)\n", msgcnt);
185 	if (cons_msgs) {
186 		free(cons_msgs);
187 		cons_msgs = NULL;
188 	}
189 
190 	if (msgcnt) {
191 		int i;
192 
193 		cons_msgs = malloc(sizeof(*cons_msgs) * msgcnt);
194 		for (i = 0 ; i < msgcnt ; i++)
195 			cons_msgs[i] = -1;
196 	}
197 
198 	cons_msgs_size = msgcnt;
199 	cons_msgs_cnt = 0;
200 }
201 
202 
int_cmp(const void * _a,const void * _b)203 static int int_cmp (const void *_a, const void *_b) {
204 	int a = *(int *)_a;
205 	int b = *(int *)_b;
206 	return RD_CMP(a, b);
207 }
208 
verify_consumed_msg_check0(const char * func,int line)209 static void verify_consumed_msg_check0 (const char *func, int line) {
210 	int i;
211 	int fails = 0;
212 
213 	if (cons_msgs_cnt < cons_msgs_size) {
214 		TEST_SAY("Missing %i messages in consumer\n",
215 			 cons_msgs_size - cons_msgs_cnt);
216 		fails++;
217 	}
218 
219 	qsort(cons_msgs, cons_msgs_size, sizeof(*cons_msgs), int_cmp);
220 
221 	for (i = 0 ; i < cons_msgs_size ; i++) {
222 		if (cons_msgs[i] != i) {
223 			TEST_SAY("Consumed message #%i is wrong, "
224 				 "expected #%i\n",
225 				 cons_msgs[i], i);
226 			fails++;
227 		}
228 	}
229 
230 	if (fails)
231 		TEST_FAIL("See above error(s)");
232 
233 	verify_consumed_msg_reset(0);
234 }
235 
236 
237 #define verify_consumed_msg_check() \
238 	verify_consumed_msg_check0(__FUNCTION__,__LINE__)
239 
240 
241 
verify_consumed_msg0(const char * func,int line,uint64_t testid,int32_t partition,int msgnum,rd_kafka_message_t * rkmessage)242 static void verify_consumed_msg0 (const char *func, int line,
243 				  uint64_t testid, int32_t partition,
244 				  int msgnum,
245 				  rd_kafka_message_t *rkmessage) {
246 	uint64_t in_testid;
247 	int in_part;
248 	int in_msgnum;
249 	char buf[1024];
250 
251 	if (rkmessage->len +1 >= sizeof(buf))
252 		TEST_FAIL("Incoming message too large (%i): "
253 			  "not sourced by this test",
254 			  (int)rkmessage->len);
255 
256 	rd_snprintf(buf, sizeof(buf), "%.*s",
257 		 (int)rkmessage->len, (char *)rkmessage->payload);
258 
259 	if (sscanf(buf, "testid=%"SCNu64", partition=%i, msg=%i",
260 		   &in_testid, &in_part, &in_msgnum) != 3)
261 		TEST_FAIL("Incorrect message format: %s", buf);
262 
263 	if (test_level > 2) {
264 		TEST_SAY("%s:%i: Our testid %"PRIu64", part %i =? %i, "
265 			 "msg %i =? %i "
266 			 ", message's: \"%s\"\n",
267 			 func, line,
268 			 testid, (int)partition, (int)rkmessage->partition,
269 			 msgnum, in_msgnum, buf);
270 	}
271 
272 	if (testid != in_testid ||
273 	    (partition != -1 && partition != in_part) ||
274 	    (msgnum != -1 && msgnum != in_msgnum) ||
275 	    (in_msgnum < 0 || in_msgnum > cons_msgs_size))
276 		goto fail_match;
277 
278 	if (cons_msgs_cnt == cons_msgs_size) {
279 		TEST_SAY("Too many messages in cons_msgs (%i) while reading "
280 			 "message \"%s\"\n",
281 			 cons_msgs_cnt, buf);
282 		verify_consumed_msg_check();
283 		TEST_FAIL("See above error(s)");
284 	}
285 
286 	cons_msgs[cons_msgs_cnt++] = in_msgnum;
287 
288 	return;
289 
290  fail_match:
291 	TEST_FAIL("%s:%i: Our testid %"PRIu64", part %i, msg %i/%i did "
292 		  "not match message's: \"%s\"\n",
293 		  func, line,
294 		  testid, (int)partition, msgnum, cons_msgs_size, buf);
295 }
296 
297 #define verify_consumed_msg(testid,part,msgnum,rkmessage) \
298 	verify_consumed_msg0(__FUNCTION__,__LINE__,testid,part,msgnum,rkmessage)
299 
300 
consume_messages(uint64_t testid,const char * topic,int32_t partition,int msg_base,int batch_cnt,int msgcnt)301 static void consume_messages (uint64_t testid, const char *topic,
302 			      int32_t partition, int msg_base, int batch_cnt,
303 			      int msgcnt) {
304 	rd_kafka_t *rk;
305 	rd_kafka_topic_t *rkt;
306 	rd_kafka_conf_t *conf;
307 	rd_kafka_topic_conf_t *topic_conf;
308 	int i;
309 
310 	test_conf_init(&conf, &topic_conf, 20);
311 
312 	/* Create kafka instance */
313 	rk = test_create_handle(RD_KAFKA_CONSUMER, conf);
314 
315 	TEST_SAY("Created    kafka instance %s\n", rd_kafka_name(rk));
316 
317 	rkt = rd_kafka_topic_new(rk, topic, topic_conf);
318 	if (!rkt)
319 		TEST_FAIL("Failed to create topic: %s\n",
320 			  rd_strerror(errno));
321 
322 	TEST_SAY("Consuming %i messages from partition %i\n",
323 		 batch_cnt, partition);
324 
325 	/* Consume messages */
326 	if (rd_kafka_consume_start(rkt, partition,
327 			     RD_KAFKA_OFFSET_TAIL(batch_cnt)) == -1)
328 		TEST_FAIL("consume_start(%i, -%i) failed: %s",
329 			  (int)partition, batch_cnt,
330 			  rd_kafka_err2str(rd_kafka_last_error()));
331 
332 	for (i = 0 ; i < batch_cnt ; ) {
333 		rd_kafka_message_t *rkmessage;
334 
335 		rkmessage = rd_kafka_consume(rkt, partition,
336                                              tmout_multip(5000));
337 		if (!rkmessage)
338 			TEST_FAIL("Failed to consume message %i/%i from "
339 				  "partition %i: %s",
340 				  i, batch_cnt, (int)partition,
341 				  rd_kafka_err2str(rd_kafka_last_error()));
342 		if (rkmessage->err) {
343                         if (rkmessage->err == RD_KAFKA_RESP_ERR__PARTITION_EOF){
344                                 rd_kafka_message_destroy(rkmessage);
345                                 continue;
346                         }
347 			TEST_FAIL("Consume message %i/%i from partition %i "
348 				  "has error: %s: %s",
349 				  i, batch_cnt, (int)partition,
350 				  rd_kafka_err2str(rkmessage->err),
351                                   rd_kafka_message_errstr(rkmessage));
352                 }
353 
354 		verify_consumed_msg(testid, partition, msg_base+i, rkmessage);
355 
356 		rd_kafka_message_destroy(rkmessage);
357                 i++;
358 	}
359 
360 	rd_kafka_consume_stop(rkt, partition);
361 
362 	/* Destroy topic */
363 	rd_kafka_topic_destroy(rkt);
364 
365 	/* Destroy rdkafka instance */
366 	TEST_SAY("Destroying kafka instance %s\n", rd_kafka_name(rk));
367 	rd_kafka_destroy(rk);
368 }
369 
370 
consume_messages_with_queues(uint64_t testid,const char * topic,int partition_cnt,int msgcnt)371 static void consume_messages_with_queues (uint64_t testid, const char *topic,
372 					  int partition_cnt, int msgcnt) {
373 	rd_kafka_t *rk;
374 	rd_kafka_topic_t *rkt;
375 	rd_kafka_conf_t *conf;
376 	rd_kafka_topic_conf_t *topic_conf;
377 	rd_kafka_queue_t *rkqu;
378 	int i;
379 	int32_t partition;
380 	int batch_cnt = msgcnt / partition_cnt;
381 
382 	test_conf_init(&conf, &topic_conf, 20);
383 
384         test_conf_set(conf, "enable.partition.eof", "true");
385 
386 	/* Create kafka instance */
387 	rk = test_create_handle(RD_KAFKA_CONSUMER, conf);
388 
389 	/* Create queue */
390 	rkqu = rd_kafka_queue_new(rk);
391 
392 
393 	rkt = rd_kafka_topic_new(rk, topic, topic_conf);
394 	if (!rkt)
395 		TEST_FAIL("Failed to create topic: %s\n",
396 			  rd_strerror(errno));
397 
398 	TEST_SAY("Consuming %i messages from one queue serving %i partitions\n",
399 		 msgcnt, partition_cnt);
400 
401 	/* Start consuming each partition */
402 	for (partition = 0 ; partition < partition_cnt ; partition++) {
403 		/* Consume messages */
404 		TEST_SAY("Start consuming partition %i at offset -%i\n",
405 			 partition, batch_cnt);
406 		if (rd_kafka_consume_start_queue(rkt, partition,
407 						 RD_KAFKA_OFFSET_TAIL(batch_cnt),
408 						 rkqu) == -1)
409 			TEST_FAIL("consume_start_queue(%i) failed: %s",
410 				  (int)partition,
411 				  rd_kafka_err2str(rd_kafka_last_error()));
412 	}
413 
414 
415 	/* Consume messages from queue */
416 	for (i = 0 ; i < msgcnt ; ) {
417 		rd_kafka_message_t *rkmessage;
418 
419 		rkmessage = rd_kafka_consume_queue(rkqu, tmout_multip(5000));
420 		if (!rkmessage)
421 			TEST_FAIL("Failed to consume message %i/%i from "
422 				  "queue: %s",
423 				  i, msgcnt,
424 				  rd_kafka_err2str(rd_kafka_last_error()));
425 		if (rkmessage->err) {
426                         if (rkmessage->err == RD_KAFKA_RESP_ERR__PARTITION_EOF){
427 				TEST_SAY("Topic %s [%"PRId32"] reached "
428 					 "EOF at offset %"PRId64"\n",
429 					 rd_kafka_topic_name(rkmessage->rkt),
430 					 rkmessage->partition,
431 					 rkmessage->offset);
432                                 rd_kafka_message_destroy(rkmessage);
433 				continue;
434                         }
435 			TEST_FAIL("Consume message %i/%i from queue "
436 				  "has error (offset %"PRId64
437                                   ", partition %"PRId32"): %s",
438 				  i, msgcnt,
439 				  rkmessage->offset, rkmessage->partition,
440 				  rd_kafka_err2str(rkmessage->err));
441                 }
442 
443 		verify_consumed_msg(testid, -1, -1, rkmessage);
444 
445 		rd_kafka_message_destroy(rkmessage);
446                 i++;
447 	}
448 
449 	/* Stop consuming each partition */
450 	for (partition = 0 ; partition < partition_cnt ; partition++)
451 		rd_kafka_consume_stop(rkt, partition);
452 
453 	/* Destroy queue */
454 	rd_kafka_queue_destroy(rkqu);
455 
456 	/* Destroy topic */
457 	rd_kafka_topic_destroy(rkt);
458 
459 	/* Destroy rdkafka instance */
460 	TEST_SAY("Destroying kafka instance %s\n", rd_kafka_name(rk));
461 	rd_kafka_destroy(rk);
462 }
463 
464 
465 /**
466  * Produce to two partitions.
467  * Consume with standard interface from both, one after the other.
468  * Consume with queue interface from both, simultanously.
469  */
test_produce_consume(void)470 static void test_produce_consume (void) {
471         int msgcnt = test_quick ? 100 : 1000;
472 	int partition_cnt = 2;
473 	int i;
474 	uint64_t testid;
475 	int msg_base = 0;
476         const char *topic;
477 
478 	/* Generate a testid so we can differentiate messages
479 	 * from other tests */
480 	testid = test_id_generate();
481 
482         /* Read test.conf to configure topic name */
483         test_conf_init(NULL, NULL, 20);
484         topic = test_mk_topic_name("0012", 1);
485 
486 	TEST_SAY("Topic %s, testid %"PRIu64"\n", topic, testid);
487 
488 	/* Produce messages */
489 	produce_messages(testid, topic, partition_cnt, msgcnt);
490 
491 
492 	/* Consume messages with standard interface */
493 	verify_consumed_msg_reset(msgcnt);
494 	for (i = 0 ; i < partition_cnt ; i++) {
495 		consume_messages(testid, topic, i,
496 				 msg_base, msgcnt / partition_cnt, msgcnt);
497 		msg_base += msgcnt / partition_cnt;
498 	}
499 	verify_consumed_msg_check();
500 
501 	/* Consume messages with queue interface */
502 	verify_consumed_msg_reset(msgcnt);
503 	consume_messages_with_queues(testid, topic, partition_cnt, msgcnt);
504 	verify_consumed_msg_check();
505 
506 	return;
507 }
508 
509 
510 
511 
main_0012_produce_consume(int argc,char ** argv)512 int main_0012_produce_consume (int argc, char **argv) {
513 	test_produce_consume();
514 	return 0;
515 }
516