1 /*
2 * librdkafka - Apache Kafka C library
3 *
4 * Copyright (c) 2016, Magnus Edenhill
5 * All rights reserved.
6 *
7 * Redistribution and use in source and binary forms, with or without
8 * modification, are permitted provided that the following conditions are met:
9 *
10 * 1. Redistributions of source code must retain the above copyright notice,
11 * this list of conditions and the following disclaimer.
12 * 2. Redistributions in binary form must reproduce the above copyright notice,
13 * this list of conditions and the following disclaimer in the documentation
14 * and/or other materials provided with the distribution.
15 *
16 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
17 * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
18 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
19 * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
20 * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
21 * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
22 * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
23 * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
24 * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
25 * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
26 * POSSIBILITY OF SUCH DAMAGE.
27 */
28
29 #include <iostream>
30 #include "testcpp.h"
31
32 /**
33 * Verify prioritization of non-message ops.
34 * MO:
35 *
36 * - Seed topic with 1000 messages
37 * - Start consumer with auto offset commit disabled,
38 * but with commit and stats callbacks registered,
39 * - Consume one message
40 * - Commit that message manually
41 * - Consume one message per second
42 * - The commit callback should be fired within reasonable time, long before
43 * - The stats callback should behave the same.
44 * all messages are consumed.
45 */
46
47
48
49 class MyCbs : public RdKafka::OffsetCommitCb, public RdKafka::EventCb {
50 public:
51 int seen_commit;
52 int seen_stats;
53
offset_commit_cb(RdKafka::ErrorCode err,std::vector<RdKafka::TopicPartition * > & offsets)54 void offset_commit_cb (RdKafka::ErrorCode err,
55 std::vector<RdKafka::TopicPartition*>&offsets) {
56 if (err)
57 Test::Fail("Offset commit failed: " + RdKafka::err2str(err));
58
59 seen_commit++;
60 Test::Say("Got commit callback!\n");
61 }
62
event_cb(RdKafka::Event & event)63 void event_cb (RdKafka::Event &event) {
64 switch (event.type())
65 {
66 case RdKafka::Event::EVENT_STATS:
67 Test::Say("Got stats callback!\n");
68 seen_stats++;
69 break;
70 default:
71 break;
72 }
73 }
74 };
75
76
77
do_test_commit_cb(void)78 static void do_test_commit_cb (void) {
79 const int msgcnt = test_quick ? 100 : 1000;
80 std::string errstr;
81 RdKafka::ErrorCode err;
82 std::string topic = Test::mk_topic_name("0060-op_prio", 1);
83
84 test_produce_msgs_easy(topic.c_str(), 0, 0, msgcnt);
85
86 /*
87 * Create consumer
88 */
89
90 /* Create consumer */
91 RdKafka::Conf *conf;
92 Test::conf_init(&conf, NULL, 10);
93 Test::conf_set(conf, "group.id", topic);
94 Test::conf_set(conf, "socket.timeout.ms", "10000");
95 Test::conf_set(conf, "enable.auto.commit", "false");
96 Test::conf_set(conf, "enable.partition.eof", "false");
97 Test::conf_set(conf, "auto.offset.reset", "earliest");
98 Test::conf_set(conf, "statistics.interval.ms", "1000");
99
100 MyCbs cbs;
101 cbs.seen_commit = 0;
102 cbs.seen_stats = 0;
103 if (conf->set("offset_commit_cb", (RdKafka::OffsetCommitCb *)&cbs, errstr) !=
104 RdKafka::Conf::CONF_OK)
105 Test::Fail("Failed to set commit callback: " + errstr);
106 if (conf->set("event_cb", (RdKafka::EventCb *)&cbs, errstr) !=
107 RdKafka::Conf::CONF_OK)
108 Test::Fail("Failed to set event callback: " + errstr);
109
110 RdKafka::KafkaConsumer *c = RdKafka::KafkaConsumer::create(conf, errstr);
111 if (!c)
112 Test::Fail("Failed to create KafkaConsumer: " + errstr);
113 delete conf;
114
115 /* Subscribe */
116 std::vector<std::string> topics;
117 topics.push_back(topic);
118 if ((err = c->subscribe(topics)))
119 Test::Fail("subscribe failed: " + RdKafka::err2str(err));
120
121 /* Wait for messages and commit callback. */
122 Test::Say("Consuming topic " + topic + "\n");
123 int cnt = 0;
124 while (!cbs.seen_commit || !cbs.seen_stats) {
125 RdKafka::Message *msg = c->consume(tmout_multip(1000));
126 if (!msg->err()) {
127 cnt++;
128 Test::Say(tostr() << "Received message #" << cnt << "\n");
129 if (cnt > 10)
130 Test::Fail(tostr() << "Should've seen the "
131 "offset commit (" << cbs.seen_commit << ") and "
132 "stats callbacks (" << cbs.seen_stats << ") by now");
133
134 /* Commit the first message to trigger the offset commit_cb */
135 if (cnt == 1) {
136 err = c->commitAsync(msg);
137 if (err)
138 Test::Fail("commitAsync() failed: " + RdKafka::err2str(err));
139 rd_sleep(1); /* Sleep to simulate slow processing, making sure
140 * that the offset commit callback op gets
141 * inserted on the consume queue in front of
142 * the messages. */
143 }
144
145 } else if (msg->err() == RdKafka::ERR__TIMED_OUT)
146 ; /* Stil rebalancing? */
147 else
148 Test::Fail("consume() failed: " + msg->errstr());
149 delete msg;
150 }
151
152 c->close();
153 delete c;
154 }
155
156 extern "C" {
main_0060_op_prio(int argc,char ** argv)157 int main_0060_op_prio (int argc, char **argv) {
158 do_test_commit_cb();
159 return 0;
160 }
161 }
162