1 /*
2  * librdkafka - Apache Kafka C library
3  *
4  * Copyright (c) 2012-2015, Magnus Edenhill
5  * All rights reserved.
6  *
7  * Redistribution and use in source and binary forms, with or without
8  * modification, are permitted provided that the following conditions are met:
9  *
10  * 1. Redistributions of source code must retain the above copyright notice,
11  *    this list of conditions and the following disclaimer.
12  * 2. Redistributions in binary form must reproduce the above copyright notice,
13  *    this list of conditions and the following disclaimer in the documentation
14  *    and/or other materials provided with the distribution.
15  *
16  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
17  * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
18  * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
19  * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
20  * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
21  * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
22  * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
23  * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
24  * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
25  * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
26  * POSSIBILITY OF SUCH DAMAGE.
27  */
28 
29 #include "test.h"
30 
31 /* Typical include path would be <librdkafka/rdkafka.h>, but this program
32  * is built from within the librdkafka source tree and thus differs. */
33 #include "rdkafka.h"  /* for Kafka driver */
34 
35 static int prod_msg_remains = 0;
36 static int fails = 0;
37 
38 /**
39  * Delivery reported callback.
40  * Called for each message once to signal its delivery status.
41  */
dr_cb(rd_kafka_t * rk,void * payload,size_t len,rd_kafka_resp_err_t err,void * opaque,void * msg_opaque)42 static void dr_cb (rd_kafka_t *rk, void *payload, size_t len,
43 		   rd_kafka_resp_err_t err, void *opaque, void *msg_opaque) {
44 
45 	if (err != RD_KAFKA_RESP_ERR_NO_ERROR)
46 		TEST_FAIL("Message delivery failed: %s\n",
47 			  rd_kafka_err2str(err));
48 
49 	if (prod_msg_remains == 0)
50 		TEST_FAIL("Too many messages delivered (prod_msg_remains %i)",
51 			  prod_msg_remains);
52 
53 	prod_msg_remains--;
54 }
55 
56 
57 /**
58  * Produces 'msgcnt' messages split over 'partition_cnt' partitions.
59  */
produce_messages(uint64_t testid,const char * topic,int partition_cnt,int msg_base,int msgcnt)60 static void produce_messages (uint64_t testid, const char *topic,
61                               int partition_cnt, int msg_base, int msgcnt) {
62 	int r;
63 	rd_kafka_t *rk;
64 	rd_kafka_topic_t *rkt;
65 	rd_kafka_conf_t *conf;
66 	rd_kafka_topic_conf_t *topic_conf;
67 	char errstr[512];
68 	int i;
69 	int32_t partition;
70 	int msgid = msg_base;
71 
72 	test_conf_init(&conf, &topic_conf, 20);
73 
74 	rd_kafka_conf_set_dr_cb(conf, dr_cb);
75 
76         /* Make sure all replicas are in-sync after producing
77          * so that consume test wont fail. */
78         rd_kafka_topic_conf_set(topic_conf, "request.required.acks", "-1",
79                                 errstr, sizeof(errstr));
80 
81 	/* Create kafka instance */
82 	rk = test_create_handle(RD_KAFKA_PRODUCER, conf);
83 
84 	rkt = rd_kafka_topic_new(rk, topic, topic_conf);
85 	if (!rkt)
86 		TEST_FAIL("Failed to create topic: %s\n",
87                           rd_kafka_err2str(rd_kafka_last_error()));
88 
89         /* Produce messages */
90 	prod_msg_remains = msgcnt;
91 	for (partition = 0 ; partition < partition_cnt ; partition++) {
92 		int batch_cnt = msgcnt / partition_cnt;
93 
94 		for (i = 0 ; i < batch_cnt ; i++) {
95                         char key[128];
96                         char buf[128];
97 			rd_snprintf(key, sizeof(key),
98 				 "testid=%"PRIu64", partition=%i, msg=%i",
99 				 testid, (int)partition, msgid);
100                         rd_snprintf(buf, sizeof(buf),
101                                  "data: testid=%"PRIu64", partition=%i, msg=%i",
102 				 testid, (int)partition, msgid);
103 
104                         r = rd_kafka_produce(rkt, partition,
105                                              RD_KAFKA_MSG_F_COPY,
106                                              buf, strlen(buf),
107                                              key, strlen(key),
108                                              NULL);
109                         if (r == -1)
110                                 TEST_FAIL("Failed to produce message %i "
111                                           "to partition %i: %s",
112                                           msgid, (int)partition,
113                                           rd_kafka_err2str(rd_kafka_last_error()));
114 			msgid++;
115 		}
116         }
117 
118 
119 	/* Wait for messages to be delivered */
120 	while (rd_kafka_outq_len(rk) > 0)
121 		rd_kafka_poll(rk, 100);
122 
123 	if (fails)
124 		TEST_FAIL("%i failures, see previous errors", fails);
125 
126 	if (prod_msg_remains != 0)
127 		TEST_FAIL("Still waiting for %i messages to be produced",
128 			  prod_msg_remains);
129 
130 	/* Destroy topic */
131 	rd_kafka_topic_destroy(rkt);
132 
133 	/* Destroy rdkafka instance */
134 	TEST_SAY("Destroying kafka instance %s\n", rd_kafka_name(rk));
135 	rd_kafka_destroy(rk);
136 }
137 
138 
139 
140 static int *cons_msgs;
141 static int  cons_msgs_size;
142 static int  cons_msgs_cnt;
143 static int  cons_msg_next;
144 static int  cons_msg_stop = -1;
145 static int64_t cons_last_offset = -1;  /* last offset received */
146 
verify_consumed_msg_reset(int msgcnt)147 static void verify_consumed_msg_reset (int msgcnt) {
148 	if (cons_msgs) {
149 		free(cons_msgs);
150 		cons_msgs = NULL;
151 	}
152 
153 	if (msgcnt) {
154 		int i;
155 
156 		cons_msgs = malloc(sizeof(*cons_msgs) * msgcnt);
157 		for (i = 0 ; i < msgcnt ; i++)
158 			cons_msgs[i] = -1;
159 	}
160 
161 	cons_msgs_size = msgcnt;
162 	cons_msgs_cnt = 0;
163         cons_msg_next = 0;
164         cons_msg_stop = -1;
165         cons_last_offset = -1;
166 
167         TEST_SAY("Reset consumed_msg stats, making room for %d new messages\n",
168                  msgcnt);
169 }
170 
171 
int_cmp(const void * _a,const void * _b)172 static int int_cmp (const void *_a, const void *_b) {
173 	int a = *(int *)_a;
174 	int b = *(int *)_b;
175         /* Sort -1 (non-received msgs) at the end */
176 	return (a == -1 ? 100000000 : a) - (b == -1 ? 10000000 : b);
177 }
178 
verify_consumed_msg_check0(const char * func,int line,const char * desc,int expected_cnt)179 static void verify_consumed_msg_check0 (const char *func, int line,
180                                         const char *desc,
181                                         int expected_cnt) {
182 	int i;
183 	int fails = 0;
184         int not_recvd = 0;
185 
186         TEST_SAY("%s: received %d/%d/%d messages\n",
187                  desc, cons_msgs_cnt, expected_cnt, cons_msgs_size);
188         if (expected_cnt > cons_msgs_size)
189                 TEST_FAIL("expected_cnt %d > cons_msgs_size %d\n",
190                           expected_cnt, cons_msgs_size);
191 
192 	if (cons_msgs_cnt < expected_cnt) {
193 		TEST_SAY("%s: Missing %i messages in consumer\n",
194 			 desc,expected_cnt - cons_msgs_cnt);
195 		fails++;
196 	}
197 
198 	qsort(cons_msgs, cons_msgs_size, sizeof(*cons_msgs), int_cmp);
199 
200 	for (i = 0 ; i < expected_cnt ; i++) {
201 		if (cons_msgs[i] != i) {
202                         if (cons_msgs[i] == -1) {
203                                 not_recvd++;
204                                 TEST_SAY("%s: msg %d/%d not received\n",
205                                          desc, i, expected_cnt);
206                         } else
207                                 TEST_SAY("%s: Consumed message #%i is wrong, "
208                                          "expected #%i\n",
209                                          desc, cons_msgs[i], i);
210 			fails++;
211 		}
212 	}
213 
214         if (not_recvd)
215                 TEST_SAY("%s: %d messages not received at all\n",
216                          desc, not_recvd);
217 
218 	if (fails)
219 		TEST_FAIL("%s: See above error(s)", desc);
220         else
221                 TEST_SAY("%s: message range check: %d/%d messages consumed: "
222                          "succeeded\n", desc, cons_msgs_cnt, expected_cnt);
223 
224 }
225 
226 
227 #define verify_consumed_msg_check(desc,expected_cnt)                        \
228 	verify_consumed_msg_check0(__FUNCTION__,__LINE__, desc, expected_cnt)
229 
230 
231 
verify_consumed_msg0(const char * func,int line,uint64_t testid,int32_t partition,int msgnum,rd_kafka_message_t * rkmessage)232 static void verify_consumed_msg0 (const char *func, int line,
233 				  uint64_t testid, int32_t partition,
234 				  int msgnum,
235 				  rd_kafka_message_t *rkmessage) {
236 	uint64_t in_testid;
237 	int in_part;
238 	int in_msgnum;
239 	char buf[128];
240 
241 	if (rkmessage->key_len +1 >= sizeof(buf))
242 		TEST_FAIL("Incoming message key too large (%i): "
243 			  "not sourced by this test",
244 			  (int)rkmessage->key_len);
245 
246 	rd_snprintf(buf, sizeof(buf), "%.*s",
247 		 (int)rkmessage->key_len, (char *)rkmessage->key);
248 
249 	if (sscanf(buf, "testid=%"SCNu64", partition=%i, msg=%i",
250 		   &in_testid, &in_part, &in_msgnum) != 3)
251 		TEST_FAIL("Incorrect key format: %s", buf);
252 
253         if (test_level > 2) {
254 		TEST_SAY("%s:%i: Our testid %"PRIu64", part %i (%i), "
255 			 "msg %i/%i, key's: \"%s\"\n",
256 			 func, line,
257 			 testid, (int)partition, (int)rkmessage->partition,
258 			 msgnum, cons_msgs_size, buf);
259 	}
260 
261 	if (testid != in_testid ||
262 	    (partition != -1 && partition != in_part) ||
263 	    (msgnum != -1 && msgnum != in_msgnum) ||
264 	    (in_msgnum < 0 || in_msgnum > cons_msgs_size))
265 		goto fail_match;
266 
267 	if (cons_msgs_cnt == cons_msgs_size) {
268 		TEST_SAY("Too many messages in cons_msgs (%i) while reading "
269 			 "message key \"%s\"\n",
270 			 cons_msgs_cnt, buf);
271 		verify_consumed_msg_check("?", cons_msgs_size);
272 		TEST_FAIL("See above error(s)");
273 	}
274 
275 	cons_msgs[cons_msgs_cnt++] = in_msgnum;
276         cons_last_offset = rkmessage->offset;
277 
278 	return;
279 
280  fail_match:
281 	TEST_FAIL("%s:%i: Our testid %"PRIu64", part %i, msg %i/%i did "
282 		  "not match message's key: \"%s\"\n",
283 		  func, line,
284 		  testid, (int)partition, msgnum, cons_msgs_size, buf);
285 }
286 
287 #define verify_consumed_msg(testid,part,msgnum,rkmessage) \
288 	verify_consumed_msg0(__FUNCTION__,__LINE__,testid,part,msgnum,rkmessage)
289 
290 
consume_cb(rd_kafka_message_t * rkmessage,void * opaque)291 static void consume_cb (rd_kafka_message_t *rkmessage, void *opaque) {
292         int64_t testid = *(int64_t *)opaque;
293 
294 	if (test_level > 2)
295 		TEST_SAY("Consumed message #%d? at offset %"PRId64": %s\n",
296 			 cons_msg_next, rkmessage->offset,
297 			 rd_kafka_err2str(rkmessage->err));
298 
299         if (rkmessage->err == RD_KAFKA_RESP_ERR__PARTITION_EOF) {
300                 TEST_SAY("EOF at offset %"PRId64"\n", rkmessage->offset);
301                 return;
302         }
303 
304         if (rkmessage->err)
305                 TEST_FAIL("Consume message from partition %i "
306                           "has error: %s",
307                           (int)rkmessage->partition,
308                           rd_kafka_err2str(rkmessage->err));
309 
310         verify_consumed_msg(testid, rkmessage->partition,
311                             cons_msg_next, rkmessage);
312 
313         if (cons_msg_next == cons_msg_stop) {
314                 rd_kafka_yield(NULL/*FIXME*/);
315         }
316 
317         cons_msg_next++;
318 }
319 
consume_messages_callback_multi(const char * desc,uint64_t testid,const char * topic,int32_t partition,const char * offset_store_method,int msg_base,int msg_cnt,int64_t initial_offset,int iterations)320 static void consume_messages_callback_multi (const char *desc,
321                                              uint64_t testid, const char *topic,
322                                              int32_t partition,
323                                              const char *offset_store_method,
324                                              int msg_base,
325                                              int msg_cnt,
326                                              int64_t initial_offset,
327                                              int iterations) {
328 	rd_kafka_t *rk;
329 	rd_kafka_topic_t *rkt;
330 	rd_kafka_conf_t *conf;
331 	rd_kafka_topic_conf_t *topic_conf;
332 	int i;
333 
334         TEST_SAY("%s: Consume messages %d+%d from %s [%"PRId32"] "
335                  "from offset %"PRId64" in %d iterations\n",
336                  desc, msg_base, msg_cnt, topic, partition,
337                  initial_offset, iterations);
338 
339 	test_conf_init(&conf, &topic_conf, 20);
340 
341         test_topic_conf_set(topic_conf, "offset.store.method",
342                             offset_store_method);
343 
344         if (!strcmp(offset_store_method, "broker")) {
345                 /* Broker based offset storage requires a group.id */
346                 test_conf_set(conf, "group.id", topic);
347         }
348 
349         test_conf_set(conf, "enable.partition.eof", "true");
350 
351 	/* Create kafka instance */
352 	rk = test_create_handle(RD_KAFKA_CONSUMER, conf);
353 
354         rd_kafka_topic_conf_set(topic_conf, "auto.offset.reset", "smallest",
355                                 NULL, 0);
356 
357 	rkt = rd_kafka_topic_new(rk, topic, topic_conf);
358 	if (!rkt)
359 		TEST_FAIL("%s: Failed to create topic: %s\n",
360                           desc, rd_kafka_err2str(rd_kafka_last_error()));
361 
362 	cons_msg_stop = cons_msg_next + msg_cnt - 1;
363 
364         /* Consume the same batch of messages multiple times to
365          * make sure back-to-back start&stops work. */
366         for (i = 0 ; i < iterations ; i++) {
367                 int cnta;
368                 test_timing_t t_stop;
369 
370                 TEST_SAY("%s: Iteration #%i: Consuming from "
371                          "partition %i at offset %"PRId64", "
372                          "msgs range %d..%d\n",
373                          desc, i, partition, initial_offset,
374                          cons_msg_next, cons_msg_stop);
375 
376                 /* Consume messages */
377                 if (rd_kafka_consume_start(rkt, partition, initial_offset) == -1)
378                         TEST_FAIL("%s: consume_start(%i) failed: %s",
379                                   desc, (int)partition,
380                                   rd_kafka_err2str(rd_kafka_last_error()));
381 
382 
383                 /* Stop consuming messages when this number of messages
384                  * is reached. */
385                 cnta = cons_msg_next;
386                 do {
387                         rd_kafka_consume_callback(rkt, partition, 1000,
388                                                   consume_cb, &testid);
389                 } while (cons_msg_next < cons_msg_stop);
390 
391                 TEST_SAY("%s: Iteration #%i: consumed %i messages\n",
392                          desc, i, cons_msg_next - cnta);
393 
394                 TIMING_START(&t_stop, "rd_kafka_consume_stop()");
395                 rd_kafka_consume_stop(rkt, partition);
396                 TIMING_STOP(&t_stop);
397 
398                 /* Advance next offset so we dont reconsume
399                  * messages on the next run. */
400                 if (initial_offset != RD_KAFKA_OFFSET_STORED) {
401                         initial_offset = cons_last_offset+1;
402 			cons_msg_stop = cons_msg_next + msg_cnt - 1;
403 		}
404         }
405 
406 	/* Destroy topic */
407 	rd_kafka_topic_destroy(rkt);
408 
409 	/* Destroy rdkafka instance */
410 	TEST_SAY("%s: Destroying kafka instance %s\n", desc, rd_kafka_name(rk));
411 	rd_kafka_destroy(rk);
412 }
413 
414 
415 
test_produce_consume(const char * offset_store_method)416 static void test_produce_consume (const char *offset_store_method) {
417 	int msgcnt = 100;
418         int partition_cnt = 1;
419 	int i;
420 	uint64_t testid;
421 	int msg_base = 0;
422         const char *topic;
423 
424 	/* Generate a testid so we can differentiate messages
425 	 * from other tests */
426 	testid = test_id_generate();
427 
428         /* Read test.conf to configure topic name */
429         test_conf_init(NULL, NULL, 20);
430         topic = test_mk_topic_name("0014", 1/*random*/);
431 
432 	TEST_SAY("Topic %s, testid %"PRIu64", offset.store.method=%s\n",
433                  topic, testid, offset_store_method);
434 
435 	/* Produce messages */
436 	produce_messages(testid, topic, partition_cnt, msg_base, msgcnt);
437 
438         /* 100% of messages */
439         verify_consumed_msg_reset(msgcnt);
440 
441 	/* Consume 50% of messages with callbacks: stored offsets with no prior
442          * offset stored. */
443 	for (i = 0 ; i < partition_cnt ; i++)
444 		consume_messages_callback_multi("STORED.1/2", testid, topic, i,
445                                                 offset_store_method,
446                                                 msg_base,
447                                                 (msgcnt / partition_cnt) / 2,
448                                                 RD_KAFKA_OFFSET_STORED,
449                                                 1);
450         verify_consumed_msg_check("STORED.1/2", msgcnt / 2);
451 
452         /* Consume the rest using the now stored offset */
453         for (i = 0 ; i < partition_cnt ; i++)
454 		consume_messages_callback_multi("STORED.2/2", testid, topic, i,
455                                                 offset_store_method,
456                                                 msg_base,
457                                                 (msgcnt / partition_cnt) / 2,
458                                                 RD_KAFKA_OFFSET_STORED,
459                                                 1);
460         verify_consumed_msg_check("STORED.2/2", msgcnt);
461 
462 
463 	/* Consume messages with callbacks: logical offsets */
464 	verify_consumed_msg_reset(msgcnt);
465 	for (i = 0 ; i < partition_cnt ; i++) {
466                 int p_msg_cnt = msgcnt / partition_cnt;
467                 int64_t initial_offset = RD_KAFKA_OFFSET_TAIL(p_msg_cnt);
468                 const int iterations = 4;
469 		consume_messages_callback_multi("TAIL+", testid, topic, i,
470                                                 offset_store_method,
471                                                 /* start here (msgid) */
472                                                 msg_base,
473                                                 /* consume this many messages
474                                                  * per iteration. */
475                                                 p_msg_cnt / iterations,
476                                                 /* start here (offset) */
477                                                 initial_offset,
478                                                 iterations);
479         }
480 
481         verify_consumed_msg_check("TAIL+", msgcnt);
482 
483         verify_consumed_msg_reset(0);
484 
485 	return;
486 }
487 
488 
489 
490 
main_0014_reconsume_191(int argc,char ** argv)491 int main_0014_reconsume_191 (int argc, char **argv) {
492 	if (test_broker_version >= TEST_BRKVER(0,8,2,0))
493 		test_produce_consume("broker");
494         test_produce_consume("file");
495 	return 0;
496 }
497