1 /*
2  * librdkafka - Apache Kafka C library
3  *
4  * Copyright (c) 2012-2015, Magnus Edenhill
5  * All rights reserved.
6  *
7  * Redistribution and use in source and binary forms, with or without
8  * modification, are permitted provided that the following conditions are met:
9  *
10  * 1. Redistributions of source code must retain the above copyright notice,
11  *    this list of conditions and the following disclaimer.
12  * 2. Redistributions in binary form must reproduce the above copyright notice,
13  *    this list of conditions and the following disclaimer in the documentation
14  *    and/or other materials provided with the distribution.
15  *
16  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
17  * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
18  * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
19  * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
20  * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
21  * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
22  * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
23  * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
24  * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
25  * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
26  * POSSIBILITY OF SUCH DAMAGE.
27  */
28 
29 #include "test.h"
30 
31 /* Typical include path would be <librdkafka/rdkafka.h>, but this program
32  * is built from within the librdkafka source tree and thus differs. */
33 #include "rdkafka.h"  /* for Kafka driver */
34 
35 
36 /**
37  * Consumer: pause and resume.
38  * Make sure no messages are lost or duplicated.
39  */
40 
41 
42 
consume_pause(void)43 static int consume_pause (void) {
44 	const char *topic = test_mk_topic_name(__FUNCTION__, 1);
45         const int partition_cnt = 3;
46 	rd_kafka_t *rk;
47         rd_kafka_conf_t *conf;
48 	rd_kafka_topic_conf_t *tconf;
49 	rd_kafka_topic_partition_list_t *topics;
50 	rd_kafka_resp_err_t err;
51         const int msgcnt = 1000;
52         uint64_t testid;
53 	int it, iterations = 3;
54 	int msg_base = 0;
55 	int fails = 0;
56         char group_id[32];
57 
58         test_conf_init(&conf, &tconf,
59                        60 + (test_session_timeout_ms * 3 / 1000));
60         test_conf_set(conf, "enable.partition.eof", "true");
61 	test_topic_conf_set(tconf, "auto.offset.reset", "smallest");
62 
63         test_create_topic(NULL, topic, partition_cnt, 1);
64 
65         /* Produce messages */
66         testid = test_produce_msgs_easy(topic, 0,
67                                         RD_KAFKA_PARTITION_UA, msgcnt);
68 
69 	topics = rd_kafka_topic_partition_list_new(1);
70 	rd_kafka_topic_partition_list_add(topics, topic, -1);
71 
72 	for (it = 0 ; it < iterations ; it++) {
73 		const int pause_cnt = 5;
74 		int per_pause_msg_cnt = msgcnt / pause_cnt;
75                 const int pause_time = 1200 /* 1.2s */;
76                 int eof_cnt = -1;
77 		int pause;
78 		rd_kafka_topic_partition_list_t *parts;
79 		test_msgver_t mv_all;
80 		int j;
81 
82 		test_msgver_init(&mv_all, testid); /* All messages */
83 
84                 /* On the last iteration reuse the previous group.id
85                  * to make consumer start at committed offsets which should
86                  * also be EOF. This to trigger #1307. */
87                 if (it < iterations-1)
88                         test_str_id_generate(group_id, sizeof(group_id));
89                 else {
90                         TEST_SAY("Reusing previous group.id %s\n", group_id);
91                         per_pause_msg_cnt = 0;
92                         eof_cnt = partition_cnt;
93                 }
94 
95 		TEST_SAY("Iteration %d/%d, using group.id %s, "
96                          "expecting %d messages/pause and %d EOFs\n",
97                          it, iterations-1, group_id,
98                          per_pause_msg_cnt, eof_cnt);
99 
100                 rk = test_create_consumer(group_id, NULL,
101                                           rd_kafka_conf_dup(conf),
102                                           rd_kafka_topic_conf_dup(tconf));
103 
104 
105 		TEST_SAY("Subscribing to %d topic(s): %s\n",
106 			 topics->cnt, topics->elems[0].topic);
107 		if ((err = rd_kafka_subscribe(rk, topics)))
108 			TEST_FAIL("Failed to subscribe: %s\n",
109 				  rd_kafka_err2str(err));
110 
111 
112 		for (pause = 0 ; pause < pause_cnt ; pause++) {
113 			int rcnt;
114 			test_timing_t t_assignment;
115 			test_msgver_t mv;
116 
117 			test_msgver_init(&mv, testid);
118 			mv.fwd = &mv_all;
119 
120 			/* Consume sub-part of the messages. */
121 			TEST_SAY("Pause-Iteration #%d: Consume %d messages at "
122 				 "msg_base %d\n", pause, per_pause_msg_cnt,
123 				 msg_base);
124 			rcnt = test_consumer_poll("consume.part", rk, testid,
125                                                   eof_cnt,
126 						  msg_base,
127                                                   per_pause_msg_cnt == 0 ?
128                                                   -1 : per_pause_msg_cnt,
129 						  &mv);
130 
131 			TEST_ASSERT(rcnt == per_pause_msg_cnt,
132 				    "expected %d messages, got %d",
133 				    per_pause_msg_cnt, rcnt);
134 
135 			test_msgver_verify("pause.iteration",
136 					   &mv, TEST_MSGVER_PER_PART,
137 					   msg_base, per_pause_msg_cnt);
138 			test_msgver_clear(&mv);
139 
140 			msg_base += per_pause_msg_cnt;
141 
142 			TIMING_START(&t_assignment, "rd_kafka_assignment()");
143 			if ((err = rd_kafka_assignment(rk, &parts)))
144 				TEST_FAIL("failed to get assignment: %s\n",
145 					  rd_kafka_err2str(err));
146 			TIMING_STOP(&t_assignment);
147 
148 			TEST_ASSERT(parts->cnt > 0,
149 				    "parts->cnt %d, expected > 0", parts->cnt);
150 
151 			TEST_SAY("Now pausing %d partition(s) for %dms\n",
152 				 parts->cnt, pause_time);
153 			if ((err = rd_kafka_pause_partitions(rk, parts)))
154 				TEST_FAIL("Failed to pause: %s\n",
155 					  rd_kafka_err2str(err));
156 
157 			/* Check per-partition errors */
158 			for (j = 0 ; j < parts->cnt ; j++) {
159 				if (parts->elems[j].err) {
160 					TEST_WARN("pause failure for "
161 						  "%s %"PRId32"]: %s\n",
162 						  parts->elems[j].topic,
163 						  parts->elems[j].partition,
164 						  rd_kafka_err2str(
165 							  parts->elems[j].err));
166 					fails++;
167 				}
168 			}
169 			TEST_ASSERT(fails == 0, "See previous warnings\n");
170 
171 			TEST_SAY("Waiting for %dms, should not receive any "
172 				 "messages during this time\n", pause_time);
173 
174 			test_consumer_poll_no_msgs("silence.while.paused",
175 						   rk, testid, pause_time);
176 
177 			TEST_SAY("Resuming %d partitions\n", parts->cnt);
178 			if ((err = rd_kafka_resume_partitions(rk, parts)))
179 				TEST_FAIL("Failed to resume: %s\n",
180 					  rd_kafka_err2str(err));
181 
182 			/* Check per-partition errors */
183 			for (j = 0 ; j < parts->cnt ; j++) {
184 				if (parts->elems[j].err) {
185 					TEST_WARN("resume failure for "
186 						  "%s %"PRId32"]: %s\n",
187 						  parts->elems[j].topic,
188 						  parts->elems[j].partition,
189 						  rd_kafka_err2str(
190 							  parts->elems[j].err));
191 					fails++;
192 				}
193 			}
194 			TEST_ASSERT(fails == 0, "See previous warnings\n");
195 
196 			rd_kafka_topic_partition_list_destroy(parts);
197 		}
198 
199                 if (per_pause_msg_cnt > 0)
200                         test_msgver_verify("all.msgs", &mv_all,
201                                            TEST_MSGVER_ALL_PART, 0, msgcnt);
202                 else
203                         test_msgver_verify("all.msgs", &mv_all,
204                                            TEST_MSGVER_ALL_PART, 0, 0);
205                         test_msgver_clear(&mv_all);
206 
207 		/* Should now not see any more messages. */
208 		test_consumer_poll_no_msgs("end.exp.no.msgs", rk, testid, 3000);
209 
210 		test_consumer_close(rk);
211 
212 		/* Hangs if bug isn't fixed */
213 		rd_kafka_destroy(rk);
214 	}
215 
216 	rd_kafka_topic_partition_list_destroy(topics);
217         rd_kafka_conf_destroy(conf);
218 	rd_kafka_topic_conf_destroy(tconf);
219 
220         return 0;
221 }
222 
223 
224 
225 /**
226  * @brief Verify that the paused partition state is not used after
227  *        the partition has been re-assigned.
228  *
229  * 1. Produce N messages
230  * 2. Consume N/4 messages
231  * 3. Pause partitions
232  * 4. Manually commit offset N/2
233  * 5. Unassign partitions
234  * 6. Assign partitions again
235  * 7. Verify that consumption starts at N/2 and not N/4
236  */
consume_pause_resume_after_reassign(void)237 static int consume_pause_resume_after_reassign (void) {
238         const char *topic = test_mk_topic_name(__FUNCTION__, 1);
239         const int32_t partition = 0;
240         const int msgcnt = 4000;
241         rd_kafka_t *rk;
242         rd_kafka_conf_t *conf;
243         rd_kafka_topic_partition_list_t *partitions, *pos;
244         rd_kafka_resp_err_t err;
245         int exp_msg_cnt;
246         uint64_t testid;
247         int r;
248         int msg_base = 0;
249         test_msgver_t mv;
250         rd_kafka_topic_partition_t *toppar;
251 
252         test_conf_init(&conf, NULL, 60);
253 
254         test_create_topic(NULL, topic, (int)partition+1, 1);
255 
256         /* Produce messages */
257         testid = test_produce_msgs_easy(topic, 0, partition, msgcnt);
258 
259         /* Set start offset to beginning */
260         partitions = rd_kafka_topic_partition_list_new(1);
261         toppar = rd_kafka_topic_partition_list_add(partitions, topic,
262                                                    partition);
263         toppar->offset = RD_KAFKA_OFFSET_BEGINNING;
264 
265 
266         /**
267          * Create consumer.
268          */
269         test_conf_set(conf, "enable.partition.eof", "true");
270         rk = test_create_consumer(topic, NULL, conf, NULL);
271 
272         test_consumer_assign("assign", rk, partitions);
273 
274 
275         exp_msg_cnt = msgcnt/4;
276         TEST_SAY("Consuming first quarter (%d) of messages\n", exp_msg_cnt);
277         test_msgver_init(&mv, testid);
278         r = test_consumer_poll("consume.first.quarter", rk, testid, 0,
279                                msg_base, exp_msg_cnt, &mv);
280         TEST_ASSERT(r == exp_msg_cnt,
281                     "expected %d messages, got %d", exp_msg_cnt, r);
282 
283 
284         TEST_SAY("Pausing partitions\n");
285         if ((err = rd_kafka_pause_partitions(rk, partitions)))
286                 TEST_FAIL("Failed to pause: %s", rd_kafka_err2str(err));
287 
288         TEST_SAY("Verifying pause, should see no new messages...\n");
289         test_consumer_poll_no_msgs("silence.while.paused", rk, testid, 3000);
290 
291         test_msgver_verify("first.quarter", &mv, TEST_MSGVER_ALL_PART,
292                            msg_base, exp_msg_cnt);
293         test_msgver_clear(&mv);
294 
295 
296         /* Check position */
297         pos = rd_kafka_topic_partition_list_copy(partitions);
298         if ((err = rd_kafka_position(rk, pos)))
299                 TEST_FAIL("position() failed: %s", rd_kafka_err2str(err));
300 
301         TEST_ASSERT(!pos->elems[0].err,
302                     "position() returned error for our partition: %s",
303                     rd_kafka_err2str(pos->elems[0].err));
304         TEST_SAY("Current application consume position is %"PRId64"\n",
305                  pos->elems[0].offset);
306         TEST_ASSERT(pos->elems[0].offset == (int64_t)exp_msg_cnt,
307                     "expected position %"PRId64", not %"PRId64,
308                     (int64_t)exp_msg_cnt, pos->elems[0].offset);
309         rd_kafka_topic_partition_list_destroy(pos);
310 
311 
312         toppar->offset = (int64_t)(msgcnt/2);
313         TEST_SAY("Committing (yet unread) offset %"PRId64"\n", toppar->offset);
314         if ((err = rd_kafka_commit(rk, partitions, 0/*sync*/)))
315                 TEST_FAIL("Commit failed: %s", rd_kafka_err2str(err));
316 
317 
318         TEST_SAY("Unassigning\n");
319         test_consumer_unassign("Unassign", rk);
320 
321         /* Set start offset to INVALID so that the standard start offset
322          * logic kicks in. */
323         toppar->offset = RD_KAFKA_OFFSET_INVALID;
324 
325         TEST_SAY("Reassigning\n");
326         test_consumer_assign("Reassign", rk, partitions);
327 
328 
329         TEST_SAY("Resuming partitions\n");
330         if ((err = rd_kafka_resume_partitions(rk, partitions)))
331                 TEST_FAIL("Failed to resume: %s", rd_kafka_err2str(err));
332 
333         msg_base = msgcnt / 2;
334         exp_msg_cnt = msgcnt / 2;
335         TEST_SAY("Consuming second half (%d) of messages at msg_base %d\n",
336                  exp_msg_cnt, msg_base);
337         test_msgver_init(&mv, testid);
338         r = test_consumer_poll("consume.second.half", rk, testid, 1/*exp eof*/,
339                                msg_base, exp_msg_cnt, &mv);
340         TEST_ASSERT(r == exp_msg_cnt,
341                     "expected %d messages, got %d", exp_msg_cnt, r);
342 
343         test_msgver_verify("second.half", &mv, TEST_MSGVER_ALL_PART,
344                            msg_base, exp_msg_cnt);
345         test_msgver_clear(&mv);
346 
347 
348         rd_kafka_topic_partition_list_destroy(partitions);
349 
350         test_consumer_close(rk);
351 
352         rd_kafka_destroy(rk);
353 
354         return 0;
355 }
356 
357 
rebalance_cb(rd_kafka_t * rk,rd_kafka_resp_err_t err,rd_kafka_topic_partition_list_t * parts,void * opaque)358 static void rebalance_cb (rd_kafka_t *rk,
359                           rd_kafka_resp_err_t err,
360                           rd_kafka_topic_partition_list_t *parts,
361                           void *opaque) {
362         rd_kafka_resp_err_t err2;
363 
364         switch (err)
365         {
366         case RD_KAFKA_RESP_ERR__ASSIGN_PARTITIONS:
367                 /* Set start offset to beginning,
368                  * while auto.offset.reset is default at `latest`. */
369 
370                 parts->elems[0].offset = RD_KAFKA_OFFSET_BEGINNING;
371                 test_consumer_assign("rebalance", rk, parts);
372                 TEST_SAY("Pausing partitions\n");
373                 if ((err2 = rd_kafka_pause_partitions(rk, parts)))
374                         TEST_FAIL("Failed to pause: %s",
375                                   rd_kafka_err2str(err2));
376                 TEST_SAY("Resuming partitions\n");
377                 if ((err2 = rd_kafka_resume_partitions(rk, parts)))
378                         TEST_FAIL("Failed to pause: %s",
379                                   rd_kafka_err2str(err2));
380                 break;
381         default:
382                 test_consumer_unassign("rebalance", rk);
383                 break;
384         }
385 }
386 
387 
388 /**
389  * @brief Verify that the assigned offset is used after pause+resume
390  *        if no messages were consumed prior to pause. #2105
391  *
392  * We do this by setting the start offset to BEGINNING in the rebalance_cb
393  * and relying on auto.offset.reset=latest (default) to catch the failure case
394  * where the assigned offset was not honoured.
395  */
consume_subscribe_assign_pause_resume(void)396 static int consume_subscribe_assign_pause_resume (void) {
397         const char *topic = test_mk_topic_name(__FUNCTION__, 1);
398         const int32_t partition = 0;
399         const int msgcnt = 1;
400         rd_kafka_t *rk;
401         rd_kafka_conf_t *conf;
402         uint64_t testid;
403         int r;
404         test_msgver_t mv;
405 
406         TEST_SAY(_C_CYA "[ %s ]\n", __FUNCTION__);
407 
408         test_conf_init(&conf, NULL, 20);
409 
410         test_create_topic(NULL, topic, (int)partition+1, 1);
411 
412         /* Produce messages */
413         testid = test_produce_msgs_easy(topic, 0, partition, msgcnt);
414 
415         /**
416          * Create consumer.
417          */
418         rd_kafka_conf_set_rebalance_cb(conf, rebalance_cb);
419         test_conf_set(conf, "session.timeout.ms", "6000");
420         test_conf_set(conf, "enable.partition.eof", "true");
421         rk = test_create_consumer(topic, NULL, conf, NULL);
422 
423         test_consumer_subscribe(rk, topic);
424 
425         test_msgver_init(&mv, testid);
426         r = test_consumer_poll("consume", rk, testid, 1/*exp eof*/,
427                                0, msgcnt, &mv);
428         TEST_ASSERT(r == msgcnt,
429                     "expected %d messages, got %d", msgcnt, r);
430 
431         test_msgver_verify("consumed", &mv, TEST_MSGVER_ALL_PART, 0, msgcnt);
432         test_msgver_clear(&mv);
433 
434 
435         test_consumer_close(rk);
436 
437         rd_kafka_destroy(rk);
438 
439         return 0;
440 }
441 
442 
main_0026_consume_pause(int argc,char ** argv)443 int main_0026_consume_pause (int argc, char **argv) {
444         int fails = 0;
445 
446         if (test_can_create_topics(1)) {
447                 fails += consume_pause();
448                 fails += consume_pause_resume_after_reassign();
449                 fails += consume_subscribe_assign_pause_resume();
450         }
451 
452         if (fails > 0)
453                 TEST_FAIL("See %d previous error(s)\n", fails);
454 
455         return 0;
456 }
457