1 /*
2  * librdkafka - Apache Kafka C library
3  *
4  * Copyright (c) 2016, Magnus Edenhill
5  * All rights reserved.
6  *
7  * Redistribution and use in source and binary forms, with or without
8  * modification, are permitted provided that the following conditions are met:
9  *
10  * 1. Redistributions of source code must retain the above copyright notice,
11  *    this list of conditions and the following disclaimer.
12  * 2. Redistributions in binary form must reproduce the above copyright notice,
13  *    this list of conditions and the following disclaimer in the documentation
14  *    and/or other materials provided with the distribution.
15  *
16  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
17  * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
18  * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
19  * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
20  * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
21  * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
22  * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
23  * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
24  * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
25  * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
26  * POSSIBILITY OF SUCH DAMAGE.
27  */
28 
29 #include "test.h"
30 
31 /* Typical include path would be <librdkafka/rdkafka.h>, but this program
32  * is built from within the librdkafka source tree and thus differs. */
33 #include "rdkafka.h"  /* for Kafka driver */
34 
35 
36 /**
37  * KafkaConsumer: regex topic subscriptions
38  */
39 
40 
41 
42 struct expect {
43 	char *name;           /* sub-test name */
44 	const char *sub[4];  /* subscriptions */
45 	const char *exp[4];  /* expected topics */
46         int         exp_err; /* expected error from subscribe() */
47 	int         stat[4]; /* per exp status */
48 	int         fails;
49 	enum {
50 		_EXP_NONE,
51 		_EXP_FAIL,
52 		_EXP_OK,
53 		_EXP_ASSIGN,
54 		_EXP_REVOKE,
55 		_EXP_ASSIGNED,
56 		_EXP_REVOKED,
57 	} result;
58 };
59 
60 static struct expect *exp_curr;
61 
62 static uint64_t testid;
63 
expect_match(struct expect * exp,const rd_kafka_topic_partition_list_t * parts)64 static void expect_match (struct expect *exp,
65 			  const rd_kafka_topic_partition_list_t *parts) {
66 	int i;
67 	int e = 0;
68 	int fails = 0;
69 
70 	memset(exp->stat, 0, sizeof(exp->stat));
71 
72 	for (i = 0 ; i < parts->cnt ; i++) {
73 		int found = 0;
74 		e = 0;
75 		while (exp->exp[e]) {
76 			if (!strcmp(parts->elems[i].topic, exp->exp[e])) {
77 				exp->stat[e]++;
78 				found++;
79 			}
80 			e++;
81 		}
82 
83 		if (!found) {
84 			TEST_WARN("%s: got unexpected topic match: %s\n",
85 				  exp->name, parts->elems[i].topic);
86 			fails++;
87 		}
88 	}
89 
90 
91 	e = 0;
92 	while (exp->exp[e]) {
93 		if (!exp->stat[e]) {
94 			TEST_WARN("%s: expected topic not "
95 				  "found in assignment: %s\n",
96 				  exp->name, exp->exp[e]);
97 			fails++;
98 		} else {
99 			TEST_SAY("%s: expected topic %s seen in assignment\n",
100 				 exp->name, exp->exp[e]);
101 		}
102 		e++;
103 	}
104 
105 	exp->fails += fails;
106 	if (fails) {
107 		TEST_WARN("%s: see %d previous failures\n", exp->name, fails);
108 		exp->result = _EXP_FAIL;
109 	} else {
110 		TEST_SAY(_C_MAG "[ %s: assignment matched ]\n", exp->name);
111 		exp->result = _EXP_OK;
112 	}
113 
114 }
115 
rebalance_cb(rd_kafka_t * rk,rd_kafka_resp_err_t err,rd_kafka_topic_partition_list_t * parts,void * opaque)116 static void rebalance_cb (rd_kafka_t *rk, rd_kafka_resp_err_t err,
117 			  rd_kafka_topic_partition_list_t *parts, void *opaque){
118 	struct expect *exp = exp_curr;
119 
120 	TEST_ASSERT(exp_curr, "exp_curr not set");
121 
122 	TEST_SAY("rebalance_cb: %s with %d partition(s)\n",
123 		 rd_kafka_err2str(err), parts->cnt);
124 	test_print_partition_list(parts);
125 
126 	switch (err)
127 	{
128 	case RD_KAFKA_RESP_ERR__ASSIGN_PARTITIONS:
129 		/* Check that provided partitions match our expectations */
130 		if (exp->result != _EXP_ASSIGN) {
131 			TEST_WARN("%s: rebalance called while expecting %d: "
132 				  "too many or undesired assignment(s?\n",
133 				  exp->name, exp->result);
134 		}
135 		expect_match(exp, parts);
136 		test_consumer_assign("rebalance", rk, parts);
137 		exp->result = _EXP_ASSIGNED;
138 		break;
139 
140 	case RD_KAFKA_RESP_ERR__REVOKE_PARTITIONS:
141 		if (exp->result != _EXP_REVOKE) {
142 			TEST_WARN("%s: rebalance called while expecting %d: "
143 				  "too many or undesired assignment(s?\n",
144 				  exp->name, exp->result);
145 		}
146 
147 		test_consumer_unassign("rebalance", rk);
148 		exp->result = _EXP_REVOKED;
149 		break;
150 
151 	default:
152 		TEST_FAIL("rebalance_cb: error: %s", rd_kafka_err2str(err));
153 	}
154 }
155 
156 
157 /**
158  * @brief Poll the consumer once.
159  */
consumer_poll_once(rd_kafka_t * rk)160 static void consumer_poll_once (rd_kafka_t *rk) {
161 	rd_kafka_message_t *rkmessage;
162 
163 	rkmessage = rd_kafka_consumer_poll(rk, 1000);
164 	if (!rkmessage)
165                 return;
166 
167 	if (rkmessage->err == RD_KAFKA_RESP_ERR__PARTITION_EOF) {
168 		TEST_SAY("%s [%"PRId32"] reached EOF at "
169 			 "offset %"PRId64"\n",
170 			 rd_kafka_topic_name(rkmessage->rkt),
171 			 rkmessage->partition,
172 			 rkmessage->offset);
173 
174         } else if (rkmessage->err == RD_KAFKA_RESP_ERR_UNKNOWN_TOPIC_OR_PART) {
175                 if (strstr(rd_kafka_topic_name(rkmessage->rkt), "NONEXIST"))
176                         TEST_SAY("%s: %s: error is expected for this topic\n",
177                                  rd_kafka_topic_name(rkmessage->rkt),
178                                  rd_kafka_message_errstr(rkmessage));
179                 else
180                         TEST_FAIL("%s [%"PRId32"] error (offset %"PRId64"): %s",
181                                   rkmessage->rkt ?
182                                   rd_kafka_topic_name(rkmessage->rkt) :
183                                   "(no-topic)",
184                                   rkmessage->partition,
185                                   rkmessage->offset,
186                                   rd_kafka_message_errstr(rkmessage));
187 	}
188 
189 	rd_kafka_message_destroy(rkmessage);
190 }
191 
192 
193 
test_subscribe(rd_kafka_t * rk,struct expect * exp)194 static int test_subscribe (rd_kafka_t *rk, struct expect *exp) {
195 	rd_kafka_resp_err_t err;
196 	rd_kafka_topic_partition_list_t *tlist;
197 	int i;
198 	test_timing_t t_sub, t_assign, t_unsub;
199 
200 	exp_curr = exp;
201 
202 	test_timeout_set((test_session_timeout_ms/1000) * 3);
203 
204 	tlist = rd_kafka_topic_partition_list_new(4);
205 	TEST_SAY(_C_MAG "[ %s: begin ]\n", exp->name);
206 	i = 0;
207 	TEST_SAY("Topic subscription:\n");
208 	while (exp->sub[i]) {
209 		TEST_SAY("%s:  %s\n", exp->name, exp->sub[i]);
210 		rd_kafka_topic_partition_list_add(tlist, exp->sub[i],
211 						  RD_KAFKA_PARTITION_UA);
212 		i++;
213 	}
214 
215 	/* Subscribe */
216 	TIMING_START(&t_sub, "subscribe");
217 	err = rd_kafka_subscribe(rk, tlist);
218 	TIMING_STOP(&t_sub);
219 	TEST_ASSERT(err == exp->exp_err,
220                     "subscribe() failed: %s (expected %s)",
221                     rd_kafka_err2str(err), rd_kafka_err2str(exp->exp_err));
222 
223 	if (exp->exp[0]) {
224 		/* Wait for assignment, actual messages are ignored. */
225 		exp->result = _EXP_ASSIGN;
226 		TEST_SAY("%s: waiting for assignment\n", exp->name);
227 		TIMING_START(&t_assign, "assignment");
228 		while (exp->result == _EXP_ASSIGN)
229 			consumer_poll_once(rk);
230 		TIMING_STOP(&t_assign);
231 		TEST_ASSERT(exp->result == _EXP_ASSIGNED,
232 			    "got %d instead of assignment", exp->result);
233 
234 	} else {
235 		/* Not expecting any assignment */
236 		int64_t ts_end = test_clock() + 5000;
237 		exp->result = _EXP_NONE; /* Not expecting a rebalance */
238 		while (exp->result == _EXP_NONE && test_clock() < ts_end)
239 			consumer_poll_once(rk);
240 		TEST_ASSERT(exp->result == _EXP_NONE);
241 	}
242 
243 	/* Unsubscribe */
244 	TIMING_START(&t_unsub, "unsubscribe");
245 	err = rd_kafka_unsubscribe(rk);
246 	TIMING_STOP(&t_unsub);
247 	TEST_ASSERT(!err, "unsubscribe() failed: %s", rd_kafka_err2str(err));
248 
249 	rd_kafka_topic_partition_list_destroy(tlist);
250 
251 	if (exp->exp[0]) {
252 		/* Wait for revoke, actual messages are ignored. */
253 		TEST_SAY("%s: waiting for revoke\n", exp->name);
254 		exp->result = _EXP_REVOKE;
255 		TIMING_START(&t_assign, "revoke");
256 		while (exp->result != _EXP_REVOKED)
257 			consumer_poll_once(rk);
258 		TIMING_STOP(&t_assign);
259 		TEST_ASSERT(exp->result == _EXP_REVOKED,
260 			    "got %d instead of revoke", exp->result);
261 	} else {
262 		/* Not expecting any revoke */
263 		int64_t ts_end = test_clock() + 5000;
264 		exp->result = _EXP_NONE; /* Not expecting a rebalance */
265 		while (exp->result == _EXP_NONE && test_clock() < ts_end)
266 			consumer_poll_once(rk);
267 		TEST_ASSERT(exp->result == _EXP_NONE);
268 	}
269 
270 	TEST_SAY(_C_MAG "[ %s: done with %d failures ]\n", exp->name, exp->fails);
271 
272 	return exp->fails;
273 }
274 
275 
do_test(const char * assignor)276 static int do_test (const char *assignor) {
277 	static char topics[3][128];
278 	static char nonexist_topic[128];
279 	const int topic_cnt = 3;
280 	rd_kafka_t *rk;
281 	const int msgcnt = 10;
282 	int i;
283 	char groupid[64];
284 	int fails = 0;
285 	rd_kafka_conf_t *conf;
286 
287 	if (!test_check_builtin("regex")) {
288 		TEST_SKIP("regex support not built in\n");
289 		return 0;
290 	}
291 
292 	testid = test_id_generate();
293 	test_str_id_generate(groupid, sizeof(groupid));
294 
295 	rd_snprintf(topics[0], sizeof(topics[0]),
296 		    "%s_%s",
297 		    test_mk_topic_name("regex_subscribe_TOPIC_0001_UNO", 0),
298 		    groupid);
299 	rd_snprintf(topics[1], sizeof(topics[1]),
300 		    "%s_%s",
301 		    test_mk_topic_name("regex_subscribe_topic_0002_dup", 0),
302 		    groupid);
303 	rd_snprintf(topics[2], sizeof(topics[2]),
304 		    "%s_%s",
305 		    test_mk_topic_name("regex_subscribe_TOOTHPIC_0003_3", 0),
306 		    groupid);
307 
308         /* To avoid auto topic creation to kick in we use
309          * an invalid topic name. */
310 	rd_snprintf(nonexist_topic, sizeof(nonexist_topic),
311 		    "%s_%s",
312 		    test_mk_topic_name("regex_subscribe_NONEXISTENT_0004_IV#!",
313                                        0),
314 		    groupid);
315 
316 	/* Produce messages to topics to ensure creation. */
317 	for (i = 0 ; i < topic_cnt ; i++)
318 		test_produce_msgs_easy(topics[i], testid,
319 				       RD_KAFKA_PARTITION_UA, msgcnt);
320 
321 	test_conf_init(&conf, NULL, 20);
322 	test_conf_set(conf, "partition.assignment.strategy", assignor);
323 	/* Speed up propagation of new topics */
324 	test_conf_set(conf, "topic.metadata.refresh.interval.ms", "5000");
325         test_conf_set(conf, "allow.auto.create.topics", "true");
326 
327 	/* Create a single consumer to handle all subscriptions.
328 	 * Has the nice side affect of testing multiple subscriptions. */
329 	rk = test_create_consumer(groupid, rebalance_cb, conf, NULL);
330 
331 	/*
332 	 * Test cases
333 	 */
334 	{
335 		struct expect expect = {
336 			.name = rd_strdup(tsprintf("%s: no regexps (0&1)",
337 						   assignor)),
338 			.sub = { topics[0], topics[1], NULL },
339 			.exp = { topics[0], topics[1], NULL }
340 		};
341 
342 		fails += test_subscribe(rk, &expect);
343 		rd_free(expect.name);
344 	}
345 
346 	{
347 		struct expect expect = {
348 			.name = rd_strdup(tsprintf("%s: no regexps "
349 						   "(no matches)",
350 						   assignor)),
351 			.sub = { nonexist_topic, NULL },
352 			.exp = { NULL }
353 		};
354 
355 		fails += test_subscribe(rk, &expect);
356 		rd_free(expect.name);
357 	}
358 
359 	{
360 		struct expect expect = {
361 			.name = rd_strdup(tsprintf("%s: regex all", assignor)),
362 			.sub = { rd_strdup(tsprintf("^.*_%s", groupid)), NULL },
363 			.exp = { topics[0], topics[1], topics[2], NULL }
364 		};
365 
366 		fails += test_subscribe(rk, &expect);
367 		rd_free(expect.name);
368 		rd_free((void*)expect.sub[0]);
369 	}
370 
371 	{
372 		struct expect expect = {
373 			.name = rd_strdup(tsprintf("%s: regex 0&1", assignor)),
374 			.sub = { rd_strdup(tsprintf("^.*[tToOpPiIcC]_0+[12]_[^_]+_%s",
375 						    groupid)), NULL },
376 			.exp = { topics[0], topics[1], NULL }
377 		};
378 
379 		fails += test_subscribe(rk, &expect);
380 		rd_free(expect.name);
381 		rd_free((void*)expect.sub[0]);
382 	}
383 
384 	{
385 		struct expect expect = {
386 			.name = rd_strdup(tsprintf("%s: regex 2", assignor)),
387 			.sub = { rd_strdup(tsprintf("^.*TOOTHPIC_000._._%s",
388 						    groupid)), NULL },
389 			.exp = { topics[2], NULL }
390 		};
391 
392 		fails += test_subscribe(rk, &expect);
393 		rd_free(expect.name);
394 		rd_free((void *)expect.sub[0]);
395 	}
396 
397 	{
398 		struct expect expect = {
399 			.name = rd_strdup(tsprintf("%s: regex 2 and "
400 						   "nonexistent(not seen)",
401 						   assignor)),
402 			.sub = { rd_strdup(tsprintf("^.*_000[34]_..?_%s",
403 						    groupid)), NULL },
404 			.exp = { topics[2], NULL }
405 		};
406 
407 		fails += test_subscribe(rk, &expect);
408 		rd_free(expect.name);
409 		rd_free((void *)expect.sub[0]);
410 	}
411 
412 	{
413 		struct expect expect = {
414 			.name = rd_strdup(tsprintf("%s: broken regex (no matches)",
415 						   assignor)),
416 			.sub = { "^.*[0", NULL },
417 			.exp = { NULL },
418                         .exp_err = RD_KAFKA_RESP_ERR__INVALID_ARG
419 		};
420 
421 		fails += test_subscribe(rk, &expect);
422 		rd_free(expect.name);
423 	}
424 
425 
426 	test_consumer_close(rk);
427 
428 	rd_kafka_destroy(rk);
429 
430 	if (fails)
431 		TEST_FAIL("See %d previous failures", fails);
432 
433         return 0;
434 }
435 
436 
main_0033_regex_subscribe(int argc,char ** argv)437 int main_0033_regex_subscribe (int argc, char **argv) {
438 	do_test("range");
439 	do_test("roundrobin");
440 	return 0;
441 }
442 
443 
444 /**
445  * @brief Subscription API tests that dont require a broker
446  */
main_0033_regex_subscribe_local(int argc,char ** argv)447 int main_0033_regex_subscribe_local (int argc, char **argv) {
448         rd_kafka_topic_partition_list_t *valids, *invalids, *none,
449                 *empty, *alot;
450         rd_kafka_t *rk;
451         rd_kafka_conf_t *conf;
452         rd_kafka_resp_err_t err;
453         char errstr[256];
454         int i;
455 
456         valids = rd_kafka_topic_partition_list_new(0);
457         invalids = rd_kafka_topic_partition_list_new(100);
458         none = rd_kafka_topic_partition_list_new(1000);
459         empty = rd_kafka_topic_partition_list_new(5);
460         alot = rd_kafka_topic_partition_list_new(1);
461 
462         rd_kafka_topic_partition_list_add(valids, "not_a_regex", 0);
463         rd_kafka_topic_partition_list_add(valids, "^My[vV]alid..regex+", 0);
464         rd_kafka_topic_partition_list_add(valids, "^another_one$", 55);
465 
466         rd_kafka_topic_partition_list_add(invalids, "not_a_regex", 0);
467         rd_kafka_topic_partition_list_add(invalids, "^My[vV]alid..regex+", 0);
468         rd_kafka_topic_partition_list_add(invalids, "^a[b", 99);
469 
470         rd_kafka_topic_partition_list_add(empty, "not_a_regex", 0);
471         rd_kafka_topic_partition_list_add(empty, "", 0);
472         rd_kafka_topic_partition_list_add(empty, "^ok", 0);
473 
474         for (i = 0 ; i < 10000 ; i++) {
475                 char topic[32];
476                 rd_snprintf(topic, sizeof(topic), "^Va[lLid]_regex_%d$", i);
477                 rd_kafka_topic_partition_list_add(alot, topic, i);
478         }
479 
480         conf = rd_kafka_conf_new();
481         test_conf_set(conf, "group.id", "group");
482         test_conf_set(conf, "client.id", test_curr->name);
483 
484         rk = rd_kafka_new(RD_KAFKA_CONSUMER, conf, errstr, sizeof(errstr));
485         if (!rk)
486                 TEST_FAIL("Failed to create consumer: %s", errstr);
487 
488         err = rd_kafka_subscribe(rk, valids);
489         TEST_ASSERT(!err, "valids failed: %s", rd_kafka_err2str(err));
490 
491         err = rd_kafka_subscribe(rk, invalids);
492         TEST_ASSERT(err == RD_KAFKA_RESP_ERR__INVALID_ARG,
493                     "invalids failed with wrong return: %s",
494                     rd_kafka_err2str(err));
495 
496         err = rd_kafka_subscribe(rk, none);
497         TEST_ASSERT(err == RD_KAFKA_RESP_ERR__INVALID_ARG,
498                     "none failed with wrong return: %s", rd_kafka_err2str(err));
499 
500         err = rd_kafka_subscribe(rk, empty);
501         TEST_ASSERT(err == RD_KAFKA_RESP_ERR__INVALID_ARG,
502                     "empty failed with wrong return: %s",
503                     rd_kafka_err2str(err));
504 
505         err = rd_kafka_subscribe(rk, alot);
506         TEST_ASSERT(!err, "alot failed: %s", rd_kafka_err2str(err));
507 
508         rd_kafka_consumer_close(rk);
509         rd_kafka_destroy(rk);
510 
511         rd_kafka_topic_partition_list_destroy(valids);
512         rd_kafka_topic_partition_list_destroy(invalids);
513         rd_kafka_topic_partition_list_destroy(none);
514         rd_kafka_topic_partition_list_destroy(empty);
515         rd_kafka_topic_partition_list_destroy(alot);
516 
517         return 0;
518 }
519