1 /*
2 * librdkafka - Apache Kafka C library
3 *
4 * Copyright (c) 2012-2018, Magnus Edenhill
5 * All rights reserved.
6 *
7 * Redistribution and use in source and binary forms, with or without
8 * modification, are permitted provided that the following conditions are met:
9 *
10 * 1. Redistributions of source code must retain the above copyright notice,
11 * this list of conditions and the following disclaimer.
12 * 2. Redistributions in binary form must reproduce the above copyright notice,
13 * this list of conditions and the following disclaimer in the documentation
14 * and/or other materials provided with the distribution.
15 *
16 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
17 * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
18 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
19 * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
20 * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
21 * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
22 * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
23 * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
24 * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
25 * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
26 * POSSIBILITY OF SUCH DAMAGE.
27 */
28
29 #include "test.h"
30 #include "rdkafka.h"
31
32
33 /**
34 * Verify that long-processing consumer does not leave the group during
35 * processing when processing time < max.poll.interval.ms but
36 * max.poll.interval.ms > socket.timeout.ms.
37 *
38 * MO:
39 * - produce N*.. messages to two partitions
40 * - create two consumers, c0 and c1.
41 * - subscribe c0, wait for rebalance, poll first message.
42 * - subscribe c1
43 * - have both consumers poll messages and spend T seconds processing
44 * each message.
45 * - wait until both consumers have received N messages each.
46 * - check that no errors (disconnects, etc) or extra rebalances were raised.
47 */
48
49
50 const int64_t processing_time = 31*1000*1000; /*31s*/
51
52 struct _consumer {
53 rd_kafka_t *rk;
54 int64_t last;
55 int cnt;
56 int rebalance_cnt;
57 int max_rebalance_cnt;
58 };
59
do_consume(struct _consumer * cons,int timeout_s)60 static void do_consume (struct _consumer *cons, int timeout_s) {
61 rd_kafka_message_t *rkm;
62
63 rkm = rd_kafka_consumer_poll(cons->rk, timeout_s*1000);
64 if (!rkm)
65 return;
66
67 TEST_ASSERT(!rkm->err,
68 "%s consumer error: %s (last poll was %dms ago)",
69 rd_kafka_name(cons->rk),
70 rd_kafka_message_errstr(rkm),
71 (int)((test_clock() - cons->last)/1000));
72
73 TEST_SAY("%s: processing message #%d from "
74 "partition %"PRId32" at offset %"PRId64"\n",
75 rd_kafka_name(cons->rk), cons->cnt,
76 rkm->partition, rkm->offset);
77
78 rd_kafka_message_destroy(rkm);
79
80 cons->cnt++;
81 cons->last = test_clock();
82
83 TEST_SAY("%s: simulate processing by sleeping for %ds\n",
84 rd_kafka_name(cons->rk), timeout_s);
85 rd_sleep(timeout_s);
86 }
87
88
rebalance_cb(rd_kafka_t * rk,rd_kafka_resp_err_t err,rd_kafka_topic_partition_list_t * parts,void * opaque)89 static void rebalance_cb (rd_kafka_t *rk,
90 rd_kafka_resp_err_t err,
91 rd_kafka_topic_partition_list_t *parts,
92 void *opaque) {
93 struct _consumer *cons = opaque;
94
95 cons->rebalance_cnt++;
96
97 TEST_SAY(_C_BLU "%s rebalance #%d/%d: %s: %d partition(s)\n",
98 rd_kafka_name(cons->rk),
99 cons->rebalance_cnt, cons->max_rebalance_cnt,
100 rd_kafka_err2name(err),
101 parts->cnt);
102
103 TEST_ASSERT(cons->rebalance_cnt <= cons->max_rebalance_cnt,
104 "%s rebalanced %d times, max was %d",
105 rd_kafka_name(cons->rk),
106 cons->rebalance_cnt, cons->max_rebalance_cnt);
107
108 if (err == RD_KAFKA_RESP_ERR__ASSIGN_PARTITIONS)
109 rd_kafka_assign(rk, parts);
110 else
111 rd_kafka_assign(rk, NULL);
112 }
113
114
115 #define _CONSUMER_CNT 2
do_test_with_subscribe(const char * topic)116 static void do_test_with_subscribe (const char *topic) {
117 int64_t testid;
118 const int msgcnt = 3;
119 struct _consumer c[_CONSUMER_CNT] = RD_ZERO_INIT;
120 rd_kafka_conf_t *conf;
121
122 TEST_SAY(_C_MAG "[ Test max.poll.interval.ms with subscribe() ]\n");
123
124 testid = test_id_generate();
125
126 test_conf_init(&conf, NULL,
127 10 + (int)(processing_time/1000000) * msgcnt);
128
129 /* Produce extra messages since we can't fully rely on the
130 * random partitioner to provide exact distribution. */
131 test_produce_msgs_easy(topic, testid, -1, msgcnt * _CONSUMER_CNT * 2);
132 test_produce_msgs_easy(topic, testid, 1, msgcnt/2);
133
134 test_conf_set(conf, "session.timeout.ms", "6000");
135 test_conf_set(conf, "max.poll.interval.ms", "20000" /*20s*/);
136 test_conf_set(conf, "socket.timeout.ms", "15000" /*15s*/);
137 test_conf_set(conf, "auto.offset.reset", "earliest");
138 test_conf_set(conf, "enable.partition.eof", "false");
139 /* Trigger other requests often */
140 test_conf_set(conf, "topic.metadata.refresh.interval.ms", "1000");
141 rd_kafka_conf_set_rebalance_cb(conf, rebalance_cb);
142
143 rd_kafka_conf_set_opaque(conf, &c[0]);
144 c[0].rk = test_create_consumer(topic, NULL,
145 rd_kafka_conf_dup(conf), NULL);
146
147 rd_kafka_conf_set_opaque(conf, &c[1]);
148 c[1].rk = test_create_consumer(topic, NULL, conf, NULL);
149
150 test_consumer_subscribe(c[0].rk, topic);
151
152 /* c0: assign, (c1 joins) revoke, assign */
153 c[0].max_rebalance_cnt = 3;
154 /* c1: assign */
155 c[1].max_rebalance_cnt = 1;
156
157 /* Wait for assignment */
158 while (1) {
159 rd_kafka_topic_partition_list_t *parts = NULL;
160
161 do_consume(&c[0], 1/*1s*/);
162
163 if (rd_kafka_assignment(c[0].rk, &parts) !=
164 RD_KAFKA_RESP_ERR_NO_ERROR ||
165 !parts || parts->cnt == 0) {
166 if (parts)
167 rd_kafka_topic_partition_list_destroy(parts);
168 continue;
169 }
170
171 TEST_SAY("%s got assignment of %d partition(s)\n",
172 rd_kafka_name(c[0].rk), parts->cnt);
173 rd_kafka_topic_partition_list_destroy(parts);
174 break;
175 }
176
177 test_consumer_subscribe(c[1].rk, topic);
178
179 /* Poll until both consumers have finished reading N messages */
180 while (c[0].cnt < msgcnt && c[1].cnt < msgcnt) {
181 do_consume(&c[0], 0);
182 do_consume(&c[1], 10/*10s*/);
183 }
184
185 /* Allow the extra revoke rebalance on close() */
186 c[0].max_rebalance_cnt++;
187 c[1].max_rebalance_cnt++;
188
189 test_consumer_close(c[0].rk);
190 test_consumer_close(c[1].rk);
191
192 rd_kafka_destroy(c[0].rk);
193 rd_kafka_destroy(c[1].rk);
194
195 TEST_SAY(_C_GRN
196 "[ Test max.poll.interval.ms with subscribe(): PASS ]\n");
197 }
198
199
200 /**
201 * @brief Verify that max.poll.interval.ms does NOT kick in
202 * when just using assign() and not subscribe().
203 */
do_test_with_assign(const char * topic)204 static void do_test_with_assign (const char *topic) {
205 rd_kafka_t *rk;
206 rd_kafka_conf_t *conf;
207 rd_kafka_message_t *rkm;
208
209 TEST_SAY(_C_MAG "[ Test max.poll.interval.ms with assign() ]\n");
210
211 test_conf_init(&conf, NULL, 60);
212
213 test_create_topic(NULL, topic, 2, 1);
214
215 test_conf_set(conf, "session.timeout.ms", "6000");
216 test_conf_set(conf, "max.poll.interval.ms", "7000" /*7s*/);
217
218 rk = test_create_consumer(topic, NULL, conf, NULL);
219
220 test_consumer_assign_partition("ASSIGN", rk, topic, 0,
221 RD_KAFKA_OFFSET_END);
222
223
224 /* Sleep for longer than max.poll.interval.ms */
225 rd_sleep(10);
226
227 /* Make sure no error was raised */
228 while ((rkm = rd_kafka_consumer_poll(rk, 0))) {
229 TEST_ASSERT(!rkm->err,
230 "Unexpected consumer error: %s: %s",
231 rd_kafka_err2name(rkm->err),
232 rd_kafka_message_errstr(rkm));
233
234 rd_kafka_message_destroy(rkm);
235 }
236
237
238 test_consumer_close(rk);
239 rd_kafka_destroy(rk);
240
241 TEST_SAY(_C_GRN
242 "[ Test max.poll.interval.ms with assign(): PASS ]\n");
243 }
244
245
246 /**
247 * @brief Verify that max.poll.interval.ms kicks in even if
248 * the application hasn't called poll once.
249 */
do_test_no_poll(const char * topic)250 static void do_test_no_poll (const char *topic) {
251 rd_kafka_t *rk;
252 rd_kafka_conf_t *conf;
253 rd_kafka_message_t *rkm;
254 rd_bool_t raised = rd_false;
255
256 TEST_SAY(_C_MAG "[ Test max.poll.interval.ms without calling poll ]\n");
257
258 test_conf_init(&conf, NULL, 60);
259
260 test_create_topic(NULL, topic, 2, 1);
261
262 test_conf_set(conf, "session.timeout.ms", "6000");
263 test_conf_set(conf, "max.poll.interval.ms", "7000" /*7s*/);
264
265 rk = test_create_consumer(topic, NULL, conf, NULL);
266
267 test_consumer_subscribe(rk, topic);
268
269 /* Sleep for longer than max.poll.interval.ms */
270 rd_sleep(10);
271
272 /* Make sure the error is raised */
273 while ((rkm = rd_kafka_consumer_poll(rk, 0))) {
274 if (rkm->err == RD_KAFKA_RESP_ERR__MAX_POLL_EXCEEDED)
275 raised = rd_true;
276
277 rd_kafka_message_destroy(rkm);
278 }
279
280 TEST_ASSERT(raised, "Expected to have seen ERR__MAX_POLL_EXCEEDED");
281
282 test_consumer_close(rk);
283 rd_kafka_destroy(rk);
284
285 TEST_SAY(_C_GRN
286 "[ Test max.poll.interval.ms without calling poll: PASS ]\n");
287 }
288
289
main_0091_max_poll_interval_timeout(int argc,char ** argv)290 int main_0091_max_poll_interval_timeout (int argc, char **argv) {
291 const char *topic = test_mk_topic_name("0091_max_poll_interval_tmout",
292 1);
293
294 test_create_topic(NULL, topic, 2, 1);
295
296 do_test_with_subscribe(topic);
297
298 do_test_with_assign(topic);
299
300 do_test_no_poll(topic);
301
302 return 0;
303 }
304