1 /*
2  * librdkafka - Apache Kafka C library
3  *
4  * Copyright (c) 2020, Magnus Edenhill
5  * All rights reserved.
6  *
7  * Redistribution and use in source and binary forms, with or without
8  * modification, are permitted provided that the following conditions are met:
9  *
10  * 1. Redistributions of source code must retain the above copyright notice,
11  *    this list of conditions and the following disclaimer.
12  * 2. Redistributions in binary form must reproduce the above copyright notice,
13  *    this list of conditions and the following disclaimer in the documentation
14  *    and/or other materials provided with the distribution.
15  *
16  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
17  * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
18  * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
19  * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
20  * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
21  * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
22  * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
23  * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
24  * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
25  * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
26  * POSSIBILITY OF SUCH DAMAGE.
27  */
28 
29 #include "test.h"
30 
31 #include "rdkafka.h"
32 
33 #include "../src/rdkafka_proto.h"
34 #include "../src/rdunittest.h"
35 
36 #include <stdarg.h>
37 
38 
39 /**
40  * @name Misc mock-injected errors.
41  *
42  */
43 
44 /**
45  * @brief Test producer handling (retry) of ERR_KAFKA_STORAGE_ERROR.
46  */
do_test_producer_storage_error(rd_bool_t too_few_retries)47 static void do_test_producer_storage_error (rd_bool_t too_few_retries) {
48         rd_kafka_conf_t *conf;
49         rd_kafka_t *rk;
50         rd_kafka_mock_cluster_t *mcluster;
51         rd_kafka_resp_err_t err;
52 
53         SUB_TEST_QUICK("%s",
54                        too_few_retries ? "with too few retries" : "");
55 
56         test_conf_init(&conf, NULL, 10);
57 
58         test_conf_set(conf, "test.mock.num.brokers", "3");
59         test_conf_set(conf, "retries", too_few_retries ? "1" : "10");
60         test_conf_set(conf, "retry.backoff.ms", "500");
61         rd_kafka_conf_set_dr_msg_cb(conf, test_dr_msg_cb);
62 
63         test_curr->ignore_dr_err = rd_false;
64         if (too_few_retries) {
65                 test_curr->exp_dr_err = RD_KAFKA_RESP_ERR_KAFKA_STORAGE_ERROR;
66                 test_curr->exp_dr_status = RD_KAFKA_MSG_STATUS_NOT_PERSISTED;
67         } else {
68                 test_curr->exp_dr_err = RD_KAFKA_RESP_ERR_NO_ERROR;
69                 test_curr->exp_dr_status = RD_KAFKA_MSG_STATUS_PERSISTED;
70         }
71 
72         rk = test_create_handle(RD_KAFKA_PRODUCER, conf);
73 
74         mcluster = rd_kafka_handle_mock_cluster(rk);
75         TEST_ASSERT(mcluster, "missing mock cluster");
76 
77         rd_kafka_mock_push_request_errors(
78                 mcluster,
79                 RD_KAFKAP_Produce,
80                 3,
81                 RD_KAFKA_RESP_ERR_KAFKA_STORAGE_ERROR,
82                 RD_KAFKA_RESP_ERR_KAFKA_STORAGE_ERROR,
83                 RD_KAFKA_RESP_ERR_KAFKA_STORAGE_ERROR);
84 
85         err = rd_kafka_producev(rk,
86                                 RD_KAFKA_V_TOPIC("mytopic"),
87                                 RD_KAFKA_V_VALUE("hi", 2),
88                                 RD_KAFKA_V_END);
89         TEST_ASSERT(!err, "produce failed: %s", rd_kafka_err2str(err));
90 
91         /* Wait for delivery report. */
92         test_flush(rk, 5000);
93 
94         rd_kafka_destroy(rk);
95 
96         SUB_TEST_PASS();
97 }
98 
99 
100 /**
101  * @brief Issue #2933. Offset commit being retried when failing due to
102  *        RD_KAFKA_RESP_ERR_REBALANCE_IN_PROGRESS and then causing fetchers
103  *        to not start.
104  */
do_test_offset_commit_error_during_rebalance(void)105 static void do_test_offset_commit_error_during_rebalance (void) {
106         rd_kafka_conf_t *conf;
107         rd_kafka_t *c1, *c2;
108         rd_kafka_mock_cluster_t *mcluster;
109         const char *bootstraps;
110         const char *topic = "test";
111         const int msgcnt = 100;
112         rd_kafka_resp_err_t err;
113 
114         SUB_TEST();
115 
116         test_conf_init(&conf, NULL, 60);
117 
118         mcluster = test_mock_cluster_new(3, &bootstraps);
119 
120         rd_kafka_mock_topic_create(mcluster, topic, 4, 3);
121 
122         /* Seed the topic with messages */
123         test_produce_msgs_easy_v(topic, 0, RD_KAFKA_PARTITION_UA, 0, msgcnt, 10,
124                                  "bootstrap.servers", bootstraps,
125                                  "batch.num.messages", "1",
126                                  NULL);
127 
128         test_conf_set(conf, "bootstrap.servers", bootstraps);
129         test_conf_set(conf, "auto.offset.reset", "earliest");
130         test_conf_set(conf, "enable.auto.commit", "false");
131 
132         /* Make sure we don't consume the entire partition in one Fetch */
133         test_conf_set(conf, "fetch.message.max.bytes", "100");
134 
135         c1 = test_create_consumer("mygroup", test_rebalance_cb,
136                                   rd_kafka_conf_dup(conf), NULL);
137 
138         c2 = test_create_consumer("mygroup", test_rebalance_cb,
139                                   conf, NULL);
140 
141         test_consumer_subscribe(c1, topic);
142         test_consumer_subscribe(c2, topic);
143 
144 
145         /* Wait for assignment and one message */
146         test_consumer_poll("C1.PRE", c1, 0, -1, -1, 1, NULL);
147         test_consumer_poll("C2.PRE", c2, 0, -1, -1, 1, NULL);
148 
149         /* Trigger rebalance */
150         test_consumer_close(c2);
151         rd_kafka_destroy(c2);
152 
153         rd_kafka_mock_push_request_errors(
154                 mcluster,
155                 RD_KAFKAP_OffsetCommit,
156                 6,
157                 RD_KAFKA_RESP_ERR_REBALANCE_IN_PROGRESS,
158                 RD_KAFKA_RESP_ERR_REBALANCE_IN_PROGRESS,
159                 RD_KAFKA_RESP_ERR_REBALANCE_IN_PROGRESS,
160                 RD_KAFKA_RESP_ERR_REBALANCE_IN_PROGRESS,
161                 RD_KAFKA_RESP_ERR_REBALANCE_IN_PROGRESS,
162                 RD_KAFKA_RESP_ERR_REBALANCE_IN_PROGRESS);
163 
164         /* This commit should fail (async) */
165         TEST_SAY("Committing (should fail)\n");
166         err = rd_kafka_commit(c1, NULL, 0/*sync*/);
167         TEST_SAY("Commit returned %s\n", rd_kafka_err2name(err));
168         TEST_ASSERT(err == RD_KAFKA_RESP_ERR_REBALANCE_IN_PROGRESS,
169                     "Expected commit to fail with ERR_REBALANCE_IN_PROGRESS, "
170                     "not %s", rd_kafka_err2name(err));
171 
172         /* Wait for new assignment and able to read all messages */
173         test_consumer_poll("C1.PRE", c1, 0, -1, -1, msgcnt, NULL);
174 
175         rd_kafka_destroy(c1);
176 
177         test_mock_cluster_destroy(mcluster);
178 
179         SUB_TEST_PASS();
180 }
181 
182 
183 
184 /**
185  * @brief Issue #2933. Offset commit being retried when failing due to
186  *        RD_KAFKA_RESP_ERR_REBALANCE_IN_PROGRESS and then causing fetchers
187  *        to not start.
188  */
do_test_offset_commit_request_timed_out(rd_bool_t auto_commit)189 static void do_test_offset_commit_request_timed_out (rd_bool_t auto_commit) {
190         rd_kafka_conf_t *conf;
191         rd_kafka_t *c1, *c2;
192         rd_kafka_mock_cluster_t *mcluster;
193         const char *bootstraps;
194         const char *topic = "test";
195         const int msgcnt = 1;
196         rd_kafka_topic_partition_list_t *partitions;
197 
198         SUB_TEST_QUICK("enable.auto.commit=%s", auto_commit ? "true": "false");
199 
200         test_conf_init(&conf, NULL, 60);
201 
202         mcluster = test_mock_cluster_new(1, &bootstraps);
203 
204         rd_kafka_mock_topic_create(mcluster, topic, 1, 1);
205 
206         /* Seed the topic with messages */
207         test_produce_msgs_easy_v(topic, 0, RD_KAFKA_PARTITION_UA, 0, msgcnt, 10,
208                                  "bootstrap.servers", bootstraps,
209                                  "batch.num.messages", "1",
210                                  NULL);
211 
212         test_conf_set(conf, "bootstrap.servers", bootstraps);
213         test_conf_set(conf, "auto.offset.reset", "earliest");
214         test_conf_set(conf, "enable.auto.commit", auto_commit ? "true":"false");
215         /* Too high to be done by interval in this test */
216         test_conf_set(conf, "auto.commit.interval.ms", "90000");
217 
218         /* Make sure we don't consume the entire partition in one Fetch */
219         test_conf_set(conf, "fetch.message.max.bytes", "100");
220 
221         c1 = test_create_consumer("mygroup", NULL,
222                                   rd_kafka_conf_dup(conf), NULL);
223 
224 
225         test_consumer_subscribe(c1, topic);
226 
227         /* Wait for assignment and one message */
228         test_consumer_poll("C1.PRE", c1, 0, -1, -1, 1, NULL);
229 
230         rd_kafka_mock_push_request_errors(
231                 mcluster,
232                 RD_KAFKAP_OffsetCommit,
233                 2,
234                 RD_KAFKA_RESP_ERR_REQUEST_TIMED_OUT,
235                 RD_KAFKA_RESP_ERR_REQUEST_TIMED_OUT);
236 
237         if (!auto_commit)
238                 TEST_CALL_ERR__(rd_kafka_commit(c1, NULL, 0/*sync*/));
239 
240         /* Rely on consumer_close() doing final commit
241          * when auto commit is enabled */
242 
243         test_consumer_close(c1);
244 
245         rd_kafka_destroy(c1);
246 
247         /* Create a new consumer and retrieve the committed offsets to verify
248          * they were properly committed */
249         c2 = test_create_consumer("mygroup", NULL, conf, NULL);
250 
251         partitions = rd_kafka_topic_partition_list_new(1);
252         rd_kafka_topic_partition_list_add(partitions, topic, 0)->offset =
253                 RD_KAFKA_OFFSET_INVALID;
254 
255         TEST_CALL_ERR__(rd_kafka_committed(c2, partitions, 10*1000));
256         TEST_ASSERT(partitions->elems[0].offset == 1,
257                     "Expected committed offset to be 1, not %"PRId64,
258                     partitions->elems[0].offset);
259 
260         rd_kafka_topic_partition_list_destroy(partitions);
261 
262         rd_kafka_destroy(c2);
263 
264         test_mock_cluster_destroy(mcluster);
265 
266         SUB_TEST_PASS();
267 }
268 
main_0117_mock_errors(int argc,char ** argv)269 int main_0117_mock_errors(int argc, char **argv) {
270 
271         if (test_needs_auth()) {
272                 TEST_SKIP("Mock cluster does not support SSL/SASL\n");
273                 return 0;
274         }
275 
276         do_test_producer_storage_error(rd_false);
277         do_test_producer_storage_error(rd_true);
278 
279         do_test_offset_commit_error_during_rebalance();
280 
281         do_test_offset_commit_request_timed_out(rd_true);
282         do_test_offset_commit_request_timed_out(rd_false);
283 
284         return 0;
285 }
286