1 /*
2 * librdkafka - Apache Kafka C library
3 *
4 * Copyright (c) 2019, Magnus Edenhill
5 * All rights reserved.
6 *
7 * Redistribution and use in source and binary forms, with or without
8 * modification, are permitted provided that the following conditions are met:
9 *
10 * 1. Redistributions of source code must retain the above copyright notice,
11 * this list of conditions and the following disclaimer.
12 * 2. Redistributions in binary form must reproduce the above copyright notice,
13 * this list of conditions and the following disclaimer in the documentation
14 * and/or other materials provided with the distribution.
15 *
16 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
17 * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
18 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
19 * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
20 * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
21 * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
22 * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
23 * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
24 * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
25 * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
26 * POSSIBILITY OF SUCH DAMAGE.
27 */
28
29 #include <iostream>
30 #include "testcpp.h"
31
32 extern "C" {
33 #include "rdkafka.h" /* For interceptor interface */
34 #include "../src/tinycthread.h" /* For mutexes */
35 }
36
37 class myThreadCb {
38 public:
myThreadCb()39 myThreadCb(): startCnt_(0), exitCnt_(0) {
40 mtx_init(&lock_, mtx_plain);
41 }
~myThreadCb()42 ~myThreadCb() {
43 mtx_destroy(&lock_);
44 }
startCount()45 int startCount () {
46 int cnt;
47 mtx_lock(&lock_);
48 cnt = startCnt_;
49 mtx_unlock(&lock_);
50 return cnt;
51 }
exitCount()52 int exitCount () {
53 int cnt;
54 mtx_lock(&lock_);
55 cnt = exitCnt_;
56 mtx_unlock(&lock_);
57 return cnt;
58 }
thread_start_cb(const char * threadname)59 virtual void thread_start_cb (const char *threadname) {
60 Test::Say(tostr() << "Started thread: " << threadname << "\n");
61 mtx_lock(&lock_);
62 startCnt_++;
63 mtx_unlock(&lock_);
64 }
thread_exit_cb(const char * threadname)65 virtual void thread_exit_cb (const char *threadname) {
66 Test::Say(tostr() << "Exiting from thread: " << threadname << "\n");
67 mtx_lock(&lock_);
68 exitCnt_++;
69 mtx_unlock(&lock_);
70 }
71
72 private:
73 int startCnt_;
74 int exitCnt_;
75 mtx_t lock_;
76 };
77
78
79 /**
80 * @brief C to C++ callback trampoline.
81 */
82 static rd_kafka_resp_err_t
on_thread_start_trampoline(rd_kafka_t * rk,rd_kafka_thread_type_t thread_type,const char * threadname,void * ic_opaque)83 on_thread_start_trampoline (rd_kafka_t *rk,
84 rd_kafka_thread_type_t thread_type,
85 const char *threadname,
86 void *ic_opaque) {
87 myThreadCb *threadcb = (myThreadCb *)ic_opaque;
88
89 Test::Say(tostr() << "on_thread_start(" << thread_type << ", " <<
90 threadname << ") called\n");
91
92 threadcb->thread_start_cb(threadname);
93
94 return RD_KAFKA_RESP_ERR_NO_ERROR;
95 }
96
97 /**
98 * @brief C to C++ callback trampoline.
99 */
100 static rd_kafka_resp_err_t
on_thread_exit_trampoline(rd_kafka_t * rk,rd_kafka_thread_type_t thread_type,const char * threadname,void * ic_opaque)101 on_thread_exit_trampoline (rd_kafka_t *rk,
102 rd_kafka_thread_type_t thread_type,
103 const char *threadname,
104 void *ic_opaque) {
105 myThreadCb *threadcb = (myThreadCb *)ic_opaque;
106
107 Test::Say(tostr() << "on_thread_exit(" << thread_type << ", " <<
108 threadname << ") called\n");
109
110 threadcb->thread_exit_cb(threadname);
111
112 return RD_KAFKA_RESP_ERR_NO_ERROR;
113 }
114
115 /**
116 * @brief This interceptor is called when a new client instance is created
117 * prior to any threads being created.
118 * We use it to set up the instance's thread interceptors.
119 */
on_new(rd_kafka_t * rk,const rd_kafka_conf_t * conf,void * ic_opaque,char * errstr,size_t errstr_size)120 static rd_kafka_resp_err_t on_new (rd_kafka_t *rk, const rd_kafka_conf_t *conf,
121 void *ic_opaque,
122 char *errstr, size_t errstr_size) {
123 Test::Say("on_new() interceptor called\n");
124 rd_kafka_interceptor_add_on_thread_start(rk, "test:0100",
125 on_thread_start_trampoline,
126 ic_opaque);
127 rd_kafka_interceptor_add_on_thread_exit(rk, "test:0100",
128 on_thread_exit_trampoline,
129 ic_opaque);
130 return RD_KAFKA_RESP_ERR_NO_ERROR;
131 }
132
133 /**
134 * @brief The on_conf_dup() interceptor let's use add the on_new interceptor
135 * in case the config object is copied, since interceptors are not
136 * automatically copied.
137 */
on_conf_dup(rd_kafka_conf_t * new_conf,const rd_kafka_conf_t * old_conf,size_t filter_cnt,const char ** filter,void * ic_opaque)138 static rd_kafka_resp_err_t on_conf_dup (rd_kafka_conf_t *new_conf,
139 const rd_kafka_conf_t *old_conf,
140 size_t filter_cnt,
141 const char **filter,
142 void *ic_opaque) {
143 Test::Say("on_conf_dup() interceptor called\n");
144 return rd_kafka_conf_interceptor_add_on_new(new_conf, "test:0100",
145 on_new, ic_opaque);
146 }
147
148
149
test_thread_cbs()150 static void test_thread_cbs () {
151 RdKafka::Conf *conf = RdKafka::Conf::create(RdKafka::Conf::CONF_GLOBAL);
152 std::string errstr;
153 rd_kafka_conf_t *c_conf;
154 myThreadCb my_threads;
155
156 Test::conf_set(conf, "bootstrap.servers", "127.0.0.1:1");
157
158 /* Interceptors are not supported in the C++ API, instead use the C API:
159 * 1. Extract the C conf_t object
160 * 2. Set up an on_new() interceptor
161 * 3. Set up an on_conf_dup() interceptor to add interceptors in the
162 * case the config object is copied (which the C++ Conf always does).
163 * 4. In the on_new() interceptor, add the thread interceptors. */
164 c_conf = conf->c_ptr_global();
165 rd_kafka_conf_interceptor_add_on_new(c_conf, "test:0100", on_new,
166 &my_threads);
167 rd_kafka_conf_interceptor_add_on_conf_dup(c_conf, "test:0100", on_conf_dup,
168 &my_threads);
169
170 RdKafka::Producer *p = RdKafka::Producer::create(conf, errstr);
171 if (!p)
172 Test::Fail("Failed to create Producer: " + errstr);
173 p->poll(500);
174 delete conf;
175 delete p;
176
177 Test::Say(tostr() << my_threads.startCount() << " thread start calls, " <<
178 my_threads.exitCount() << " thread exit calls seen\n");
179
180 /* 3 = rdkafka main thread + internal broker + bootstrap broker */
181 if (my_threads.startCount() < 3)
182 Test::Fail("Did not catch enough thread start callback calls");
183 if (my_threads.exitCount() < 3)
184 Test::Fail("Did not catch enough thread exit callback calls");
185 if (my_threads.startCount() != my_threads.exitCount())
186 Test::Fail("Did not catch same number of start and exit callback calls");
187 }
188
189
190 extern "C" {
main_0100_thread_interceptors(int argc,char ** argv)191 int main_0100_thread_interceptors (int argc, char **argv) {
192 test_thread_cbs();
193 return 0;
194 }
195 }
196