1 /*
2  * librdkafka - Apache Kafka C library
3  *
4  * Copyright (c) 2016, Magnus Edenhill
5  * All rights reserved.
6  *
7  * Redistribution and use in source and binary forms, with or without
8  * modification, are permitted provided that the following conditions are met:
9  *
10  * 1. Redistributions of source code must retain the above copyright notice,
11  *    this list of conditions and the following disclaimer.
12  * 2. Redistributions in binary form must reproduce the above copyright notice,
13  *    this list of conditions and the following disclaimer in the documentation
14  *    and/or other materials provided with the distribution.
15  *
16  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
17  * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
18  * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
19  * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
20  * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
21  * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
22  * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
23  * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
24  * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
25  * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
26  * POSSIBILITY OF SUCH DAMAGE.
27  */
28 
29 #include <iostream>
30 #include "testcpp.h"
31 
32 /**
33  * Verify prioritization of non-message ops.
34  * MO:
35  *
36  *  - Seed topic with 1000 messages
37  *  - Start consumer with auto offset commit disabled,
38  *    but with commit and stats callbacks registered,
39  *  - Consume one message
40  *  - Commit that message manually
41  *  - Consume one message per second
42  *  - The commit callback should be fired within reasonable time, long before
43  *  - The stats callback should behave the same.
44  *    all messages are consumed.
45  */
46 
47 
48 
49 class MyCbs : public RdKafka::OffsetCommitCb, public RdKafka::EventCb {
50  public:
51   int seen_commit;
52   int seen_stats;
53 
offset_commit_cb(RdKafka::ErrorCode err,std::vector<RdKafka::TopicPartition * > & offsets)54   void offset_commit_cb (RdKafka::ErrorCode err,
55                          std::vector<RdKafka::TopicPartition*>&offsets) {
56     if (err)
57       Test::Fail("Offset commit failed: " + RdKafka::err2str(err));
58 
59     seen_commit++;
60     Test::Say("Got commit callback!\n");
61   }
62 
event_cb(RdKafka::Event & event)63   void event_cb (RdKafka::Event &event) {
64     switch (event.type())
65       {
66       case RdKafka::Event::EVENT_STATS:
67         Test::Say("Got stats callback!\n");
68         seen_stats++;
69         break;
70       default:
71         break;
72     }
73   }
74 };
75 
76 
77 
do_test_commit_cb(void)78 static void do_test_commit_cb (void) {
79   const int msgcnt = test_quick ? 100 : 1000;
80   std::string errstr;
81   RdKafka::ErrorCode err;
82   std::string topic = Test::mk_topic_name("0060-op_prio", 1);
83 
84   test_produce_msgs_easy(topic.c_str(), 0, 0, msgcnt);
85 
86   /*
87    * Create consumer
88    */
89 
90   /* Create consumer */
91   RdKafka::Conf *conf;
92   Test::conf_init(&conf, NULL, 10);
93   Test::conf_set(conf, "group.id", topic);
94   Test::conf_set(conf, "socket.timeout.ms", "10000");
95   Test::conf_set(conf, "enable.auto.commit", "false");
96   Test::conf_set(conf, "enable.partition.eof", "false");
97   Test::conf_set(conf, "auto.offset.reset", "earliest");
98   Test::conf_set(conf, "statistics.interval.ms", "1000");
99 
100   MyCbs cbs;
101   cbs.seen_commit = 0;
102   cbs.seen_stats  = 0;
103   if (conf->set("offset_commit_cb", (RdKafka::OffsetCommitCb *)&cbs, errstr) !=
104       RdKafka::Conf::CONF_OK)
105     Test::Fail("Failed to set commit callback: " + errstr);
106   if (conf->set("event_cb", (RdKafka::EventCb *)&cbs, errstr) !=
107       RdKafka::Conf::CONF_OK)
108     Test::Fail("Failed to set event callback: " + errstr);
109 
110   RdKafka::KafkaConsumer *c = RdKafka::KafkaConsumer::create(conf, errstr);
111   if (!c)
112     Test::Fail("Failed to create KafkaConsumer: " + errstr);
113   delete conf;
114 
115   /* Subscribe */
116   std::vector<std::string> topics;
117   topics.push_back(topic);
118   if ((err = c->subscribe(topics)))
119     Test::Fail("subscribe failed: " + RdKafka::err2str(err));
120 
121   /* Wait for messages and commit callback. */
122   Test::Say("Consuming topic " + topic + "\n");
123   int cnt = 0;
124   while (!cbs.seen_commit || !cbs.seen_stats) {
125     RdKafka::Message *msg = c->consume(tmout_multip(1000));
126     if (!msg->err()) {
127       cnt++;
128       Test::Say(tostr() << "Received message #" << cnt << "\n");
129       if (cnt > 10)
130         Test::Fail(tostr() << "Should've seen the "
131                    "offset commit (" << cbs.seen_commit << ") and "
132                    "stats callbacks (" << cbs.seen_stats << ") by now");
133 
134       /* Commit the first message to trigger the offset commit_cb */
135       if (cnt == 1) {
136         err = c->commitAsync(msg);
137         if (err)
138           Test::Fail("commitAsync() failed: " + RdKafka::err2str(err));
139         rd_sleep(1); /* Sleep to simulate slow processing, making sure
140                       * that the offset commit callback op gets
141                       * inserted on the consume queue in front of
142                       * the messages. */
143       }
144 
145     } else if (msg->err() == RdKafka::ERR__TIMED_OUT)
146       ; /* Stil rebalancing? */
147     else
148       Test::Fail("consume() failed: " + msg->errstr());
149     delete msg;
150   }
151 
152   c->close();
153   delete c;
154 }
155 
156 extern "C" {
main_0060_op_prio(int argc,char ** argv)157   int main_0060_op_prio (int argc, char **argv) {
158     do_test_commit_cb();
159     return 0;
160   }
161 }
162