1 /*
2  * librdkafka - Apache Kafka C library
3  *
4  * Copyright (c) 2020, Magnus Edenhill
5  * All rights reserved.
6  *
7  * Redistribution and use in source and binary forms, with or without
8  * modification, are permitted provided that the following conditions are met:
9  *
10  * 1. Redistributions of source code must retain the above copyright notice,
11  *    this list of conditions and the following disclaimer.
12  * 2. Redistributions in binary form must reproduce the above copyright notice,
13  *    this list of conditions and the following disclaimer in the documentation
14  *    and/or other materials provided with the distribution.
15  *
16  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
17  * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
18  * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
19  * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
20  * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
push_assignment(char * var,enum ECPGdtype value)21  * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
22  * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
23  * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
24  * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
25  * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
26  * POSSIBILITY OF SUCH DAMAGE.
27  */
28 
29 #include "test.h"
30 
31 #include "rdkafka.h"
32 
33 #include "../src/rdkafka_proto.h"
34 #include "../src/rdunittest.h"
35 
36 #include <stdarg.h>
37 
38 
39 /**
40  * @name Misc mock-injected errors.
41  *
42  */
43 
44 /**
45  * @brief Test producer handling (retry) of ERR_KAFKA_STORAGE_ERROR.
46  */
47 static void do_test_producer_storage_error (rd_bool_t too_few_retries) {
48         rd_kafka_conf_t *conf;
49         rd_kafka_t *rk;
50         rd_kafka_mock_cluster_t *mcluster;
51         rd_kafka_resp_err_t err;
52 
53         TEST_SAY(_C_MAG "[ %s%s ]\n", __FUNCTION__,
54                  too_few_retries ? ": with too few retries" : "");
55 
56         test_conf_init(&conf, NULL, 10);
57 
58         test_conf_set(conf, "test.mock.num.brokers", "3");
59         test_conf_set(conf, "retries", too_few_retries ? "1" : "10");
60         test_conf_set(conf, "retry.backoff.ms", "500");
61         rd_kafka_conf_set_dr_msg_cb(conf, test_dr_msg_cb);
62 
63         test_curr->ignore_dr_err = rd_false;
64         if (too_few_retries) {
65                 test_curr->exp_dr_err = RD_KAFKA_RESP_ERR_KAFKA_STORAGE_ERROR;
66                 test_curr->exp_dr_status = RD_KAFKA_MSG_STATUS_NOT_PERSISTED;
67         } else {
68                 test_curr->exp_dr_err = RD_KAFKA_RESP_ERR_NO_ERROR;
69                 test_curr->exp_dr_status = RD_KAFKA_MSG_STATUS_PERSISTED;
70         }
71 
72         rk = test_create_handle(RD_KAFKA_PRODUCER, conf);
73 
74         mcluster = rd_kafka_handle_mock_cluster(rk);
75         TEST_ASSERT(mcluster, "missing mock cluster");
76 
77         rd_kafka_mock_push_request_errors(
78                 mcluster,
79                 RD_KAFKAP_Produce,
80                 3,
81                 RD_KAFKA_RESP_ERR_KAFKA_STORAGE_ERROR,
82                 RD_KAFKA_RESP_ERR_KAFKA_STORAGE_ERROR,
83                 RD_KAFKA_RESP_ERR_KAFKA_STORAGE_ERROR);
84 
85         err = rd_kafka_producev(rk,
86                                 RD_KAFKA_V_TOPIC("mytopic"),
87                                 RD_KAFKA_V_VALUE("hi", 2),
88                                 RD_KAFKA_V_END);
89         TEST_ASSERT(!err, "produce failed: %s", rd_kafka_err2str(err));
90 
91         /* Wait for delivery report. */
92         test_flush(rk, 5000);
93 
94         rd_kafka_destroy(rk);
95 
96         TEST_SAY(_C_GRN "[ %s%s PASS ]\n", __FUNCTION__,
97                  too_few_retries ? ": with too few retries" : "");
98 
99         test_curr->ignore_dr_err = rd_false;
100         test_curr->exp_dr_err = RD_KAFKA_RESP_ERR_NO_ERROR;
101         test_curr->exp_dr_status = RD_KAFKA_MSG_STATUS_PERSISTED;
102 }
103 
104 
105 /**
106  * @brief Issue #2933. Offset commit being retried when failing due to
107  *        RD_KAFKA_RESP_ERR_REBALANCE_IN_PROGRESS and then causing fetchers
108  *        to not start.
109  */
110 static void do_test_offset_commit_error_during_rebalance (void) {
111         rd_kafka_conf_t *conf;
112         rd_kafka_t *c1, *c2;
113         rd_kafka_mock_cluster_t *mcluster;
114         const char *bootstraps;
115         const char *topic = "test";
116         const int msgcnt = 100;
117         rd_kafka_resp_err_t err;
118 
119         TEST_SAY(_C_MAG "[ %s ]\n", __FUNCTION__);
120 
121         test_conf_init(&conf, NULL, 60);
122 
123         mcluster = test_mock_cluster_new(3, &bootstraps);
124 
125         rd_kafka_mock_topic_create(mcluster, topic, 4, 3);
126 
127         /* Seed the topic with messages */
128         test_produce_msgs_easy_v(topic, 0, RD_KAFKA_PARTITION_UA, 0, msgcnt, 10,
lookup_descriptor(char * name,char * connection)129                                  "bootstrap.servers", bootstraps,
130                                  "batch.num.messages", "1",
131                                  NULL);
132 
133         test_conf_set(conf, "bootstrap.servers", bootstraps);
134         test_conf_set(conf, "auto.offset.reset", "earliest");
135         test_conf_set(conf, "enable.auto.commit", "false");
136 
137         /* Make sure we don't consume the entire partition in one Fetch */
138         test_conf_set(conf, "fetch.message.max.bytes", "100");
139 
140         c1 = test_create_consumer("mygroup", test_rebalance_cb,
141                                   rd_kafka_conf_dup(conf), NULL);
142 
143         //test_conf_set(conf, "debug", ",");
144         c2 = test_create_consumer("mygroup", test_rebalance_cb,
145                                   conf, NULL);
146 
147         test_consumer_subscribe(c1, topic);
148         test_consumer_subscribe(c2, topic);
149 
150 
output_get_descr_header(char * desc_name)151         /* Wait for assignment and one message */
152         test_consumer_poll("C1.PRE", c1, 0, -1, -1, 1, NULL);
153         test_consumer_poll("C2.PRE", c2, 0, -1, -1, 1, NULL);
154 
155         /* Trigger rebalance */
156         test_consumer_close(c2);
157         rd_kafka_destroy(c2);
158 
159         rd_kafka_mock_push_request_errors(
160                 mcluster,
161                 RD_KAFKAP_OffsetCommit,
162                 6,
163                 RD_KAFKA_RESP_ERR_REBALANCE_IN_PROGRESS,
164                 RD_KAFKA_RESP_ERR_REBALANCE_IN_PROGRESS,
165                 RD_KAFKA_RESP_ERR_REBALANCE_IN_PROGRESS,
166                 RD_KAFKA_RESP_ERR_REBALANCE_IN_PROGRESS,
167                 RD_KAFKA_RESP_ERR_REBALANCE_IN_PROGRESS,
168                 RD_KAFKA_RESP_ERR_REBALANCE_IN_PROGRESS);
169 
output_get_descr(char * desc_name,char * index)170         /* This commit should fail (async) */
171         TEST_SAY("Committing (should fail)\n");
172         err = rd_kafka_commit(c1, NULL, 0/*sync*/);
173         TEST_SAY("Commit returned %s\n", rd_kafka_err2name(err));
174         TEST_ASSERT(err == RD_KAFKA_RESP_ERR_REBALANCE_IN_PROGRESS,
175                     "Expected commit to fail with ERR_REBALANCE_IN_PROGRESS, "
176                     "not %s", rd_kafka_err2name(err));
177 
178         /* Wait for new assignment and able to read all messages */
179         test_consumer_poll("C1.PRE", c1, 0, -1, -1, msgcnt, NULL);
180 
181         rd_kafka_destroy(c1);
182 
183         test_mock_cluster_destroy(mcluster);
184 
185         TEST_SAY(_C_GRN "[ %s PASS ]\n", __FUNCTION__);
186 }
187 
188 
189 
190 int main_0117_mock_errors (int argc, char **argv) {
191 
192         if (test_needs_auth()) {
193                 TEST_SKIP("Mock cluster does not support SSL/SASL\n");
194                 return 0;
195         }
196 
197         do_test_producer_storage_error(rd_false);
198         do_test_producer_storage_error(rd_true);
199 
200         do_test_offset_commit_error_during_rebalance();
201 
202         return 0;
output_set_descr_header(char * desc_name)203 }
204