1 /*
2  * librdkafka - Apache Kafka C library
3  *
4  * Copyright (c) 2012-2018, Magnus Edenhill
5  * All rights reserved.
6  *
7  * Redistribution and use in source and binary forms, with or without
8  * modification, are permitted provided that the following conditions are met:
9  *
10  * 1. Redistributions of source code must retain the above copyright notice,
11  *    this list of conditions and the following disclaimer.
12  * 2. Redistributions in binary form must reproduce the above copyright notice,
13  *    this list of conditions and the following disclaimer in the documentation
14  *    and/or other materials provided with the distribution.
15  *
16  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
17  * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
18  * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
19  * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
20  * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
21  * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
22  * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
23  * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
24  * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
25  * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
26  * POSSIBILITY OF SUCH DAMAGE.
27  */
28 
29 #include "test.h"
30 #include "rdkafka.h"
31 
32 
33 /**
34  * Verify that long-processing consumer does not leave the group during
35  * processing when processing time < max.poll.interval.ms but
36  * max.poll.interval.ms > socket.timeout.ms.
37  *
38  * MO:
39  *  - produce N*.. messages to two partitions
40  *  - create two consumers, c0 and c1.
41  *  - subscribe c0, wait for rebalance, poll first message.
42  *  - subscribe c1
43  *  - have both consumers poll messages and spend T seconds processing
44  *    each message.
45  *  - wait until both consumers have received N messages each.
46  *  - check that no errors (disconnects, etc) or extra rebalances were raised.
47  */
48 
49 
50 const int64_t processing_time = 31*1000*1000; /*31s*/
51 
52 struct _consumer {
53         rd_kafka_t *rk;
54         int64_t last;
55         int cnt;
56         int rebalance_cnt;
57         int max_rebalance_cnt;
58 };
59 
do_consume(struct _consumer * cons,int timeout_s)60 static void do_consume (struct _consumer *cons, int timeout_s) {
61         rd_kafka_message_t *rkm;
62 
63         rkm = rd_kafka_consumer_poll(cons->rk, timeout_s*1000);
64         if (!rkm)
65                 return;
66 
67         TEST_ASSERT(!rkm->err,
68                     "%s consumer error: %s (last poll was %dms ago)",
69                     rd_kafka_name(cons->rk),
70                     rd_kafka_message_errstr(rkm),
71                     (int)((test_clock() - cons->last)/1000));
72 
73         TEST_SAY("%s: processing message #%d from "
74                  "partition %"PRId32" at offset %"PRId64"\n",
75                  rd_kafka_name(cons->rk), cons->cnt,
76                  rkm->partition, rkm->offset);
77 
78         rd_kafka_message_destroy(rkm);
79 
80         cons->cnt++;
81         cons->last = test_clock();
82 
83         TEST_SAY("%s: simulate processing by sleeping for %ds\n",
84                  rd_kafka_name(cons->rk), timeout_s);
85         rd_sleep(timeout_s);
86 }
87 
88 
rebalance_cb(rd_kafka_t * rk,rd_kafka_resp_err_t err,rd_kafka_topic_partition_list_t * parts,void * opaque)89 static void rebalance_cb (rd_kafka_t *rk,
90                           rd_kafka_resp_err_t err,
91                           rd_kafka_topic_partition_list_t *parts,
92                           void *opaque) {
93         struct _consumer *cons = opaque;
94 
95         cons->rebalance_cnt++;
96 
97         TEST_SAY(_C_BLU "%s rebalance #%d/%d: %s: %d partition(s)\n",
98                  rd_kafka_name(cons->rk),
99                  cons->rebalance_cnt, cons->max_rebalance_cnt,
100                  rd_kafka_err2name(err),
101                  parts->cnt);
102 
103         TEST_ASSERT(cons->rebalance_cnt <= cons->max_rebalance_cnt,
104                     "%s rebalanced %d times, max was %d",
105                     rd_kafka_name(cons->rk),
106                     cons->rebalance_cnt, cons->max_rebalance_cnt);
107 
108         if (err == RD_KAFKA_RESP_ERR__ASSIGN_PARTITIONS)
109                 rd_kafka_assign(rk, parts);
110         else
111                 rd_kafka_assign(rk, NULL);
112 }
113 
114 
115 #define _CONSUMER_CNT 2
do_test_with_subscribe(const char * topic)116 static void do_test_with_subscribe (const char *topic) {
117         int64_t testid;
118         const int msgcnt = 3;
119         struct _consumer c[_CONSUMER_CNT] = RD_ZERO_INIT;
120         rd_kafka_conf_t *conf;
121 
122         TEST_SAY(_C_MAG "[ Test max.poll.interval.ms with subscribe() ]\n");
123 
124         testid = test_id_generate();
125 
126         test_conf_init(&conf, NULL,
127                        10 + (int)(processing_time/1000000) * msgcnt);
128 
129         /* Produce extra messages since we can't fully rely on the
130          * random partitioner to provide exact distribution. */
131         test_produce_msgs_easy(topic, testid, -1, msgcnt * _CONSUMER_CNT * 2);
132         test_produce_msgs_easy(topic, testid, 1, msgcnt/2);
133 
134         test_conf_set(conf, "session.timeout.ms", "6000");
135         test_conf_set(conf, "max.poll.interval.ms", "20000" /*20s*/);
136         test_conf_set(conf, "socket.timeout.ms", "15000" /*15s*/);
137         test_conf_set(conf, "auto.offset.reset", "earliest");
138         test_conf_set(conf, "enable.partition.eof", "false");
139         /* Trigger other requests often */
140         test_conf_set(conf, "topic.metadata.refresh.interval.ms", "1000");
141         rd_kafka_conf_set_rebalance_cb(conf, rebalance_cb);
142 
143         rd_kafka_conf_set_opaque(conf, &c[0]);
144         c[0].rk = test_create_consumer(topic, NULL,
145                                        rd_kafka_conf_dup(conf), NULL);
146 
147         rd_kafka_conf_set_opaque(conf, &c[1]);
148         c[1].rk = test_create_consumer(topic, NULL, conf, NULL);
149 
150         test_consumer_subscribe(c[0].rk, topic);
151 
152         /* c0: assign, (c1 joins) revoke, assign */
153         c[0].max_rebalance_cnt = 3;
154         /* c1: assign */
155         c[1].max_rebalance_cnt = 1;
156 
157         /* Wait for assignment */
158         while (1) {
159                 rd_kafka_topic_partition_list_t *parts = NULL;
160 
161                 do_consume(&c[0], 1/*1s*/);
162 
163                 if (rd_kafka_assignment(c[0].rk, &parts) !=
164                     RD_KAFKA_RESP_ERR_NO_ERROR ||
165                     !parts || parts->cnt == 0) {
166                         if (parts)
167                                 rd_kafka_topic_partition_list_destroy(parts);
168                         continue;
169                 }
170 
171                 TEST_SAY("%s got assignment of %d partition(s)\n",
172                          rd_kafka_name(c[0].rk), parts->cnt);
173                 rd_kafka_topic_partition_list_destroy(parts);
174                 break;
175         }
176 
177         test_consumer_subscribe(c[1].rk, topic);
178 
179         /* Poll until both consumers have finished reading N messages */
180         while (c[0].cnt < msgcnt && c[1].cnt < msgcnt) {
181                 do_consume(&c[0], 0);
182                 do_consume(&c[1], 10/*10s*/);
183         }
184 
185         /* Allow the extra revoke rebalance on close() */
186         c[0].max_rebalance_cnt++;
187         c[1].max_rebalance_cnt++;
188 
189         test_consumer_close(c[0].rk);
190         test_consumer_close(c[1].rk);
191 
192         rd_kafka_destroy(c[0].rk);
193         rd_kafka_destroy(c[1].rk);
194 
195         TEST_SAY(_C_GRN
196                  "[ Test max.poll.interval.ms with subscribe(): PASS ]\n");
197 }
198 
199 
200 /**
201  * @brief Verify that max.poll.interval.ms does NOT kick in
202  *        when just using assign() and not subscribe().
203  */
do_test_with_assign(const char * topic)204 static void do_test_with_assign (const char *topic) {
205         rd_kafka_t *rk;
206         rd_kafka_conf_t *conf;
207         rd_kafka_message_t *rkm;
208 
209         TEST_SAY(_C_MAG "[ Test max.poll.interval.ms with assign() ]\n");
210 
211         test_conf_init(&conf, NULL, 60);
212 
213         test_create_topic(NULL, topic, 2, 1);
214 
215         test_conf_set(conf, "session.timeout.ms", "6000");
216         test_conf_set(conf, "max.poll.interval.ms", "7000" /*7s*/);
217 
218         rk = test_create_consumer(topic, NULL, conf, NULL);
219 
220         test_consumer_assign_partition("ASSIGN", rk, topic, 0,
221                                        RD_KAFKA_OFFSET_END);
222 
223 
224         /* Sleep for longer than max.poll.interval.ms */
225         rd_sleep(10);
226 
227         /* Make sure no error was raised */
228         while ((rkm = rd_kafka_consumer_poll(rk, 0))) {
229                 TEST_ASSERT(!rkm->err,
230                             "Unexpected consumer error: %s: %s",
231                             rd_kafka_err2name(rkm->err),
232                             rd_kafka_message_errstr(rkm));
233 
234                 rd_kafka_message_destroy(rkm);
235         }
236 
237 
238         test_consumer_close(rk);
239         rd_kafka_destroy(rk);
240 
241         TEST_SAY(_C_GRN
242                  "[ Test max.poll.interval.ms with assign(): PASS ]\n");
243 }
244 
245 
246 /**
247  * @brief Verify that max.poll.interval.ms kicks in even if
248  *        the application hasn't called poll once.
249  */
do_test_no_poll(const char * topic)250 static void do_test_no_poll (const char *topic) {
251         rd_kafka_t *rk;
252         rd_kafka_conf_t *conf;
253         rd_kafka_message_t *rkm;
254         rd_bool_t raised = rd_false;
255 
256         TEST_SAY(_C_MAG "[ Test max.poll.interval.ms without calling poll ]\n");
257 
258         test_conf_init(&conf, NULL, 60);
259 
260         test_create_topic(NULL, topic, 2, 1);
261 
262         test_conf_set(conf, "session.timeout.ms", "6000");
263         test_conf_set(conf, "max.poll.interval.ms", "7000" /*7s*/);
264 
265         rk = test_create_consumer(topic, NULL, conf, NULL);
266 
267         test_consumer_subscribe(rk, topic);
268 
269         /* Sleep for longer than max.poll.interval.ms */
270         rd_sleep(10);
271 
272         /* Make sure the error is raised */
273         while ((rkm = rd_kafka_consumer_poll(rk, 0))) {
274                 if (rkm->err == RD_KAFKA_RESP_ERR__MAX_POLL_EXCEEDED)
275                         raised = rd_true;
276 
277                 rd_kafka_message_destroy(rkm);
278         }
279 
280         TEST_ASSERT(raised, "Expected to have seen ERR__MAX_POLL_EXCEEDED");
281 
282         test_consumer_close(rk);
283         rd_kafka_destroy(rk);
284 
285         TEST_SAY(_C_GRN
286                  "[ Test max.poll.interval.ms without calling poll: PASS ]\n");
287 }
288 
289 
main_0091_max_poll_interval_timeout(int argc,char ** argv)290 int main_0091_max_poll_interval_timeout (int argc, char **argv) {
291         const char *topic = test_mk_topic_name("0091_max_poll_interval_tmout",
292                                                1);
293 
294         test_create_topic(NULL, topic, 2, 1);
295 
296         do_test_with_subscribe(topic);
297 
298         do_test_with_assign(topic);
299 
300         do_test_no_poll(topic);
301 
302         return 0;
303 }
304