1 /*
2 * librdkafka - The Apache Kafka C/C++ library
3 *
4 * Copyright (c) 2017 Magnus Edenhill
5 * All rights reserved.
6 *
7 * Redistribution and use in source and binary forms, with or without
8 * modification, are permitted provided that the following conditions are met:
9 *
10 * 1. Redistributions of source code must retain the above copyright notice,
11 * this list of conditions and the following disclaimer.
12 * 2. Redistributions in binary form must reproduce the above copyright notice,
13 * this list of conditions and the following disclaimer in the documentation
14 * and/or other materials provided with the distribution.
15 *
16 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
17 * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
18 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
19 * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
20 * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
21 * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
22 * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
23 * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
24 * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
25 * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
26 * POSSIBILITY OF SUCH DAMAGE.
27 */
28
29
30 /**
31 * @brief Interceptor plugin test library
32 *
33 * Interceptors can be implemented in the app itself and use
34 * the direct API to set the interceptors methods, or be implemented
35 * as an external plugin library that uses the direct APIs.
36 *
37 * This file implements the latter, an interceptor plugin library.
38 */
39
40 #define _CRT_SECURE_NO_WARNINGS /* Silence MSVC nonsense */
41
42 #include "../test.h"
43
44 #include <stdio.h>
45 #include <string.h>
46 #include <assert.h>
47
48 /* typical include path outside tests is <librdkafka/rdkafka.h> */
49 #include "rdkafka.h"
50
51 #include "interceptor_test.h"
52
53 #ifdef _WIN32
54 #define DLL_EXPORT __declspec(dllexport)
55 #else
56 #define DLL_EXPORT
57 #endif
58
59 /**
60 * @brief Interceptor instance.
61 *
62 * An interceptor instance is created for each intercepted configuration
63 * object (triggered through conf_init() which is the plugin loader,
64 * or by conf_dup() which is a copying of a conf previously seen by conf_init())
65 */
66 struct ici {
67 rd_kafka_conf_t *conf; /**< Interceptor config */
68 char *config1; /**< Interceptor-specific config */
69 char *config2;
70
71 int on_new_cnt;
72 int on_conf_destroy_cnt;
73 };
74
75 static char *my_interceptor_plug_opaque = "my_interceptor_plug_opaque";
76
77
78
79 /* Producer methods */
on_send(rd_kafka_t * rk,rd_kafka_message_t * rkmessage,void * ic_opaque)80 rd_kafka_resp_err_t on_send (rd_kafka_t *rk,
81 rd_kafka_message_t *rkmessage,
82 void *ic_opaque) {
83 struct ici *ici = ic_opaque;
84 printf("on_send: %p\n", ici);
85 return RD_KAFKA_RESP_ERR_NO_ERROR;
86 }
87
88
on_acknowledgement(rd_kafka_t * rk,rd_kafka_message_t * rkmessage,void * ic_opaque)89 rd_kafka_resp_err_t on_acknowledgement (rd_kafka_t *rk,
90 rd_kafka_message_t *rkmessage,
91 void *ic_opaque) {
92 struct ici *ici = ic_opaque;
93 printf("on_acknowledgement: %p: err %d, partition %"PRId32"\n",
94 ici, rkmessage->err, rkmessage->partition);
95 return RD_KAFKA_RESP_ERR_NO_ERROR;
96 }
97
98 /* Consumer methods */
on_consume(rd_kafka_t * rk,rd_kafka_message_t * rkmessage,void * ic_opaque)99 rd_kafka_resp_err_t on_consume (rd_kafka_t *rk,
100 rd_kafka_message_t *rkmessage,
101 void *ic_opaque) {
102 struct ici *ici = ic_opaque;
103 printf("on_consume: %p: partition %"PRId32" @ %"PRId64"\n",
104 ici, rkmessage->partition, rkmessage->offset);
105 return RD_KAFKA_RESP_ERR_NO_ERROR;
106 }
107
on_commit(rd_kafka_t * rk,const rd_kafka_topic_partition_list_t * offsets,rd_kafka_resp_err_t err,void * ic_opaque)108 rd_kafka_resp_err_t on_commit (rd_kafka_t *rk,
109 const rd_kafka_topic_partition_list_t *offsets,
110 rd_kafka_resp_err_t err, void *ic_opaque) {
111 struct ici *ici = ic_opaque;
112 printf("on_commit: %p: err %d\n", ici, err);
113 return RD_KAFKA_RESP_ERR_NO_ERROR;
114 }
115
116
ici_destroy(struct ici * ici)117 static void ici_destroy (struct ici *ici) {
118 if (ici->conf)
119 rd_kafka_conf_destroy(ici->conf);
120 if (ici->config1)
121 free(ici->config1);
122 if (ici->config2)
123 free(ici->config2);
124 free(ici);
125 }
126
on_destroy(rd_kafka_t * rk,void * ic_opaque)127 rd_kafka_resp_err_t on_destroy (rd_kafka_t *rk, void *ic_opaque) {
128 struct ici *ici = ic_opaque;
129 printf("on_destroy: %p\n", ici);
130 /* the ici is freed from on_conf_destroy() */
131 return RD_KAFKA_RESP_ERR_NO_ERROR;
132 }
133
134
135 /**
136 * @brief Called from rd_kafka_new(). We use it to set up interceptors.
137 */
on_new(rd_kafka_t * rk,const rd_kafka_conf_t * conf,void * ic_opaque,char * errstr,size_t errstr_size)138 static rd_kafka_resp_err_t on_new (rd_kafka_t *rk, const rd_kafka_conf_t *conf,
139 void *ic_opaque,
140 char *errstr, size_t errstr_size) {
141 struct ici *ici = ic_opaque;
142
143 ictest.on_new.cnt++;
144 ici->on_new_cnt++;
145
146 TEST_SAY("on_new(rk %p, conf %p, ici->conf %p): %p: #%d\n",
147 rk, conf, ici->conf, ici, ictest.on_new.cnt);
148
149 ICTEST_CNT_CHECK(on_new);
150 TEST_ASSERT(ici->on_new_cnt == 1);
151
152 TEST_ASSERT(!ictest.session_timeout_ms);
153 TEST_ASSERT(!ictest.socket_timeout_ms);
154 /* Extract some well known config properties from the interceptor's
155 * configuration. */
156 ictest.session_timeout_ms = rd_strdup(test_conf_get(ici->conf, "session.timeout.ms"));
157 ictest.socket_timeout_ms = rd_strdup(test_conf_get(ici->conf, "socket.timeout.ms"));
158 ictest.config1 = rd_strdup(ici->config1);
159 ictest.config2 = rd_strdup(ici->config2);
160
161 rd_kafka_interceptor_add_on_send(rk, __FILE__, on_send, ici);
162 rd_kafka_interceptor_add_on_acknowledgement(rk, __FILE__,
163 on_acknowledgement, ici);
164 rd_kafka_interceptor_add_on_consume(rk, __FILE__, on_consume, ici);
165 rd_kafka_interceptor_add_on_commit(rk, __FILE__, on_commit, ici);
166 rd_kafka_interceptor_add_on_destroy(rk, __FILE__, on_destroy, ici);
167
168 return RD_KAFKA_RESP_ERR_NO_ERROR;
169 }
170
171
172 /**
173 * @brief Configuration set handler
174 */
on_conf_set(rd_kafka_conf_t * conf,const char * name,const char * val,char * errstr,size_t errstr_size,void * ic_opaque)175 static rd_kafka_conf_res_t on_conf_set (rd_kafka_conf_t *conf,
176 const char *name, const char *val,
177 char *errstr, size_t errstr_size,
178 void *ic_opaque) {
179 struct ici *ici = ic_opaque;
180 int level = 3;
181
182 if (!strcmp(name, "session.timeout.ms") ||
183 !strcmp(name, "socket.timeout.ms") ||
184 !strncmp(name, "interceptor_test", strlen("interceptor_test")))
185 level = 2;
186
187 TEST_SAYL(level, "on_conf_set(conf %p, \"%s\", \"%s\"): %p\n",
188 conf, name, val, ici);
189
190 if (!strcmp(name, "interceptor_test.good"))
191 return RD_KAFKA_CONF_OK;
192 else if (!strcmp(name, "interceptor_test.bad")) {
193 strncpy(errstr, "on_conf_set failed deliberately",
194 errstr_size-1);
195 errstr[errstr_size-1] = '\0';
196 return RD_KAFKA_CONF_INVALID;
197 } else if (!strcmp(name, "interceptor_test.config1")) {
198 if (ici->config1) {
199 free(ici->config1);
200 ici->config1 = NULL;
201 }
202 if (val)
203 ici->config1 = rd_strdup(val);
204 TEST_SAY("on_conf_set(conf %p, %s, %s): %p\n",
205 conf, name, val, ici);
206 return RD_KAFKA_CONF_OK;
207 } else if (!strcmp(name, "interceptor_test.config2")) {
208 if (ici->config2) {
209 free(ici->config2);
210 ici->config2 = NULL;
211 }
212 if (val)
213 ici->config2 = rd_strdup(val);
214 return RD_KAFKA_CONF_OK;
215 } else {
216 /* Apply intercepted client's config properties on
217 * interceptor config. */
218 rd_kafka_conf_set(ici->conf, name, val,
219 errstr, errstr_size);
220 /* UNKNOWN makes the conf_set() call continue with
221 * other interceptors and finally the librdkafka properties. */
222 return RD_KAFKA_CONF_UNKNOWN;
223 }
224
225 return RD_KAFKA_CONF_UNKNOWN;
226 }
227
228 static void conf_init0 (rd_kafka_conf_t *conf);
229
230
231 /**
232 * @brief Set up new configuration on copy.
233 */
on_conf_dup(rd_kafka_conf_t * new_conf,const rd_kafka_conf_t * old_conf,size_t filter_cnt,const char ** filter,void * ic_opaque)234 static rd_kafka_resp_err_t on_conf_dup (rd_kafka_conf_t *new_conf,
235 const rd_kafka_conf_t *old_conf,
236 size_t filter_cnt, const char **filter,
237 void *ic_opaque) {
238 struct ici *ici = ic_opaque;
239 TEST_SAY("on_conf_dup(new_conf %p, old_conf %p, filter_cnt %"PRIusz
240 ", ici %p)\n",
241 new_conf, old_conf, filter_cnt, ici);
242 conf_init0(new_conf);
243 return RD_KAFKA_RESP_ERR_NO_ERROR;
244 }
245
246
on_conf_destroy(void * ic_opaque)247 static rd_kafka_resp_err_t on_conf_destroy (void *ic_opaque) {
248 struct ici *ici = ic_opaque;
249 ici->on_conf_destroy_cnt++;
250 printf("conf_destroy called (opaque %p vs %p) ici %p\n",
251 ic_opaque, my_interceptor_plug_opaque, ici);
252 TEST_ASSERT(ici->on_conf_destroy_cnt == 1);
253 ici_destroy(ici);
254 return RD_KAFKA_RESP_ERR_NO_ERROR;
255 }
256
257
258
259 /**
260 * @brief Configuration init is intercepted both from plugin.library.paths
261 * as well as rd_kafka_conf_dup().
262 * This internal method serves both cases.
263 */
conf_init0(rd_kafka_conf_t * conf)264 static void conf_init0 (rd_kafka_conf_t *conf) {
265 struct ici *ici;
266 const char *filter[] = { "plugin.library.paths",
267 "interceptor_test." };
268 size_t filter_cnt = sizeof(filter) / sizeof(*filter);
269
270 /* Create new interceptor instance */
271 ici = calloc(1, sizeof(*ici));
272
273 ictest.conf_init.cnt++;
274 ICTEST_CNT_CHECK(conf_init);
275
276 /* Create own copy of configuration, after filtering out what
277 * brought us here (plugins and our own interceptor config). */
278 ici->conf = rd_kafka_conf_dup_filter(conf, filter_cnt, filter);
279 TEST_SAY("conf_init0(conf %p) for ici %p with ici->conf %p\n",
280 conf, ici, ici->conf);
281
282
283 /* Add interceptor methods */
284 rd_kafka_conf_interceptor_add_on_new(conf, __FILE__, on_new, ici);
285
286 rd_kafka_conf_interceptor_add_on_conf_set(conf, __FILE__, on_conf_set,
287 ici);
288 rd_kafka_conf_interceptor_add_on_conf_dup(conf, __FILE__, on_conf_dup,
289 ici);
290 rd_kafka_conf_interceptor_add_on_conf_destroy(conf, __FILE__,
291 on_conf_destroy, ici);
292 }
293
294 /**
295 * @brief Plugin conf initializer called when plugin.library.paths is set.
296 */
297 DLL_EXPORT
conf_init(rd_kafka_conf_t * conf,void ** plug_opaquep,char * errstr,size_t errstr_size)298 rd_kafka_resp_err_t conf_init (rd_kafka_conf_t *conf,
299 void **plug_opaquep,
300 char *errstr, size_t errstr_size) {
301 *plug_opaquep = (void *)my_interceptor_plug_opaque;
302
303 TEST_SAY("conf_init(conf %p) called (setting opaque to %p)\n",
304 conf, *plug_opaquep);
305
306 conf_init0(conf);
307
308 return RD_KAFKA_RESP_ERR_NO_ERROR;
309 }
310
311
312