1 /*
2 * librdkafka - Apache Kafka C library
3 *
4 * Copyright (c) 2016, Magnus Edenhill
5 * All rights reserved.
6 *
7 * Redistribution and use in source and binary forms, with or without
8 * modification, are permitted provided that the following conditions are met:
9 *
10 * 1. Redistributions of source code must retain the above copyright notice,
11 * this list of conditions and the following disclaimer.
12 * 2. Redistributions in binary form must reproduce the above copyright notice,
13 * this list of conditions and the following disclaimer in the documentation
14 * and/or other materials provided with the distribution.
15 *
16 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
17 * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
18 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
19 * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
20 * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
21 * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
22 * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
23 * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
24 * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
25 * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
26 * POSSIBILITY OF SUCH DAMAGE.
27 */
28
29 #include "test.h"
30
31 /* Typical include path would be <librdkafka/rdkafka.h>, but this program
32 * is built from within the librdkafka source tree and thus differs. */
33 #include "rdkafka.h" /* for Kafka driver */
34
35
36 /**
37 * KafkaConsumer: regex topic subscriptions
38 */
39
40
41
42 struct expect {
43 char *name; /* sub-test name */
44 const char *sub[4]; /* subscriptions */
45 const char *exp[4]; /* expected topics */
46 int exp_err; /* expected error from subscribe() */
47 int stat[4]; /* per exp status */
48 int fails;
49 enum {
50 _EXP_NONE,
51 _EXP_FAIL,
52 _EXP_OK,
53 _EXP_ASSIGN,
54 _EXP_REVOKE,
55 _EXP_ASSIGNED,
56 _EXP_REVOKED,
57 } result;
58 };
59
60 static struct expect *exp_curr;
61
62 static uint64_t testid;
63
expect_match(struct expect * exp,const rd_kafka_topic_partition_list_t * parts)64 static void expect_match (struct expect *exp,
65 const rd_kafka_topic_partition_list_t *parts) {
66 int i;
67 int e = 0;
68 int fails = 0;
69
70 memset(exp->stat, 0, sizeof(exp->stat));
71
72 for (i = 0 ; i < parts->cnt ; i++) {
73 int found = 0;
74 e = 0;
75 while (exp->exp[e]) {
76 if (!strcmp(parts->elems[i].topic, exp->exp[e])) {
77 exp->stat[e]++;
78 found++;
79 }
80 e++;
81 }
82
83 if (!found) {
84 TEST_WARN("%s: got unexpected topic match: %s\n",
85 exp->name, parts->elems[i].topic);
86 fails++;
87 }
88 }
89
90
91 e = 0;
92 while (exp->exp[e]) {
93 if (!exp->stat[e]) {
94 TEST_WARN("%s: expected topic not "
95 "found in assignment: %s\n",
96 exp->name, exp->exp[e]);
97 fails++;
98 } else {
99 TEST_SAY("%s: expected topic %s seen in assignment\n",
100 exp->name, exp->exp[e]);
101 }
102 e++;
103 }
104
105 exp->fails += fails;
106 if (fails) {
107 TEST_WARN("%s: see %d previous failures\n", exp->name, fails);
108 exp->result = _EXP_FAIL;
109 } else {
110 TEST_SAY(_C_MAG "[ %s: assignment matched ]\n", exp->name);
111 exp->result = _EXP_OK;
112 }
113
114 }
115
rebalance_cb(rd_kafka_t * rk,rd_kafka_resp_err_t err,rd_kafka_topic_partition_list_t * parts,void * opaque)116 static void rebalance_cb (rd_kafka_t *rk, rd_kafka_resp_err_t err,
117 rd_kafka_topic_partition_list_t *parts, void *opaque){
118 struct expect *exp = exp_curr;
119
120 TEST_ASSERT(exp_curr, "exp_curr not set");
121
122 TEST_SAY("rebalance_cb: %s with %d partition(s)\n",
123 rd_kafka_err2str(err), parts->cnt);
124 test_print_partition_list(parts);
125
126 switch (err)
127 {
128 case RD_KAFKA_RESP_ERR__ASSIGN_PARTITIONS:
129 /* Check that provided partitions match our expectations */
130 if (exp->result != _EXP_ASSIGN) {
131 TEST_WARN("%s: rebalance called while expecting %d: "
132 "too many or undesired assignment(s?\n",
133 exp->name, exp->result);
134 }
135 expect_match(exp, parts);
136 test_consumer_assign("rebalance", rk, parts);
137 exp->result = _EXP_ASSIGNED;
138 break;
139
140 case RD_KAFKA_RESP_ERR__REVOKE_PARTITIONS:
141 if (exp->result != _EXP_REVOKE) {
142 TEST_WARN("%s: rebalance called while expecting %d: "
143 "too many or undesired assignment(s?\n",
144 exp->name, exp->result);
145 }
146
147 test_consumer_unassign("rebalance", rk);
148 exp->result = _EXP_REVOKED;
149 break;
150
151 default:
152 TEST_FAIL("rebalance_cb: error: %s", rd_kafka_err2str(err));
153 }
154 }
155
156
157 /**
158 * @brief Poll the consumer once.
159 */
consumer_poll_once(rd_kafka_t * rk)160 static void consumer_poll_once (rd_kafka_t *rk) {
161 rd_kafka_message_t *rkmessage;
162
163 rkmessage = rd_kafka_consumer_poll(rk, 1000);
164 if (!rkmessage)
165 return;
166
167 if (rkmessage->err == RD_KAFKA_RESP_ERR__PARTITION_EOF) {
168 TEST_SAY("%s [%"PRId32"] reached EOF at "
169 "offset %"PRId64"\n",
170 rd_kafka_topic_name(rkmessage->rkt),
171 rkmessage->partition,
172 rkmessage->offset);
173
174 } else if (rkmessage->err == RD_KAFKA_RESP_ERR_UNKNOWN_TOPIC_OR_PART) {
175 if (strstr(rd_kafka_topic_name(rkmessage->rkt), "NONEXIST"))
176 TEST_SAY("%s: %s: error is expected for this topic\n",
177 rd_kafka_topic_name(rkmessage->rkt),
178 rd_kafka_message_errstr(rkmessage));
179 else
180 TEST_FAIL("%s [%"PRId32"] error (offset %"PRId64"): %s",
181 rkmessage->rkt ?
182 rd_kafka_topic_name(rkmessage->rkt) :
183 "(no-topic)",
184 rkmessage->partition,
185 rkmessage->offset,
186 rd_kafka_message_errstr(rkmessage));
187 }
188
189 rd_kafka_message_destroy(rkmessage);
190 }
191
192
193
test_subscribe(rd_kafka_t * rk,struct expect * exp)194 static int test_subscribe (rd_kafka_t *rk, struct expect *exp) {
195 rd_kafka_resp_err_t err;
196 rd_kafka_topic_partition_list_t *tlist;
197 int i;
198 test_timing_t t_sub, t_assign, t_unsub;
199
200 exp_curr = exp;
201
202 test_timeout_set((test_session_timeout_ms/1000) * 3);
203
204 tlist = rd_kafka_topic_partition_list_new(4);
205 TEST_SAY(_C_MAG "[ %s: begin ]\n", exp->name);
206 i = 0;
207 TEST_SAY("Topic subscription:\n");
208 while (exp->sub[i]) {
209 TEST_SAY("%s: %s\n", exp->name, exp->sub[i]);
210 rd_kafka_topic_partition_list_add(tlist, exp->sub[i],
211 RD_KAFKA_PARTITION_UA);
212 i++;
213 }
214
215 /* Subscribe */
216 TIMING_START(&t_sub, "subscribe");
217 err = rd_kafka_subscribe(rk, tlist);
218 TIMING_STOP(&t_sub);
219 TEST_ASSERT(err == exp->exp_err,
220 "subscribe() failed: %s (expected %s)",
221 rd_kafka_err2str(err), rd_kafka_err2str(exp->exp_err));
222
223 if (exp->exp[0]) {
224 /* Wait for assignment, actual messages are ignored. */
225 exp->result = _EXP_ASSIGN;
226 TEST_SAY("%s: waiting for assignment\n", exp->name);
227 TIMING_START(&t_assign, "assignment");
228 while (exp->result == _EXP_ASSIGN)
229 consumer_poll_once(rk);
230 TIMING_STOP(&t_assign);
231 TEST_ASSERT(exp->result == _EXP_ASSIGNED,
232 "got %d instead of assignment", exp->result);
233
234 } else {
235 /* Not expecting any assignment */
236 int64_t ts_end = test_clock() + 5000;
237 exp->result = _EXP_NONE; /* Not expecting a rebalance */
238 while (exp->result == _EXP_NONE && test_clock() < ts_end)
239 consumer_poll_once(rk);
240 TEST_ASSERT(exp->result == _EXP_NONE);
241 }
242
243 /* Unsubscribe */
244 TIMING_START(&t_unsub, "unsubscribe");
245 err = rd_kafka_unsubscribe(rk);
246 TIMING_STOP(&t_unsub);
247 TEST_ASSERT(!err, "unsubscribe() failed: %s", rd_kafka_err2str(err));
248
249 rd_kafka_topic_partition_list_destroy(tlist);
250
251 if (exp->exp[0]) {
252 /* Wait for revoke, actual messages are ignored. */
253 TEST_SAY("%s: waiting for revoke\n", exp->name);
254 exp->result = _EXP_REVOKE;
255 TIMING_START(&t_assign, "revoke");
256 while (exp->result != _EXP_REVOKED)
257 consumer_poll_once(rk);
258 TIMING_STOP(&t_assign);
259 TEST_ASSERT(exp->result == _EXP_REVOKED,
260 "got %d instead of revoke", exp->result);
261 } else {
262 /* Not expecting any revoke */
263 int64_t ts_end = test_clock() + 5000;
264 exp->result = _EXP_NONE; /* Not expecting a rebalance */
265 while (exp->result == _EXP_NONE && test_clock() < ts_end)
266 consumer_poll_once(rk);
267 TEST_ASSERT(exp->result == _EXP_NONE);
268 }
269
270 TEST_SAY(_C_MAG "[ %s: done with %d failures ]\n", exp->name, exp->fails);
271
272 return exp->fails;
273 }
274
275
do_test(const char * assignor)276 static int do_test (const char *assignor) {
277 static char topics[3][128];
278 static char nonexist_topic[128];
279 const int topic_cnt = 3;
280 rd_kafka_t *rk;
281 const int msgcnt = 10;
282 int i;
283 char groupid[64];
284 int fails = 0;
285 rd_kafka_conf_t *conf;
286
287 if (!test_check_builtin("regex")) {
288 TEST_SKIP("regex support not built in\n");
289 return 0;
290 }
291
292 testid = test_id_generate();
293 test_str_id_generate(groupid, sizeof(groupid));
294
295 rd_snprintf(topics[0], sizeof(topics[0]),
296 "%s_%s",
297 test_mk_topic_name("regex_subscribe_TOPIC_0001_UNO", 0),
298 groupid);
299 rd_snprintf(topics[1], sizeof(topics[1]),
300 "%s_%s",
301 test_mk_topic_name("regex_subscribe_topic_0002_dup", 0),
302 groupid);
303 rd_snprintf(topics[2], sizeof(topics[2]),
304 "%s_%s",
305 test_mk_topic_name("regex_subscribe_TOOTHPIC_0003_3", 0),
306 groupid);
307
308 /* To avoid auto topic creation to kick in we use
309 * an invalid topic name. */
310 rd_snprintf(nonexist_topic, sizeof(nonexist_topic),
311 "%s_%s",
312 test_mk_topic_name("regex_subscribe_NONEXISTENT_0004_IV#!",
313 0),
314 groupid);
315
316 /* Produce messages to topics to ensure creation. */
317 for (i = 0 ; i < topic_cnt ; i++)
318 test_produce_msgs_easy(topics[i], testid,
319 RD_KAFKA_PARTITION_UA, msgcnt);
320
321 test_conf_init(&conf, NULL, 20);
322 test_conf_set(conf, "partition.assignment.strategy", assignor);
323 /* Speed up propagation of new topics */
324 test_conf_set(conf, "topic.metadata.refresh.interval.ms", "5000");
325 test_conf_set(conf, "allow.auto.create.topics", "true");
326
327 /* Create a single consumer to handle all subscriptions.
328 * Has the nice side affect of testing multiple subscriptions. */
329 rk = test_create_consumer(groupid, rebalance_cb, conf, NULL);
330
331 /*
332 * Test cases
333 */
334 {
335 struct expect expect = {
336 .name = rd_strdup(tsprintf("%s: no regexps (0&1)",
337 assignor)),
338 .sub = { topics[0], topics[1], NULL },
339 .exp = { topics[0], topics[1], NULL }
340 };
341
342 fails += test_subscribe(rk, &expect);
343 rd_free(expect.name);
344 }
345
346 {
347 struct expect expect = {
348 .name = rd_strdup(tsprintf("%s: no regexps "
349 "(no matches)",
350 assignor)),
351 .sub = { nonexist_topic, NULL },
352 .exp = { NULL }
353 };
354
355 fails += test_subscribe(rk, &expect);
356 rd_free(expect.name);
357 }
358
359 {
360 struct expect expect = {
361 .name = rd_strdup(tsprintf("%s: regex all", assignor)),
362 .sub = { rd_strdup(tsprintf("^.*_%s", groupid)), NULL },
363 .exp = { topics[0], topics[1], topics[2], NULL }
364 };
365
366 fails += test_subscribe(rk, &expect);
367 rd_free(expect.name);
368 rd_free((void*)expect.sub[0]);
369 }
370
371 {
372 struct expect expect = {
373 .name = rd_strdup(tsprintf("%s: regex 0&1", assignor)),
374 .sub = { rd_strdup(tsprintf("^.*[tToOpPiIcC]_0+[12]_[^_]+_%s",
375 groupid)), NULL },
376 .exp = { topics[0], topics[1], NULL }
377 };
378
379 fails += test_subscribe(rk, &expect);
380 rd_free(expect.name);
381 rd_free((void*)expect.sub[0]);
382 }
383
384 {
385 struct expect expect = {
386 .name = rd_strdup(tsprintf("%s: regex 2", assignor)),
387 .sub = { rd_strdup(tsprintf("^.*TOOTHPIC_000._._%s",
388 groupid)), NULL },
389 .exp = { topics[2], NULL }
390 };
391
392 fails += test_subscribe(rk, &expect);
393 rd_free(expect.name);
394 rd_free((void *)expect.sub[0]);
395 }
396
397 {
398 struct expect expect = {
399 .name = rd_strdup(tsprintf("%s: regex 2 and "
400 "nonexistent(not seen)",
401 assignor)),
402 .sub = { rd_strdup(tsprintf("^.*_000[34]_..?_%s",
403 groupid)), NULL },
404 .exp = { topics[2], NULL }
405 };
406
407 fails += test_subscribe(rk, &expect);
408 rd_free(expect.name);
409 rd_free((void *)expect.sub[0]);
410 }
411
412 {
413 struct expect expect = {
414 .name = rd_strdup(tsprintf("%s: broken regex (no matches)",
415 assignor)),
416 .sub = { "^.*[0", NULL },
417 .exp = { NULL },
418 .exp_err = RD_KAFKA_RESP_ERR__INVALID_ARG
419 };
420
421 fails += test_subscribe(rk, &expect);
422 rd_free(expect.name);
423 }
424
425
426 test_consumer_close(rk);
427
428 rd_kafka_destroy(rk);
429
430 if (fails)
431 TEST_FAIL("See %d previous failures", fails);
432
433 return 0;
434 }
435
436
main_0033_regex_subscribe(int argc,char ** argv)437 int main_0033_regex_subscribe (int argc, char **argv) {
438 do_test("range");
439 do_test("roundrobin");
440 return 0;
441 }
442
443
444 /**
445 * @brief Subscription API tests that dont require a broker
446 */
main_0033_regex_subscribe_local(int argc,char ** argv)447 int main_0033_regex_subscribe_local (int argc, char **argv) {
448 rd_kafka_topic_partition_list_t *valids, *invalids, *none,
449 *empty, *alot;
450 rd_kafka_t *rk;
451 rd_kafka_conf_t *conf;
452 rd_kafka_resp_err_t err;
453 char errstr[256];
454 int i;
455
456 valids = rd_kafka_topic_partition_list_new(0);
457 invalids = rd_kafka_topic_partition_list_new(100);
458 none = rd_kafka_topic_partition_list_new(1000);
459 empty = rd_kafka_topic_partition_list_new(5);
460 alot = rd_kafka_topic_partition_list_new(1);
461
462 rd_kafka_topic_partition_list_add(valids, "not_a_regex", 0);
463 rd_kafka_topic_partition_list_add(valids, "^My[vV]alid..regex+", 0);
464 rd_kafka_topic_partition_list_add(valids, "^another_one$", 55);
465
466 rd_kafka_topic_partition_list_add(invalids, "not_a_regex", 0);
467 rd_kafka_topic_partition_list_add(invalids, "^My[vV]alid..regex+", 0);
468 rd_kafka_topic_partition_list_add(invalids, "^a[b", 99);
469
470 rd_kafka_topic_partition_list_add(empty, "not_a_regex", 0);
471 rd_kafka_topic_partition_list_add(empty, "", 0);
472 rd_kafka_topic_partition_list_add(empty, "^ok", 0);
473
474 for (i = 0 ; i < 10000 ; i++) {
475 char topic[32];
476 rd_snprintf(topic, sizeof(topic), "^Va[lLid]_regex_%d$", i);
477 rd_kafka_topic_partition_list_add(alot, topic, i);
478 }
479
480 conf = rd_kafka_conf_new();
481 test_conf_set(conf, "group.id", "group");
482 test_conf_set(conf, "client.id", test_curr->name);
483
484 rk = rd_kafka_new(RD_KAFKA_CONSUMER, conf, errstr, sizeof(errstr));
485 if (!rk)
486 TEST_FAIL("Failed to create consumer: %s", errstr);
487
488 err = rd_kafka_subscribe(rk, valids);
489 TEST_ASSERT(!err, "valids failed: %s", rd_kafka_err2str(err));
490
491 err = rd_kafka_subscribe(rk, invalids);
492 TEST_ASSERT(err == RD_KAFKA_RESP_ERR__INVALID_ARG,
493 "invalids failed with wrong return: %s",
494 rd_kafka_err2str(err));
495
496 err = rd_kafka_subscribe(rk, none);
497 TEST_ASSERT(err == RD_KAFKA_RESP_ERR__INVALID_ARG,
498 "none failed with wrong return: %s", rd_kafka_err2str(err));
499
500 err = rd_kafka_subscribe(rk, empty);
501 TEST_ASSERT(err == RD_KAFKA_RESP_ERR__INVALID_ARG,
502 "empty failed with wrong return: %s",
503 rd_kafka_err2str(err));
504
505 err = rd_kafka_subscribe(rk, alot);
506 TEST_ASSERT(!err, "alot failed: %s", rd_kafka_err2str(err));
507
508 rd_kafka_consumer_close(rk);
509 rd_kafka_destroy(rk);
510
511 rd_kafka_topic_partition_list_destroy(valids);
512 rd_kafka_topic_partition_list_destroy(invalids);
513 rd_kafka_topic_partition_list_destroy(none);
514 rd_kafka_topic_partition_list_destroy(empty);
515 rd_kafka_topic_partition_list_destroy(alot);
516
517 return 0;
518 }
519