1 /*
2  * librdkafka - Apache Kafka C library
3  *
4  * Copyright (c) 2018, Magnus Edenhill
5  * All rights reserved.
6  *
7  * Redistribution and use in source and binary forms, with or without
8  * modification, are permitted provided that the following conditions are met:
9  *
10  * 1. Redistributions of source code must retain the above copyright notice,
11  *    this list of conditions and the following disclaimer.
12  * 2. Redistributions in binary form must reproduce the above copyright notice,
13  *    this list of conditions and the following disclaimer in the documentation
14  *    and/or other materials provided with the distribution.
15  *
16  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
17  * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
18  * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
19  * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
20  * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
21  * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
22  * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
23  * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
24  * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
25  * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
26  * POSSIBILITY OF SUCH DAMAGE.
27  */
28 
29 #include "test.h"
30 
31 #include "rdkafka.h"
32 
33 #include <stdarg.h>
34 
35 /**
36  * @name Idempotent Producer tests
37  *
38  */
39 
40 static struct {
41         int batch_cnt;
42         int initial_fail_batch_cnt;
43         rd_atomic32_t produce_cnt;
44 } state;
45 
46 
47 
48 /**
49  * @brief This is called prior to parsing the ProduceResponse,
50  *        we use it to inject errors.
51  *
52  * @locality an internal rdkafka thread
53  */
handle_ProduceResponse(rd_kafka_t * rk,int32_t brokerid,uint64_t msgseq,rd_kafka_resp_err_t err)54 static rd_kafka_resp_err_t handle_ProduceResponse (rd_kafka_t *rk,
55                                                    int32_t brokerid,
56                                                    uint64_t msgseq,
57                                                    rd_kafka_resp_err_t err) {
58         rd_kafka_resp_err_t new_err = err;
59         int n;
60 
61         if (err == RD_KAFKA_RESP_ERR__RETRY)
62                 return err; /* Skip internal retries, such as triggered by
63                              * rd_kafka_broker_bufq_purge_by_toppar() */
64 
65         n = rd_atomic32_add(&state.produce_cnt, 1);
66 
67         /* Let the first N ProduceRequests fail with request timeout.
68          * Do allow the first request through. */
69         if (n > 1 && n <= state.initial_fail_batch_cnt) {
70                 if (err)
71                         TEST_WARN("First %d ProduceRequests should not "
72                                   "have failed, this is #%d with error %s for "
73                                   "brokerid %"PRId32" and msgseq %"PRIu64"\n",
74                                   state.initial_fail_batch_cnt, n,
75                                   rd_kafka_err2name(err), brokerid, msgseq);
76                 assert(!err &&
77                        *"First N ProduceRequests should not have failed");
78                 new_err = RD_KAFKA_RESP_ERR__TIMED_OUT;
79         }
80 
81         TEST_SAY("handle_ProduceResponse(broker %"PRId32
82                  ", MsgSeq %"PRId64", Error %s) -> new Error %s\n",
83                  brokerid, msgseq,
84                  rd_kafka_err2name(err),
85                  rd_kafka_err2name(new_err));
86 
87         return new_err;
88 }
89 
90 
91 /**
92  * @brief Test handling of implicit acks.
93  *
94  * @param batch_cnt Total number of batches, ProduceRequests, sent.
95  * @param initial_fail_batch_cnt How many of the initial batches should
96  *                               fail with an emulated network timeout.
97  */
do_test_implicit_ack(const char * what,int batch_cnt,int initial_fail_batch_cnt)98 static void do_test_implicit_ack (const char *what,
99                                   int batch_cnt, int initial_fail_batch_cnt) {
100         rd_kafka_t *rk;
101         const char *topic = test_mk_topic_name("0090_idempotence_impl_ack", 1);
102         const int32_t partition = 0;
103         uint64_t testid;
104         int msgcnt = 10*batch_cnt;
105         rd_kafka_conf_t *conf;
106         rd_kafka_topic_t *rkt;
107         test_msgver_t mv;
108 
109         TEST_SAY(_C_MAG "[ Test implicit ack: %s ]\n", what);
110 
111         rd_atomic32_init(&state.produce_cnt, 0);
112         state.batch_cnt = batch_cnt;
113         state.initial_fail_batch_cnt = initial_fail_batch_cnt;
114 
115         testid = test_id_generate();
116 
117         test_conf_init(&conf, NULL, 60);
118         rd_kafka_conf_set_dr_msg_cb(conf, test_dr_msg_cb);
119         test_conf_set(conf, "enable.idempotence", "true");
120         test_conf_set(conf, "batch.num.messages", "10");
121         test_conf_set(conf, "linger.ms", "500");
122         test_conf_set(conf, "retry.backoff.ms", "10");
123 
124         /* The ProduceResponse handler will inject timed-out-in-flight
125          * errors for the first N ProduceRequests, which will trigger retries
126          * that in turn will result in OutOfSequence errors. */
127         test_conf_set(conf, "ut_handle_ProduceResponse",
128                       (char *)handle_ProduceResponse);
129 
130         rk = test_create_handle(RD_KAFKA_PRODUCER, conf);
131 
132         test_create_topic(rk, topic, 1, 1);
133 
134         rkt = test_create_producer_topic(rk, topic, NULL);
135 
136 
137         TEST_SAY("Producing %d messages\n", msgcnt);
138         test_produce_msgs(rk, rkt, testid, -1, 0, msgcnt, NULL, 0);
139 
140         TEST_SAY("Flushing..\n");
141         rd_kafka_flush(rk, 10000);
142 
143         rd_kafka_topic_destroy(rkt);
144         rd_kafka_destroy(rk);
145 
146         TEST_SAY("Verifying messages with consumer\n");
147         test_msgver_init(&mv, testid);
148         test_consume_msgs_easy_mv(NULL, topic, partition,
149                                   testid, 1, msgcnt, NULL, &mv);
150         test_msgver_verify("verify", &mv, TEST_MSGVER_ALL, 0, msgcnt);
151         test_msgver_clear(&mv);
152 
153         TEST_SAY(_C_GRN "[ Test implicit ack: %s : PASS ]\n", what);
154 }
155 
156 
main_0090_idempotence(int argc,char ** argv)157 int main_0090_idempotence (int argc, char **argv) {
158         /* The broker maintains a window of the N last ProduceRequests
159          * per partition and producer to allow ProduceRequest retries
160          * for previously successful requests to return a non-error response.
161          * This limit is currently (AK 2.0) hard coded at 5. */
162         const int broker_req_window = 5;
163 
164         do_test_implicit_ack("within broker request window",
165                              broker_req_window * 2,
166                              broker_req_window);
167 
168         do_test_implicit_ack("outside broker request window",
169                              broker_req_window + 3,
170                              broker_req_window + 3);
171 
172         return 0;
173 }
174