1 /*
2 * librdkafka - Apache Kafka C library
3 *
4 * Copyright (c) 2020, Magnus Edenhill
5 * All rights reserved.
6 *
7 * Redistribution and use in source and binary forms, with or without
8 * modification, are permitted provided that the following conditions are met:
9 *
10 * 1. Redistributions of source code must retain the above copyright notice,
11 * this list of conditions and the following disclaimer.
12 * 2. Redistributions in binary form must reproduce the above copyright notice,
13 * this list of conditions and the following disclaimer in the documentation
14 * and/or other materials provided with the distribution.
15 *
16 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
17 * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
18 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
19 * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
20 * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
21 * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
22 * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
23 * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
24 * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
25 * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
26 * POSSIBILITY OF SUCH DAMAGE.
27 */
28
29 #include "test.h"
30
31 #include "../src/rdkafka_proto.h"
32
33
34 /**
35 * @name Verify that the high-level consumer times out itself if
36 * heartbeats are not successful (issue #2631).
37 */
38
39 static const char *commit_type;
40 static int rebalance_cnt;
41 static rd_kafka_resp_err_t rebalance_exp_event;
42 static rd_kafka_resp_err_t commit_exp_err;
43
rebalance_cb(rd_kafka_t * rk,rd_kafka_resp_err_t err,rd_kafka_topic_partition_list_t * parts,void * opaque)44 static void rebalance_cb (rd_kafka_t *rk,
45 rd_kafka_resp_err_t err,
46 rd_kafka_topic_partition_list_t *parts,
47 void *opaque) {
48
49 rebalance_cnt++;
50 TEST_SAY("Rebalance #%d: %s: %d partition(s)\n",
51 rebalance_cnt, rd_kafka_err2name(err), parts->cnt);
52
53 TEST_ASSERT(err == rebalance_exp_event,
54 "Expected rebalance event %s, not %s",
55 rd_kafka_err2name(rebalance_exp_event),
56 rd_kafka_err2name(err));
57
58 if (err == RD_KAFKA_RESP_ERR__ASSIGN_PARTITIONS) {
59 test_consumer_assign("assign", rk, parts);
60 } else {
61 rd_kafka_resp_err_t commit_err;
62
63 if (strcmp(commit_type, "auto")) {
64 rd_kafka_resp_err_t perr;
65
66 TEST_SAY("Performing %s commit\n", commit_type);
67
68 perr = rd_kafka_position(rk, parts);
69 TEST_ASSERT(!perr, "Failed to acquire position: %s",
70 rd_kafka_err2str(perr));
71
72 /* Sleep a short while so the broker times out the
73 * member too. */
74 rd_sleep(1);
75
76 commit_err = rd_kafka_commit(
77 rk, parts, !strcmp(commit_type, "async"));
78
79 if (!strcmp(commit_type, "async"))
80 TEST_ASSERT(!commit_err,
81 "Async commit should not fail, "
82 "but it returned %s",
83 rd_kafka_err2name(commit_err));
84 else
85 TEST_ASSERT(commit_err == commit_exp_err ||
86 (!commit_exp_err &&
87 commit_err ==
88 RD_KAFKA_RESP_ERR__NO_OFFSET),
89 "Expected %s commit to return %s, "
90 "not %s",
91 commit_type,
92 rd_kafka_err2name(commit_exp_err),
93 rd_kafka_err2name(commit_err));
94 }
95
96 test_consumer_unassign("unassign", rk);
97 }
98 }
99
100
101 /**
102 * @brief Wait for an expected rebalance event, or fail.
103 */
expect_rebalance(const char * what,rd_kafka_t * c,rd_kafka_resp_err_t exp_event,int timeout_s)104 static void expect_rebalance (const char *what, rd_kafka_t *c,
105 rd_kafka_resp_err_t exp_event,
106 int timeout_s) {
107 int64_t tmout = test_clock() + (timeout_s * 1000000);
108 int start_cnt = rebalance_cnt;
109
110 TEST_SAY("Waiting for %s (%s) for %ds\n",
111 what, rd_kafka_err2name(exp_event), timeout_s);
112
113 rebalance_exp_event = exp_event;
114
115 while (tmout > test_clock() && rebalance_cnt == start_cnt) {
116 if (test_consumer_poll_once(c, NULL, 1000))
117 rd_sleep(1);
118 }
119
120 if (rebalance_cnt == start_cnt + 1) {
121 rebalance_exp_event = RD_KAFKA_RESP_ERR_NO_ERROR;
122 return;
123 }
124
125 TEST_FAIL("Timed out waiting for %s (%s)\n",
126 what, rd_kafka_err2name(exp_event));
127 }
128
129
130 /**
131 * @brief Verify that session timeouts are handled by the consumer itself.
132 *
133 * @param use_commit_type "auto", "sync" (manual), "async" (manual)
134 */
do_test_session_timeout(const char * use_commit_type)135 static void do_test_session_timeout (const char *use_commit_type) {
136 const char *bootstraps;
137 rd_kafka_mock_cluster_t *mcluster;
138 rd_kafka_conf_t *conf;
139 rd_kafka_t *c;
140 const char *groupid = "mygroup";
141 const char *topic = "test";
142
143 rebalance_cnt = 0;
144 commit_type = use_commit_type;
145
146 TEST_SAY(_C_MAG "[ Test session timeout with %s commit ]\n",
147 commit_type);
148
149 mcluster = test_mock_cluster_new(3, &bootstraps);
150
151 rd_kafka_mock_coordinator_set(mcluster, "group", groupid, 1);
152
153 /* Seed the topic with messages */
154 test_produce_msgs_easy_v(topic, 0, 0, 0, 100, 10,
155 "bootstrap.servers", bootstraps,
156 "batch.num.messages", "10",
157 NULL);
158
159 test_conf_init(&conf, NULL, 30);
160 test_conf_set(conf, "bootstrap.servers", bootstraps);
161 test_conf_set(conf, "security.protocol", "PLAINTEXT");
162 test_conf_set(conf, "group.id", groupid);
163 test_conf_set(conf, "session.timeout.ms", "5000");
164 test_conf_set(conf, "heartbeat.interval.ms", "1000");
165 test_conf_set(conf, "auto.offset.reset", "earliest");
166 test_conf_set(conf, "enable.auto.commit",
167 !strcmp(commit_type, "auto") ? "true" : "false");
168
169 c = test_create_consumer(groupid, rebalance_cb, conf, NULL);
170
171 test_consumer_subscribe(c, topic);
172
173 /* Let Heartbeats fail after a couple of successful ones */
174 rd_kafka_mock_push_request_errors(
175 mcluster, RD_KAFKAP_Heartbeat,
176 9,
177 RD_KAFKA_RESP_ERR_NO_ERROR,
178 RD_KAFKA_RESP_ERR_NO_ERROR,
179 RD_KAFKA_RESP_ERR_NOT_COORDINATOR,
180 RD_KAFKA_RESP_ERR_NOT_COORDINATOR,
181 RD_KAFKA_RESP_ERR_NOT_COORDINATOR,
182 RD_KAFKA_RESP_ERR_NOT_COORDINATOR,
183 RD_KAFKA_RESP_ERR_NOT_COORDINATOR,
184 RD_KAFKA_RESP_ERR_NOT_COORDINATOR,
185 RD_KAFKA_RESP_ERR_NOT_COORDINATOR);
186
187 expect_rebalance("initial assignment", c,
188 RD_KAFKA_RESP_ERR__ASSIGN_PARTITIONS, 5+2);
189
190 /* Consume a couple of messages so that we have something to commit */
191 test_consumer_poll("consume", c, 0, -1, 0, 10, NULL);
192
193 /* The commit in the rebalance callback should fail when the
194 * member has timed out from the group. */
195 commit_exp_err = RD_KAFKA_RESP_ERR_UNKNOWN_MEMBER_ID;
196
197 expect_rebalance("session timeout revoke", c,
198 RD_KAFKA_RESP_ERR__REVOKE_PARTITIONS, 2+5+2);
199
200 expect_rebalance("second assignment", c,
201 RD_KAFKA_RESP_ERR__ASSIGN_PARTITIONS, 5+2);
202
203 /* Final rebalance in close().
204 * It's commit will work. */
205 rebalance_exp_event = RD_KAFKA_RESP_ERR__REVOKE_PARTITIONS;
206 commit_exp_err = RD_KAFKA_RESP_ERR_NO_ERROR;
207
208 test_consumer_close(c);
209
210 rd_kafka_destroy(c);
211
212 test_mock_cluster_destroy(mcluster);
213
214 TEST_SAY(_C_GRN "[ Test session timeout with %s commit PASSED ]\n",
215 commit_type);
216 }
217
218
main_0106_cgrp_sess_timeout(int argc,char ** argv)219 int main_0106_cgrp_sess_timeout (int argc, char **argv) {
220
221 do_test_session_timeout("sync");
222
223 if (!test_quick) {
224 do_test_session_timeout("async");
225 do_test_session_timeout("auto");
226 }
227
228 return 0;
229 }
230