1 /*
2 * librdkafka - Apache Kafka C library
3 *
4 * Copyright (c) 2018, Magnus Edenhill
5 * All rights reserved.
6 *
7 * Redistribution and use in source and binary forms, with or without
8 * modification, are permitted provided that the following conditions are met:
9 *
10 * 1. Redistributions of source code must retain the above copyright notice,
11 * this list of conditions and the following disclaimer.
12 * 2. Redistributions in binary form must reproduce the above copyright notice,
13 * this list of conditions and the following disclaimer in the documentation
14 * and/or other materials provided with the distribution.
15 *
16 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
17 * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
18 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
19 * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
20 * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
21 * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
22 * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
23 * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
24 * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
25 * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
26 * POSSIBILITY OF SUCH DAMAGE.
27 */
28
29 #include "test.h"
30
31 #include "rdkafka.h"
32
33 #include <stdarg.h>
34
35 /**
36 * @name Idempotent Producer tests
37 *
38 */
39
40 static struct {
41 int batch_cnt;
42 int initial_fail_batch_cnt;
43 rd_atomic32_t produce_cnt;
44 } state;
45
46
47
48 /**
49 * @brief This is called prior to parsing the ProduceResponse,
50 * we use it to inject errors.
51 *
52 * @locality an internal rdkafka thread
53 */
handle_ProduceResponse(rd_kafka_t * rk,int32_t brokerid,uint64_t msgseq,rd_kafka_resp_err_t err)54 static rd_kafka_resp_err_t handle_ProduceResponse (rd_kafka_t *rk,
55 int32_t brokerid,
56 uint64_t msgseq,
57 rd_kafka_resp_err_t err) {
58 rd_kafka_resp_err_t new_err = err;
59 int n;
60
61 if (err == RD_KAFKA_RESP_ERR__RETRY)
62 return err; /* Skip internal retries, such as triggered by
63 * rd_kafka_broker_bufq_purge_by_toppar() */
64
65 n = rd_atomic32_add(&state.produce_cnt, 1);
66
67 /* Let the first N ProduceRequests fail with request timeout.
68 * Do allow the first request through. */
69 if (n > 1 && n <= state.initial_fail_batch_cnt) {
70 if (err)
71 TEST_WARN("First %d ProduceRequests should not "
72 "have failed, this is #%d with error %s for "
73 "brokerid %"PRId32" and msgseq %"PRIu64"\n",
74 state.initial_fail_batch_cnt, n,
75 rd_kafka_err2name(err), brokerid, msgseq);
76 assert(!err &&
77 *"First N ProduceRequests should not have failed");
78 new_err = RD_KAFKA_RESP_ERR__TIMED_OUT;
79 }
80
81 TEST_SAY("handle_ProduceResponse(broker %"PRId32
82 ", MsgSeq %"PRId64", Error %s) -> new Error %s\n",
83 brokerid, msgseq,
84 rd_kafka_err2name(err),
85 rd_kafka_err2name(new_err));
86
87 return new_err;
88 }
89
90
91 /**
92 * @brief Test handling of implicit acks.
93 *
94 * @param batch_cnt Total number of batches, ProduceRequests, sent.
95 * @param initial_fail_batch_cnt How many of the initial batches should
96 * fail with an emulated network timeout.
97 */
do_test_implicit_ack(const char * what,int batch_cnt,int initial_fail_batch_cnt)98 static void do_test_implicit_ack (const char *what,
99 int batch_cnt, int initial_fail_batch_cnt) {
100 rd_kafka_t *rk;
101 const char *topic = test_mk_topic_name("0090_idempotence_impl_ack", 1);
102 const int32_t partition = 0;
103 uint64_t testid;
104 int msgcnt = 10*batch_cnt;
105 rd_kafka_conf_t *conf;
106 rd_kafka_topic_t *rkt;
107 test_msgver_t mv;
108
109 TEST_SAY(_C_MAG "[ Test implicit ack: %s ]\n", what);
110
111 rd_atomic32_init(&state.produce_cnt, 0);
112 state.batch_cnt = batch_cnt;
113 state.initial_fail_batch_cnt = initial_fail_batch_cnt;
114
115 testid = test_id_generate();
116
117 test_conf_init(&conf, NULL, 60);
118 rd_kafka_conf_set_dr_msg_cb(conf, test_dr_msg_cb);
119 test_conf_set(conf, "enable.idempotence", "true");
120 test_conf_set(conf, "batch.num.messages", "10");
121 test_conf_set(conf, "linger.ms", "500");
122 test_conf_set(conf, "retry.backoff.ms", "10");
123
124 /* The ProduceResponse handler will inject timed-out-in-flight
125 * errors for the first N ProduceRequests, which will trigger retries
126 * that in turn will result in OutOfSequence errors. */
127 test_conf_set(conf, "ut_handle_ProduceResponse",
128 (char *)handle_ProduceResponse);
129
130 rk = test_create_handle(RD_KAFKA_PRODUCER, conf);
131
132 test_create_topic(rk, topic, 1, 1);
133
134 rkt = test_create_producer_topic(rk, topic, NULL);
135
136
137 TEST_SAY("Producing %d messages\n", msgcnt);
138 test_produce_msgs(rk, rkt, testid, -1, 0, msgcnt, NULL, 0);
139
140 TEST_SAY("Flushing..\n");
141 rd_kafka_flush(rk, 10000);
142
143 rd_kafka_topic_destroy(rkt);
144 rd_kafka_destroy(rk);
145
146 TEST_SAY("Verifying messages with consumer\n");
147 test_msgver_init(&mv, testid);
148 test_consume_msgs_easy_mv(NULL, topic, partition,
149 testid, 1, msgcnt, NULL, &mv);
150 test_msgver_verify("verify", &mv, TEST_MSGVER_ALL, 0, msgcnt);
151 test_msgver_clear(&mv);
152
153 TEST_SAY(_C_GRN "[ Test implicit ack: %s : PASS ]\n", what);
154 }
155
156
main_0090_idempotence(int argc,char ** argv)157 int main_0090_idempotence (int argc, char **argv) {
158 /* The broker maintains a window of the N last ProduceRequests
159 * per partition and producer to allow ProduceRequest retries
160 * for previously successful requests to return a non-error response.
161 * This limit is currently (AK 2.0) hard coded at 5. */
162 const int broker_req_window = 5;
163
164 do_test_implicit_ack("within broker request window",
165 broker_req_window * 2,
166 broker_req_window);
167
168 do_test_implicit_ack("outside broker request window",
169 broker_req_window + 3,
170 broker_req_window + 3);
171
172 return 0;
173 }
174