1 /*
2  * librdkafka - Apache Kafka C library
3  *
4  * Copyright (c) 2020, Magnus Edenhill
5  * All rights reserved.
6  *
7  * Redistribution and use in source and binary forms, with or without
8  * modification, are permitted provided that the following conditions are met:
9  *
10  * 1. Redistributions of source code must retain the above copyright notice,
11  *    this list of conditions and the following disclaimer.
12  * 2. Redistributions in binary form must reproduce the above copyright notice,
13  *    this list of conditions and the following disclaimer in the documentation
14  *    and/or other materials provided with the distribution.
15  *
16  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
17  * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
18  * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
19  * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
20  * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
21  * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
22  * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
23  * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
24  * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
25  * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
26  * POSSIBILITY OF SUCH DAMAGE.
27  */
28 
29 #include "test.h"
30 
31 #include "../src/rdkafka_proto.h"
32 
33 
34 /**
35  * @name Verify that the high-level consumer times out itself if
36  *       heartbeats are not successful (issue #2631).
37  */
38 
39 static const char *commit_type;
40 static int rebalance_cnt;
41 static rd_kafka_resp_err_t rebalance_exp_event;
42 static rd_kafka_resp_err_t commit_exp_err;
43 
rebalance_cb(rd_kafka_t * rk,rd_kafka_resp_err_t err,rd_kafka_topic_partition_list_t * parts,void * opaque)44 static void rebalance_cb (rd_kafka_t *rk,
45                           rd_kafka_resp_err_t err,
46                           rd_kafka_topic_partition_list_t *parts,
47                           void *opaque) {
48 
49         rebalance_cnt++;
50         TEST_SAY("Rebalance #%d: %s: %d partition(s)\n",
51                  rebalance_cnt, rd_kafka_err2name(err), parts->cnt);
52 
53         TEST_ASSERT(err == rebalance_exp_event,
54                     "Expected rebalance event %s, not %s",
55                     rd_kafka_err2name(rebalance_exp_event),
56                     rd_kafka_err2name(err));
57 
58         if (err == RD_KAFKA_RESP_ERR__ASSIGN_PARTITIONS) {
59                 test_consumer_assign("assign", rk, parts);
60         } else {
61                 rd_kafka_resp_err_t commit_err;
62 
63                 if (strcmp(commit_type, "auto")) {
64                         rd_kafka_resp_err_t perr;
65 
66                         TEST_SAY("Performing %s commit\n", commit_type);
67 
68                         perr = rd_kafka_position(rk, parts);
69                         TEST_ASSERT(!perr, "Failed to acquire position: %s",
70                                     rd_kafka_err2str(perr));
71 
72                         /* Sleep a short while so the broker times out the
73                          * member too. */
74                         rd_sleep(1);
75 
76                         commit_err = rd_kafka_commit(
77                                 rk, parts, !strcmp(commit_type, "async"));
78 
79                         if (!strcmp(commit_type, "async"))
80                                 TEST_ASSERT(!commit_err,
81                                             "Async commit should not fail, "
82                                             "but it returned %s",
83                                             rd_kafka_err2name(commit_err));
84                         else
85                                 TEST_ASSERT(commit_err == commit_exp_err ||
86                                             (!commit_exp_err &&
87                                              commit_err ==
88                                              RD_KAFKA_RESP_ERR__NO_OFFSET),
89                                             "Expected %s commit to return %s, "
90                                             "not %s",
91                                             commit_type,
92                                             rd_kafka_err2name(commit_exp_err),
93                                             rd_kafka_err2name(commit_err));
94                 }
95 
96                 test_consumer_unassign("unassign", rk);
97         }
98 }
99 
100 
101 /**
102  * @brief Wait for an expected rebalance event, or fail.
103  */
expect_rebalance(const char * what,rd_kafka_t * c,rd_kafka_resp_err_t exp_event,int timeout_s)104 static void expect_rebalance (const char *what, rd_kafka_t *c,
105                               rd_kafka_resp_err_t exp_event,
106                               int timeout_s) {
107         int64_t tmout = test_clock() + (timeout_s * 1000000);
108         int start_cnt = rebalance_cnt;
109 
110         TEST_SAY("Waiting for %s (%s) for %ds\n",
111                  what, rd_kafka_err2name(exp_event), timeout_s);
112 
113         rebalance_exp_event = exp_event;
114 
115         while (tmout > test_clock() && rebalance_cnt == start_cnt) {
116                 if (test_consumer_poll_once(c, NULL, 1000))
117                         rd_sleep(1);
118         }
119 
120         if (rebalance_cnt == start_cnt + 1) {
121                 rebalance_exp_event = RD_KAFKA_RESP_ERR_NO_ERROR;
122                 return;
123         }
124 
125         TEST_FAIL("Timed out waiting for %s (%s)\n",
126                   what, rd_kafka_err2name(exp_event));
127 }
128 
129 
130 /**
131  * @brief Verify that session timeouts are handled by the consumer itself.
132  *
133  * @param use_commit_type "auto", "sync" (manual), "async" (manual)
134  */
do_test_session_timeout(const char * use_commit_type)135 static void do_test_session_timeout (const char *use_commit_type) {
136         const char *bootstraps;
137         rd_kafka_mock_cluster_t *mcluster;
138         rd_kafka_conf_t *conf;
139         rd_kafka_t *c;
140         const char *groupid = "mygroup";
141         const char *topic = "test";
142 
143         rebalance_cnt = 0;
144         commit_type = use_commit_type;
145 
146         TEST_SAY(_C_MAG "[ Test session timeout with %s commit ]\n",
147                  commit_type);
148 
149         mcluster = test_mock_cluster_new(3, &bootstraps);
150 
151         rd_kafka_mock_coordinator_set(mcluster, "group", groupid, 1);
152 
153         /* Seed the topic with messages */
154         test_produce_msgs_easy_v(topic, 0, 0, 0, 100, 10,
155                                  "bootstrap.servers", bootstraps,
156                                  "batch.num.messages", "10",
157                                  NULL);
158 
159         test_conf_init(&conf, NULL, 30);
160         test_conf_set(conf, "bootstrap.servers", bootstraps);
161         test_conf_set(conf, "security.protocol", "PLAINTEXT");
162         test_conf_set(conf, "group.id", groupid);
163         test_conf_set(conf, "session.timeout.ms", "5000");
164         test_conf_set(conf, "heartbeat.interval.ms", "1000");
165         test_conf_set(conf, "auto.offset.reset", "earliest");
166         test_conf_set(conf, "enable.auto.commit",
167                       !strcmp(commit_type, "auto") ? "true" : "false");
168 
169         c = test_create_consumer(groupid, rebalance_cb, conf, NULL);
170 
171         test_consumer_subscribe(c, topic);
172 
173         /* Let Heartbeats fail after a couple of successful ones */
174         rd_kafka_mock_push_request_errors(
175                 mcluster, RD_KAFKAP_Heartbeat,
176                 9,
177                 RD_KAFKA_RESP_ERR_NO_ERROR,
178                 RD_KAFKA_RESP_ERR_NO_ERROR,
179                 RD_KAFKA_RESP_ERR_NOT_COORDINATOR,
180                 RD_KAFKA_RESP_ERR_NOT_COORDINATOR,
181                 RD_KAFKA_RESP_ERR_NOT_COORDINATOR,
182                 RD_KAFKA_RESP_ERR_NOT_COORDINATOR,
183                 RD_KAFKA_RESP_ERR_NOT_COORDINATOR,
184                 RD_KAFKA_RESP_ERR_NOT_COORDINATOR,
185                 RD_KAFKA_RESP_ERR_NOT_COORDINATOR);
186 
187         expect_rebalance("initial assignment", c,
188                          RD_KAFKA_RESP_ERR__ASSIGN_PARTITIONS, 5+2);
189 
190         /* Consume a couple of messages so that we have something to commit */
191         test_consumer_poll("consume", c, 0, -1, 0, 10, NULL);
192 
193         /* The commit in the rebalance callback should fail when the
194          * member has timed out from the group. */
195         commit_exp_err = RD_KAFKA_RESP_ERR_UNKNOWN_MEMBER_ID;
196 
197         expect_rebalance("session timeout revoke", c,
198                          RD_KAFKA_RESP_ERR__REVOKE_PARTITIONS, 2+5+2);
199 
200         expect_rebalance("second assignment", c,
201                          RD_KAFKA_RESP_ERR__ASSIGN_PARTITIONS, 5+2);
202 
203         /* Final rebalance in close().
204          * It's commit will work. */
205         rebalance_exp_event = RD_KAFKA_RESP_ERR__REVOKE_PARTITIONS;
206         commit_exp_err = RD_KAFKA_RESP_ERR_NO_ERROR;
207 
208         test_consumer_close(c);
209 
210         rd_kafka_destroy(c);
211 
212         test_mock_cluster_destroy(mcluster);
213 
214         TEST_SAY(_C_GRN "[ Test session timeout with %s commit PASSED ]\n",
215                  commit_type);
216 }
217 
218 
main_0106_cgrp_sess_timeout(int argc,char ** argv)219 int main_0106_cgrp_sess_timeout (int argc, char **argv) {
220 
221         do_test_session_timeout("sync");
222 
223         if (!test_quick) {
224                 do_test_session_timeout("async");
225                 do_test_session_timeout("auto");
226         }
227 
228         return 0;
229 }
230