1 /*
2 * librdkafka - Apache Kafka C library
3 *
4 * Copyright (c) 2012-2015, Magnus Edenhill
5 * All rights reserved.
6 *
7 * Redistribution and use in source and binary forms, with or without
8 * modification, are permitted provided that the following conditions are met:
9 *
10 * 1. Redistributions of source code must retain the above copyright notice,
11 * this list of conditions and the following disclaimer.
12 * 2. Redistributions in binary form must reproduce the above copyright notice,
13 * this list of conditions and the following disclaimer in the documentation
14 * and/or other materials provided with the distribution.
15 *
16 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
17 * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
18 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
19 * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
20 * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
21 * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
22 * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
23 * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
24 * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
25 * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
26 * POSSIBILITY OF SUCH DAMAGE.
27 */
28
29 #include "test.h"
30
31 /* Typical include path would be <librdkafka/rdkafka.h>, but this program
32 * is built from within the librdkafka source tree and thus differs. */
33 #include "rdkafka.h" /* for Kafka driver */
34
35
36 /**
37 * Consumer: pause and resume.
38 * Make sure no messages are lost or duplicated.
39 */
40
41
42
consume_pause(void)43 static int consume_pause (void) {
44 const char *topic = test_mk_topic_name(__FUNCTION__, 1);
45 const int partition_cnt = 3;
46 rd_kafka_t *rk;
47 rd_kafka_conf_t *conf;
48 rd_kafka_topic_conf_t *tconf;
49 rd_kafka_topic_partition_list_t *topics;
50 rd_kafka_resp_err_t err;
51 const int msgcnt = 1000;
52 uint64_t testid;
53 int it, iterations = 3;
54 int msg_base = 0;
55 int fails = 0;
56 char group_id[32];
57
58 test_conf_init(&conf, &tconf,
59 60 + (test_session_timeout_ms * 3 / 1000));
60 test_conf_set(conf, "enable.partition.eof", "true");
61 test_topic_conf_set(tconf, "auto.offset.reset", "smallest");
62
63 test_create_topic(NULL, topic, partition_cnt, 1);
64
65 /* Produce messages */
66 testid = test_produce_msgs_easy(topic, 0,
67 RD_KAFKA_PARTITION_UA, msgcnt);
68
69 topics = rd_kafka_topic_partition_list_new(1);
70 rd_kafka_topic_partition_list_add(topics, topic, -1);
71
72 for (it = 0 ; it < iterations ; it++) {
73 const int pause_cnt = 5;
74 int per_pause_msg_cnt = msgcnt / pause_cnt;
75 const int pause_time = 1200 /* 1.2s */;
76 int eof_cnt = -1;
77 int pause;
78 rd_kafka_topic_partition_list_t *parts;
79 test_msgver_t mv_all;
80 int j;
81
82 test_msgver_init(&mv_all, testid); /* All messages */
83
84 /* On the last iteration reuse the previous group.id
85 * to make consumer start at committed offsets which should
86 * also be EOF. This to trigger #1307. */
87 if (it < iterations-1)
88 test_str_id_generate(group_id, sizeof(group_id));
89 else {
90 TEST_SAY("Reusing previous group.id %s\n", group_id);
91 per_pause_msg_cnt = 0;
92 eof_cnt = partition_cnt;
93 }
94
95 TEST_SAY("Iteration %d/%d, using group.id %s, "
96 "expecting %d messages/pause and %d EOFs\n",
97 it, iterations-1, group_id,
98 per_pause_msg_cnt, eof_cnt);
99
100 rk = test_create_consumer(group_id, NULL,
101 rd_kafka_conf_dup(conf),
102 rd_kafka_topic_conf_dup(tconf));
103
104
105 TEST_SAY("Subscribing to %d topic(s): %s\n",
106 topics->cnt, topics->elems[0].topic);
107 if ((err = rd_kafka_subscribe(rk, topics)))
108 TEST_FAIL("Failed to subscribe: %s\n",
109 rd_kafka_err2str(err));
110
111
112 for (pause = 0 ; pause < pause_cnt ; pause++) {
113 int rcnt;
114 test_timing_t t_assignment;
115 test_msgver_t mv;
116
117 test_msgver_init(&mv, testid);
118 mv.fwd = &mv_all;
119
120 /* Consume sub-part of the messages. */
121 TEST_SAY("Pause-Iteration #%d: Consume %d messages at "
122 "msg_base %d\n", pause, per_pause_msg_cnt,
123 msg_base);
124 rcnt = test_consumer_poll("consume.part", rk, testid,
125 eof_cnt,
126 msg_base,
127 per_pause_msg_cnt == 0 ?
128 -1 : per_pause_msg_cnt,
129 &mv);
130
131 TEST_ASSERT(rcnt == per_pause_msg_cnt,
132 "expected %d messages, got %d",
133 per_pause_msg_cnt, rcnt);
134
135 test_msgver_verify("pause.iteration",
136 &mv, TEST_MSGVER_PER_PART,
137 msg_base, per_pause_msg_cnt);
138 test_msgver_clear(&mv);
139
140 msg_base += per_pause_msg_cnt;
141
142 TIMING_START(&t_assignment, "rd_kafka_assignment()");
143 if ((err = rd_kafka_assignment(rk, &parts)))
144 TEST_FAIL("failed to get assignment: %s\n",
145 rd_kafka_err2str(err));
146 TIMING_STOP(&t_assignment);
147
148 TEST_ASSERT(parts->cnt > 0,
149 "parts->cnt %d, expected > 0", parts->cnt);
150
151 TEST_SAY("Now pausing %d partition(s) for %dms\n",
152 parts->cnt, pause_time);
153 if ((err = rd_kafka_pause_partitions(rk, parts)))
154 TEST_FAIL("Failed to pause: %s\n",
155 rd_kafka_err2str(err));
156
157 /* Check per-partition errors */
158 for (j = 0 ; j < parts->cnt ; j++) {
159 if (parts->elems[j].err) {
160 TEST_WARN("pause failure for "
161 "%s %"PRId32"]: %s\n",
162 parts->elems[j].topic,
163 parts->elems[j].partition,
164 rd_kafka_err2str(
165 parts->elems[j].err));
166 fails++;
167 }
168 }
169 TEST_ASSERT(fails == 0, "See previous warnings\n");
170
171 TEST_SAY("Waiting for %dms, should not receive any "
172 "messages during this time\n", pause_time);
173
174 test_consumer_poll_no_msgs("silence.while.paused",
175 rk, testid, pause_time);
176
177 TEST_SAY("Resuming %d partitions\n", parts->cnt);
178 if ((err = rd_kafka_resume_partitions(rk, parts)))
179 TEST_FAIL("Failed to resume: %s\n",
180 rd_kafka_err2str(err));
181
182 /* Check per-partition errors */
183 for (j = 0 ; j < parts->cnt ; j++) {
184 if (parts->elems[j].err) {
185 TEST_WARN("resume failure for "
186 "%s %"PRId32"]: %s\n",
187 parts->elems[j].topic,
188 parts->elems[j].partition,
189 rd_kafka_err2str(
190 parts->elems[j].err));
191 fails++;
192 }
193 }
194 TEST_ASSERT(fails == 0, "See previous warnings\n");
195
196 rd_kafka_topic_partition_list_destroy(parts);
197 }
198
199 if (per_pause_msg_cnt > 0)
200 test_msgver_verify("all.msgs", &mv_all,
201 TEST_MSGVER_ALL_PART, 0, msgcnt);
202 else
203 test_msgver_verify("all.msgs", &mv_all,
204 TEST_MSGVER_ALL_PART, 0, 0);
205 test_msgver_clear(&mv_all);
206
207 /* Should now not see any more messages. */
208 test_consumer_poll_no_msgs("end.exp.no.msgs", rk, testid, 3000);
209
210 test_consumer_close(rk);
211
212 /* Hangs if bug isn't fixed */
213 rd_kafka_destroy(rk);
214 }
215
216 rd_kafka_topic_partition_list_destroy(topics);
217 rd_kafka_conf_destroy(conf);
218 rd_kafka_topic_conf_destroy(tconf);
219
220 return 0;
221 }
222
223
224
225 /**
226 * @brief Verify that the paused partition state is not used after
227 * the partition has been re-assigned.
228 *
229 * 1. Produce N messages
230 * 2. Consume N/4 messages
231 * 3. Pause partitions
232 * 4. Manually commit offset N/2
233 * 5. Unassign partitions
234 * 6. Assign partitions again
235 * 7. Verify that consumption starts at N/2 and not N/4
236 */
consume_pause_resume_after_reassign(void)237 static int consume_pause_resume_after_reassign (void) {
238 const char *topic = test_mk_topic_name(__FUNCTION__, 1);
239 const int32_t partition = 0;
240 const int msgcnt = 4000;
241 rd_kafka_t *rk;
242 rd_kafka_conf_t *conf;
243 rd_kafka_topic_partition_list_t *partitions, *pos;
244 rd_kafka_resp_err_t err;
245 int exp_msg_cnt;
246 uint64_t testid;
247 int r;
248 int msg_base = 0;
249 test_msgver_t mv;
250 rd_kafka_topic_partition_t *toppar;
251
252 test_conf_init(&conf, NULL, 60);
253
254 test_create_topic(NULL, topic, (int)partition+1, 1);
255
256 /* Produce messages */
257 testid = test_produce_msgs_easy(topic, 0, partition, msgcnt);
258
259 /* Set start offset to beginning */
260 partitions = rd_kafka_topic_partition_list_new(1);
261 toppar = rd_kafka_topic_partition_list_add(partitions, topic,
262 partition);
263 toppar->offset = RD_KAFKA_OFFSET_BEGINNING;
264
265
266 /**
267 * Create consumer.
268 */
269 test_conf_set(conf, "enable.partition.eof", "true");
270 rk = test_create_consumer(topic, NULL, conf, NULL);
271
272 test_consumer_assign("assign", rk, partitions);
273
274
275 exp_msg_cnt = msgcnt/4;
276 TEST_SAY("Consuming first quarter (%d) of messages\n", exp_msg_cnt);
277 test_msgver_init(&mv, testid);
278 r = test_consumer_poll("consume.first.quarter", rk, testid, 0,
279 msg_base, exp_msg_cnt, &mv);
280 TEST_ASSERT(r == exp_msg_cnt,
281 "expected %d messages, got %d", exp_msg_cnt, r);
282
283
284 TEST_SAY("Pausing partitions\n");
285 if ((err = rd_kafka_pause_partitions(rk, partitions)))
286 TEST_FAIL("Failed to pause: %s", rd_kafka_err2str(err));
287
288 TEST_SAY("Verifying pause, should see no new messages...\n");
289 test_consumer_poll_no_msgs("silence.while.paused", rk, testid, 3000);
290
291 test_msgver_verify("first.quarter", &mv, TEST_MSGVER_ALL_PART,
292 msg_base, exp_msg_cnt);
293 test_msgver_clear(&mv);
294
295
296 /* Check position */
297 pos = rd_kafka_topic_partition_list_copy(partitions);
298 if ((err = rd_kafka_position(rk, pos)))
299 TEST_FAIL("position() failed: %s", rd_kafka_err2str(err));
300
301 TEST_ASSERT(!pos->elems[0].err,
302 "position() returned error for our partition: %s",
303 rd_kafka_err2str(pos->elems[0].err));
304 TEST_SAY("Current application consume position is %"PRId64"\n",
305 pos->elems[0].offset);
306 TEST_ASSERT(pos->elems[0].offset == (int64_t)exp_msg_cnt,
307 "expected position %"PRId64", not %"PRId64,
308 (int64_t)exp_msg_cnt, pos->elems[0].offset);
309 rd_kafka_topic_partition_list_destroy(pos);
310
311
312 toppar->offset = (int64_t)(msgcnt/2);
313 TEST_SAY("Committing (yet unread) offset %"PRId64"\n", toppar->offset);
314 if ((err = rd_kafka_commit(rk, partitions, 0/*sync*/)))
315 TEST_FAIL("Commit failed: %s", rd_kafka_err2str(err));
316
317
318 TEST_SAY("Unassigning\n");
319 test_consumer_unassign("Unassign", rk);
320
321 /* Set start offset to INVALID so that the standard start offset
322 * logic kicks in. */
323 toppar->offset = RD_KAFKA_OFFSET_INVALID;
324
325 TEST_SAY("Reassigning\n");
326 test_consumer_assign("Reassign", rk, partitions);
327
328
329 TEST_SAY("Resuming partitions\n");
330 if ((err = rd_kafka_resume_partitions(rk, partitions)))
331 TEST_FAIL("Failed to resume: %s", rd_kafka_err2str(err));
332
333 msg_base = msgcnt / 2;
334 exp_msg_cnt = msgcnt / 2;
335 TEST_SAY("Consuming second half (%d) of messages at msg_base %d\n",
336 exp_msg_cnt, msg_base);
337 test_msgver_init(&mv, testid);
338 r = test_consumer_poll("consume.second.half", rk, testid, 1/*exp eof*/,
339 msg_base, exp_msg_cnt, &mv);
340 TEST_ASSERT(r == exp_msg_cnt,
341 "expected %d messages, got %d", exp_msg_cnt, r);
342
343 test_msgver_verify("second.half", &mv, TEST_MSGVER_ALL_PART,
344 msg_base, exp_msg_cnt);
345 test_msgver_clear(&mv);
346
347
348 rd_kafka_topic_partition_list_destroy(partitions);
349
350 test_consumer_close(rk);
351
352 rd_kafka_destroy(rk);
353
354 return 0;
355 }
356
357
rebalance_cb(rd_kafka_t * rk,rd_kafka_resp_err_t err,rd_kafka_topic_partition_list_t * parts,void * opaque)358 static void rebalance_cb (rd_kafka_t *rk,
359 rd_kafka_resp_err_t err,
360 rd_kafka_topic_partition_list_t *parts,
361 void *opaque) {
362 rd_kafka_resp_err_t err2;
363
364 switch (err)
365 {
366 case RD_KAFKA_RESP_ERR__ASSIGN_PARTITIONS:
367 /* Set start offset to beginning,
368 * while auto.offset.reset is default at `latest`. */
369
370 parts->elems[0].offset = RD_KAFKA_OFFSET_BEGINNING;
371 test_consumer_assign("rebalance", rk, parts);
372 TEST_SAY("Pausing partitions\n");
373 if ((err2 = rd_kafka_pause_partitions(rk, parts)))
374 TEST_FAIL("Failed to pause: %s",
375 rd_kafka_err2str(err2));
376 TEST_SAY("Resuming partitions\n");
377 if ((err2 = rd_kafka_resume_partitions(rk, parts)))
378 TEST_FAIL("Failed to pause: %s",
379 rd_kafka_err2str(err2));
380 break;
381 default:
382 test_consumer_unassign("rebalance", rk);
383 break;
384 }
385 }
386
387
388 /**
389 * @brief Verify that the assigned offset is used after pause+resume
390 * if no messages were consumed prior to pause. #2105
391 *
392 * We do this by setting the start offset to BEGINNING in the rebalance_cb
393 * and relying on auto.offset.reset=latest (default) to catch the failure case
394 * where the assigned offset was not honoured.
395 */
consume_subscribe_assign_pause_resume(void)396 static int consume_subscribe_assign_pause_resume (void) {
397 const char *topic = test_mk_topic_name(__FUNCTION__, 1);
398 const int32_t partition = 0;
399 const int msgcnt = 1;
400 rd_kafka_t *rk;
401 rd_kafka_conf_t *conf;
402 uint64_t testid;
403 int r;
404 test_msgver_t mv;
405
406 TEST_SAY(_C_CYA "[ %s ]\n", __FUNCTION__);
407
408 test_conf_init(&conf, NULL, 20);
409
410 test_create_topic(NULL, topic, (int)partition+1, 1);
411
412 /* Produce messages */
413 testid = test_produce_msgs_easy(topic, 0, partition, msgcnt);
414
415 /**
416 * Create consumer.
417 */
418 rd_kafka_conf_set_rebalance_cb(conf, rebalance_cb);
419 test_conf_set(conf, "session.timeout.ms", "6000");
420 test_conf_set(conf, "enable.partition.eof", "true");
421 rk = test_create_consumer(topic, NULL, conf, NULL);
422
423 test_consumer_subscribe(rk, topic);
424
425 test_msgver_init(&mv, testid);
426 r = test_consumer_poll("consume", rk, testid, 1/*exp eof*/,
427 0, msgcnt, &mv);
428 TEST_ASSERT(r == msgcnt,
429 "expected %d messages, got %d", msgcnt, r);
430
431 test_msgver_verify("consumed", &mv, TEST_MSGVER_ALL_PART, 0, msgcnt);
432 test_msgver_clear(&mv);
433
434
435 test_consumer_close(rk);
436
437 rd_kafka_destroy(rk);
438
439 return 0;
440 }
441
442
main_0026_consume_pause(int argc,char ** argv)443 int main_0026_consume_pause (int argc, char **argv) {
444 int fails = 0;
445
446 if (test_can_create_topics(1)) {
447 fails += consume_pause();
448 fails += consume_pause_resume_after_reassign();
449 fails += consume_subscribe_assign_pause_resume();
450 }
451
452 if (fails > 0)
453 TEST_FAIL("See %d previous error(s)\n", fails);
454
455 return 0;
456 }
457