1 /*
2 * librdkafka - Apache Kafka C library
3 *
4 * Copyright (c) 2020, Magnus Edenhill
5 * All rights reserved.
6 *
7 * Redistribution and use in source and binary forms, with or without
8 * modification, are permitted provided that the following conditions are met:
9 *
10 * 1. Redistributions of source code must retain the above copyright notice,
11 * this list of conditions and the following disclaimer.
12 * 2. Redistributions in binary form must reproduce the above copyright notice,
13 * this list of conditions and the following disclaimer in the documentation
14 * and/or other materials provided with the distribution.
15 *
16 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
17 * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
18 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
19 * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
20 * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
21 * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
22 * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
23 * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
24 * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
25 * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
26 * POSSIBILITY OF SUCH DAMAGE.
27 */
28
29 #include "test.h"
30
31 #include "rdkafka.h"
32
33 #include "../src/rdkafka_proto.h"
34 #include "../src/rdunittest.h"
35
36 #include <stdarg.h>
37
38
39 /**
40 * @name Misc mock-injected errors.
41 *
42 */
43
44 /**
45 * @brief Test producer handling (retry) of ERR_KAFKA_STORAGE_ERROR.
46 */
do_test_producer_storage_error(rd_bool_t too_few_retries)47 static void do_test_producer_storage_error (rd_bool_t too_few_retries) {
48 rd_kafka_conf_t *conf;
49 rd_kafka_t *rk;
50 rd_kafka_mock_cluster_t *mcluster;
51 rd_kafka_resp_err_t err;
52
53 SUB_TEST_QUICK("%s",
54 too_few_retries ? "with too few retries" : "");
55
56 test_conf_init(&conf, NULL, 10);
57
58 test_conf_set(conf, "test.mock.num.brokers", "3");
59 test_conf_set(conf, "retries", too_few_retries ? "1" : "10");
60 test_conf_set(conf, "retry.backoff.ms", "500");
61 rd_kafka_conf_set_dr_msg_cb(conf, test_dr_msg_cb);
62
63 test_curr->ignore_dr_err = rd_false;
64 if (too_few_retries) {
65 test_curr->exp_dr_err = RD_KAFKA_RESP_ERR_KAFKA_STORAGE_ERROR;
66 test_curr->exp_dr_status = RD_KAFKA_MSG_STATUS_NOT_PERSISTED;
67 } else {
68 test_curr->exp_dr_err = RD_KAFKA_RESP_ERR_NO_ERROR;
69 test_curr->exp_dr_status = RD_KAFKA_MSG_STATUS_PERSISTED;
70 }
71
72 rk = test_create_handle(RD_KAFKA_PRODUCER, conf);
73
74 mcluster = rd_kafka_handle_mock_cluster(rk);
75 TEST_ASSERT(mcluster, "missing mock cluster");
76
77 rd_kafka_mock_push_request_errors(
78 mcluster,
79 RD_KAFKAP_Produce,
80 3,
81 RD_KAFKA_RESP_ERR_KAFKA_STORAGE_ERROR,
82 RD_KAFKA_RESP_ERR_KAFKA_STORAGE_ERROR,
83 RD_KAFKA_RESP_ERR_KAFKA_STORAGE_ERROR);
84
85 err = rd_kafka_producev(rk,
86 RD_KAFKA_V_TOPIC("mytopic"),
87 RD_KAFKA_V_VALUE("hi", 2),
88 RD_KAFKA_V_END);
89 TEST_ASSERT(!err, "produce failed: %s", rd_kafka_err2str(err));
90
91 /* Wait for delivery report. */
92 test_flush(rk, 5000);
93
94 rd_kafka_destroy(rk);
95
96 SUB_TEST_PASS();
97 }
98
99
100 /**
101 * @brief Issue #2933. Offset commit being retried when failing due to
102 * RD_KAFKA_RESP_ERR_REBALANCE_IN_PROGRESS and then causing fetchers
103 * to not start.
104 */
do_test_offset_commit_error_during_rebalance(void)105 static void do_test_offset_commit_error_during_rebalance (void) {
106 rd_kafka_conf_t *conf;
107 rd_kafka_t *c1, *c2;
108 rd_kafka_mock_cluster_t *mcluster;
109 const char *bootstraps;
110 const char *topic = "test";
111 const int msgcnt = 100;
112 rd_kafka_resp_err_t err;
113
114 SUB_TEST();
115
116 test_conf_init(&conf, NULL, 60);
117
118 mcluster = test_mock_cluster_new(3, &bootstraps);
119
120 rd_kafka_mock_topic_create(mcluster, topic, 4, 3);
121
122 /* Seed the topic with messages */
123 test_produce_msgs_easy_v(topic, 0, RD_KAFKA_PARTITION_UA, 0, msgcnt, 10,
124 "bootstrap.servers", bootstraps,
125 "batch.num.messages", "1",
126 NULL);
127
128 test_conf_set(conf, "bootstrap.servers", bootstraps);
129 test_conf_set(conf, "auto.offset.reset", "earliest");
130 test_conf_set(conf, "enable.auto.commit", "false");
131
132 /* Make sure we don't consume the entire partition in one Fetch */
133 test_conf_set(conf, "fetch.message.max.bytes", "100");
134
135 c1 = test_create_consumer("mygroup", test_rebalance_cb,
136 rd_kafka_conf_dup(conf), NULL);
137
138 c2 = test_create_consumer("mygroup", test_rebalance_cb,
139 conf, NULL);
140
141 test_consumer_subscribe(c1, topic);
142 test_consumer_subscribe(c2, topic);
143
144
145 /* Wait for assignment and one message */
146 test_consumer_poll("C1.PRE", c1, 0, -1, -1, 1, NULL);
147 test_consumer_poll("C2.PRE", c2, 0, -1, -1, 1, NULL);
148
149 /* Trigger rebalance */
150 test_consumer_close(c2);
151 rd_kafka_destroy(c2);
152
153 rd_kafka_mock_push_request_errors(
154 mcluster,
155 RD_KAFKAP_OffsetCommit,
156 6,
157 RD_KAFKA_RESP_ERR_REBALANCE_IN_PROGRESS,
158 RD_KAFKA_RESP_ERR_REBALANCE_IN_PROGRESS,
159 RD_KAFKA_RESP_ERR_REBALANCE_IN_PROGRESS,
160 RD_KAFKA_RESP_ERR_REBALANCE_IN_PROGRESS,
161 RD_KAFKA_RESP_ERR_REBALANCE_IN_PROGRESS,
162 RD_KAFKA_RESP_ERR_REBALANCE_IN_PROGRESS);
163
164 /* This commit should fail (async) */
165 TEST_SAY("Committing (should fail)\n");
166 err = rd_kafka_commit(c1, NULL, 0/*sync*/);
167 TEST_SAY("Commit returned %s\n", rd_kafka_err2name(err));
168 TEST_ASSERT(err == RD_KAFKA_RESP_ERR_REBALANCE_IN_PROGRESS,
169 "Expected commit to fail with ERR_REBALANCE_IN_PROGRESS, "
170 "not %s", rd_kafka_err2name(err));
171
172 /* Wait for new assignment and able to read all messages */
173 test_consumer_poll("C1.PRE", c1, 0, -1, -1, msgcnt, NULL);
174
175 rd_kafka_destroy(c1);
176
177 test_mock_cluster_destroy(mcluster);
178
179 SUB_TEST_PASS();
180 }
181
182
183
184 /**
185 * @brief Issue #2933. Offset commit being retried when failing due to
186 * RD_KAFKA_RESP_ERR_REBALANCE_IN_PROGRESS and then causing fetchers
187 * to not start.
188 */
do_test_offset_commit_request_timed_out(rd_bool_t auto_commit)189 static void do_test_offset_commit_request_timed_out (rd_bool_t auto_commit) {
190 rd_kafka_conf_t *conf;
191 rd_kafka_t *c1, *c2;
192 rd_kafka_mock_cluster_t *mcluster;
193 const char *bootstraps;
194 const char *topic = "test";
195 const int msgcnt = 1;
196 rd_kafka_topic_partition_list_t *partitions;
197
198 SUB_TEST_QUICK("enable.auto.commit=%s", auto_commit ? "true": "false");
199
200 test_conf_init(&conf, NULL, 60);
201
202 mcluster = test_mock_cluster_new(1, &bootstraps);
203
204 rd_kafka_mock_topic_create(mcluster, topic, 1, 1);
205
206 /* Seed the topic with messages */
207 test_produce_msgs_easy_v(topic, 0, RD_KAFKA_PARTITION_UA, 0, msgcnt, 10,
208 "bootstrap.servers", bootstraps,
209 "batch.num.messages", "1",
210 NULL);
211
212 test_conf_set(conf, "bootstrap.servers", bootstraps);
213 test_conf_set(conf, "auto.offset.reset", "earliest");
214 test_conf_set(conf, "enable.auto.commit", auto_commit ? "true":"false");
215 /* Too high to be done by interval in this test */
216 test_conf_set(conf, "auto.commit.interval.ms", "90000");
217
218 /* Make sure we don't consume the entire partition in one Fetch */
219 test_conf_set(conf, "fetch.message.max.bytes", "100");
220
221 c1 = test_create_consumer("mygroup", NULL,
222 rd_kafka_conf_dup(conf), NULL);
223
224
225 test_consumer_subscribe(c1, topic);
226
227 /* Wait for assignment and one message */
228 test_consumer_poll("C1.PRE", c1, 0, -1, -1, 1, NULL);
229
230 rd_kafka_mock_push_request_errors(
231 mcluster,
232 RD_KAFKAP_OffsetCommit,
233 2,
234 RD_KAFKA_RESP_ERR_REQUEST_TIMED_OUT,
235 RD_KAFKA_RESP_ERR_REQUEST_TIMED_OUT);
236
237 if (!auto_commit)
238 TEST_CALL_ERR__(rd_kafka_commit(c1, NULL, 0/*sync*/));
239
240 /* Rely on consumer_close() doing final commit
241 * when auto commit is enabled */
242
243 test_consumer_close(c1);
244
245 rd_kafka_destroy(c1);
246
247 /* Create a new consumer and retrieve the committed offsets to verify
248 * they were properly committed */
249 c2 = test_create_consumer("mygroup", NULL, conf, NULL);
250
251 partitions = rd_kafka_topic_partition_list_new(1);
252 rd_kafka_topic_partition_list_add(partitions, topic, 0)->offset =
253 RD_KAFKA_OFFSET_INVALID;
254
255 TEST_CALL_ERR__(rd_kafka_committed(c2, partitions, 10*1000));
256 TEST_ASSERT(partitions->elems[0].offset == 1,
257 "Expected committed offset to be 1, not %"PRId64,
258 partitions->elems[0].offset);
259
260 rd_kafka_topic_partition_list_destroy(partitions);
261
262 rd_kafka_destroy(c2);
263
264 test_mock_cluster_destroy(mcluster);
265
266 SUB_TEST_PASS();
267 }
268
main_0117_mock_errors(int argc,char ** argv)269 int main_0117_mock_errors(int argc, char **argv) {
270
271 if (test_needs_auth()) {
272 TEST_SKIP("Mock cluster does not support SSL/SASL\n");
273 return 0;
274 }
275
276 do_test_producer_storage_error(rd_false);
277 do_test_producer_storage_error(rd_true);
278
279 do_test_offset_commit_error_during_rebalance();
280
281 do_test_offset_commit_request_timed_out(rd_true);
282 do_test_offset_commit_request_timed_out(rd_false);
283
284 return 0;
285 }
286