1 /*
2 * librdkafka - Apache Kafka C library
3 *
4 * Copyright (c) 2020, Magnus Edenhill
5 * All rights reserved.
6 *
7 * Redistribution and use in source and binary forms, with or without
8 * modification, are permitted provided that the following conditions are met:
9 *
10 * 1. Redistributions of source code must retain the above copyright notice,
11 * this list of conditions and the following disclaimer.
12 * 2. Redistributions in binary form must reproduce the above copyright notice,
13 * this list of conditions and the following disclaimer in the documentation
14 * and/or other materials provided with the distribution.
15 *
16 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
17 * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
18 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
19 * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
20 * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
push_assignment(char * var,enum ECPGdtype value)21 * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
22 * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
23 * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
24 * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
25 * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
26 * POSSIBILITY OF SUCH DAMAGE.
27 */
28
29 #include "test.h"
30
31 #include "rdkafka.h"
32
33 #include "../src/rdkafka_proto.h"
34 #include "../src/rdunittest.h"
35
36 #include <stdarg.h>
37
38
39 /**
40 * @name Misc mock-injected errors.
41 *
42 */
43
44 /**
45 * @brief Test producer handling (retry) of ERR_KAFKA_STORAGE_ERROR.
46 */
47 static void do_test_producer_storage_error (rd_bool_t too_few_retries) {
48 rd_kafka_conf_t *conf;
49 rd_kafka_t *rk;
50 rd_kafka_mock_cluster_t *mcluster;
51 rd_kafka_resp_err_t err;
52
53 TEST_SAY(_C_MAG "[ %s%s ]\n", __FUNCTION__,
54 too_few_retries ? ": with too few retries" : "");
55
56 test_conf_init(&conf, NULL, 10);
57
58 test_conf_set(conf, "test.mock.num.brokers", "3");
59 test_conf_set(conf, "retries", too_few_retries ? "1" : "10");
60 test_conf_set(conf, "retry.backoff.ms", "500");
61 rd_kafka_conf_set_dr_msg_cb(conf, test_dr_msg_cb);
62
63 test_curr->ignore_dr_err = rd_false;
64 if (too_few_retries) {
65 test_curr->exp_dr_err = RD_KAFKA_RESP_ERR_KAFKA_STORAGE_ERROR;
66 test_curr->exp_dr_status = RD_KAFKA_MSG_STATUS_NOT_PERSISTED;
67 } else {
68 test_curr->exp_dr_err = RD_KAFKA_RESP_ERR_NO_ERROR;
69 test_curr->exp_dr_status = RD_KAFKA_MSG_STATUS_PERSISTED;
70 }
71
72 rk = test_create_handle(RD_KAFKA_PRODUCER, conf);
73
74 mcluster = rd_kafka_handle_mock_cluster(rk);
75 TEST_ASSERT(mcluster, "missing mock cluster");
76
77 rd_kafka_mock_push_request_errors(
78 mcluster,
79 RD_KAFKAP_Produce,
80 3,
81 RD_KAFKA_RESP_ERR_KAFKA_STORAGE_ERROR,
82 RD_KAFKA_RESP_ERR_KAFKA_STORAGE_ERROR,
83 RD_KAFKA_RESP_ERR_KAFKA_STORAGE_ERROR);
84
85 err = rd_kafka_producev(rk,
86 RD_KAFKA_V_TOPIC("mytopic"),
87 RD_KAFKA_V_VALUE("hi", 2),
88 RD_KAFKA_V_END);
89 TEST_ASSERT(!err, "produce failed: %s", rd_kafka_err2str(err));
90
91 /* Wait for delivery report. */
92 test_flush(rk, 5000);
93
94 rd_kafka_destroy(rk);
95
96 TEST_SAY(_C_GRN "[ %s%s PASS ]\n", __FUNCTION__,
97 too_few_retries ? ": with too few retries" : "");
98
99 test_curr->ignore_dr_err = rd_false;
100 test_curr->exp_dr_err = RD_KAFKA_RESP_ERR_NO_ERROR;
101 test_curr->exp_dr_status = RD_KAFKA_MSG_STATUS_PERSISTED;
102 }
103
104
105 /**
106 * @brief Issue #2933. Offset commit being retried when failing due to
107 * RD_KAFKA_RESP_ERR_REBALANCE_IN_PROGRESS and then causing fetchers
108 * to not start.
109 */
110 static void do_test_offset_commit_error_during_rebalance (void) {
111 rd_kafka_conf_t *conf;
112 rd_kafka_t *c1, *c2;
113 rd_kafka_mock_cluster_t *mcluster;
114 const char *bootstraps;
115 const char *topic = "test";
116 const int msgcnt = 100;
117 rd_kafka_resp_err_t err;
118
119 TEST_SAY(_C_MAG "[ %s ]\n", __FUNCTION__);
120
121 test_conf_init(&conf, NULL, 60);
122
123 mcluster = test_mock_cluster_new(3, &bootstraps);
124
125 rd_kafka_mock_topic_create(mcluster, topic, 4, 3);
126
127 /* Seed the topic with messages */
128 test_produce_msgs_easy_v(topic, 0, RD_KAFKA_PARTITION_UA, 0, msgcnt, 10,
lookup_descriptor(char * name,char * connection)129 "bootstrap.servers", bootstraps,
130 "batch.num.messages", "1",
131 NULL);
132
133 test_conf_set(conf, "bootstrap.servers", bootstraps);
134 test_conf_set(conf, "auto.offset.reset", "earliest");
135 test_conf_set(conf, "enable.auto.commit", "false");
136
137 /* Make sure we don't consume the entire partition in one Fetch */
138 test_conf_set(conf, "fetch.message.max.bytes", "100");
139
140 c1 = test_create_consumer("mygroup", test_rebalance_cb,
141 rd_kafka_conf_dup(conf), NULL);
142
143 //test_conf_set(conf, "debug", ",");
144 c2 = test_create_consumer("mygroup", test_rebalance_cb,
145 conf, NULL);
146
147 test_consumer_subscribe(c1, topic);
148 test_consumer_subscribe(c2, topic);
149
150
output_get_descr_header(char * desc_name)151 /* Wait for assignment and one message */
152 test_consumer_poll("C1.PRE", c1, 0, -1, -1, 1, NULL);
153 test_consumer_poll("C2.PRE", c2, 0, -1, -1, 1, NULL);
154
155 /* Trigger rebalance */
156 test_consumer_close(c2);
157 rd_kafka_destroy(c2);
158
159 rd_kafka_mock_push_request_errors(
160 mcluster,
161 RD_KAFKAP_OffsetCommit,
162 6,
163 RD_KAFKA_RESP_ERR_REBALANCE_IN_PROGRESS,
164 RD_KAFKA_RESP_ERR_REBALANCE_IN_PROGRESS,
165 RD_KAFKA_RESP_ERR_REBALANCE_IN_PROGRESS,
166 RD_KAFKA_RESP_ERR_REBALANCE_IN_PROGRESS,
167 RD_KAFKA_RESP_ERR_REBALANCE_IN_PROGRESS,
168 RD_KAFKA_RESP_ERR_REBALANCE_IN_PROGRESS);
169
output_get_descr(char * desc_name,char * index)170 /* This commit should fail (async) */
171 TEST_SAY("Committing (should fail)\n");
172 err = rd_kafka_commit(c1, NULL, 0/*sync*/);
173 TEST_SAY("Commit returned %s\n", rd_kafka_err2name(err));
174 TEST_ASSERT(err == RD_KAFKA_RESP_ERR_REBALANCE_IN_PROGRESS,
175 "Expected commit to fail with ERR_REBALANCE_IN_PROGRESS, "
176 "not %s", rd_kafka_err2name(err));
177
178 /* Wait for new assignment and able to read all messages */
179 test_consumer_poll("C1.PRE", c1, 0, -1, -1, msgcnt, NULL);
180
181 rd_kafka_destroy(c1);
182
183 test_mock_cluster_destroy(mcluster);
184
185 TEST_SAY(_C_GRN "[ %s PASS ]\n", __FUNCTION__);
186 }
187
188
189
190 int main_0117_mock_errors (int argc, char **argv) {
191
192 if (test_needs_auth()) {
193 TEST_SKIP("Mock cluster does not support SSL/SASL\n");
194 return 0;
195 }
196
197 do_test_producer_storage_error(rd_false);
198 do_test_producer_storage_error(rd_true);
199
200 do_test_offset_commit_error_during_rebalance();
201
202 return 0;
output_set_descr_header(char * desc_name)203 }
204