1 /*
2 * librdkafka - Apache Kafka C library
3 *
4 * Copyright (c) 2012-2015, Magnus Edenhill
5 * All rights reserved.
6 *
7 * Redistribution and use in source and binary forms, with or without
8 * modification, are permitted provided that the following conditions are met:
9 *
10 * 1. Redistributions of source code must retain the above copyright notice,
11 * this list of conditions and the following disclaimer.
12 * 2. Redistributions in binary form must reproduce the above copyright notice,
13 * this list of conditions and the following disclaimer in the documentation
14 * and/or other materials provided with the distribution.
15 *
16 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
17 * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
18 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
19 * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
20 * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
21 * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
22 * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
23 * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
24 * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
25 * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
26 * POSSIBILITY OF SUCH DAMAGE.
27 */
28
29 #include "test.h"
30
31 /* Typical include path would be <librdkafka/rdkafka.h>, but this program
32 * is built from within the librdkafka source tree and thus differs. */
33 #include "rdkafka.h" /* for Kafka driver */
34
35
36 /**
37 * Consumer partition assignment test, without consumer group balancing.
38 */
39
40
main_0016_assign_partition(int argc,char ** argv)41 int main_0016_assign_partition (int argc, char **argv) {
42 const char *topic = test_mk_topic_name(__FUNCTION__, 1);
43 rd_kafka_t *rk_p, *rk_c;
44 rd_kafka_topic_t *rkt_p;
45 int msg_cnt = 1000;
46 int msg_base = 0;
47 int partition_cnt = 2;
48 int partition;
49 uint64_t testid;
50 rd_kafka_topic_conf_t *default_topic_conf;
51 rd_kafka_topic_partition_list_t *partitions;
52 char errstr[512];
53
54 testid = test_id_generate();
55
56 /* Produce messages */
57 rk_p = test_create_producer();
58 rkt_p = test_create_producer_topic(rk_p, topic, NULL);
59
60 for (partition = 0 ; partition < partition_cnt ; partition++) {
61 test_produce_msgs(rk_p, rkt_p, testid, partition,
62 msg_base+(partition*msg_cnt), msg_cnt,
63 NULL, 0);
64 }
65
66 rd_kafka_topic_destroy(rkt_p);
67 rd_kafka_destroy(rk_p);
68
69
70 test_conf_init(NULL, &default_topic_conf, 0);
71 if (rd_kafka_topic_conf_set(default_topic_conf, "auto.offset.reset",
72 "smallest", errstr, sizeof(errstr)) !=
73 RD_KAFKA_CONF_OK)
74 TEST_FAIL("%s\n", errstr);
75
76 rk_c = test_create_consumer(topic/*group_id*/, NULL,
77 default_topic_conf);
78
79 /* Fill in partition set */
80 partitions = rd_kafka_topic_partition_list_new(partition_cnt);
81
82 for (partition = 0 ; partition < partition_cnt ; partition++)
83 rd_kafka_topic_partition_list_add(partitions, topic, partition);
84
85 test_consumer_assign("assign.partition", rk_c, partitions);
86
87 /* Make sure all messages are available */
88 test_consumer_poll("verify.all", rk_c, testid, partition_cnt,
89 msg_base, partition_cnt * msg_cnt);
90
91 /* Stop assignments */
92 test_consumer_unassign("unassign.partitions", rk_c);
93
94 #if 0 // FIXME when get_offset() is functional
95 /* Acquire stored offsets */
96 for (partition = 0 ; partition < partition_cnt ; partition++) {
97 rd_kafka_resp_err_t err;
98 rd_kafka_topic_t *rkt_c = rd_kafka_topic_new(rk_c, topic, NULL);
99 int64_t offset;
100 test_timing_t t_offs;
101
102 TIMING_START(&t_offs, "GET.OFFSET");
103 err = rd_kafka_consumer_get_offset(rkt_c, partition,
104 &offset, 5000);
105 TIMING_STOP(&t_offs);
106 if (err)
107 TEST_FAIL("Failed to get offsets for %s [%"PRId32"]: "
108 "%s\n",
109 rd_kafka_topic_name(rkt_c), partition,
110 rd_kafka_err2str(err));
111 TEST_SAY("get_offset for %s [%"PRId32"] returned %"PRId64"\n",
112 rd_kafka_topic_name(rkt_c), partition, offset);
113
114 rd_kafka_topic_destroy(rkt_c);
115 }
116 #endif
117 test_consumer_close(rk_c);
118
119 rd_kafka_destroy(rk_c);
120
121 return 0;
122 }
123