1 /*
2 * librdkafka - Apache Kafka C library
3 *
4 * Copyright (c) 2012-2015, Magnus Edenhill
5 * All rights reserved.
6 *
7 * Redistribution and use in source and binary forms, with or without
8 * modification, are permitted provided that the following conditions are met:
9 *
10 * 1. Redistributions of source code must retain the above copyright notice,
11 * this list of conditions and the following disclaimer.
12 * 2. Redistributions in binary form must reproduce the above copyright notice,
13 * this list of conditions and the following disclaimer in the documentation
14 * and/or other materials provided with the distribution.
15 *
16 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
17 * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
18 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
19 * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
20 * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
21 * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
22 * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
23 * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
24 * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
25 * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
26 * POSSIBILITY OF SUCH DAMAGE.
27 */
28
29 #include "test.h"
30
31 /**
32 * @name Test rd_kafka_purge()
33 *
34 * Local test:
35 * - produce 20 messages (that will be held up in queues),
36 * for specific partitions and UA.
37 * - purge(INFLIGHT) => no change in len()
38 * - purge(QUEUE) => len() should drop to 0, dr errs should be ERR__PURGE_QUEUE
39 *
40 * Remote test (WITH_SOCKEM):
41 * - Limit in-flight messages to 10
42 * - Produce 20 messages to the same partition, in batches of 10.
43 * - Make sure only first batch is sent.
44 * - purge(QUEUE) => len should drop to 10, dr err ERR__PURGE_QUEUE
45 * - purge(INFLIGHT|QUEUE) => len should drop to 0, ERR__PURGE_INFLIGHT
46 */
47
48
49 static const int msgcnt = 20;
50 struct waitmsgs {
51 rd_kafka_resp_err_t exp_err[20];
52 int cnt;
53 };
54
55 static mtx_t produce_req_lock;
56 static cnd_t produce_req_cnd;
57 static int produce_req_cnt = 0;
58
59
60 #if WITH_SOCKEM
61 /**
62 * @brief Sockem connect, called from **internal librdkafka thread** through
63 * librdkafka's connect_cb
64 */
connect_cb(struct test * test,sockem_t * skm,const char * id)65 static int connect_cb (struct test *test, sockem_t *skm, const char *id) {
66 sockem_set(skm, "delay", 500, NULL);
67 return 0;
68 }
69
on_request_sent(rd_kafka_t * rk,int sockfd,const char * brokername,int32_t brokerid,int16_t ApiKey,int16_t ApiVersion,int32_t CorrId,size_t size,void * ic_opaque)70 static rd_kafka_resp_err_t on_request_sent (rd_kafka_t *rk,
71 int sockfd,
72 const char *brokername,
73 int32_t brokerid,
74 int16_t ApiKey,
75 int16_t ApiVersion,
76 int32_t CorrId,
77 size_t size,
78 void *ic_opaque) {
79
80 /* Ignore if not a ProduceRequest */
81 if (ApiKey != 0)
82 return RD_KAFKA_RESP_ERR_NO_ERROR;
83
84 TEST_SAY("ProduceRequest sent to %s (%"PRId32")\n",
85 brokername, brokerid);
86
87 mtx_lock(&produce_req_lock);
88 produce_req_cnt++;
89 cnd_broadcast(&produce_req_cnd);
90 mtx_unlock(&produce_req_lock);
91
92 /* Stall the connection */
93 test_socket_sockem_set(sockfd, "delay", 5000);
94
95 return RD_KAFKA_RESP_ERR_NO_ERROR;
96 }
97
on_new_producer(rd_kafka_t * rk,const rd_kafka_conf_t * conf,void * ic_opaque,char * errstr,size_t errstr_size)98 static rd_kafka_resp_err_t on_new_producer (rd_kafka_t *rk,
99 const rd_kafka_conf_t *conf,
100 void *ic_opaque,
101 char *errstr, size_t errstr_size) {
102 return rd_kafka_interceptor_add_on_request_sent(
103 rk, "catch_producer_req",
104 on_request_sent, NULL);
105 }
106 #endif
107
108
109
dr_msg_cb(rd_kafka_t * rk,const rd_kafka_message_t * rkmessage,void * opaque)110 static void dr_msg_cb (rd_kafka_t *rk, const rd_kafka_message_t *rkmessage,
111 void *opaque) {
112 int msgid;
113 struct waitmsgs *waitmsgs = rkmessage->_private;
114
115 TEST_ASSERT(waitmsgs->cnt > 0, "wait_msg_cnt is zero on DR");
116
117 waitmsgs->cnt--;
118
119 TEST_ASSERT(rkmessage->len == sizeof(msgid),
120 "invalid message size %"PRIusz", expected sizeof(int)",
121 rkmessage->len);
122
123 memcpy(&msgid, rkmessage->payload, rkmessage->len);
124
125 TEST_ASSERT(msgid >= 0 && msgid < msgcnt,
126 "msgid %d out of range 0..%d", msgid, msgcnt - 1);
127
128 TEST_ASSERT((int)waitmsgs->exp_err[msgid] != 12345,
129 "msgid %d delivered twice", msgid);
130
131 TEST_SAY("DeliveryReport for msg #%d: %s\n",
132 msgid, rd_kafka_err2name(rkmessage->err));
133
134 if (rkmessage->err != waitmsgs->exp_err[msgid]) {
135 TEST_FAIL_LATER("Expected message #%d to fail with %s, not %s",
136 msgid,
137 rd_kafka_err2str(waitmsgs->exp_err[msgid]),
138 rd_kafka_err2str(rkmessage->err));
139 }
140
141 /* Indicate already seen */
142 waitmsgs->exp_err[msgid] = (rd_kafka_resp_err_t)12345;
143 }
144
145
146
147
148
149
150
purge_and_expect(const char * what,int line,rd_kafka_t * rk,int purge_flags,struct waitmsgs * waitmsgs,int exp_remain,const char * reason)151 static void purge_and_expect (const char *what, int line,
152 rd_kafka_t *rk, int purge_flags,
153 struct waitmsgs *waitmsgs,
154 int exp_remain, const char *reason) {
155 test_timing_t t_purge;
156 rd_kafka_resp_err_t err;
157
158 TEST_SAY("%s:%d: purge(0x%x): "
159 "expecting %d messages to remain when done\n",
160 what, line, purge_flags, exp_remain);
161 TIMING_START(&t_purge, "%s:%d: purge(0x%x)", what, line, purge_flags);
162 err = rd_kafka_purge(rk, purge_flags);
163 TIMING_STOP(&t_purge);
164
165 TEST_ASSERT(!err, "purge(0x%x) at %d failed: %s",
166 purge_flags, line, rd_kafka_err2str(err));
167
168 rd_kafka_poll(rk, 0);
169 TEST_ASSERT(waitmsgs->cnt == exp_remain,
170 "%s:%d: expected %d messages remaining, not %d",
171 what, line, exp_remain, waitmsgs->cnt);
172 }
173
174
175 /**
176 * @brief Don't treat ERR__GAPLESS_GUARANTEE as a fatal error
177 */
gapless_is_not_fatal_cb(rd_kafka_t * rk,rd_kafka_resp_err_t err,const char * reason)178 static int gapless_is_not_fatal_cb (rd_kafka_t *rk, rd_kafka_resp_err_t err,
179 const char *reason) {
180 return err != RD_KAFKA_RESP_ERR__GAPLESS_GUARANTEE;
181 }
182
do_test_purge(const char * what,int remote,int idempotence,int gapless)183 static void do_test_purge (const char *what, int remote,
184 int idempotence, int gapless) {
185 const char *topic = test_mk_topic_name("0086_purge", 0);
186 rd_kafka_conf_t *conf;
187 rd_kafka_t *rk;
188 int i;
189 rd_kafka_resp_err_t err;
190 struct waitmsgs waitmsgs = RD_ZERO_INIT;
191
192 #if !WITH_SOCKEM
193 if (remote) {
194 TEST_SKIP("No sockem support\n");
195 return;
196 }
197 #endif
198
199 TEST_SAY(_C_MAG "Test rd_kafka_purge(): %s\n" _C_CLR, what);
200
201 test_conf_init(&conf, NULL, 20);
202
203 test_conf_set(conf, "batch.num.messages", "10");
204 test_conf_set(conf, "max.in.flight", "1");
205 test_conf_set(conf, "linger.ms", "500");
206 test_conf_set(conf, "enable.idempotence", idempotence?"true":"false");
207 test_conf_set(conf, "enable.gapless.guarantee", gapless?"true":"false");
208 rd_kafka_conf_set_dr_msg_cb(conf, dr_msg_cb);
209
210 if (remote) {
211 #if WITH_SOCKEM
212 test_socket_enable(conf);
213 test_curr->connect_cb = connect_cb;
214 rd_kafka_conf_interceptor_add_on_new(conf, "on_new_producer",
215 on_new_producer, NULL);
216 #endif
217
218 if (idempotence && !gapless)
219 test_curr->is_fatal_cb = gapless_is_not_fatal_cb;
220
221 mtx_init(&produce_req_lock, mtx_plain);
222 cnd_init(&produce_req_cnd);
223 } else {
224 test_conf_set(conf, "bootstrap.servers", NULL);
225 }
226
227 rk = test_create_handle(RD_KAFKA_PRODUCER, conf);
228
229 TEST_SAY("Producing %d messages to topic %s\n", msgcnt, topic);
230
231 for (i = 0 ; i < msgcnt ; i++) {
232 int32_t partition;
233
234 if (remote) {
235 /* We need all messages in the same partition
236 * so that remaining messages are queued
237 * up behind the first messageset */
238 partition = 0;
239 } else {
240 partition = (i < 10 ? i % 3 : RD_KAFKA_PARTITION_UA);
241 }
242
243 err = rd_kafka_producev(rk,
244 RD_KAFKA_V_TOPIC(topic),
245 RD_KAFKA_V_PARTITION(partition),
246 RD_KAFKA_V_VALUE((void *)&i, sizeof(i)),
247 RD_KAFKA_V_MSGFLAGS(RD_KAFKA_MSG_F_COPY),
248 RD_KAFKA_V_OPAQUE(&waitmsgs),
249 RD_KAFKA_V_END);
250 TEST_ASSERT(!err, "producev(#%d) failed: %s",
251 i, rd_kafka_err2str(err));
252
253 waitmsgs.exp_err[i] = (remote && i < 10 ?
254 RD_KAFKA_RESP_ERR__PURGE_INFLIGHT :
255 RD_KAFKA_RESP_ERR__PURGE_QUEUE);
256
257 waitmsgs.cnt++;
258 }
259
260
261 if (remote) {
262 /* Wait for ProduceRequest to be sent */
263 mtx_lock(&produce_req_lock);
264 cnd_timedwait_ms(&produce_req_cnd, &produce_req_lock, 15*1000);
265 TEST_ASSERT(produce_req_cnt > 0,
266 "First Produce request should've been sent by now");
267 mtx_unlock(&produce_req_lock);
268
269 purge_and_expect(what, __LINE__, rk, RD_KAFKA_PURGE_F_QUEUE,
270 &waitmsgs, 10,
271 "in-flight messages should not be purged");
272
273 purge_and_expect(what, __LINE__, rk,
274 RD_KAFKA_PURGE_F_INFLIGHT|
275 RD_KAFKA_PURGE_F_QUEUE,
276 &waitmsgs, 0,
277 "all messages should have been purged");
278 } else {
279 purge_and_expect(what, __LINE__, rk, RD_KAFKA_PURGE_F_INFLIGHT,
280 &waitmsgs, msgcnt,
281 "no messagess should have been purged");
282
283 purge_and_expect(what, __LINE__, rk, RD_KAFKA_PURGE_F_QUEUE,
284 &waitmsgs, 0,
285 "no messagess should have been purged");
286 }
287
288
289 rd_kafka_destroy(rk);
290
291 TEST_LATER_CHECK();
292 }
293
294
main_0086_purge_remote(int argc,char ** argv)295 int main_0086_purge_remote (int argc, char **argv) {
296 const rd_bool_t has_idempotence =
297 test_broker_version >= TEST_BRKVER(0,11,0,0);
298
299 do_test_purge("remote", 1/*remote*/, 0/*idempotence*/, 0/*!gapless*/);
300
301 if (has_idempotence) {
302 do_test_purge("remote,idempotence",
303 1/*remote*/, 1/*idempotence*/, 0/*!gapless*/);
304 do_test_purge("remote,idempotence,gapless",
305 1/*remote*/, 1/*idempotence*/, 1/*!gapless*/);
306 }
307 return 0;
308 }
309
310
main_0086_purge_local(int argc,char ** argv)311 int main_0086_purge_local (int argc, char **argv) {
312 do_test_purge("local", 0/*local*/, 0, 0);
313 return 0;
314 }
315