1 /*
2  * librdkafka - Apache Kafka C library
3  *
4  * Copyright (c) 2012-2015, Magnus Edenhill
5  * All rights reserved.
6  *
7  * Redistribution and use in source and binary forms, with or without
8  * modification, are permitted provided that the following conditions are met:
9  *
10  * 1. Redistributions of source code must retain the above copyright notice,
11  *    this list of conditions and the following disclaimer.
12  * 2. Redistributions in binary form must reproduce the above copyright notice,
13  *    this list of conditions and the following disclaimer in the documentation
14  *    and/or other materials provided with the distribution.
15  *
16  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
17  * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
18  * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
19  * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
20  * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
21  * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
22  * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
23  * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
24  * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
25  * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
26  * POSSIBILITY OF SUCH DAMAGE.
27  */
28 
29 #include "test.h"
30 
31 /* Typical include path would be <librdkafka/rdkafka.h>, but this program
32  * is built from within the librdkafka source tree and thus differs. */
33 #include "rdkafka.h"  /* for Kafka driver */
34 
35 
36 /**
37  * Test long topic names (>=255 characters), issue #529.
38  * This broker-side issue only seems to occur when explicitly creating
39  * topics with kafka-topics.sh --create, not with auto-created topics.
40  */
41 
42 
main_0028_long_topicnames(int argc,char ** argv)43 int main_0028_long_topicnames (int argc, char **argv) {
44         const int msgcnt = 1000;
45         uint64_t testid;
46 	char topic[256];
47 	rd_kafka_t *rk_c;
48 
49 	if (!test_can_create_topics(1))
50 		return 0;
51 
52 	memset(topic, 'a', sizeof(topic)-1);
53 	topic[sizeof(topic)-1] = '\0';
54 
55 	strncpy(topic, test_mk_topic_name(topic, 1), sizeof(topic)-1);
56 
57 	TEST_SAY("Using topic name of %d bytes: %s\n",
58 		 (int)strlen(topic), topic);
59 
60 	/* First try a non-verifying consumer. The consumer has been known
61 	 * to crash when the broker bug kicks in. */
62 	rk_c = test_create_consumer(topic, NULL, NULL, NULL);
63 
64         /* Create topic */
65         test_create_topic(rk_c, topic, 1, 1);
66 
67 	test_consumer_subscribe(rk_c, topic);
68 	test_consumer_poll_no_msgs("consume.nomsgs", rk_c, 0, 5000);
69 	test_consumer_close(rk_c);
70 
71         /* Produce messages */
72         testid = test_produce_msgs_easy(topic, 0,
73                                         RD_KAFKA_PARTITION_UA, msgcnt);
74 
75 	/* Consume messages */
76 	test_consume_msgs_easy(NULL, topic, testid, -1, msgcnt, NULL);
77 
78         return 0;
79 }
80