1 /*
2  * librdkafka - Apache Kafka C library
3  *
4  * Copyright (c) 2012-2015, Magnus Edenhill
5  * All rights reserved.
6  *
7  * Redistribution and use in source and binary forms, with or without
8  * modification, are permitted provided that the following conditions are met:
9  *
10  * 1. Redistributions of source code must retain the above copyright notice,
11  *    this list of conditions and the following disclaimer.
12  * 2. Redistributions in binary form must reproduce the above copyright notice,
13  *    this list of conditions and the following disclaimer in the documentation
14  *    and/or other materials provided with the distribution.
15  *
16  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
17  * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
18  * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
19  * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
20  * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
21  * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
22  * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
23  * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
24  * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
25  * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
26  * POSSIBILITY OF SUCH DAMAGE.
27  */
28 
29 #include "test.h"
30 
31 /**
32  * @name Test rd_kafka_purge()
33  *
34  * Local test:
35  *  - produce 20 messages (that will be held up in queues),
36  *    for specific partitions and UA.
37  *  - purge(INFLIGHT) => no change in len()
38  *  - purge(QUEUE) => len() should drop to 0, dr errs should be ERR__PURGE_QUEUE
39  *
40  * Remote test (WITH_SOCKEM):
41  *  - Limit in-flight messages to 10
42  *  - Produce 20 messages to the same partition, in batches of 10.
43  *  - Make sure only first batch is sent.
44  *  - purge(QUEUE) => len should drop to 10, dr err ERR__PURGE_QUEUE
45  *  - purge(INFLIGHT|QUEUE) => len should drop to 0, ERR__PURGE_INFLIGHT
46  */
47 
48 
49 static const int msgcnt = 20;
50 struct waitmsgs {
51         rd_kafka_resp_err_t exp_err[20];
52         int cnt;
53 };
54 
55 static mtx_t produce_req_lock;
56 static cnd_t produce_req_cnd;
57 static int produce_req_cnt = 0;
58 
59 
60 #if WITH_SOCKEM
61 /**
62  * @brief Sockem connect, called from **internal librdkafka thread** through
63  *        librdkafka's connect_cb
64  */
connect_cb(struct test * test,sockem_t * skm,const char * id)65 static int connect_cb (struct test *test, sockem_t *skm, const char *id) {
66         sockem_set(skm, "delay", 500, NULL);
67         return 0;
68 }
69 
on_request_sent(rd_kafka_t * rk,int sockfd,const char * brokername,int32_t brokerid,int16_t ApiKey,int16_t ApiVersion,int32_t CorrId,size_t size,void * ic_opaque)70 static rd_kafka_resp_err_t on_request_sent (rd_kafka_t *rk,
71                                             int sockfd,
72                                             const char *brokername,
73                                             int32_t brokerid,
74                                             int16_t ApiKey,
75                                             int16_t ApiVersion,
76                                             int32_t CorrId,
77                                             size_t  size,
78                                             void *ic_opaque) {
79 
80         /* Ignore if not a ProduceRequest */
81         if (ApiKey != 0)
82                 return RD_KAFKA_RESP_ERR_NO_ERROR;
83 
84         TEST_SAY("ProduceRequest sent to %s (%"PRId32")\n",
85                  brokername, brokerid);
86 
87         mtx_lock(&produce_req_lock);
88         produce_req_cnt++;
89         cnd_broadcast(&produce_req_cnd);
90         mtx_unlock(&produce_req_lock);
91 
92         /* Stall the connection */
93         test_socket_sockem_set(sockfd, "delay", 5000);
94 
95         return RD_KAFKA_RESP_ERR_NO_ERROR;
96 }
97 
on_new_producer(rd_kafka_t * rk,const rd_kafka_conf_t * conf,void * ic_opaque,char * errstr,size_t errstr_size)98 static rd_kafka_resp_err_t on_new_producer (rd_kafka_t *rk,
99                                             const rd_kafka_conf_t *conf,
100                                             void *ic_opaque,
101                                             char *errstr, size_t errstr_size) {
102         return rd_kafka_interceptor_add_on_request_sent(
103                 rk, "catch_producer_req",
104                 on_request_sent, NULL);
105 }
106 #endif
107 
108 
109 
dr_msg_cb(rd_kafka_t * rk,const rd_kafka_message_t * rkmessage,void * opaque)110 static void dr_msg_cb (rd_kafka_t *rk, const rd_kafka_message_t *rkmessage,
111                        void *opaque) {
112         int msgid;
113         struct waitmsgs *waitmsgs = rkmessage->_private;
114 
115         TEST_ASSERT(waitmsgs->cnt > 0, "wait_msg_cnt is zero on DR");
116 
117         waitmsgs->cnt--;
118 
119         TEST_ASSERT(rkmessage->len == sizeof(msgid),
120                     "invalid message size %"PRIusz", expected sizeof(int)",
121                     rkmessage->len);
122 
123         memcpy(&msgid, rkmessage->payload, rkmessage->len);
124 
125         TEST_ASSERT(msgid >= 0 && msgid < msgcnt,
126                     "msgid %d out of range 0..%d", msgid, msgcnt - 1);
127 
128         TEST_ASSERT((int)waitmsgs->exp_err[msgid] != 12345,
129                     "msgid %d delivered twice", msgid);
130 
131         TEST_SAY("DeliveryReport for msg #%d: %s\n",
132                  msgid, rd_kafka_err2name(rkmessage->err));
133 
134         if (rkmessage->err != waitmsgs->exp_err[msgid]) {
135                 TEST_FAIL_LATER("Expected message #%d to fail with %s, not %s",
136                                 msgid,
137                                 rd_kafka_err2str(waitmsgs->exp_err[msgid]),
138                                 rd_kafka_err2str(rkmessage->err));
139         }
140 
141         /* Indicate already seen */
142         waitmsgs->exp_err[msgid] = (rd_kafka_resp_err_t)12345;
143 }
144 
145 
146 
147 
148 
149 
150 
purge_and_expect(const char * what,int line,rd_kafka_t * rk,int purge_flags,struct waitmsgs * waitmsgs,int exp_remain,const char * reason)151 static void purge_and_expect (const char *what, int line,
152                               rd_kafka_t *rk, int purge_flags,
153                               struct waitmsgs *waitmsgs,
154                               int exp_remain, const char *reason) {
155         test_timing_t t_purge;
156         rd_kafka_resp_err_t err;
157 
158         TEST_SAY("%s:%d: purge(0x%x): "
159                  "expecting %d messages to remain when done\n",
160                  what, line, purge_flags, exp_remain);
161         TIMING_START(&t_purge, "%s:%d: purge(0x%x)", what, line, purge_flags);
162         err = rd_kafka_purge(rk, purge_flags);
163         TIMING_STOP(&t_purge);
164 
165         TEST_ASSERT(!err, "purge(0x%x) at %d failed: %s",
166                     purge_flags, line, rd_kafka_err2str(err));
167 
168         rd_kafka_poll(rk, 0);
169         TEST_ASSERT(waitmsgs->cnt == exp_remain,
170                     "%s:%d: expected %d messages remaining, not %d",
171                     what, line, exp_remain, waitmsgs->cnt);
172 }
173 
174 
175 /**
176  * @brief Don't treat ERR__GAPLESS_GUARANTEE as a fatal error
177  */
gapless_is_not_fatal_cb(rd_kafka_t * rk,rd_kafka_resp_err_t err,const char * reason)178 static int gapless_is_not_fatal_cb (rd_kafka_t *rk, rd_kafka_resp_err_t err,
179                                     const char *reason) {
180         return err != RD_KAFKA_RESP_ERR__GAPLESS_GUARANTEE;
181 }
182 
do_test_purge(const char * what,int remote,int idempotence,int gapless)183 static void do_test_purge (const char *what, int remote,
184                            int idempotence, int gapless) {
185         const char *topic = test_mk_topic_name("0086_purge", 0);
186         rd_kafka_conf_t *conf;
187         rd_kafka_t *rk;
188         int i;
189         rd_kafka_resp_err_t err;
190         struct waitmsgs waitmsgs = RD_ZERO_INIT;
191 
192 #if !WITH_SOCKEM
193         if (remote) {
194                 TEST_SKIP("No sockem support\n");
195                 return;
196         }
197 #endif
198 
199         TEST_SAY(_C_MAG "Test rd_kafka_purge(): %s\n" _C_CLR, what);
200 
201         test_conf_init(&conf, NULL, 20);
202 
203         test_conf_set(conf, "batch.num.messages", "10");
204         test_conf_set(conf, "max.in.flight", "1");
205         test_conf_set(conf, "linger.ms", "500");
206         test_conf_set(conf, "enable.idempotence", idempotence?"true":"false");
207         test_conf_set(conf, "enable.gapless.guarantee", gapless?"true":"false");
208         rd_kafka_conf_set_dr_msg_cb(conf, dr_msg_cb);
209 
210         if (remote) {
211 #if WITH_SOCKEM
212                 test_socket_enable(conf);
213                 test_curr->connect_cb = connect_cb;
214                 rd_kafka_conf_interceptor_add_on_new(conf, "on_new_producer",
215                                                      on_new_producer, NULL);
216 #endif
217 
218                 if (idempotence && !gapless)
219                         test_curr->is_fatal_cb = gapless_is_not_fatal_cb;
220 
221                 mtx_init(&produce_req_lock, mtx_plain);
222                 cnd_init(&produce_req_cnd);
223         } else {
224                 test_conf_set(conf, "bootstrap.servers", NULL);
225         }
226 
227         rk = test_create_handle(RD_KAFKA_PRODUCER, conf);
228 
229         TEST_SAY("Producing %d messages to topic %s\n", msgcnt, topic);
230 
231         for (i = 0 ; i < msgcnt ; i++) {
232                 int32_t partition;
233 
234                 if (remote) {
235                         /* We need all messages in the same partition
236                          * so that remaining messages are queued
237                          * up behind the first messageset */
238                         partition = 0;
239                 } else {
240                         partition = (i < 10 ? i % 3 : RD_KAFKA_PARTITION_UA);
241                 }
242 
243                 err = rd_kafka_producev(rk,
244                                         RD_KAFKA_V_TOPIC(topic),
245                                         RD_KAFKA_V_PARTITION(partition),
246                                         RD_KAFKA_V_VALUE((void *)&i, sizeof(i)),
247                                         RD_KAFKA_V_MSGFLAGS(RD_KAFKA_MSG_F_COPY),
248                                         RD_KAFKA_V_OPAQUE(&waitmsgs),
249                                         RD_KAFKA_V_END);
250                 TEST_ASSERT(!err, "producev(#%d) failed: %s",
251                             i, rd_kafka_err2str(err));
252 
253                 waitmsgs.exp_err[i] = (remote && i < 10 ?
254                                        RD_KAFKA_RESP_ERR__PURGE_INFLIGHT :
255                                        RD_KAFKA_RESP_ERR__PURGE_QUEUE);
256 
257                 waitmsgs.cnt++;
258         }
259 
260 
261         if (remote) {
262                 /* Wait for ProduceRequest to be sent */
263                 mtx_lock(&produce_req_lock);
264                 cnd_timedwait_ms(&produce_req_cnd, &produce_req_lock, 15*1000);
265                 TEST_ASSERT(produce_req_cnt > 0,
266                             "First Produce request should've been sent by now");
267                 mtx_unlock(&produce_req_lock);
268 
269                 purge_and_expect(what, __LINE__, rk, RD_KAFKA_PURGE_F_QUEUE,
270                                  &waitmsgs, 10,
271                                  "in-flight messages should not be purged");
272 
273                 purge_and_expect(what, __LINE__, rk,
274                                  RD_KAFKA_PURGE_F_INFLIGHT|
275                                  RD_KAFKA_PURGE_F_QUEUE,
276                                  &waitmsgs, 0,
277                                  "all messages should have been purged");
278         } else {
279                 purge_and_expect(what, __LINE__, rk, RD_KAFKA_PURGE_F_INFLIGHT,
280                                  &waitmsgs, msgcnt,
281                                  "no messagess should have been purged");
282 
283                 purge_and_expect(what, __LINE__, rk, RD_KAFKA_PURGE_F_QUEUE,
284                                  &waitmsgs, 0,
285                                  "no messagess should have been purged");
286         }
287 
288 
289         rd_kafka_destroy(rk);
290 
291         TEST_LATER_CHECK();
292 }
293 
294 
main_0086_purge_remote(int argc,char ** argv)295 int main_0086_purge_remote (int argc, char **argv) {
296         const rd_bool_t has_idempotence =
297                 test_broker_version >= TEST_BRKVER(0,11,0,0);
298 
299         do_test_purge("remote", 1/*remote*/, 0/*idempotence*/, 0/*!gapless*/);
300 
301         if (has_idempotence) {
302                 do_test_purge("remote,idempotence",
303                               1/*remote*/, 1/*idempotence*/, 0/*!gapless*/);
304                 do_test_purge("remote,idempotence,gapless",
305                               1/*remote*/, 1/*idempotence*/, 1/*!gapless*/);
306         }
307         return 0;
308 }
309 
310 
main_0086_purge_local(int argc,char ** argv)311 int main_0086_purge_local (int argc, char **argv) {
312         do_test_purge("local", 0/*local*/, 0, 0);
313         return 0;
314 }
315