1 /*
2  * librdkafka - The Apache Kafka C/C++ library
3  *
4  * Copyright (c) 2017 Magnus Edenhill
5  * All rights reserved.
6  *
7  * Redistribution and use in source and binary forms, with or without
8  * modification, are permitted provided that the following conditions are met:
9  *
10  * 1. Redistributions of source code must retain the above copyright notice,
11  *    this list of conditions and the following disclaimer.
12  * 2. Redistributions in binary form must reproduce the above copyright notice,
13  *    this list of conditions and the following disclaimer in the documentation
14  *    and/or other materials provided with the distribution.
15  *
16  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
17  * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
18  * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
19  * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
20  * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
21  * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
22  * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
23  * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
24  * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
25  * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
26  * POSSIBILITY OF SUCH DAMAGE.
27  */
28 
29 
30 /**
31  * @brief Interceptor plugin test library
32  *
33  * Interceptors can be implemented in the app itself and use
34  * the direct API to set the interceptors methods, or be implemented
35  * as an external plugin library that uses the direct APIs.
36  *
37  * This file implements the latter, an interceptor plugin library.
38  */
39 
40 #define _CRT_SECURE_NO_WARNINGS /* Silence MSVC nonsense */
41 
42 #include "../test.h"
43 
44 #include <stdio.h>
45 #include <string.h>
46 #include <assert.h>
47 
48 /* typical include path outside tests is <librdkafka/rdkafka.h> */
49 #include "rdkafka.h"
50 
51 #include "interceptor_test.h"
52 
53 #ifdef _WIN32
54 #define DLL_EXPORT __declspec(dllexport)
55 #else
56 #define DLL_EXPORT
57 #endif
58 
59 /**
60  * @brief Interceptor instance.
61  *
62  * An interceptor instance is created for each intercepted configuration
63  * object (triggered through conf_init() which is the plugin loader,
64  * or by conf_dup() which is a copying of a conf previously seen by conf_init())
65  */
66 struct ici {
67         rd_kafka_conf_t *conf;  /**< Interceptor config */
68         char *config1;          /**< Interceptor-specific config */
69         char *config2;
70 
71         int on_new_cnt;
72         int on_conf_destroy_cnt;
73 };
74 
75 static char *my_interceptor_plug_opaque = "my_interceptor_plug_opaque";
76 
77 
78 
79 /* Producer methods */
on_send(rd_kafka_t * rk,rd_kafka_message_t * rkmessage,void * ic_opaque)80 rd_kafka_resp_err_t on_send (rd_kafka_t *rk,
81                              rd_kafka_message_t *rkmessage,
82                              void *ic_opaque) {
83         struct ici *ici = ic_opaque;
84         printf("on_send: %p\n", ici);
85         return RD_KAFKA_RESP_ERR_NO_ERROR;
86 }
87 
88 
on_acknowledgement(rd_kafka_t * rk,rd_kafka_message_t * rkmessage,void * ic_opaque)89 rd_kafka_resp_err_t on_acknowledgement (rd_kafka_t *rk,
90                                         rd_kafka_message_t *rkmessage,
91                                         void *ic_opaque) {
92         struct ici *ici = ic_opaque;
93         printf("on_acknowledgement: %p: err %d, partition %"PRId32"\n",
94                ici, rkmessage->err, rkmessage->partition);
95         return RD_KAFKA_RESP_ERR_NO_ERROR;
96 }
97 
98 /* Consumer methods */
on_consume(rd_kafka_t * rk,rd_kafka_message_t * rkmessage,void * ic_opaque)99 rd_kafka_resp_err_t on_consume (rd_kafka_t *rk,
100                                 rd_kafka_message_t *rkmessage,
101                                 void *ic_opaque) {
102         struct ici *ici = ic_opaque;
103         printf("on_consume: %p: partition %"PRId32" @ %"PRId64"\n",
104                ici, rkmessage->partition, rkmessage->offset);
105         return RD_KAFKA_RESP_ERR_NO_ERROR;
106 }
107 
on_commit(rd_kafka_t * rk,const rd_kafka_topic_partition_list_t * offsets,rd_kafka_resp_err_t err,void * ic_opaque)108 rd_kafka_resp_err_t on_commit (rd_kafka_t *rk,
109                                const rd_kafka_topic_partition_list_t *offsets,
110                                rd_kafka_resp_err_t err, void *ic_opaque) {
111         struct ici *ici = ic_opaque;
112         printf("on_commit: %p: err %d\n", ici, err);
113         return RD_KAFKA_RESP_ERR_NO_ERROR;
114 }
115 
116 
ici_destroy(struct ici * ici)117 static void ici_destroy (struct ici *ici) {
118         if (ici->conf)
119                 rd_kafka_conf_destroy(ici->conf);
120         if (ici->config1)
121                 free(ici->config1);
122         if (ici->config2)
123                 free(ici->config2);
124         free(ici);
125 }
126 
on_destroy(rd_kafka_t * rk,void * ic_opaque)127 rd_kafka_resp_err_t on_destroy (rd_kafka_t *rk, void *ic_opaque) {
128         struct ici *ici = ic_opaque;
129         printf("on_destroy: %p\n", ici);
130         /* the ici is freed from on_conf_destroy() */
131         return RD_KAFKA_RESP_ERR_NO_ERROR;
132 }
133 
134 
135 /**
136  * @brief Called from rd_kafka_new(). We use it to set up interceptors.
137  */
on_new(rd_kafka_t * rk,const rd_kafka_conf_t * conf,void * ic_opaque,char * errstr,size_t errstr_size)138 static rd_kafka_resp_err_t on_new (rd_kafka_t *rk, const rd_kafka_conf_t *conf,
139                                    void *ic_opaque,
140                                    char *errstr, size_t errstr_size) {
141         struct ici *ici = ic_opaque;
142 
143         ictest.on_new.cnt++;
144         ici->on_new_cnt++;
145 
146         TEST_SAY("on_new(rk %p, conf %p, ici->conf %p): %p: #%d\n",
147                  rk, conf, ici->conf, ici, ictest.on_new.cnt);
148 
149         ICTEST_CNT_CHECK(on_new);
150         TEST_ASSERT(ici->on_new_cnt == 1);
151 
152         TEST_ASSERT(!ictest.session_timeout_ms);
153         TEST_ASSERT(!ictest.socket_timeout_ms);
154         /* Extract some well known config properties from the interceptor's
155          * configuration. */
156         ictest.session_timeout_ms = rd_strdup(test_conf_get(ici->conf, "session.timeout.ms"));
157         ictest.socket_timeout_ms  = rd_strdup(test_conf_get(ici->conf, "socket.timeout.ms"));
158         ictest.config1 = rd_strdup(ici->config1);
159         ictest.config2 = rd_strdup(ici->config2);
160 
161         rd_kafka_interceptor_add_on_send(rk, __FILE__, on_send, ici);
162         rd_kafka_interceptor_add_on_acknowledgement(rk, __FILE__,
163                                                     on_acknowledgement, ici);
164         rd_kafka_interceptor_add_on_consume(rk, __FILE__, on_consume, ici);
165         rd_kafka_interceptor_add_on_commit(rk, __FILE__, on_commit, ici);
166         rd_kafka_interceptor_add_on_destroy(rk, __FILE__, on_destroy, ici);
167 
168         return RD_KAFKA_RESP_ERR_NO_ERROR;
169 }
170 
171 
172 /**
173  * @brief Configuration set handler
174  */
on_conf_set(rd_kafka_conf_t * conf,const char * name,const char * val,char * errstr,size_t errstr_size,void * ic_opaque)175 static rd_kafka_conf_res_t on_conf_set (rd_kafka_conf_t *conf,
176                                         const char *name, const char *val,
177                                         char *errstr, size_t errstr_size,
178                                         void *ic_opaque) {
179         struct ici *ici = ic_opaque;
180         int level = 3;
181 
182         if (!strcmp(name, "session.timeout.ms") ||
183             !strcmp(name, "socket.timeout.ms") ||
184             !strncmp(name, "interceptor_test", strlen("interceptor_test")))
185                 level = 2;
186 
187         TEST_SAYL(level, "on_conf_set(conf %p, \"%s\", \"%s\"): %p\n",
188                   conf, name, val, ici);
189 
190         if (!strcmp(name, "interceptor_test.good"))
191                 return RD_KAFKA_CONF_OK;
192         else if (!strcmp(name, "interceptor_test.bad")) {
193                 strncpy(errstr, "on_conf_set failed deliberately",
194                         errstr_size-1);
195                 errstr[errstr_size-1] = '\0';
196                 return RD_KAFKA_CONF_INVALID;
197         } else if (!strcmp(name, "interceptor_test.config1")) {
198                 if (ici->config1) {
199                         free(ici->config1);
200                         ici->config1 = NULL;
201                 }
202                 if (val)
203                         ici->config1 = rd_strdup(val);
204                 TEST_SAY("on_conf_set(conf %p, %s, %s): %p\n",
205                          conf, name, val, ici);
206                 return RD_KAFKA_CONF_OK;
207         } else if (!strcmp(name, "interceptor_test.config2")) {
208                 if (ici->config2) {
209                         free(ici->config2);
210                         ici->config2 = NULL;
211                 }
212                 if (val)
213                         ici->config2 = rd_strdup(val);
214                 return RD_KAFKA_CONF_OK;
215         } else {
216                 /* Apply intercepted client's config properties on
217                  * interceptor config. */
218                 rd_kafka_conf_set(ici->conf, name, val,
219                                   errstr, errstr_size);
220                 /* UNKNOWN makes the conf_set() call continue with
221                  * other interceptors and finally the librdkafka properties. */
222                 return RD_KAFKA_CONF_UNKNOWN;
223         }
224 
225         return RD_KAFKA_CONF_UNKNOWN;
226 }
227 
228 static void conf_init0 (rd_kafka_conf_t *conf);
229 
230 
231 /**
232  * @brief Set up new configuration on copy.
233  */
on_conf_dup(rd_kafka_conf_t * new_conf,const rd_kafka_conf_t * old_conf,size_t filter_cnt,const char ** filter,void * ic_opaque)234 static rd_kafka_resp_err_t on_conf_dup (rd_kafka_conf_t *new_conf,
235                                         const rd_kafka_conf_t *old_conf,
236                                         size_t filter_cnt, const char **filter,
237                                         void *ic_opaque) {
238         struct ici *ici = ic_opaque;
239         TEST_SAY("on_conf_dup(new_conf %p, old_conf %p, filter_cnt %"PRIusz
240                  ", ici %p)\n",
241                  new_conf, old_conf, filter_cnt, ici);
242         conf_init0(new_conf);
243         return RD_KAFKA_RESP_ERR_NO_ERROR;
244 }
245 
246 
on_conf_destroy(void * ic_opaque)247 static rd_kafka_resp_err_t on_conf_destroy (void *ic_opaque) {
248         struct ici *ici = ic_opaque;
249         ici->on_conf_destroy_cnt++;
250         printf("conf_destroy called (opaque %p vs %p) ici %p\n",
251                ic_opaque, my_interceptor_plug_opaque, ici);
252         TEST_ASSERT(ici->on_conf_destroy_cnt == 1);
253         ici_destroy(ici);
254         return RD_KAFKA_RESP_ERR_NO_ERROR;
255 }
256 
257 
258 
259 /**
260  * @brief Configuration init is intercepted both from plugin.library.paths
261  *        as well as rd_kafka_conf_dup().
262  *        This internal method serves both cases.
263  */
conf_init0(rd_kafka_conf_t * conf)264 static void conf_init0 (rd_kafka_conf_t *conf) {
265         struct ici *ici;
266         const char *filter[] = { "plugin.library.paths",
267                                  "interceptor_test." };
268         size_t filter_cnt = sizeof(filter) / sizeof(*filter);
269 
270         /* Create new interceptor instance */
271         ici = calloc(1, sizeof(*ici));
272 
273         ictest.conf_init.cnt++;
274         ICTEST_CNT_CHECK(conf_init);
275 
276         /* Create own copy of configuration, after filtering out what
277          * brought us here (plugins and our own interceptor config). */
278         ici->conf = rd_kafka_conf_dup_filter(conf, filter_cnt, filter);
279         TEST_SAY("conf_init0(conf %p) for ici %p with ici->conf %p\n",
280                  conf, ici, ici->conf);
281 
282 
283         /* Add interceptor methods */
284         rd_kafka_conf_interceptor_add_on_new(conf, __FILE__, on_new, ici);
285 
286         rd_kafka_conf_interceptor_add_on_conf_set(conf, __FILE__, on_conf_set,
287                                                   ici);
288         rd_kafka_conf_interceptor_add_on_conf_dup(conf, __FILE__, on_conf_dup,
289                                                   ici);
290         rd_kafka_conf_interceptor_add_on_conf_destroy(conf, __FILE__,
291                                                       on_conf_destroy, ici);
292 }
293 
294 /**
295  * @brief Plugin conf initializer called when plugin.library.paths is set.
296  */
297 DLL_EXPORT
conf_init(rd_kafka_conf_t * conf,void ** plug_opaquep,char * errstr,size_t errstr_size)298 rd_kafka_resp_err_t conf_init (rd_kafka_conf_t *conf,
299                                void **plug_opaquep,
300                                char *errstr, size_t errstr_size) {
301         *plug_opaquep = (void *)my_interceptor_plug_opaque;
302 
303         TEST_SAY("conf_init(conf %p) called (setting opaque to %p)\n",
304                  conf, *plug_opaquep);
305 
306         conf_init0(conf);
307 
308         return RD_KAFKA_RESP_ERR_NO_ERROR;
309 }
310 
311 
312