1 /*
2  * librdkafka - Apache Kafka C library
3  *
4  * Copyright (c) 2020, Magnus Edenhill
5  * All rights reserved.
6  *
7  * Redistribution and use in source and binary forms, with or without
8  * modification, are permitted provided that the following conditions are met:
9  *
10  * 1. Redistributions of source code must retain the above copyright notice,
11  *    this list of conditions and the following disclaimer.
12  * 2. Redistributions in binary form must reproduce the above copyright notice,
13  *    this list of conditions and the following disclaimer in the documentation
14  *    and/or other materials provided with the distribution.
15  *
16  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
17  * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
18  * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
19  * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
20  * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
21  * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
22  * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
23  * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
24  * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
25  * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
26  * POSSIBILITY OF SUCH DAMAGE.
27  */
28 
29 #include "test.h"
30 
31 #include "../src/rdkafka_proto.h"
32 
33 
34 /**
35  * @name Verify that the high-level consumer times out itself if
36  *       heartbeats are not successful (issue #2631).
37  */
38 
39 static const char *commit_type;
40 static int rebalance_cnt;
41 static rd_kafka_resp_err_t rebalance_exp_event;
42 static rd_kafka_resp_err_t commit_exp_err;
43 
rebalance_cb(rd_kafka_t * rk,rd_kafka_resp_err_t err,rd_kafka_topic_partition_list_t * parts,void * opaque)44 static void rebalance_cb (rd_kafka_t *rk,
45                           rd_kafka_resp_err_t err,
46                           rd_kafka_topic_partition_list_t *parts,
47                           void *opaque) {
48 
49         rebalance_cnt++;
50         TEST_SAY("Rebalance #%d: %s: %d partition(s)\n",
51                  rebalance_cnt, rd_kafka_err2name(err), parts->cnt);
52 
53         TEST_ASSERT(err == rebalance_exp_event,
54                     "Expected rebalance event %s, not %s",
55                     rd_kafka_err2name(rebalance_exp_event),
56                     rd_kafka_err2name(err));
57 
58         if (err == RD_KAFKA_RESP_ERR__ASSIGN_PARTITIONS) {
59                 test_consumer_assign("assign", rk, parts);
60         } else {
61                 rd_kafka_resp_err_t commit_err;
62 
63                 if (strcmp(commit_type, "auto")) {
64                         rd_kafka_resp_err_t perr;
65 
66                         TEST_SAY("Performing %s commit\n", commit_type);
67 
68                         perr = rd_kafka_position(rk, parts);
69                         TEST_ASSERT(!perr, "Failed to acquire position: %s",
70                                     rd_kafka_err2str(perr));
71 
72                         /* Sleep a short while so the broker times out the
73                          * member too. */
74                         rd_sleep(1);
75 
76                         commit_err = rd_kafka_commit(
77                                 rk, parts, !strcmp(commit_type, "async"));
78 
79                         if (!strcmp(commit_type, "async"))
80                                 TEST_ASSERT(!commit_err,
81                                             "Async commit should not fail, "
82                                             "but it returned %s",
83                                             rd_kafka_err2name(commit_err));
84                         else
85                                 TEST_ASSERT(commit_err == commit_exp_err ||
86                                             (!commit_exp_err &&
87                                              commit_err ==
88                                              RD_KAFKA_RESP_ERR__NO_OFFSET),
89                                             "Expected %s commit to return %s, "
90                                             "not %s",
91                                             commit_type,
92                                             rd_kafka_err2name(commit_exp_err),
93                                             rd_kafka_err2name(commit_err));
94                 }
95 
96                 test_consumer_unassign("unassign", rk);
97         }
98 
99         /* Make sure only one rebalance callback is served per poll()
100          * so that expect_rebalance() returns to the test logic on each
101          * rebalance. */
102         rd_kafka_yield(rk);
103 }
104 
105 
106 /**
107  * @brief Wait for an expected rebalance event, or fail.
108  */
expect_rebalance(const char * what,rd_kafka_t * c,rd_kafka_resp_err_t exp_event,int timeout_s)109 static void expect_rebalance (const char *what, rd_kafka_t *c,
110                               rd_kafka_resp_err_t exp_event,
111                               int timeout_s) {
112         int64_t tmout = test_clock() + (timeout_s * 1000000);
113         int start_cnt = rebalance_cnt;
114 
115         TEST_SAY("Waiting for %s (%s) for %ds\n",
116                  what, rd_kafka_err2name(exp_event), timeout_s);
117 
118         rebalance_exp_event = exp_event;
119 
120         while (tmout > test_clock() && rebalance_cnt == start_cnt) {
121                 if (test_consumer_poll_once(c, NULL, 1000))
122                         rd_sleep(1);
123         }
124 
125         if (rebalance_cnt == start_cnt + 1) {
126                 rebalance_exp_event = RD_KAFKA_RESP_ERR_NO_ERROR;
127                 return;
128         }
129 
130         TEST_FAIL("Timed out waiting for %s (%s)\n",
131                   what, rd_kafka_err2name(exp_event));
132 }
133 
134 
135 /**
136  * @brief Verify that session timeouts are handled by the consumer itself.
137  *
138  * @param use_commit_type "auto", "sync" (manual), "async" (manual)
139  */
do_test_session_timeout(const char * use_commit_type)140 static void do_test_session_timeout (const char *use_commit_type) {
141         const char *bootstraps;
142         rd_kafka_mock_cluster_t *mcluster;
143         rd_kafka_conf_t *conf;
144         rd_kafka_t *c;
145         const char *groupid = "mygroup";
146         const char *topic = "test";
147 
148         rebalance_cnt = 0;
149         commit_type = use_commit_type;
150 
151         SUB_TEST0(!strcmp(use_commit_type, "sync") /*quick*/,
152                   "Test session timeout with %s commit", use_commit_type);
153 
154         mcluster = test_mock_cluster_new(3, &bootstraps);
155 
156         rd_kafka_mock_coordinator_set(mcluster, "group", groupid, 1);
157 
158         /* Seed the topic with messages */
159         test_produce_msgs_easy_v(topic, 0, 0, 0, 100, 10,
160                                  "bootstrap.servers", bootstraps,
161                                  "batch.num.messages", "10",
162                                  NULL);
163 
164         test_conf_init(&conf, NULL, 30);
165         test_conf_set(conf, "bootstrap.servers", bootstraps);
166         test_conf_set(conf, "security.protocol", "PLAINTEXT");
167         test_conf_set(conf, "group.id", groupid);
168         test_conf_set(conf, "session.timeout.ms", "5000");
169         test_conf_set(conf, "heartbeat.interval.ms", "1000");
170         test_conf_set(conf, "auto.offset.reset", "earliest");
171         test_conf_set(conf, "enable.auto.commit",
172                       !strcmp(commit_type, "auto") ? "true" : "false");
173 
174         c = test_create_consumer(groupid, rebalance_cb, conf, NULL);
175 
176         test_consumer_subscribe(c, topic);
177 
178         /* Let Heartbeats fail after a couple of successful ones */
179         rd_kafka_mock_push_request_errors(
180                 mcluster, RD_KAFKAP_Heartbeat,
181                 9,
182                 RD_KAFKA_RESP_ERR_NO_ERROR,
183                 RD_KAFKA_RESP_ERR_NO_ERROR,
184                 RD_KAFKA_RESP_ERR_NOT_COORDINATOR,
185                 RD_KAFKA_RESP_ERR_NOT_COORDINATOR,
186                 RD_KAFKA_RESP_ERR_NOT_COORDINATOR,
187                 RD_KAFKA_RESP_ERR_NOT_COORDINATOR,
188                 RD_KAFKA_RESP_ERR_NOT_COORDINATOR,
189                 RD_KAFKA_RESP_ERR_NOT_COORDINATOR,
190                 RD_KAFKA_RESP_ERR_NOT_COORDINATOR);
191 
192         expect_rebalance("initial assignment", c,
193                          RD_KAFKA_RESP_ERR__ASSIGN_PARTITIONS, 5+2);
194 
195         /* Consume a couple of messages so that we have something to commit */
196         test_consumer_poll("consume", c, 0, -1, 0, 10, NULL);
197 
198         /* The commit in the rebalance callback should fail when the
199          * member has timed out from the group. */
200         commit_exp_err = RD_KAFKA_RESP_ERR_UNKNOWN_MEMBER_ID;
201 
202         expect_rebalance("session timeout revoke", c,
203                          RD_KAFKA_RESP_ERR__REVOKE_PARTITIONS, 2+5+2);
204 
205         expect_rebalance("second assignment", c,
206                          RD_KAFKA_RESP_ERR__ASSIGN_PARTITIONS, 5+2);
207 
208         /* Final rebalance in close().
209          * Its commit will work. */
210         rebalance_exp_event = RD_KAFKA_RESP_ERR__REVOKE_PARTITIONS;
211         commit_exp_err = RD_KAFKA_RESP_ERR_NO_ERROR;
212 
213         test_consumer_close(c);
214 
215         rd_kafka_destroy(c);
216 
217         test_mock_cluster_destroy(mcluster);
218 
219         SUB_TEST_PASS();
220 }
221 
222 
223 /**
224  * @brief Attempt manual commit when assignment has been lost (#3217)
225  */
do_test_commit_on_lost(void)226 static void do_test_commit_on_lost (void) {
227         const char *bootstraps;
228         rd_kafka_mock_cluster_t *mcluster;
229         rd_kafka_conf_t *conf;
230         rd_kafka_t *c;
231         const char *groupid = "mygroup";
232         const char *topic = "test";
233         rd_kafka_resp_err_t err;
234 
235         SUB_TEST();
236 
237         test_curr->is_fatal_cb = test_error_is_not_fatal_cb;
238 
239         mcluster = test_mock_cluster_new(3, &bootstraps);
240 
241         rd_kafka_mock_coordinator_set(mcluster, "group", groupid, 1);
242 
243         /* Seed the topic with messages */
244         test_produce_msgs_easy_v(topic, 0, 0, 0, 100, 10,
245                                  "bootstrap.servers", bootstraps,
246                                  "batch.num.messages", "10",
247                                  NULL);
248 
249         test_conf_init(&conf, NULL, 30);
250         test_conf_set(conf, "bootstrap.servers", bootstraps);
251         test_conf_set(conf, "security.protocol", "PLAINTEXT");
252         test_conf_set(conf, "group.id", groupid);
253         test_conf_set(conf, "session.timeout.ms", "5000");
254         test_conf_set(conf, "heartbeat.interval.ms", "1000");
255         test_conf_set(conf, "auto.offset.reset", "earliest");
256         test_conf_set(conf, "enable.auto.commit", "false");
257 
258         c = test_create_consumer(groupid, test_rebalance_cb, conf, NULL);
259 
260         test_consumer_subscribe(c, topic);
261 
262         /* Consume a couple of messages so that we have something to commit */
263         test_consumer_poll("consume", c, 0, -1, 0, 10, NULL);
264 
265         /* Make the coordinator unreachable, this will cause a local session
266          * timeout followed by a revoke and assignment lost. */
267         rd_kafka_mock_broker_set_down(mcluster, 1);
268 
269         /* Wait until the assignment is lost */
270         TEST_SAY("Waiting for assignment to be lost...\n");
271         while (!rd_kafka_assignment_lost(c))
272                 rd_sleep(1);
273 
274         TEST_SAY("Assignment is lost, committing\n");
275         /* Perform manual commit */
276         err = rd_kafka_commit(c, NULL, 0/*sync*/);
277         TEST_SAY("commit() returned: %s\n", rd_kafka_err2name(err));
278         TEST_ASSERT(err, "expected commit to fail");
279 
280         test_consumer_close(c);
281 
282         rd_kafka_destroy(c);
283 
284         test_mock_cluster_destroy(mcluster);
285 
286         test_curr->is_fatal_cb = NULL;
287 
288         SUB_TEST_PASS();
289 }
290 
291 
main_0106_cgrp_sess_timeout(int argc,char ** argv)292 int main_0106_cgrp_sess_timeout (int argc, char **argv) {
293 
294         if (test_needs_auth()) {
295                 TEST_SKIP("Mock cluster does not support SSL/SASL\n");
296                 return 0;
297         }
298 
299         do_test_session_timeout("sync");
300         do_test_session_timeout("async");
301         do_test_session_timeout("auto");
302 
303         do_test_commit_on_lost();
304 
305         return 0;
306 }
307