1 /*
2  * librdkafka - Apache Kafka C library
3  *
4  * Copyright (c) 2012-2015, Magnus Edenhill
5  * All rights reserved.
6  *
7  * Redistribution and use in source and binary forms, with or without
8  * modification, are permitted provided that the following conditions are met:
9  *
10  * 1. Redistributions of source code must retain the above copyright notice,
11  *    this list of conditions and the following disclaimer.
12  * 2. Redistributions in binary form must reproduce the above copyright notice,
13  *    this list of conditions and the following disclaimer in the documentation
14  *    and/or other materials provided with the distribution.
15  *
16  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
17  * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
18  * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
19  * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
20  * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
21  * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
22  * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
23  * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
24  * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
25  * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
26  * POSSIBILITY OF SUCH DAMAGE.
27  */
28 
29 #include "test.h"
30 #include "rdkafka.h"
31 
32 #include <stdarg.h>
33 #include <errno.h>
34 
is_fatal_cb(rd_kafka_t * rk,rd_kafka_resp_err_t err,const char * reason)35 static int is_fatal_cb (rd_kafka_t *rk, rd_kafka_resp_err_t err,
36                         const char *reason) {
37         /* Ignore connectivity errors since we'll be bringing down
38          * .. connectivity.
39          * SASL auther will think a connection-down even in the auth
40          * state means the broker doesn't support SASL PLAIN. */
41         TEST_SAY("is_fatal?: %s: %s\n", rd_kafka_err2str(err), reason);
42         if (err == RD_KAFKA_RESP_ERR__TRANSPORT ||
43             err == RD_KAFKA_RESP_ERR__ALL_BROKERS_DOWN ||
44             err == RD_KAFKA_RESP_ERR__AUTHENTICATION ||
45             err == RD_KAFKA_RESP_ERR__TIMED_OUT)
46                 return 0;
47         return 1;
48 }
49 
50 
51 #if WITH_SOCKEM
52 /**
53  * Producer message retry testing
54  */
55 
56 /* Hang on to the first broker socket we see in connect_cb,
57  * reject all the rest (connection refused) to make sure we're only
58  * playing with one single broker for this test. */
59 
60 #include "sockem_ctrl.h"
61 
62 
63 /**
64  * @brief Test produce retries.
65  *
66  * @param should_fail If true, do negative testing which should fail.
67  */
do_test_produce_retries(const char * topic,int idempotence,int try_fail,int should_fail)68 static void do_test_produce_retries (const char *topic,
69                                      int idempotence,
70                                      int try_fail,
71                                      int should_fail) {
72         rd_kafka_t *rk;
73         rd_kafka_conf_t *conf;
74         rd_kafka_topic_t *rkt;
75         uint64_t testid;
76         rd_kafka_resp_err_t err;
77         int msgcnt = 1;
78         sockem_ctrl_t ctrl;
79 
80         TEST_SAY(_C_BLU "Test produce retries "
81                  "(idempotence=%d,try_fail=%d,should_fail=%d)\n",
82                  idempotence, try_fail, should_fail);
83 
84         testid = test_id_generate();
85 
86         test_conf_init(&conf, NULL, 60);
87 
88         if (should_fail &&
89             !strcmp(test_conf_get(conf, "enable.sparse.connections"),
90                     "true")) {
91                 rd_kafka_conf_destroy(conf);
92                 TEST_SAY(_C_YEL "Sparse connections enabled: "
93                          "skipping connection-timing related test\n");
94                 return;
95         }
96 
97         sockem_ctrl_init(&ctrl);
98 
99         test_conf_set(conf, "socket.timeout.ms", "1000");
100         /* Avoid disconnects on request timeouts */
101         test_conf_set(conf, "socket.max.fails", "100");
102         test_conf_set(conf, "enable.idempotence", idempotence?"true":"false");
103         test_curr->exp_dr_err = RD_KAFKA_RESP_ERR_NO_ERROR;
104         test_curr->exp_dr_status = RD_KAFKA_MSG_STATUS_PERSISTED;
105         if (!try_fail) {
106                 test_conf_set(conf, "retries", "5");
107         } else {
108                 /* enable.idempotence=true request retries >= 1 which
109                  * makes the test pass. Adjust expected error accordingly. */
110                 if (idempotence)
111                         test_conf_set(conf, "retries", "5");
112                 else
113                         test_conf_set(conf, "retries", "0");
114                 if (should_fail) {
115                         test_curr->exp_dr_err = RD_KAFKA_RESP_ERR__MSG_TIMED_OUT;
116                         test_curr->exp_dr_status = RD_KAFKA_MSG_STATUS_POSSIBLY_PERSISTED;
117                 }
118         }
119         test_conf_set(conf, "retry.backoff.ms", "5000");
120         rd_kafka_conf_set_dr_msg_cb(conf, test_dr_msg_cb);
121         test_socket_enable(conf);
122         test_curr->is_fatal_cb = is_fatal_cb;
123 
124         rk = test_create_handle(RD_KAFKA_PRODUCER, conf);
125         rkt = test_create_producer_topic(rk, topic, NULL);
126 
127         /* Create the topic to make sure connections are up and ready. */
128         err = test_auto_create_topic_rkt(rk, rkt, tmout_multip(5000));
129         TEST_ASSERT(!err, "topic creation failed: %s", rd_kafka_err2str(err));
130 
131         /* Set initial delay to 3s */
132         sockem_ctrl_set_delay(&ctrl, 0, 3000); /* Takes effect immediately */
133 
134         /* After two retries, remove the delay, the third retry
135          * should kick in and work. */
136         sockem_ctrl_set_delay(&ctrl,
137                               ((1000 /*socket.timeout.ms*/ +
138                                 5000 /*retry.backoff.ms*/) * 2) - 2000, 0);
139 
140         test_produce_msgs(rk, rkt, testid, RD_KAFKA_PARTITION_UA,
141                           0, msgcnt, NULL, 0);
142 
143 
144         rd_kafka_topic_destroy(rkt);
145         rd_kafka_destroy(rk);
146 
147         if (!should_fail) {
148                 TEST_SAY("Verifying messages with consumer\n");
149                 test_consume_msgs_easy(NULL, topic, testid, -1, msgcnt, NULL);
150         }
151 
152         sockem_ctrl_term(&ctrl);
153 
154         TEST_SAY(_C_GRN "Test produce retries "
155                  "(idempotence=%d,try_fail=%d,should_fail=%d): PASS\n",
156                  idempotence, try_fail, should_fail);
157 }
158 #endif
159 
160 
161 
162 
163 /**
164  * @brief Simple on_request_sent interceptor that simply disconnects
165  *        the socket when first ProduceRequest is seen.
166  *        Sub-sequent ProduceRequests will not trigger a disconnect, to allow
167  *        for retries.
168  */
169 static mtx_t produce_disconnect_lock;
170 static int produce_disconnects = 0;
on_request_sent(rd_kafka_t * rk,int sockfd,const char * brokername,int32_t brokerid,int16_t ApiKey,int16_t ApiVersion,int32_t CorrId,size_t size,void * ic_opaque)171 static rd_kafka_resp_err_t on_request_sent (rd_kafka_t *rk,
172                                             int sockfd,
173                                             const char *brokername,
174                                             int32_t brokerid,
175                                             int16_t ApiKey,
176                                             int16_t ApiVersion,
177                                             int32_t CorrId,
178                                             size_t  size,
179                                             void *ic_opaque) {
180 
181         /* Ignore if not a ProduceRequest */
182         if (ApiKey != 0)
183                 return RD_KAFKA_RESP_ERR_NO_ERROR;
184 
185         mtx_lock(&produce_disconnect_lock);
186         if (produce_disconnects == 0) {
187                 char buf[512];
188                 ssize_t r;
189                 printf(_C_CYA "%s:%d: shutting down socket %d (%s)\n" _C_CLR,
190                        __FILE__, __LINE__, sockfd, brokername);
191 #ifdef _WIN32
192                 closesocket(sockfd);
193 #else
194                 close(sockfd);
195 #endif
196                 /* There is a chance the broker responded in the
197                  * time it took us to get here, so purge the
198                  * socket recv buffer to make sure librdkafka does not see
199                  * the response. */
200                 while ((r = recv(sockfd, buf, sizeof(buf), 0)) > 0)
201                         printf(_C_CYA "%s:%d: "
202                                "purged %"PRIdsz" bytes from socket\n",
203                                __FILE__, __LINE__, r);
204                 produce_disconnects = 1;
205         }
206         mtx_unlock(&produce_disconnect_lock);
207 
208         return RD_KAFKA_RESP_ERR_NO_ERROR;
209 }
210 
211 
on_new_producer(rd_kafka_t * rk,const rd_kafka_conf_t * conf,void * ic_opaque,char * errstr,size_t errstr_size)212 static rd_kafka_resp_err_t on_new_producer (rd_kafka_t *rk,
213                                             const rd_kafka_conf_t *conf,
214                                             void *ic_opaque,
215                                             char *errstr, size_t errstr_size) {
216         return rd_kafka_interceptor_add_on_request_sent(
217                 rk, "disconnect_on_send",
218                 on_request_sent, NULL);
219 }
220 
221 /**
222  * @brief Test produce retries by disconnecting right after ProduceRequest
223  *        has been sent.
224  *
225  * @param should_fail If true, do negative testing which should fail.
226  */
do_test_produce_retries_disconnect(const char * topic,int idempotence,int try_fail,int should_fail)227 static void do_test_produce_retries_disconnect (const char *topic,
228                                                 int idempotence,
229                                                 int try_fail,
230                                                 int should_fail) {
231         rd_kafka_t *rk;
232         rd_kafka_conf_t *conf;
233         rd_kafka_topic_t *rkt;
234         uint64_t testid;
235         rd_kafka_resp_err_t err;
236         int msgcnt = 1;
237         int partition_cnt;
238 
239         TEST_SAY(_C_BLU "Test produce retries by disconnect "
240                  "(idempotence=%d,try_fail=%d,should_fail=%d)\n",
241                  idempotence, try_fail, should_fail);
242 
243         test_curr->is_fatal_cb = is_fatal_cb;
244 
245         testid = test_id_generate();
246 
247         test_conf_init(&conf, NULL, 60);
248         rd_kafka_conf_set_dr_msg_cb(conf, test_dr_msg_cb);
249         test_conf_set(conf, "socket.timeout.ms", test_quick ? "3000":"10000");
250         test_conf_set(conf, "message.timeout.ms", test_quick ? "9000":"30000");
251         test_conf_set(conf, "enable.idempotence", idempotence?"true":"false");
252         if (!try_fail) {
253                 test_conf_set(conf, "retries", "1");
254         } else {
255                 /* enable.idempotence=true request retries >= 1 which
256                  * makes the test pass. */
257                 if (!idempotence)
258                         test_conf_set(conf, "retries", "0");
259         }
260 
261         mtx_init(&produce_disconnect_lock, mtx_plain);
262         produce_disconnects = 0;
263 
264         rd_kafka_conf_interceptor_add_on_new(conf, "on_new_producer",
265                                              on_new_producer, NULL);
266 
267         rk = test_create_handle(RD_KAFKA_PRODUCER, conf);
268         rkt = test_create_producer_topic(rk, topic, NULL);
269 
270         err = test_produce_sync(rk, rkt, testid, 0);
271 
272         if (should_fail) {
273                 if (!err)
274                         TEST_FAIL("Expected produce to fail\n");
275                 else
276                         TEST_SAY("Produced message failed as expected: %s\n",
277                                  rd_kafka_err2str(err));
278         } else {
279                 if (err)
280                         TEST_FAIL("Produced message failed: %s\n",
281                                   rd_kafka_err2str(err));
282                 else
283                         TEST_SAY("Produced message delivered\n");
284         }
285 
286         mtx_lock(&produce_disconnect_lock);
287         TEST_ASSERT(produce_disconnects == 1,
288                     "expected %d disconnects, not %d", 1, produce_disconnects);
289         mtx_unlock(&produce_disconnect_lock);
290 
291 
292         partition_cnt = test_get_partition_count(rk, topic, tmout_multip(5000));
293 
294         rd_kafka_topic_destroy(rkt);
295         rd_kafka_destroy(rk);
296 
297         TEST_SAY("Verifying messages with consumer\n");
298         test_consume_msgs_easy(NULL, topic, testid, partition_cnt,
299                                /* Since we don't know the number of
300                                 * messages that got thru on the socket
301                                 * before disconnect we can't let the
302                                 * expected message count be 0 in case of
303                                 * should_fail, so instead ignore the message
304                                 * count (-1). */
305                                should_fail ? -1 : msgcnt, NULL);
306 
307         TEST_SAY(_C_GRN "Test produce retries by disconnect "
308                  "(idempotence=%d,try_fail=%d,should_fail=%d): PASS\n",
309                  idempotence, try_fail, should_fail);
310 }
311 
312 
main_0076_produce_retry(int argc,char ** argv)313 int main_0076_produce_retry (int argc, char **argv) {
314         const char *topic = test_mk_topic_name("0076_produce_retry", 1);
315         const rd_bool_t has_idempotence =
316                 test_broker_version >= TEST_BRKVER(0,11,0,0);
317 
318 #if WITH_SOCKEM
319         if (has_idempotence) {
320                 /* Idempotence, no try fail, should succeed. */
321                 do_test_produce_retries(topic, 1, 0, 0);
322                 /* Idempotence, try fail, should succeed. */
323                 do_test_produce_retries(topic, 1, 1, 0);
324         }
325         /* No idempotence, try fail, should fail. */
326         do_test_produce_retries(topic, 0, 1, 1);
327 #endif
328 
329         if (has_idempotence) {
330                 /* Idempotence, no try fail, should succeed. */
331                 do_test_produce_retries_disconnect(topic, 1, 0, 0);
332                 /* Idempotence, try fail, should succeed. */
333                 do_test_produce_retries_disconnect(topic, 1, 1, 0);
334         }
335         /* No idempotence, try fail, should fail. */
336         do_test_produce_retries_disconnect(topic, 0, 1, 1);
337 
338         return 0;
339 }
340 
341 
342