1 /*
2  * librdkafka - Apache Kafka C library
3  *
4  * Copyright (c) 2012-2015, Magnus Edenhill
5  * All rights reserved.
6  *
7  * Redistribution and use in source and binary forms, with or without
8  * modification, are permitted provided that the following conditions are met:
9  *
10  * 1. Redistributions of source code must retain the above copyright notice,
11  *    this list of conditions and the following disclaimer.
12  * 2. Redistributions in binary form must reproduce the above copyright notice,
13  *    this list of conditions and the following disclaimer in the documentation
14  *    and/or other materials provided with the distribution.
15  *
16  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
17  * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
18  * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
19  * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
20  * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
21  * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
22  * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
23  * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
24  * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
25  * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
26  * POSSIBILITY OF SUCH DAMAGE.
27  */
28 
29 #include "test.h"
30 
31 /* Typical include path would be <librdkafka/rdkafka.h>, but this program
32  * is built from within the librdkafka source tree and thus differs. */
33 #include "rdkafka.h" /* for Kafka driver */
34 
35 /**
36  * KafkaConsumer balanced group with multithreading tests
37  *
38  * Runs a consumer subscribing to a topic with multiple partitions and farms
39  * consuming of each partition to a separate thread.
40  */
41 
42 #define MAX_THRD_CNT 4
43 
44 static int assign_cnt = 0;
45 static int consumed_msg_cnt = 0;
46 static int consumers_running = 0;
47 static int exp_msg_cnt;
48 
49 static mtx_t lock;
50 static thrd_t tids[MAX_THRD_CNT];
51 
52 typedef struct part_consume_info_s {
53         rd_kafka_queue_t * rkqu;
54         int partition;
55 } part_consume_info_t;
56 
is_consuming()57 static int is_consuming () {
58         int result;
59         mtx_lock(&lock);
60         result = consumers_running;
61         mtx_unlock(&lock);
62         return result;
63 }
64 
partition_consume(void * args)65 static int partition_consume (void *args) {
66         part_consume_info_t *info = (part_consume_info_t *)args;
67         rd_kafka_queue_t *rkqu = info->rkqu;
68         int partition = info->partition;
69         int64_t ts_start = test_clock();
70         int max_time = (test_session_timeout_ms + 3000) * 1000;
71         int running = 1;
72 
73         free(args); /* Free the parameter struct dynamically allocated for us */
74 
75         while (ts_start + max_time > test_clock() && running &&
76                is_consuming()) {
77                 rd_kafka_message_t *rkmsg;
78 
79                 rkmsg = rd_kafka_consume_queue(rkqu, 500);
80 
81                 if (!rkmsg)
82                         continue;
83                 else if (rkmsg->err == RD_KAFKA_RESP_ERR__PARTITION_EOF)
84                         running = 0;
85                 else if (rkmsg->err) {
86                         mtx_lock(&lock);
87                         TEST_FAIL("Message error "
88                                   "(at offset %" PRId64 " after "
89                                   "%d/%d messages and %dms): %s",
90                                   rkmsg->offset, consumed_msg_cnt, exp_msg_cnt,
91                                   (int)(test_clock() - ts_start) / 1000,
92                                   rd_kafka_message_errstr(rkmsg));
93                         mtx_unlock(&lock);
94                 } else {
95                         if (rkmsg->partition != partition) {
96                                 mtx_lock(&lock);
97                                 TEST_FAIL("Message consumed has partition %d "
98                                           "but we expected partition %d.",
99                                           rkmsg->partition, partition);
100                                 mtx_unlock(&lock);
101                         }
102                 }
103                 rd_kafka_message_destroy(rkmsg);
104 
105                 mtx_lock(&lock);
106                 if (running && ++consumed_msg_cnt >= exp_msg_cnt) {
107                         TEST_SAY("All messages consumed\n");
108                         running = 0;
109                 }
110                 mtx_unlock(&lock);
111         }
112 
113         rd_kafka_queue_destroy(rkqu);
114 
115         return thrd_success;
116 }
117 
spawn_thread(rd_kafka_queue_t * rkqu,int partition)118 static thrd_t spawn_thread (rd_kafka_queue_t *rkqu, int partition) {
119         thrd_t thr;
120         part_consume_info_t *info = malloc(sizeof(part_consume_info_t));
121 
122         info->rkqu = rkqu;
123         info->partition = partition;
124 
125         if (thrd_create(&thr, &partition_consume, info) != thrd_success) {
126                 TEST_FAIL("Failed to create consumer thread.");
127         }
128         return thr;
129 }
130 
131 static int rebalanced = 0;
132 
rebalance_cb(rd_kafka_t * rk,rd_kafka_resp_err_t err,rd_kafka_topic_partition_list_t * partitions,void * opaque)133 static void rebalance_cb (rd_kafka_t *rk, rd_kafka_resp_err_t err,
134                          rd_kafka_topic_partition_list_t *partitions,
135                          void *opaque) {
136         int i;
137         char *memberid = rd_kafka_memberid(rk);
138 
139         TEST_SAY("%s: MemberId \"%s\": Consumer group rebalanced: %s\n",
140                  rd_kafka_name(rk), memberid, rd_kafka_err2str(err));
141 
142         if (memberid)
143                 free(memberid);
144 
145         test_print_partition_list(partitions);
146 
147         switch (err) {
148         case RD_KAFKA_RESP_ERR__ASSIGN_PARTITIONS:
149                 assign_cnt++;
150 
151                 rd_kafka_assign(rk, partitions);
152                 mtx_lock(&lock);
153                 consumers_running = 1;
154                 mtx_unlock(&lock);
155 
156                 for (i = 0; i < partitions->cnt && i < MAX_THRD_CNT; ++i) {
157                         rd_kafka_topic_partition_t part = partitions->elems[i];
158                         rd_kafka_queue_t *rkqu;
159                         /* This queue is loosed in partition-consume. */
160                         rkqu = rd_kafka_queue_get_partition(rk, part.topic,
161                                                             part.partition);
162 
163                         rd_kafka_queue_forward(rkqu, NULL);
164                         tids[part.partition] = spawn_thread(rkqu,
165                                                             part.partition);
166                 }
167 
168                 rebalanced = 1;
169 
170                 break;
171 
172         case RD_KAFKA_RESP_ERR__REVOKE_PARTITIONS:
173                 if (assign_cnt == 0)
174                         TEST_FAIL("asymetric rebalance_cb");
175                 assign_cnt--;
176                 rd_kafka_assign(rk, NULL);
177                 mtx_lock(&lock);
178                 consumers_running = 0;
179                 mtx_unlock(&lock);
180 
181                 break;
182 
183         default:
184                 TEST_FAIL("rebalance failed: %s", rd_kafka_err2str(err));
185                 break;
186         }
187 }
188 
get_assignment(rd_kafka_t * rk_c)189 static void get_assignment (rd_kafka_t *rk_c) {
190         while (!rebalanced) {
191                 rd_kafka_message_t *rkmsg;
192                 rkmsg = rd_kafka_consumer_poll(rk_c, 500);
193                 if (rkmsg)
194                         rd_kafka_message_destroy(rkmsg);
195         }
196 }
197 
main_0056_balanced_group_mt(int argc,char ** argv)198 int main_0056_balanced_group_mt (int argc, char **argv) {
199         const char *topic = test_mk_topic_name(__FUNCTION__, 1);
200         rd_kafka_t *rk_p, *rk_c;
201         rd_kafka_topic_t *rkt_p;
202         int msg_cnt = test_quick ? 100 : 1000;
203         int msg_base = 0;
204         int partition_cnt = 2;
205         int partition;
206         uint64_t testid;
207         rd_kafka_conf_t *conf;
208         rd_kafka_topic_conf_t *default_topic_conf;
209         rd_kafka_topic_partition_list_t *sub, *topics;
210         rd_kafka_resp_err_t err;
211         test_timing_t t_assign, t_close, t_consume;
212         int i;
213 
214         exp_msg_cnt = msg_cnt * partition_cnt;
215 
216         testid = test_id_generate();
217 
218         /* Produce messages */
219         rk_p = test_create_producer();
220         rkt_p = test_create_producer_topic(rk_p, topic, NULL);
221 
222         for (partition = 0; partition < partition_cnt; partition++) {
223                 test_produce_msgs(rk_p, rkt_p, testid, partition,
224                                   msg_base + (partition * msg_cnt), msg_cnt,
225                                   NULL, 0);
226         }
227 
228         rd_kafka_topic_destroy(rkt_p);
229         rd_kafka_destroy(rk_p);
230 
231         if (mtx_init(&lock, mtx_plain) != thrd_success)
232                 TEST_FAIL("Cannot create mutex.");
233 
234         test_conf_init(&conf, &default_topic_conf,
235                        (test_session_timeout_ms * 3) / 1000);
236 
237         test_conf_set(conf, "enable.partition.eof", "true");
238 
239         test_topic_conf_set(default_topic_conf, "auto.offset.reset",
240                             "smallest");
241 
242         /* Fill in topic subscription set */
243         topics = rd_kafka_topic_partition_list_new(1);
244         rd_kafka_topic_partition_list_add(topics, topic, RD_KAFKA_PARTITION_UA);
245 
246         /* Create consumers and start subscription */
247         rk_c = test_create_consumer(
248                 topic /*group_id*/, rebalance_cb,
249                 conf, default_topic_conf);
250 
251         test_consumer_subscribe(rk_c, topic);
252 
253         rd_kafka_topic_partition_list_destroy(topics);
254 
255         /* Wait for both consumers to get an assignment */
256         TIMING_START(&t_assign, "WAIT.ASSIGN");
257         get_assignment(rk_c);
258         TIMING_STOP(&t_assign);
259 
260         TIMING_START(&t_consume, "CONSUME.WAIT");
261         for (i = 0; i < MAX_THRD_CNT; ++i) {
262                 int res;
263                 if (tids[i] != 0)
264                         thrd_join(tids[i], &res);
265         }
266         TIMING_STOP(&t_consume);
267 
268         TEST_SAY("Closing remaining consumers\n");
269         /* Query subscription */
270         err = rd_kafka_subscription(rk_c, &sub);
271         TEST_ASSERT(!err, "%s: subscription () failed: %s", rd_kafka_name(rk_c),
272                     rd_kafka_err2str(err));
273         TEST_SAY("%s: subscription (%d):\n", rd_kafka_name(rk_c), sub->cnt);
274         for (i = 0; i < sub->cnt; ++i)
275                 TEST_SAY(" %s\n", sub->elems[i].topic);
276         rd_kafka_topic_partition_list_destroy(sub);
277 
278         /* Run an explicit unsubscribe () (async) prior to close ()
279          * to trigger race condition issues on termination. */
280         TEST_SAY("Unsubscribing instance %s\n", rd_kafka_name(rk_c));
281         err = rd_kafka_unsubscribe(rk_c);
282         TEST_ASSERT(!err, "%s: unsubscribe failed: %s", rd_kafka_name(rk_c),
283                     rd_kafka_err2str(err));
284 
285         TEST_SAY("Closing %s\n", rd_kafka_name(rk_c));
286         TIMING_START(&t_close, "CONSUMER.CLOSE");
287         err = rd_kafka_consumer_close(rk_c);
288         TIMING_STOP(&t_close);
289         TEST_ASSERT(!err, "consumer_close failed: %s", rd_kafka_err2str(err));
290 
291         rd_kafka_destroy(rk_c);
292         rk_c = NULL;
293 
294         TEST_SAY("%d/%d messages consumed\n", consumed_msg_cnt, exp_msg_cnt);
295         TEST_ASSERT(consumed_msg_cnt >= exp_msg_cnt,
296                     "Only %d/%d messages were consumed", consumed_msg_cnt,
297                     exp_msg_cnt);
298 
299         if (consumed_msg_cnt > exp_msg_cnt)
300                 TEST_SAY("At least %d/%d messages were consumed "
301                          "multiple times\n",
302                          consumed_msg_cnt - exp_msg_cnt, exp_msg_cnt);
303 
304         mtx_destroy(&lock);
305 
306         return 0;
307 }
308