1 /*
2  * librdkafka - Apache Kafka C library
3  *
4  * Copyright (c) 2018, Magnus Edenhill
5  * All rights reserved.
6  *
7  * Redistribution and use in source and binary forms, with or without
8  * modification, are permitted provided that the following conditions are met:
9  *
10  * 1. Redistributions of source code must retain the above copyright notice,
11  *    this list of conditions and the following disclaimer.
12  * 2. Redistributions in binary form must reproduce the above copyright notice,
13  *    this list of conditions and the following disclaimer in the documentation
14  *    and/or other materials provided with the distribution.
15  *
16  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
17  * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
18  * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
19  * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
20  * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
21  * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
22  * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
23  * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
24  * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
25  * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
26  * POSSIBILITY OF SUCH DAMAGE.
27  */
28 
29 /**
30  * Tests the queue callback IO event signalling.
31  */
32 
33 
34 #include "test.h"
35 
36 /* Typical include path would be <librdkafka/rdkafka.h>, but this program
37  * is built from within the librdkafka source tree and thus differs. */
38 #include "rdkafka.h"  /* for Kafka driver */
39 
40 
41 /**
42  * @brief Thread safe event counter */
43 static struct {
44         mtx_t lock;
45         int count;
46 } event_receiver;
47 
48 /**
49  * @brief Event callback function. Check the opaque pointer and
50  *        increase the count of received event. */
event_cb(rd_kafka_t * rk_p,void * opaque)51 static void event_cb(rd_kafka_t *rk_p, void *opaque) {
52         TEST_ASSERT(opaque == (void*)0x1234,
53                     "Opaque pointer is not as expected (got: %p)", opaque);
54         mtx_lock(&event_receiver.lock);
55         event_receiver.count += 1;
56         mtx_unlock(&event_receiver.lock);
57 }
58 
59 /**
60  * @brief Wait for one or more events to be received.
61  *        Return 0 if no event was received within the timeout. */
wait_event_cb(int timeout_secs)62 static int wait_event_cb(int timeout_secs) {
63         int event_count = 0;
64         for (; timeout_secs >= 0; timeout_secs--) {
65                 mtx_lock(&event_receiver.lock);
66                 event_count = event_receiver.count;
67                 event_receiver.count = 0;
68                 mtx_unlock(&event_receiver.lock);
69                 if (event_count > 0 || timeout_secs == 0)
70                         return event_count;
71                 rd_sleep(1);
72         }
73         return 0;
74 }
75 
76 
main_0083_cb_event(int argc,char ** argv)77 int main_0083_cb_event (int argc, char **argv) {
78         rd_kafka_conf_t *conf;
79         rd_kafka_topic_conf_t *tconf;
80         rd_kafka_t *rk_p, *rk_c;
81         const char *topic;
82         rd_kafka_topic_t *rkt_p;
83         rd_kafka_queue_t *queue;
84         uint64_t testid;
85         int msgcnt = 100;
86         int recvd = 0;
87         int wait_multiplier = 1;
88         rd_kafka_resp_err_t err;
89         enum {
90                 _NOPE,
91                 _YEP,
92                 _REBALANCE
93         } expecting_io = _REBALANCE;
94         int callback_event_count;
95         rd_kafka_event_t *rkev;
96         int eventcnt = 0;
97 
98         mtx_init(&event_receiver.lock, mtx_plain);
99 
100         testid = test_id_generate();
101         topic = test_mk_topic_name(__FUNCTION__, 1);
102 
103         rk_p = test_create_producer();
104         rkt_p = test_create_producer_topic(rk_p, topic, NULL);
105         err = test_auto_create_topic_rkt(rk_p, rkt_p, tmout_multip(5000));
106         TEST_ASSERT(!err, "Topic auto creation failed: %s",
107                     rd_kafka_err2str(err));
108 
109         test_conf_init(&conf, &tconf, 0);
110         rd_kafka_conf_set_events(conf, RD_KAFKA_EVENT_REBALANCE);
111         test_conf_set(conf, "session.timeout.ms", "6000");
112         test_conf_set(conf, "enable.partition.eof", "false");
113         /* Speed up propagation of new topics */
114         test_conf_set(conf, "metadata.max.age.ms", "5000");
115         test_topic_conf_set(tconf, "auto.offset.reset", "earliest");
116         rk_c = test_create_consumer(topic, NULL, conf, tconf);
117 
118         queue = rd_kafka_queue_get_consumer(rk_c);
119 
120         test_consumer_subscribe(rk_c, topic);
121 
122         rd_kafka_queue_cb_event_enable(queue, event_cb, (void *)0x1234);
123 
124         /**
125          * 1) Wait for rebalance event
126          * 2) Wait 1 interval (1s) expecting no IO (nothing produced).
127          * 3) Produce half the messages
128          * 4) Expect CB
129          * 5) Consume the available messages
130          * 6) Wait 1 interval expecting no CB.
131          * 7) Produce remaing half
132          * 8) Expect CB
133          * 9) Done.
134          */
135         while (recvd < msgcnt) {
136                 TEST_SAY("Waiting for event\n");
137                 callback_event_count = wait_event_cb(1 * wait_multiplier);
138                 TEST_ASSERT(callback_event_count <= 1, "Event cb called %d times", callback_event_count);
139 
140                 if (callback_event_count == 1) {
141                         TEST_SAY("Events received: %d\n", callback_event_count);
142 
143                         while ((rkev = rd_kafka_queue_poll(queue, 0))) {
144                                 eventcnt++;
145                                 switch (rd_kafka_event_type(rkev))
146                                 {
147                                 case RD_KAFKA_EVENT_REBALANCE:
148                                         TEST_SAY("Got %s: %s\n", rd_kafka_event_name(rkev),
149                                                  rd_kafka_err2str(rd_kafka_event_error(rkev)));
150                                         if (expecting_io != _REBALANCE)
151                                                 TEST_FAIL("Got Rebalance when expecting message\n");
152                                         if (rd_kafka_event_error(rkev) == RD_KAFKA_RESP_ERR__ASSIGN_PARTITIONS) {
153                                                 rd_kafka_assign(rk_c, rd_kafka_event_topic_partition_list(rkev));
154                                                 expecting_io = _NOPE;
155                                         } else
156                                                 rd_kafka_assign(rk_c, NULL);
157                                         break;
158 
159                                 case RD_KAFKA_EVENT_FETCH:
160                                         if (expecting_io != _YEP)
161                                                 TEST_FAIL("Did not expect more messages at %d/%d\n",
162                                                           recvd, msgcnt);
163                                         recvd++;
164                                         if (recvd == (msgcnt / 2) || recvd == msgcnt)
165                                                 expecting_io = _NOPE;
166                                         break;
167 
168                                 case RD_KAFKA_EVENT_ERROR:
169                                         TEST_FAIL("Error: %s\n", rd_kafka_event_error_string(rkev));
170                                         break;
171 
172                                 default:
173                                         TEST_SAY("Ignoring event %s\n", rd_kafka_event_name(rkev));
174                                 }
175 
176                                 rd_kafka_event_destroy(rkev);
177                         }
178                         TEST_SAY("%d events, Consumed %d/%d messages\n", eventcnt, recvd, msgcnt);
179 
180                         wait_multiplier = 1;
181 
182                 } else {
183                         if (expecting_io == _REBALANCE) {
184                                 continue;
185                         } else if (expecting_io == _YEP) {
186                                 TEST_FAIL("Did not see expected IO after %d/%d msgs\n",
187                                           recvd, msgcnt);
188                         }
189 
190                         TEST_SAY("Event wait timeout (good)\n");
191                         TEST_SAY("Got idle period, producing\n");
192                         test_produce_msgs(rk_p, rkt_p, testid, 0, recvd, msgcnt/2,
193                                           NULL, 10);
194 
195                         expecting_io = _YEP;
196                         /* When running slowly (e.g., valgrind) it might take
197                          * some time before the first message is received
198                          * after producing. */
199                         wait_multiplier = 3;
200                 }
201         }
202         TEST_SAY("Done\n");
203 
204         rd_kafka_topic_destroy(rkt_p);
205         rd_kafka_destroy(rk_p);
206 
207         rd_kafka_queue_destroy(queue);
208         rd_kafka_consumer_close(rk_c);
209         rd_kafka_destroy(rk_c);
210 
211         mtx_destroy(&event_receiver.lock);
212 
213         return 0;
214 }
215