1 /*
2 * librdkafka - Apache Kafka C library
3 *
4 * Copyright (c) 2012-2015, Magnus Edenhill
5 * All rights reserved.
6 *
7 * Redistribution and use in source and binary forms, with or without
8 * modification, are permitted provided that the following conditions are met:
9 *
10 * 1. Redistributions of source code must retain the above copyright notice,
11 * this list of conditions and the following disclaimer.
12 * 2. Redistributions in binary form must reproduce the above copyright notice,
13 * this list of conditions and the following disclaimer in the documentation
14 * and/or other materials provided with the distribution.
15 *
16 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
17 * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
18 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
19 * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
20 * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
21 * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
22 * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
23 * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
24 * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
25 * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
26 * POSSIBILITY OF SUCH DAMAGE.
27 */
28
29 #include "test.h"
30 #include "rdkafka.h"
31
32 #include <stdarg.h>
33 #include <errno.h>
34
is_fatal_cb(rd_kafka_t * rk,rd_kafka_resp_err_t err,const char * reason)35 static int is_fatal_cb (rd_kafka_t *rk, rd_kafka_resp_err_t err,
36 const char *reason) {
37 /* Ignore connectivity errors since we'll be bringing down
38 * .. connectivity.
39 * SASL auther will think a connection-down even in the auth
40 * state means the broker doesn't support SASL PLAIN. */
41 TEST_SAY("is_fatal?: %s: %s\n", rd_kafka_err2str(err), reason);
42 if (err == RD_KAFKA_RESP_ERR__TRANSPORT ||
43 err == RD_KAFKA_RESP_ERR__ALL_BROKERS_DOWN ||
44 err == RD_KAFKA_RESP_ERR__AUTHENTICATION ||
45 err == RD_KAFKA_RESP_ERR__TIMED_OUT)
46 return 0;
47 return 1;
48 }
49
50
51 #if WITH_SOCKEM
52 /**
53 * Producer message retry testing
54 */
55
56 /* Hang on to the first broker socket we see in connect_cb,
57 * reject all the rest (connection refused) to make sure we're only
58 * playing with one single broker for this test. */
59
60 #include "sockem_ctrl.h"
61
62
63 /**
64 * @brief Test produce retries.
65 *
66 * @param should_fail If true, do negative testing which should fail.
67 */
do_test_produce_retries(const char * topic,int idempotence,int try_fail,int should_fail)68 static void do_test_produce_retries (const char *topic,
69 int idempotence,
70 int try_fail,
71 int should_fail) {
72 rd_kafka_t *rk;
73 rd_kafka_conf_t *conf;
74 rd_kafka_topic_t *rkt;
75 uint64_t testid;
76 rd_kafka_resp_err_t err;
77 int msgcnt = 1;
78 sockem_ctrl_t ctrl;
79
80 TEST_SAY(_C_BLU "Test produce retries "
81 "(idempotence=%d,try_fail=%d,should_fail=%d)\n",
82 idempotence, try_fail, should_fail);
83
84 testid = test_id_generate();
85
86 test_conf_init(&conf, NULL, 60);
87
88 if (should_fail &&
89 !strcmp(test_conf_get(conf, "enable.sparse.connections"),
90 "true")) {
91 rd_kafka_conf_destroy(conf);
92 TEST_SAY(_C_YEL "Sparse connections enabled: "
93 "skipping connection-timing related test\n");
94 return;
95 }
96
97 sockem_ctrl_init(&ctrl);
98
99 test_conf_set(conf, "socket.timeout.ms", "1000");
100 /* Avoid disconnects on request timeouts */
101 test_conf_set(conf, "socket.max.fails", "100");
102 test_conf_set(conf, "enable.idempotence", idempotence?"true":"false");
103 test_curr->exp_dr_err = RD_KAFKA_RESP_ERR_NO_ERROR;
104 test_curr->exp_dr_status = RD_KAFKA_MSG_STATUS_PERSISTED;
105 if (!try_fail) {
106 test_conf_set(conf, "retries", "5");
107 } else {
108 /* enable.idempotence=true request retries >= 1 which
109 * makes the test pass. Adjust expected error accordingly. */
110 if (idempotence)
111 test_conf_set(conf, "retries", "5");
112 else
113 test_conf_set(conf, "retries", "0");
114 if (should_fail) {
115 test_curr->exp_dr_err = RD_KAFKA_RESP_ERR__MSG_TIMED_OUT;
116 test_curr->exp_dr_status = RD_KAFKA_MSG_STATUS_POSSIBLY_PERSISTED;
117 }
118 }
119 test_conf_set(conf, "retry.backoff.ms", "5000");
120 rd_kafka_conf_set_dr_msg_cb(conf, test_dr_msg_cb);
121 test_socket_enable(conf);
122 test_curr->is_fatal_cb = is_fatal_cb;
123
124 rk = test_create_handle(RD_KAFKA_PRODUCER, conf);
125 rkt = test_create_producer_topic(rk, topic, NULL);
126
127 /* Create the topic to make sure connections are up and ready. */
128 err = test_auto_create_topic_rkt(rk, rkt, tmout_multip(5000));
129 TEST_ASSERT(!err, "topic creation failed: %s", rd_kafka_err2str(err));
130
131 /* Set initial delay to 3s */
132 sockem_ctrl_set_delay(&ctrl, 0, 3000); /* Takes effect immediately */
133
134 /* After two retries, remove the delay, the third retry
135 * should kick in and work. */
136 sockem_ctrl_set_delay(&ctrl,
137 ((1000 /*socket.timeout.ms*/ +
138 5000 /*retry.backoff.ms*/) * 2) - 2000, 0);
139
140 test_produce_msgs(rk, rkt, testid, RD_KAFKA_PARTITION_UA,
141 0, msgcnt, NULL, 0);
142
143
144 rd_kafka_topic_destroy(rkt);
145 rd_kafka_destroy(rk);
146
147 if (!should_fail) {
148 TEST_SAY("Verifying messages with consumer\n");
149 test_consume_msgs_easy(NULL, topic, testid, -1, msgcnt, NULL);
150 }
151
152 sockem_ctrl_term(&ctrl);
153
154 TEST_SAY(_C_GRN "Test produce retries "
155 "(idempotence=%d,try_fail=%d,should_fail=%d): PASS\n",
156 idempotence, try_fail, should_fail);
157 }
158 #endif
159
160
161
162
163 /**
164 * @brief Simple on_request_sent interceptor that simply disconnects
165 * the socket when first ProduceRequest is seen.
166 * Sub-sequent ProduceRequests will not trigger a disconnect, to allow
167 * for retries.
168 */
169 static mtx_t produce_disconnect_lock;
170 static int produce_disconnects = 0;
on_request_sent(rd_kafka_t * rk,int sockfd,const char * brokername,int32_t brokerid,int16_t ApiKey,int16_t ApiVersion,int32_t CorrId,size_t size,void * ic_opaque)171 static rd_kafka_resp_err_t on_request_sent (rd_kafka_t *rk,
172 int sockfd,
173 const char *brokername,
174 int32_t brokerid,
175 int16_t ApiKey,
176 int16_t ApiVersion,
177 int32_t CorrId,
178 size_t size,
179 void *ic_opaque) {
180
181 /* Ignore if not a ProduceRequest */
182 if (ApiKey != 0)
183 return RD_KAFKA_RESP_ERR_NO_ERROR;
184
185 mtx_lock(&produce_disconnect_lock);
186 if (produce_disconnects == 0) {
187 char buf[512];
188 ssize_t r;
189 printf(_C_CYA "%s:%d: shutting down socket %d (%s)\n" _C_CLR,
190 __FILE__, __LINE__, sockfd, brokername);
191 #ifdef _WIN32
192 closesocket(sockfd);
193 #else
194 close(sockfd);
195 #endif
196 /* There is a chance the broker responded in the
197 * time it took us to get here, so purge the
198 * socket recv buffer to make sure librdkafka does not see
199 * the response. */
200 while ((r = recv(sockfd, buf, sizeof(buf), 0)) > 0)
201 printf(_C_CYA "%s:%d: "
202 "purged %"PRIdsz" bytes from socket\n",
203 __FILE__, __LINE__, r);
204 produce_disconnects = 1;
205 }
206 mtx_unlock(&produce_disconnect_lock);
207
208 return RD_KAFKA_RESP_ERR_NO_ERROR;
209 }
210
211
on_new_producer(rd_kafka_t * rk,const rd_kafka_conf_t * conf,void * ic_opaque,char * errstr,size_t errstr_size)212 static rd_kafka_resp_err_t on_new_producer (rd_kafka_t *rk,
213 const rd_kafka_conf_t *conf,
214 void *ic_opaque,
215 char *errstr, size_t errstr_size) {
216 return rd_kafka_interceptor_add_on_request_sent(
217 rk, "disconnect_on_send",
218 on_request_sent, NULL);
219 }
220
221 /**
222 * @brief Test produce retries by disconnecting right after ProduceRequest
223 * has been sent.
224 *
225 * @param should_fail If true, do negative testing which should fail.
226 */
do_test_produce_retries_disconnect(const char * topic,int idempotence,int try_fail,int should_fail)227 static void do_test_produce_retries_disconnect (const char *topic,
228 int idempotence,
229 int try_fail,
230 int should_fail) {
231 rd_kafka_t *rk;
232 rd_kafka_conf_t *conf;
233 rd_kafka_topic_t *rkt;
234 uint64_t testid;
235 rd_kafka_resp_err_t err;
236 int msgcnt = 1;
237 int partition_cnt;
238
239 TEST_SAY(_C_BLU "Test produce retries by disconnect "
240 "(idempotence=%d,try_fail=%d,should_fail=%d)\n",
241 idempotence, try_fail, should_fail);
242
243 test_curr->is_fatal_cb = is_fatal_cb;
244
245 testid = test_id_generate();
246
247 test_conf_init(&conf, NULL, 60);
248 rd_kafka_conf_set_dr_msg_cb(conf, test_dr_msg_cb);
249 test_conf_set(conf, "socket.timeout.ms", test_quick ? "3000":"10000");
250 test_conf_set(conf, "message.timeout.ms", test_quick ? "9000":"30000");
251 test_conf_set(conf, "enable.idempotence", idempotence?"true":"false");
252 if (!try_fail) {
253 test_conf_set(conf, "retries", "1");
254 } else {
255 /* enable.idempotence=true request retries >= 1 which
256 * makes the test pass. */
257 if (!idempotence)
258 test_conf_set(conf, "retries", "0");
259 }
260
261 mtx_init(&produce_disconnect_lock, mtx_plain);
262 produce_disconnects = 0;
263
264 rd_kafka_conf_interceptor_add_on_new(conf, "on_new_producer",
265 on_new_producer, NULL);
266
267 rk = test_create_handle(RD_KAFKA_PRODUCER, conf);
268 rkt = test_create_producer_topic(rk, topic, NULL);
269
270 err = test_produce_sync(rk, rkt, testid, 0);
271
272 if (should_fail) {
273 if (!err)
274 TEST_FAIL("Expected produce to fail\n");
275 else
276 TEST_SAY("Produced message failed as expected: %s\n",
277 rd_kafka_err2str(err));
278 } else {
279 if (err)
280 TEST_FAIL("Produced message failed: %s\n",
281 rd_kafka_err2str(err));
282 else
283 TEST_SAY("Produced message delivered\n");
284 }
285
286 mtx_lock(&produce_disconnect_lock);
287 TEST_ASSERT(produce_disconnects == 1,
288 "expected %d disconnects, not %d", 1, produce_disconnects);
289 mtx_unlock(&produce_disconnect_lock);
290
291
292 partition_cnt = test_get_partition_count(rk, topic, tmout_multip(5000));
293
294 rd_kafka_topic_destroy(rkt);
295 rd_kafka_destroy(rk);
296
297 TEST_SAY("Verifying messages with consumer\n");
298 test_consume_msgs_easy(NULL, topic, testid, partition_cnt,
299 /* Since we don't know the number of
300 * messages that got thru on the socket
301 * before disconnect we can't let the
302 * expected message count be 0 in case of
303 * should_fail, so instead ignore the message
304 * count (-1). */
305 should_fail ? -1 : msgcnt, NULL);
306
307 TEST_SAY(_C_GRN "Test produce retries by disconnect "
308 "(idempotence=%d,try_fail=%d,should_fail=%d): PASS\n",
309 idempotence, try_fail, should_fail);
310 }
311
312
main_0076_produce_retry(int argc,char ** argv)313 int main_0076_produce_retry (int argc, char **argv) {
314 const char *topic = test_mk_topic_name("0076_produce_retry", 1);
315 const rd_bool_t has_idempotence =
316 test_broker_version >= TEST_BRKVER(0,11,0,0);
317
318 #if WITH_SOCKEM
319 if (has_idempotence) {
320 /* Idempotence, no try fail, should succeed. */
321 do_test_produce_retries(topic, 1, 0, 0);
322 /* Idempotence, try fail, should succeed. */
323 do_test_produce_retries(topic, 1, 1, 0);
324 }
325 /* No idempotence, try fail, should fail. */
326 do_test_produce_retries(topic, 0, 1, 1);
327 #endif
328
329 if (has_idempotence) {
330 /* Idempotence, no try fail, should succeed. */
331 do_test_produce_retries_disconnect(topic, 1, 0, 0);
332 /* Idempotence, try fail, should succeed. */
333 do_test_produce_retries_disconnect(topic, 1, 1, 0);
334 }
335 /* No idempotence, try fail, should fail. */
336 do_test_produce_retries_disconnect(topic, 0, 1, 1);
337
338 return 0;
339 }
340
341
342