1 /*
2 * librdkafka - Apache Kafka C library
3 *
4 * Copyright (c) 2012-2015, Magnus Edenhill
5 * All rights reserved.
6 *
7 * Redistribution and use in source and binary forms, with or without
8 * modification, are permitted provided that the following conditions are met:
9 *
10 * 1. Redistributions of source code must retain the above copyright notice,
11 * this list of conditions and the following disclaimer.
12 * 2. Redistributions in binary form must reproduce the above copyright notice,
13 * this list of conditions and the following disclaimer in the documentation
14 * and/or other materials provided with the distribution.
15 *
16 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
17 * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
18 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
19 * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
20 * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
21 * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
22 * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
23 * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
24 * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
25 * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
26 * POSSIBILITY OF SUCH DAMAGE.
27 */
28
29 #include "test.h"
30 #include "rdkafka.h"
31
32
33 /**
34 * Make sure library behaves when the partition count for a topic changes.
35 * This test requires to be run under trivup to be able to use kafka-topics.sh
36 */
37
38
39
40 /**
41 * - Create topic with 2 partitions
42 * - Start producing messages to UA partition
43 * - Change to 4 partitions
44 * - Produce more messages to UA partition
45 * - Wait for DRs
46 * - Close
47 */
48
test_producer_partition_cnt_change(void)49 static void test_producer_partition_cnt_change (void) {
50 rd_kafka_t *rk;
51 rd_kafka_conf_t *conf;
52 rd_kafka_topic_t *rkt;
53 const char *topic = test_mk_topic_name(__FUNCTION__, 1);
54 const int partition_cnt = 4;
55 int msgcnt = test_quick ? 500 : 100000;
56 test_timing_t t_destroy;
57 int produced = 0;
58
59 test_conf_init(&conf, NULL, 20);
60 rd_kafka_conf_set_dr_msg_cb(conf, test_dr_msg_cb);
61 rk = test_create_handle(RD_KAFKA_PRODUCER, conf);
62
63 test_create_topic(rk, topic, partition_cnt/2, 1);
64
65 rkt = test_create_topic_object(rk, __FUNCTION__,
66 "message.timeout.ms",
67 tsprintf("%d", tmout_multip(10000)),
68 NULL);
69
70 test_produce_msgs_nowait(rk, rkt, 0, RD_KAFKA_PARTITION_UA, 0, msgcnt/2,
71 NULL, 100, 0, &produced);
72
73 test_create_partitions(rk, topic, partition_cnt);
74
75 test_produce_msgs_nowait(rk, rkt, 0, RD_KAFKA_PARTITION_UA,
76 msgcnt/2, msgcnt/2,
77 NULL, 100, 0, &produced);
78
79 test_wait_delivery(rk, &produced);
80
81 rd_kafka_topic_destroy(rkt);
82
83 TIMING_START(&t_destroy, "rd_kafka_destroy()");
84 rd_kafka_destroy(rk);
85 TIMING_STOP(&t_destroy);
86 }
87
main_0044_partition_cnt(int argc,char ** argv)88 int main_0044_partition_cnt (int argc, char **argv) {
89 if (!test_can_create_topics(1))
90 return 0;
91
92 test_producer_partition_cnt_change();
93
94 return 0;
95 }
96