1 /*
2  * librdkafka - Apache Kafka C library
3  *
4  * Copyright (c) 2019, Magnus Edenhill
5  * All rights reserved.
6  *
7  * Redistribution and use in source and binary forms, with or without
8  * modification, are permitted provided that the following conditions are met:
9  *
10  * 1. Redistributions of source code must retain the above copyright notice,
11  *    this list of conditions and the following disclaimer.
12  * 2. Redistributions in binary form must reproduce the above copyright notice,
13  *    this list of conditions and the following disclaimer in the documentation
14  *    and/or other materials provided with the distribution.
15  *
16  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
17  * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
18  * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
19  * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
20  * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
21  * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
22  * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
23  * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
24  * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
25  * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
26  * POSSIBILITY OF SUCH DAMAGE.
27  */
28 
29 #include <iostream>
30 #include "testcpp.h"
31 
32 extern "C" {
33 #include "rdkafka.h"            /* For interceptor interface */
34 #include "../src/tinycthread.h" /* For mutexes */
35 }
36 
37 class myThreadCb {
38  public:
myThreadCb()39   myThreadCb(): startCnt_(0), exitCnt_(0) {
40     mtx_init(&lock_, mtx_plain);
41   }
~myThreadCb()42   ~myThreadCb() {
43     mtx_destroy(&lock_);
44   }
startCount()45   int startCount () {
46     int cnt;
47     mtx_lock(&lock_);
48     cnt = startCnt_;
49     mtx_unlock(&lock_);
50     return cnt;
51   }
exitCount()52   int exitCount () {
53     int cnt;
54     mtx_lock(&lock_);
55     cnt = exitCnt_;
56     mtx_unlock(&lock_);
57     return cnt;
58   }
thread_start_cb(const char * threadname)59   virtual void thread_start_cb (const char *threadname) {
60     Test::Say(tostr() << "Started thread: " << threadname << "\n");
61     mtx_lock(&lock_);
62     startCnt_++;
63     mtx_unlock(&lock_);
64   }
thread_exit_cb(const char * threadname)65   virtual void thread_exit_cb (const char *threadname) {
66     Test::Say(tostr() << "Exiting from thread: " << threadname << "\n");
67     mtx_lock(&lock_);
68     exitCnt_++;
69     mtx_unlock(&lock_);
70   }
71 
72  private:
73   int startCnt_;
74   int exitCnt_;
75   mtx_t lock_;
76 };
77 
78 
79 /**
80  * @brief C to C++ callback trampoline.
81  */
82 static rd_kafka_resp_err_t
on_thread_start_trampoline(rd_kafka_t * rk,rd_kafka_thread_type_t thread_type,const char * threadname,void * ic_opaque)83 on_thread_start_trampoline (rd_kafka_t *rk,
84                             rd_kafka_thread_type_t thread_type,
85                             const char *threadname,
86                             void *ic_opaque) {
87   myThreadCb *threadcb = (myThreadCb *)ic_opaque;
88 
89   Test::Say(tostr() << "on_thread_start(" << thread_type << ", " <<
90             threadname << ") called\n");
91 
92   threadcb->thread_start_cb(threadname);
93 
94   return RD_KAFKA_RESP_ERR_NO_ERROR;
95 }
96 
97 /**
98  * @brief C to C++ callback trampoline.
99  */
100 static rd_kafka_resp_err_t
on_thread_exit_trampoline(rd_kafka_t * rk,rd_kafka_thread_type_t thread_type,const char * threadname,void * ic_opaque)101 on_thread_exit_trampoline (rd_kafka_t *rk,
102                            rd_kafka_thread_type_t thread_type,
103                            const char *threadname,
104                            void *ic_opaque) {
105   myThreadCb *threadcb = (myThreadCb *)ic_opaque;
106 
107   Test::Say(tostr() << "on_thread_exit(" << thread_type << ", " <<
108             threadname << ") called\n");
109 
110   threadcb->thread_exit_cb(threadname);
111 
112   return RD_KAFKA_RESP_ERR_NO_ERROR;
113 }
114 
115 /**
116  * @brief This interceptor is called when a new client instance is created
117  *        prior to any threads being created.
118  *        We use it to set up the instance's thread interceptors.
119  */
on_new(rd_kafka_t * rk,const rd_kafka_conf_t * conf,void * ic_opaque,char * errstr,size_t errstr_size)120 static rd_kafka_resp_err_t on_new (rd_kafka_t *rk, const rd_kafka_conf_t *conf,
121                                    void *ic_opaque,
122                                    char *errstr, size_t errstr_size) {
123   Test::Say("on_new() interceptor called\n");
124   rd_kafka_interceptor_add_on_thread_start(rk, "test:0100",
125                                            on_thread_start_trampoline,
126                                            ic_opaque);
127   rd_kafka_interceptor_add_on_thread_exit(rk, "test:0100",
128                                            on_thread_exit_trampoline,
129                                            ic_opaque);
130   return RD_KAFKA_RESP_ERR_NO_ERROR;
131 }
132 
133 /**
134  * @brief The on_conf_dup() interceptor let's use add the on_new interceptor
135  *        in case the config object is copied, since interceptors are not
136  *        automatically copied.
137  */
on_conf_dup(rd_kafka_conf_t * new_conf,const rd_kafka_conf_t * old_conf,size_t filter_cnt,const char ** filter,void * ic_opaque)138 static rd_kafka_resp_err_t on_conf_dup (rd_kafka_conf_t *new_conf,
139                                         const rd_kafka_conf_t *old_conf,
140                                         size_t filter_cnt,
141                                         const char **filter,
142                                         void *ic_opaque) {
143   Test::Say("on_conf_dup() interceptor called\n");
144   return rd_kafka_conf_interceptor_add_on_new(new_conf, "test:0100",
145                                               on_new, ic_opaque);
146 }
147 
148 
149 
test_thread_cbs()150 static void test_thread_cbs () {
151   RdKafka::Conf *conf = RdKafka::Conf::create(RdKafka::Conf::CONF_GLOBAL);
152   std::string errstr;
153   rd_kafka_conf_t *c_conf;
154   myThreadCb my_threads;
155 
156   Test::conf_set(conf, "bootstrap.servers", "127.0.0.1:1");
157 
158   /* Interceptors are not supported in the C++ API, instead use the C API:
159    *  1. Extract the C conf_t object
160    *  2. Set up an on_new() interceptor
161    *  3. Set up an on_conf_dup() interceptor to add interceptors in the
162    *     case the config object is copied (which the C++ Conf always does).
163    *  4. In the on_new() interceptor, add the thread interceptors. */
164   c_conf = conf->c_ptr_global();
165   rd_kafka_conf_interceptor_add_on_new(c_conf, "test:0100", on_new,
166                                              &my_threads);
167   rd_kafka_conf_interceptor_add_on_conf_dup(c_conf, "test:0100", on_conf_dup,
168                                             &my_threads);
169 
170   RdKafka::Producer *p = RdKafka::Producer::create(conf, errstr);
171   if (!p)
172     Test::Fail("Failed to create Producer: " + errstr);
173   p->poll(500);
174   delete conf;
175   delete p;
176 
177   Test::Say(tostr() << my_threads.startCount() << " thread start calls, " <<
178             my_threads.exitCount() << " thread exit calls seen\n");
179 
180   /* 3 = rdkafka main thread + internal broker + bootstrap broker */
181   if (my_threads.startCount() < 3)
182     Test::Fail("Did not catch enough thread start callback calls");
183   if (my_threads.exitCount() < 3)
184     Test::Fail("Did not catch enough thread exit callback calls");
185   if (my_threads.startCount() != my_threads.exitCount())
186     Test::Fail("Did not catch same number of start and exit callback calls");
187 }
188 
189 
190 extern "C" {
main_0100_thread_interceptors(int argc,char ** argv)191   int main_0100_thread_interceptors (int argc, char **argv) {
192     test_thread_cbs();
193     return 0;
194   }
195 }
196