1 /*
2 * librdkafka - Apache Kafka C library
3 *
4 * Copyright (c) 2020, Magnus Edenhill
5 * All rights reserved.
6 *
7 * Redistribution and use in source and binary forms, with or without
8 * modification, are permitted provided that the following conditions are met:
9 *
10 * 1. Redistributions of source code must retain the above copyright notice,
11 * this list of conditions and the following disclaimer.
12 * 2. Redistributions in binary form must reproduce the above copyright notice,
13 * this list of conditions and the following disclaimer in the documentation
14 * and/or other materials provided with the distribution.
15 *
16 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
17 * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
18 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
19 * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
20 * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
21 * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
22 * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
23 * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
24 * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
25 * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
26 * POSSIBILITY OF SUCH DAMAGE.
27 */
28
29 #include "test.h"
30
31 #include "../src/rdkafka_proto.h"
32
33
34 /**
35 * @name Verify that the high-level consumer times out itself if
36 * heartbeats are not successful (issue #2631).
37 */
38
39 static const char *commit_type;
40 static int rebalance_cnt;
41 static rd_kafka_resp_err_t rebalance_exp_event;
42 static rd_kafka_resp_err_t commit_exp_err;
43
rebalance_cb(rd_kafka_t * rk,rd_kafka_resp_err_t err,rd_kafka_topic_partition_list_t * parts,void * opaque)44 static void rebalance_cb (rd_kafka_t *rk,
45 rd_kafka_resp_err_t err,
46 rd_kafka_topic_partition_list_t *parts,
47 void *opaque) {
48
49 rebalance_cnt++;
50 TEST_SAY("Rebalance #%d: %s: %d partition(s)\n",
51 rebalance_cnt, rd_kafka_err2name(err), parts->cnt);
52
53 TEST_ASSERT(err == rebalance_exp_event,
54 "Expected rebalance event %s, not %s",
55 rd_kafka_err2name(rebalance_exp_event),
56 rd_kafka_err2name(err));
57
58 if (err == RD_KAFKA_RESP_ERR__ASSIGN_PARTITIONS) {
59 test_consumer_assign("assign", rk, parts);
60 } else {
61 rd_kafka_resp_err_t commit_err;
62
63 if (strcmp(commit_type, "auto")) {
64 rd_kafka_resp_err_t perr;
65
66 TEST_SAY("Performing %s commit\n", commit_type);
67
68 perr = rd_kafka_position(rk, parts);
69 TEST_ASSERT(!perr, "Failed to acquire position: %s",
70 rd_kafka_err2str(perr));
71
72 /* Sleep a short while so the broker times out the
73 * member too. */
74 rd_sleep(1);
75
76 commit_err = rd_kafka_commit(
77 rk, parts, !strcmp(commit_type, "async"));
78
79 if (!strcmp(commit_type, "async"))
80 TEST_ASSERT(!commit_err,
81 "Async commit should not fail, "
82 "but it returned %s",
83 rd_kafka_err2name(commit_err));
84 else
85 TEST_ASSERT(commit_err == commit_exp_err ||
86 (!commit_exp_err &&
87 commit_err ==
88 RD_KAFKA_RESP_ERR__NO_OFFSET),
89 "Expected %s commit to return %s, "
90 "not %s",
91 commit_type,
92 rd_kafka_err2name(commit_exp_err),
93 rd_kafka_err2name(commit_err));
94 }
95
96 test_consumer_unassign("unassign", rk);
97 }
98
99 /* Make sure only one rebalance callback is served per poll()
100 * so that expect_rebalance() returns to the test logic on each
101 * rebalance. */
102 rd_kafka_yield(rk);
103 }
104
105
106 /**
107 * @brief Wait for an expected rebalance event, or fail.
108 */
expect_rebalance(const char * what,rd_kafka_t * c,rd_kafka_resp_err_t exp_event,int timeout_s)109 static void expect_rebalance (const char *what, rd_kafka_t *c,
110 rd_kafka_resp_err_t exp_event,
111 int timeout_s) {
112 int64_t tmout = test_clock() + (timeout_s * 1000000);
113 int start_cnt = rebalance_cnt;
114
115 TEST_SAY("Waiting for %s (%s) for %ds\n",
116 what, rd_kafka_err2name(exp_event), timeout_s);
117
118 rebalance_exp_event = exp_event;
119
120 while (tmout > test_clock() && rebalance_cnt == start_cnt) {
121 if (test_consumer_poll_once(c, NULL, 1000))
122 rd_sleep(1);
123 }
124
125 if (rebalance_cnt == start_cnt + 1) {
126 rebalance_exp_event = RD_KAFKA_RESP_ERR_NO_ERROR;
127 return;
128 }
129
130 TEST_FAIL("Timed out waiting for %s (%s)\n",
131 what, rd_kafka_err2name(exp_event));
132 }
133
134
135 /**
136 * @brief Verify that session timeouts are handled by the consumer itself.
137 *
138 * @param use_commit_type "auto", "sync" (manual), "async" (manual)
139 */
do_test_session_timeout(const char * use_commit_type)140 static void do_test_session_timeout (const char *use_commit_type) {
141 const char *bootstraps;
142 rd_kafka_mock_cluster_t *mcluster;
143 rd_kafka_conf_t *conf;
144 rd_kafka_t *c;
145 const char *groupid = "mygroup";
146 const char *topic = "test";
147
148 rebalance_cnt = 0;
149 commit_type = use_commit_type;
150
151 SUB_TEST0(!strcmp(use_commit_type, "sync") /*quick*/,
152 "Test session timeout with %s commit", use_commit_type);
153
154 mcluster = test_mock_cluster_new(3, &bootstraps);
155
156 rd_kafka_mock_coordinator_set(mcluster, "group", groupid, 1);
157
158 /* Seed the topic with messages */
159 test_produce_msgs_easy_v(topic, 0, 0, 0, 100, 10,
160 "bootstrap.servers", bootstraps,
161 "batch.num.messages", "10",
162 NULL);
163
164 test_conf_init(&conf, NULL, 30);
165 test_conf_set(conf, "bootstrap.servers", bootstraps);
166 test_conf_set(conf, "security.protocol", "PLAINTEXT");
167 test_conf_set(conf, "group.id", groupid);
168 test_conf_set(conf, "session.timeout.ms", "5000");
169 test_conf_set(conf, "heartbeat.interval.ms", "1000");
170 test_conf_set(conf, "auto.offset.reset", "earliest");
171 test_conf_set(conf, "enable.auto.commit",
172 !strcmp(commit_type, "auto") ? "true" : "false");
173
174 c = test_create_consumer(groupid, rebalance_cb, conf, NULL);
175
176 test_consumer_subscribe(c, topic);
177
178 /* Let Heartbeats fail after a couple of successful ones */
179 rd_kafka_mock_push_request_errors(
180 mcluster, RD_KAFKAP_Heartbeat,
181 9,
182 RD_KAFKA_RESP_ERR_NO_ERROR,
183 RD_KAFKA_RESP_ERR_NO_ERROR,
184 RD_KAFKA_RESP_ERR_NOT_COORDINATOR,
185 RD_KAFKA_RESP_ERR_NOT_COORDINATOR,
186 RD_KAFKA_RESP_ERR_NOT_COORDINATOR,
187 RD_KAFKA_RESP_ERR_NOT_COORDINATOR,
188 RD_KAFKA_RESP_ERR_NOT_COORDINATOR,
189 RD_KAFKA_RESP_ERR_NOT_COORDINATOR,
190 RD_KAFKA_RESP_ERR_NOT_COORDINATOR);
191
192 expect_rebalance("initial assignment", c,
193 RD_KAFKA_RESP_ERR__ASSIGN_PARTITIONS, 5+2);
194
195 /* Consume a couple of messages so that we have something to commit */
196 test_consumer_poll("consume", c, 0, -1, 0, 10, NULL);
197
198 /* The commit in the rebalance callback should fail when the
199 * member has timed out from the group. */
200 commit_exp_err = RD_KAFKA_RESP_ERR_UNKNOWN_MEMBER_ID;
201
202 expect_rebalance("session timeout revoke", c,
203 RD_KAFKA_RESP_ERR__REVOKE_PARTITIONS, 2+5+2);
204
205 expect_rebalance("second assignment", c,
206 RD_KAFKA_RESP_ERR__ASSIGN_PARTITIONS, 5+2);
207
208 /* Final rebalance in close().
209 * Its commit will work. */
210 rebalance_exp_event = RD_KAFKA_RESP_ERR__REVOKE_PARTITIONS;
211 commit_exp_err = RD_KAFKA_RESP_ERR_NO_ERROR;
212
213 test_consumer_close(c);
214
215 rd_kafka_destroy(c);
216
217 test_mock_cluster_destroy(mcluster);
218
219 SUB_TEST_PASS();
220 }
221
222
223 /**
224 * @brief Attempt manual commit when assignment has been lost (#3217)
225 */
do_test_commit_on_lost(void)226 static void do_test_commit_on_lost (void) {
227 const char *bootstraps;
228 rd_kafka_mock_cluster_t *mcluster;
229 rd_kafka_conf_t *conf;
230 rd_kafka_t *c;
231 const char *groupid = "mygroup";
232 const char *topic = "test";
233 rd_kafka_resp_err_t err;
234
235 SUB_TEST();
236
237 test_curr->is_fatal_cb = test_error_is_not_fatal_cb;
238
239 mcluster = test_mock_cluster_new(3, &bootstraps);
240
241 rd_kafka_mock_coordinator_set(mcluster, "group", groupid, 1);
242
243 /* Seed the topic with messages */
244 test_produce_msgs_easy_v(topic, 0, 0, 0, 100, 10,
245 "bootstrap.servers", bootstraps,
246 "batch.num.messages", "10",
247 NULL);
248
249 test_conf_init(&conf, NULL, 30);
250 test_conf_set(conf, "bootstrap.servers", bootstraps);
251 test_conf_set(conf, "security.protocol", "PLAINTEXT");
252 test_conf_set(conf, "group.id", groupid);
253 test_conf_set(conf, "session.timeout.ms", "5000");
254 test_conf_set(conf, "heartbeat.interval.ms", "1000");
255 test_conf_set(conf, "auto.offset.reset", "earliest");
256 test_conf_set(conf, "enable.auto.commit", "false");
257
258 c = test_create_consumer(groupid, test_rebalance_cb, conf, NULL);
259
260 test_consumer_subscribe(c, topic);
261
262 /* Consume a couple of messages so that we have something to commit */
263 test_consumer_poll("consume", c, 0, -1, 0, 10, NULL);
264
265 /* Make the coordinator unreachable, this will cause a local session
266 * timeout followed by a revoke and assignment lost. */
267 rd_kafka_mock_broker_set_down(mcluster, 1);
268
269 /* Wait until the assignment is lost */
270 TEST_SAY("Waiting for assignment to be lost...\n");
271 while (!rd_kafka_assignment_lost(c))
272 rd_sleep(1);
273
274 TEST_SAY("Assignment is lost, committing\n");
275 /* Perform manual commit */
276 err = rd_kafka_commit(c, NULL, 0/*sync*/);
277 TEST_SAY("commit() returned: %s\n", rd_kafka_err2name(err));
278 TEST_ASSERT(err, "expected commit to fail");
279
280 test_consumer_close(c);
281
282 rd_kafka_destroy(c);
283
284 test_mock_cluster_destroy(mcluster);
285
286 test_curr->is_fatal_cb = NULL;
287
288 SUB_TEST_PASS();
289 }
290
291
main_0106_cgrp_sess_timeout(int argc,char ** argv)292 int main_0106_cgrp_sess_timeout (int argc, char **argv) {
293
294 if (test_needs_auth()) {
295 TEST_SKIP("Mock cluster does not support SSL/SASL\n");
296 return 0;
297 }
298
299 do_test_session_timeout("sync");
300 do_test_session_timeout("async");
301 do_test_session_timeout("auto");
302
303 do_test_commit_on_lost();
304
305 return 0;
306 }
307