1 /*
2 * librdkafka - Apache Kafka C library
3 *
4 * Copyright (c) 2012-2015, Magnus Edenhill
5 * All rights reserved.
6 *
7 * Redistribution and use in source and binary forms, with or without
8 * modification, are permitted provided that the following conditions are met:
9 *
10 * 1. Redistributions of source code must retain the above copyright notice,
11 * this list of conditions and the following disclaimer.
12 * 2. Redistributions in binary form must reproduce the above copyright notice,
13 * this list of conditions and the following disclaimer in the documentation
14 * and/or other materials provided with the distribution.
15 *
16 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
17 * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
18 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
19 * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
20 * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
21 * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
22 * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
23 * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
24 * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
25 * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
26 * POSSIBILITY OF SUCH DAMAGE.
27 */
28
29 #include "test.h"
30
31 /* Typical include path would be <librdkafka/rdkafka.h>, but this program
32 * is built from within the librdkafka source tree and thus differs. */
33 #include "rdkafka.h" /* for Kafka driver */
34
35 static int prod_msg_remains = 0;
36 static int fails = 0;
37
38 /**
39 * Delivery reported callback.
40 * Called for each message once to signal its delivery status.
41 */
dr_cb(rd_kafka_t * rk,void * payload,size_t len,rd_kafka_resp_err_t err,void * opaque,void * msg_opaque)42 static void dr_cb (rd_kafka_t *rk, void *payload, size_t len,
43 rd_kafka_resp_err_t err, void *opaque, void *msg_opaque) {
44
45 if (err != RD_KAFKA_RESP_ERR_NO_ERROR)
46 TEST_FAIL("Message delivery failed: %s\n",
47 rd_kafka_err2str(err));
48
49 if (prod_msg_remains == 0)
50 TEST_FAIL("Too many messages delivered (prod_msg_remains %i)",
51 prod_msg_remains);
52
53 prod_msg_remains--;
54 }
55
56
57 /**
58 * Produces 'msgcnt' messages split over 'partition_cnt' partitions.
59 */
produce_messages(uint64_t testid,const char * topic,int partition_cnt,int msg_base,int msgcnt)60 static void produce_messages (uint64_t testid, const char *topic,
61 int partition_cnt, int msg_base, int msgcnt) {
62 int r;
63 rd_kafka_t *rk;
64 rd_kafka_topic_t *rkt;
65 rd_kafka_conf_t *conf;
66 rd_kafka_topic_conf_t *topic_conf;
67 char errstr[512];
68 int i;
69 int32_t partition;
70 int msgid = msg_base;
71
72 test_conf_init(&conf, &topic_conf, 20);
73
74 rd_kafka_conf_set_dr_cb(conf, dr_cb);
75
76 /* Make sure all replicas are in-sync after producing
77 * so that consume test wont fail. */
78 rd_kafka_topic_conf_set(topic_conf, "request.required.acks", "-1",
79 errstr, sizeof(errstr));
80
81 /* Create kafka instance */
82 rk = test_create_handle(RD_KAFKA_PRODUCER, conf);
83
84 rkt = rd_kafka_topic_new(rk, topic, topic_conf);
85 if (!rkt)
86 TEST_FAIL("Failed to create topic: %s\n",
87 rd_kafka_err2str(rd_kafka_last_error()));
88
89 /* Produce messages */
90 prod_msg_remains = msgcnt;
91 for (partition = 0 ; partition < partition_cnt ; partition++) {
92 int batch_cnt = msgcnt / partition_cnt;
93
94 for (i = 0 ; i < batch_cnt ; i++) {
95 char key[128];
96 char buf[128];
97 rd_snprintf(key, sizeof(key),
98 "testid=%"PRIu64", partition=%i, msg=%i",
99 testid, (int)partition, msgid);
100 rd_snprintf(buf, sizeof(buf),
101 "data: testid=%"PRIu64", partition=%i, msg=%i",
102 testid, (int)partition, msgid);
103
104 r = rd_kafka_produce(rkt, partition,
105 RD_KAFKA_MSG_F_COPY,
106 buf, strlen(buf),
107 key, strlen(key),
108 NULL);
109 if (r == -1)
110 TEST_FAIL("Failed to produce message %i "
111 "to partition %i: %s",
112 msgid, (int)partition,
113 rd_kafka_err2str(rd_kafka_last_error()));
114 msgid++;
115 }
116 }
117
118
119 /* Wait for messages to be delivered */
120 while (rd_kafka_outq_len(rk) > 0)
121 rd_kafka_poll(rk, 100);
122
123 if (fails)
124 TEST_FAIL("%i failures, see previous errors", fails);
125
126 if (prod_msg_remains != 0)
127 TEST_FAIL("Still waiting for %i messages to be produced",
128 prod_msg_remains);
129
130 /* Destroy topic */
131 rd_kafka_topic_destroy(rkt);
132
133 /* Destroy rdkafka instance */
134 TEST_SAY("Destroying kafka instance %s\n", rd_kafka_name(rk));
135 rd_kafka_destroy(rk);
136 }
137
138
139
140 static int *cons_msgs;
141 static int cons_msgs_size;
142 static int cons_msgs_cnt;
143 static int cons_msg_next;
144 static int cons_msg_stop = -1;
145 static int64_t cons_last_offset = -1; /* last offset received */
146
verify_consumed_msg_reset(int msgcnt)147 static void verify_consumed_msg_reset (int msgcnt) {
148 if (cons_msgs) {
149 free(cons_msgs);
150 cons_msgs = NULL;
151 }
152
153 if (msgcnt) {
154 int i;
155
156 cons_msgs = malloc(sizeof(*cons_msgs) * msgcnt);
157 for (i = 0 ; i < msgcnt ; i++)
158 cons_msgs[i] = -1;
159 }
160
161 cons_msgs_size = msgcnt;
162 cons_msgs_cnt = 0;
163 cons_msg_next = 0;
164 cons_msg_stop = -1;
165 cons_last_offset = -1;
166
167 TEST_SAY("Reset consumed_msg stats, making room for %d new messages\n",
168 msgcnt);
169 }
170
171
int_cmp(const void * _a,const void * _b)172 static int int_cmp (const void *_a, const void *_b) {
173 int a = *(int *)_a;
174 int b = *(int *)_b;
175 /* Sort -1 (non-received msgs) at the end */
176 return (a == -1 ? 100000000 : a) - (b == -1 ? 10000000 : b);
177 }
178
verify_consumed_msg_check0(const char * func,int line,const char * desc,int expected_cnt)179 static void verify_consumed_msg_check0 (const char *func, int line,
180 const char *desc,
181 int expected_cnt) {
182 int i;
183 int fails = 0;
184 int not_recvd = 0;
185
186 TEST_SAY("%s: received %d/%d/%d messages\n",
187 desc, cons_msgs_cnt, expected_cnt, cons_msgs_size);
188 if (expected_cnt > cons_msgs_size)
189 TEST_FAIL("expected_cnt %d > cons_msgs_size %d\n",
190 expected_cnt, cons_msgs_size);
191
192 if (cons_msgs_cnt < expected_cnt) {
193 TEST_SAY("%s: Missing %i messages in consumer\n",
194 desc,expected_cnt - cons_msgs_cnt);
195 fails++;
196 }
197
198 qsort(cons_msgs, cons_msgs_size, sizeof(*cons_msgs), int_cmp);
199
200 for (i = 0 ; i < expected_cnt ; i++) {
201 if (cons_msgs[i] != i) {
202 if (cons_msgs[i] == -1) {
203 not_recvd++;
204 TEST_SAY("%s: msg %d/%d not received\n",
205 desc, i, expected_cnt);
206 } else
207 TEST_SAY("%s: Consumed message #%i is wrong, "
208 "expected #%i\n",
209 desc, cons_msgs[i], i);
210 fails++;
211 }
212 }
213
214 if (not_recvd)
215 TEST_SAY("%s: %d messages not received at all\n",
216 desc, not_recvd);
217
218 if (fails)
219 TEST_FAIL("%s: See above error(s)", desc);
220 else
221 TEST_SAY("%s: message range check: %d/%d messages consumed: "
222 "succeeded\n", desc, cons_msgs_cnt, expected_cnt);
223
224 }
225
226
227 #define verify_consumed_msg_check(desc,expected_cnt) \
228 verify_consumed_msg_check0(__FUNCTION__,__LINE__, desc, expected_cnt)
229
230
231
verify_consumed_msg0(const char * func,int line,uint64_t testid,int32_t partition,int msgnum,rd_kafka_message_t * rkmessage)232 static void verify_consumed_msg0 (const char *func, int line,
233 uint64_t testid, int32_t partition,
234 int msgnum,
235 rd_kafka_message_t *rkmessage) {
236 uint64_t in_testid;
237 int in_part;
238 int in_msgnum;
239 char buf[128];
240
241 if (rkmessage->key_len +1 >= sizeof(buf))
242 TEST_FAIL("Incoming message key too large (%i): "
243 "not sourced by this test",
244 (int)rkmessage->key_len);
245
246 rd_snprintf(buf, sizeof(buf), "%.*s",
247 (int)rkmessage->key_len, (char *)rkmessage->key);
248
249 if (sscanf(buf, "testid=%"SCNu64", partition=%i, msg=%i",
250 &in_testid, &in_part, &in_msgnum) != 3)
251 TEST_FAIL("Incorrect key format: %s", buf);
252
253 if (test_level > 2) {
254 TEST_SAY("%s:%i: Our testid %"PRIu64", part %i (%i), "
255 "msg %i/%i, key's: \"%s\"\n",
256 func, line,
257 testid, (int)partition, (int)rkmessage->partition,
258 msgnum, cons_msgs_size, buf);
259 }
260
261 if (testid != in_testid ||
262 (partition != -1 && partition != in_part) ||
263 (msgnum != -1 && msgnum != in_msgnum) ||
264 (in_msgnum < 0 || in_msgnum > cons_msgs_size))
265 goto fail_match;
266
267 if (cons_msgs_cnt == cons_msgs_size) {
268 TEST_SAY("Too many messages in cons_msgs (%i) while reading "
269 "message key \"%s\"\n",
270 cons_msgs_cnt, buf);
271 verify_consumed_msg_check("?", cons_msgs_size);
272 TEST_FAIL("See above error(s)");
273 }
274
275 cons_msgs[cons_msgs_cnt++] = in_msgnum;
276 cons_last_offset = rkmessage->offset;
277
278 return;
279
280 fail_match:
281 TEST_FAIL("%s:%i: Our testid %"PRIu64", part %i, msg %i/%i did "
282 "not match message's key: \"%s\"\n",
283 func, line,
284 testid, (int)partition, msgnum, cons_msgs_size, buf);
285 }
286
287 #define verify_consumed_msg(testid,part,msgnum,rkmessage) \
288 verify_consumed_msg0(__FUNCTION__,__LINE__,testid,part,msgnum,rkmessage)
289
290
consume_cb(rd_kafka_message_t * rkmessage,void * opaque)291 static void consume_cb (rd_kafka_message_t *rkmessage, void *opaque) {
292 int64_t testid = *(int64_t *)opaque;
293
294 if (test_level > 2)
295 TEST_SAY("Consumed message #%d? at offset %"PRId64": %s\n",
296 cons_msg_next, rkmessage->offset,
297 rd_kafka_err2str(rkmessage->err));
298
299 if (rkmessage->err == RD_KAFKA_RESP_ERR__PARTITION_EOF) {
300 TEST_SAY("EOF at offset %"PRId64"\n", rkmessage->offset);
301 return;
302 }
303
304 if (rkmessage->err)
305 TEST_FAIL("Consume message from partition %i "
306 "has error: %s",
307 (int)rkmessage->partition,
308 rd_kafka_err2str(rkmessage->err));
309
310 verify_consumed_msg(testid, rkmessage->partition,
311 cons_msg_next, rkmessage);
312
313 if (cons_msg_next == cons_msg_stop) {
314 rd_kafka_yield(NULL/*FIXME*/);
315 }
316
317 cons_msg_next++;
318 }
319
consume_messages_callback_multi(const char * desc,uint64_t testid,const char * topic,int32_t partition,const char * offset_store_method,int msg_base,int msg_cnt,int64_t initial_offset,int iterations)320 static void consume_messages_callback_multi (const char *desc,
321 uint64_t testid, const char *topic,
322 int32_t partition,
323 const char *offset_store_method,
324 int msg_base,
325 int msg_cnt,
326 int64_t initial_offset,
327 int iterations) {
328 rd_kafka_t *rk;
329 rd_kafka_topic_t *rkt;
330 rd_kafka_conf_t *conf;
331 rd_kafka_topic_conf_t *topic_conf;
332 int i;
333
334 TEST_SAY("%s: Consume messages %d+%d from %s [%"PRId32"] "
335 "from offset %"PRId64" in %d iterations\n",
336 desc, msg_base, msg_cnt, topic, partition,
337 initial_offset, iterations);
338
339 test_conf_init(&conf, &topic_conf, 20);
340
341 test_topic_conf_set(topic_conf, "offset.store.method",
342 offset_store_method);
343
344 if (!strcmp(offset_store_method, "broker")) {
345 /* Broker based offset storage requires a group.id */
346 test_conf_set(conf, "group.id", topic);
347 }
348
349 test_conf_set(conf, "enable.partition.eof", "true");
350
351 /* Create kafka instance */
352 rk = test_create_handle(RD_KAFKA_CONSUMER, conf);
353
354 rd_kafka_topic_conf_set(topic_conf, "auto.offset.reset", "smallest",
355 NULL, 0);
356
357 rkt = rd_kafka_topic_new(rk, topic, topic_conf);
358 if (!rkt)
359 TEST_FAIL("%s: Failed to create topic: %s\n",
360 desc, rd_kafka_err2str(rd_kafka_last_error()));
361
362 cons_msg_stop = cons_msg_next + msg_cnt - 1;
363
364 /* Consume the same batch of messages multiple times to
365 * make sure back-to-back start&stops work. */
366 for (i = 0 ; i < iterations ; i++) {
367 int cnta;
368 test_timing_t t_stop;
369
370 TEST_SAY("%s: Iteration #%i: Consuming from "
371 "partition %i at offset %"PRId64", "
372 "msgs range %d..%d\n",
373 desc, i, partition, initial_offset,
374 cons_msg_next, cons_msg_stop);
375
376 /* Consume messages */
377 if (rd_kafka_consume_start(rkt, partition, initial_offset) == -1)
378 TEST_FAIL("%s: consume_start(%i) failed: %s",
379 desc, (int)partition,
380 rd_kafka_err2str(rd_kafka_last_error()));
381
382
383 /* Stop consuming messages when this number of messages
384 * is reached. */
385 cnta = cons_msg_next;
386 do {
387 rd_kafka_consume_callback(rkt, partition, 1000,
388 consume_cb, &testid);
389 } while (cons_msg_next < cons_msg_stop);
390
391 TEST_SAY("%s: Iteration #%i: consumed %i messages\n",
392 desc, i, cons_msg_next - cnta);
393
394 TIMING_START(&t_stop, "rd_kafka_consume_stop()");
395 rd_kafka_consume_stop(rkt, partition);
396 TIMING_STOP(&t_stop);
397
398 /* Advance next offset so we dont reconsume
399 * messages on the next run. */
400 if (initial_offset != RD_KAFKA_OFFSET_STORED) {
401 initial_offset = cons_last_offset+1;
402 cons_msg_stop = cons_msg_next + msg_cnt - 1;
403 }
404 }
405
406 /* Destroy topic */
407 rd_kafka_topic_destroy(rkt);
408
409 /* Destroy rdkafka instance */
410 TEST_SAY("%s: Destroying kafka instance %s\n", desc, rd_kafka_name(rk));
411 rd_kafka_destroy(rk);
412 }
413
414
415
test_produce_consume(const char * offset_store_method)416 static void test_produce_consume (const char *offset_store_method) {
417 int msgcnt = 100;
418 int partition_cnt = 1;
419 int i;
420 uint64_t testid;
421 int msg_base = 0;
422 const char *topic;
423
424 /* Generate a testid so we can differentiate messages
425 * from other tests */
426 testid = test_id_generate();
427
428 /* Read test.conf to configure topic name */
429 test_conf_init(NULL, NULL, 20);
430 topic = test_mk_topic_name("0014", 1/*random*/);
431
432 TEST_SAY("Topic %s, testid %"PRIu64", offset.store.method=%s\n",
433 topic, testid, offset_store_method);
434
435 /* Produce messages */
436 produce_messages(testid, topic, partition_cnt, msg_base, msgcnt);
437
438 /* 100% of messages */
439 verify_consumed_msg_reset(msgcnt);
440
441 /* Consume 50% of messages with callbacks: stored offsets with no prior
442 * offset stored. */
443 for (i = 0 ; i < partition_cnt ; i++)
444 consume_messages_callback_multi("STORED.1/2", testid, topic, i,
445 offset_store_method,
446 msg_base,
447 (msgcnt / partition_cnt) / 2,
448 RD_KAFKA_OFFSET_STORED,
449 1);
450 verify_consumed_msg_check("STORED.1/2", msgcnt / 2);
451
452 /* Consume the rest using the now stored offset */
453 for (i = 0 ; i < partition_cnt ; i++)
454 consume_messages_callback_multi("STORED.2/2", testid, topic, i,
455 offset_store_method,
456 msg_base,
457 (msgcnt / partition_cnt) / 2,
458 RD_KAFKA_OFFSET_STORED,
459 1);
460 verify_consumed_msg_check("STORED.2/2", msgcnt);
461
462
463 /* Consume messages with callbacks: logical offsets */
464 verify_consumed_msg_reset(msgcnt);
465 for (i = 0 ; i < partition_cnt ; i++) {
466 int p_msg_cnt = msgcnt / partition_cnt;
467 int64_t initial_offset = RD_KAFKA_OFFSET_TAIL(p_msg_cnt);
468 const int iterations = 4;
469 consume_messages_callback_multi("TAIL+", testid, topic, i,
470 offset_store_method,
471 /* start here (msgid) */
472 msg_base,
473 /* consume this many messages
474 * per iteration. */
475 p_msg_cnt / iterations,
476 /* start here (offset) */
477 initial_offset,
478 iterations);
479 }
480
481 verify_consumed_msg_check("TAIL+", msgcnt);
482
483 verify_consumed_msg_reset(0);
484
485 return;
486 }
487
488
489
490
main_0014_reconsume_191(int argc,char ** argv)491 int main_0014_reconsume_191 (int argc, char **argv) {
492 if (test_broker_version >= TEST_BRKVER(0,8,2,0))
493 test_produce_consume("broker");
494 test_produce_consume("file");
495 return 0;
496 }
497