1 /*
2  * librdkafka - The Apache Kafka C/C++ library
3  *
4  * Copyright (c) 2017 Magnus Edenhill
5  * All rights reserved.
6  *
7  * Redistribution and use in source and binary forms, with or without
8  * modification, are permitted provided that the following conditions are met:
9  *
10  * 1. Redistributions of source code must retain the above copyright notice,
11  *    this list of conditions and the following disclaimer.
12  * 2. Redistributions in binary form must reproduce the above copyright notice,
13  *    this list of conditions and the following disclaimer in the documentation
14  *    and/or other materials provided with the distribution.
15  *
16  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
17  * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
18  * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
19  * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
20  * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
21  * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
22  * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
23  * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
24  * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
25  * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
26  * POSSIBILITY OF SUCH DAMAGE.
27  */
28 
29 #include "rdkafka_int.h"
30 #include "rdkafka_interceptor.h"
31 #include "rdstring.h"
32 
33 /**
34  * @brief Interceptor methodtion/method reference
35  */
36 typedef struct rd_kafka_interceptor_method_s {
37         union {
38                 rd_kafka_interceptor_f_on_conf_set_t *on_conf_set;
39                 rd_kafka_interceptor_f_on_conf_dup_t *on_conf_dup;
40                 rd_kafka_interceptor_f_on_conf_destroy_t *on_conf_destroy;
41                 rd_kafka_interceptor_f_on_new_t     *on_new;
42                 rd_kafka_interceptor_f_on_destroy_t *on_destroy;
43                 rd_kafka_interceptor_f_on_send_t    *on_send;
44                 rd_kafka_interceptor_f_on_acknowledgement_t *on_acknowledgement;
45                 rd_kafka_interceptor_f_on_consume_t *on_consume;
46                 rd_kafka_interceptor_f_on_commit_t  *on_commit;
47                 rd_kafka_interceptor_f_on_request_sent_t *on_request_sent;
48                 rd_kafka_interceptor_f_on_response_received_t
49                 *on_response_received;
50                 rd_kafka_interceptor_f_on_thread_start_t *on_thread_start;
51                 rd_kafka_interceptor_f_on_thread_exit_t  *on_thread_exit;
52                 void *generic; /* For easy assignment */
53 
54         } u;
55         char *ic_name;
56         void *ic_opaque;
57 } rd_kafka_interceptor_method_t;
58 
59 /**
60  * @brief Destroy interceptor methodtion reference
61  */
62 static void
rd_kafka_interceptor_method_destroy(void * ptr)63 rd_kafka_interceptor_method_destroy (void *ptr) {
64         rd_kafka_interceptor_method_t *method = ptr;
65         rd_free(method->ic_name);
66         rd_free(method);
67 }
68 
69 
70 
71 
72 
73 /**
74  * @brief Handle an interceptor on_... methodtion call failures.
75  */
76 static RD_INLINE void
rd_kafka_interceptor_failed(rd_kafka_t * rk,const rd_kafka_interceptor_method_t * method,const char * method_name,rd_kafka_resp_err_t err,const rd_kafka_message_t * rkmessage,const char * errstr)77 rd_kafka_interceptor_failed (rd_kafka_t *rk,
78                              const rd_kafka_interceptor_method_t *method,
79                              const char *method_name, rd_kafka_resp_err_t err,
80                              const rd_kafka_message_t *rkmessage,
81                              const char *errstr) {
82 
83         /* FIXME: Suppress log messages, eventually */
84         if (rkmessage)
85                 rd_kafka_log(rk, LOG_WARNING, "ICFAIL",
86                              "Interceptor %s failed %s for "
87                              "message on %s [%"PRId32"] @ %"PRId64
88                              ": %s%s%s",
89                              method->ic_name, method_name,
90                              rd_kafka_topic_name(rkmessage->rkt),
91                              rkmessage->partition,
92                              rkmessage->offset,
93                              rd_kafka_err2str(err),
94                              errstr ? ": " : "",
95                              errstr ? errstr : "");
96         else
97                 rd_kafka_log(rk, LOG_WARNING, "ICFAIL",
98                              "Interceptor %s failed %s: %s%s%s",
99                              method->ic_name, method_name,
100                              rd_kafka_err2str(err),
101                              errstr ? ": " : "",
102                              errstr ? errstr : "");
103 
104 }
105 
106 
107 
108 /**
109  * @brief Create interceptor method reference.
110  *        Duplicates are rejected
111  */
112 static rd_kafka_interceptor_method_t *
rd_kafka_interceptor_method_new(const char * ic_name,void * func,void * ic_opaque)113 rd_kafka_interceptor_method_new (const char *ic_name,
114                                  void *func, void *ic_opaque) {
115         rd_kafka_interceptor_method_t *method;
116 
117         method             = rd_calloc(1, sizeof(*method));
118         method->ic_name    = rd_strdup(ic_name);
119         method->ic_opaque  = ic_opaque;
120         method->u.generic  = func;
121 
122         return method;
123 }
124 
125 
126 /**
127  * @brief Method comparator to be used for finding, not sorting.
128  */
rd_kafka_interceptor_method_cmp(const void * _a,const void * _b)129 static int rd_kafka_interceptor_method_cmp (const void *_a, const void *_b) {
130         const rd_kafka_interceptor_method_t *a = _a, *b = _b;
131 
132         if (a->u.generic != b->u.generic)
133                 return -1;
134 
135         return strcmp(a->ic_name, b->ic_name);
136 }
137 
138 /**
139  * @brief Add interceptor method reference
140  */
141 static rd_kafka_resp_err_t
rd_kafka_interceptor_method_add(rd_list_t * list,const char * ic_name,void * func,void * ic_opaque)142 rd_kafka_interceptor_method_add (rd_list_t *list, const char *ic_name,
143                                  void *func, void *ic_opaque) {
144         rd_kafka_interceptor_method_t *method;
145         const rd_kafka_interceptor_method_t skel = {
146                 .ic_name = (char *)ic_name,
147                 .u = { .generic = func }
148         };
149 
150         /* Reject same method from same interceptor.
151          * This is needed to avoid duplicate interceptors when configuration
152          * objects are duplicated.
153          * An exception is made for lists with _F_UNIQUE, which is currently
154          * only on_conf_destroy() to allow interceptor cleanup. */
155         if ((list->rl_flags & RD_LIST_F_UNIQUE) &&
156             rd_list_find(list, &skel, rd_kafka_interceptor_method_cmp))
157                 return RD_KAFKA_RESP_ERR__CONFLICT;
158 
159         method = rd_kafka_interceptor_method_new(ic_name, func, ic_opaque);
160         rd_list_add(list, method);
161 
162         return RD_KAFKA_RESP_ERR_NO_ERROR;
163 }
164 
165 /**
166  * @brief Destroy all interceptors
167  * @locality application thread calling rd_kafka_conf_destroy() or
168  *           rd_kafka_destroy()
169  */
rd_kafka_interceptors_destroy(rd_kafka_conf_t * conf)170 void rd_kafka_interceptors_destroy (rd_kafka_conf_t *conf) {
171         rd_list_destroy(&conf->interceptors.on_conf_set);
172         rd_list_destroy(&conf->interceptors.on_conf_dup);
173         rd_list_destroy(&conf->interceptors.on_conf_destroy);
174         rd_list_destroy(&conf->interceptors.on_new);
175         rd_list_destroy(&conf->interceptors.on_destroy);
176         rd_list_destroy(&conf->interceptors.on_send);
177         rd_list_destroy(&conf->interceptors.on_acknowledgement);
178         rd_list_destroy(&conf->interceptors.on_consume);
179         rd_list_destroy(&conf->interceptors.on_commit);
180         rd_list_destroy(&conf->interceptors.on_request_sent);
181         rd_list_destroy(&conf->interceptors.on_response_received);
182         rd_list_destroy(&conf->interceptors.on_thread_start);
183         rd_list_destroy(&conf->interceptors.on_thread_exit);
184 
185         /* Interceptor config */
186         rd_list_destroy(&conf->interceptors.config);
187 }
188 
189 
190 /**
191  * @brief Initialize interceptor sub-system for config object.
192  * @locality application thread
193  */
194 static void
rd_kafka_interceptors_init(rd_kafka_conf_t * conf)195 rd_kafka_interceptors_init (rd_kafka_conf_t *conf) {
196         rd_list_init(&conf->interceptors.on_conf_set, 0,
197                      rd_kafka_interceptor_method_destroy)
198                 ->rl_flags |= RD_LIST_F_UNIQUE;
199         rd_list_init(&conf->interceptors.on_conf_dup, 0,
200                      rd_kafka_interceptor_method_destroy)
201                 ->rl_flags |= RD_LIST_F_UNIQUE;
202         /* conf_destroy() allows duplicates entries. */
203         rd_list_init(&conf->interceptors.on_conf_destroy, 0,
204                      rd_kafka_interceptor_method_destroy);
205         rd_list_init(&conf->interceptors.on_new, 0,
206                      rd_kafka_interceptor_method_destroy)
207                 ->rl_flags |= RD_LIST_F_UNIQUE;
208         rd_list_init(&conf->interceptors.on_destroy, 0,
209                      rd_kafka_interceptor_method_destroy)
210                 ->rl_flags |= RD_LIST_F_UNIQUE;
211         rd_list_init(&conf->interceptors.on_send, 0,
212                      rd_kafka_interceptor_method_destroy)
213                 ->rl_flags |= RD_LIST_F_UNIQUE;
214         rd_list_init(&conf->interceptors.on_acknowledgement, 0,
215                      rd_kafka_interceptor_method_destroy)
216                 ->rl_flags |= RD_LIST_F_UNIQUE;
217         rd_list_init(&conf->interceptors.on_consume, 0,
218                      rd_kafka_interceptor_method_destroy)
219                 ->rl_flags |= RD_LIST_F_UNIQUE;
220         rd_list_init(&conf->interceptors.on_commit, 0,
221                      rd_kafka_interceptor_method_destroy)
222                 ->rl_flags |= RD_LIST_F_UNIQUE;
223         rd_list_init(&conf->interceptors.on_request_sent, 0,
224                      rd_kafka_interceptor_method_destroy)
225                 ->rl_flags |= RD_LIST_F_UNIQUE;
226         rd_list_init(&conf->interceptors.on_response_received, 0,
227                      rd_kafka_interceptor_method_destroy)
228                 ->rl_flags |= RD_LIST_F_UNIQUE;
229         rd_list_init(&conf->interceptors.on_thread_start, 0,
230                      rd_kafka_interceptor_method_destroy)
231                 ->rl_flags |= RD_LIST_F_UNIQUE;
232         rd_list_init(&conf->interceptors.on_thread_exit, 0,
233                      rd_kafka_interceptor_method_destroy)
234                 ->rl_flags |= RD_LIST_F_UNIQUE;
235 
236         /* Interceptor config */
237         rd_list_init(&conf->interceptors.config, 0,
238                      (void (*)(void *))rd_strtup_destroy);
239 }
240 
241 
242 
243 
244 /**
245  * @name Configuration backend
246  */
247 
248 
249 /**
250  * @brief Constructor called when configuration object is created.
251  */
rd_kafka_conf_interceptor_ctor(int scope,void * pconf)252 void rd_kafka_conf_interceptor_ctor (int scope, void *pconf) {
253         rd_kafka_conf_t *conf = pconf;
254         assert(scope == _RK_GLOBAL);
255         rd_kafka_interceptors_init(conf);
256 }
257 
258 /**
259  * @brief Destructor called when configuration object is destroyed.
260  */
rd_kafka_conf_interceptor_dtor(int scope,void * pconf)261 void rd_kafka_conf_interceptor_dtor (int scope, void *pconf) {
262         rd_kafka_conf_t *conf = pconf;
263         assert(scope == _RK_GLOBAL);
264         rd_kafka_interceptors_destroy(conf);
265 }
266 
267 /**
268  * @brief Copy-constructor called when configuration object \p psrcp is
269  *        duplicated to \p dstp.
270  * @remark Interceptors are NOT copied, but interceptor config is.
271  *
272  */
rd_kafka_conf_interceptor_copy(int scope,void * pdst,const void * psrc,void * dstptr,const void * srcptr,size_t filter_cnt,const char ** filter)273 void rd_kafka_conf_interceptor_copy (int scope, void *pdst, const void *psrc,
274                                      void *dstptr, const void *srcptr,
275                                      size_t filter_cnt, const char **filter) {
276         rd_kafka_conf_t *dconf = pdst;
277         const rd_kafka_conf_t *sconf = psrc;
278         int i;
279         const rd_strtup_t *confval;
280 
281         assert(scope == _RK_GLOBAL);
282 
283         /* Apply interceptor configuration values.
284          * on_conf_dup() has already been called for dconf so
285          * on_conf_set() interceptors are already in place and we can
286          * apply the configuration through the standard conf_set() API. */
287         RD_LIST_FOREACH(confval, &sconf->interceptors.config, i) {
288                 size_t fi;
289                 size_t nlen = strlen(confval->name);
290 
291                 /* Apply filter */
292                 for (fi = 0 ; fi < filter_cnt ; fi++) {
293                         size_t flen = strlen(filter[fi]);
294                         if (nlen >= flen && !strncmp(filter[fi], confval->name,
295                                                      flen))
296                                 break;
297                 }
298 
299                 if (fi < filter_cnt)
300                         continue; /* Filter matched: ignore property. */
301 
302                 /* Ignore errors for now */
303                 rd_kafka_conf_set(dconf, confval->name, confval->value,
304                                   NULL, 0);
305         }
306 }
307 
308 
309 
310 
311 /**
312  * @brief Call interceptor on_conf_set methods.
313  * @locality application thread calling rd_kafka_conf_set() and
314  *           rd_kafka_conf_dup()
315  */
316 rd_kafka_conf_res_t
rd_kafka_interceptors_on_conf_set(rd_kafka_conf_t * conf,const char * name,const char * val,char * errstr,size_t errstr_size)317 rd_kafka_interceptors_on_conf_set (rd_kafka_conf_t *conf,
318                                    const char *name, const char *val,
319                                    char *errstr, size_t errstr_size) {
320         rd_kafka_interceptor_method_t *method;
321         int i;
322 
323         RD_LIST_FOREACH(method, &conf->interceptors.on_conf_set, i) {
324                 rd_kafka_conf_res_t res;
325 
326                 res = method->u.on_conf_set(conf,
327                                             name, val, errstr, errstr_size,
328                                             method->ic_opaque);
329                 if (res == RD_KAFKA_CONF_UNKNOWN)
330                         continue;
331 
332                 /* Add successfully handled properties to list of
333                  * interceptor config properties so conf_t objects
334                  * can be copied. */
335                 if (res == RD_KAFKA_CONF_OK)
336                         rd_list_add(&conf->interceptors.config,
337                                     rd_strtup_new(name, val));
338                 return res;
339         }
340 
341         return RD_KAFKA_CONF_UNKNOWN;
342 }
343 
344 /**
345  * @brief Call interceptor on_conf_dup methods.
346  * @locality application thread calling rd_kafka_conf_dup()
347  */
348 void
rd_kafka_interceptors_on_conf_dup(rd_kafka_conf_t * new_conf,const rd_kafka_conf_t * old_conf,size_t filter_cnt,const char ** filter)349 rd_kafka_interceptors_on_conf_dup (rd_kafka_conf_t *new_conf,
350                                    const rd_kafka_conf_t *old_conf,
351                                    size_t filter_cnt, const char **filter) {
352         rd_kafka_interceptor_method_t *method;
353         int i;
354 
355         RD_LIST_FOREACH(method, &old_conf->interceptors.on_conf_dup, i) {
356                 /* FIXME: Ignore error for now */
357                 method->u.on_conf_dup(new_conf, old_conf,
358                                       filter_cnt, filter, method->ic_opaque);
359         }
360 }
361 
362 
363 /**
364  * @brief Call interceptor on_conf_destroy methods.
365  * @locality application thread calling rd_kafka_conf_destroy(), rd_kafka_new(),
366  *           rd_kafka_destroy()
367  */
368 void
rd_kafka_interceptors_on_conf_destroy(rd_kafka_conf_t * conf)369 rd_kafka_interceptors_on_conf_destroy (rd_kafka_conf_t *conf) {
370         rd_kafka_interceptor_method_t *method;
371         int i;
372 
373         RD_LIST_FOREACH(method, &conf->interceptors.on_conf_destroy, i) {
374                 /* FIXME: Ignore error for now */
375                 method->u.on_conf_destroy(method->ic_opaque);
376         }
377 }
378 
379 
380 /**
381  * @brief Call interceptor on_new methods.
382  * @locality application thread calling rd_kafka_new()
383  */
384 void
rd_kafka_interceptors_on_new(rd_kafka_t * rk,const rd_kafka_conf_t * conf)385 rd_kafka_interceptors_on_new (rd_kafka_t *rk, const rd_kafka_conf_t *conf) {
386         rd_kafka_interceptor_method_t *method;
387         int i;
388         char errstr[512];
389 
390         RD_LIST_FOREACH(method, &rk->rk_conf.interceptors.on_new, i) {
391                 rd_kafka_resp_err_t err;
392 
393                 err = method->u.on_new(rk, conf, method->ic_opaque,
394                                        errstr, sizeof(errstr));
395                 if (unlikely(err))
396                         rd_kafka_interceptor_failed(rk, method, "on_new", err,
397                                                     NULL, errstr);
398         }
399 }
400 
401 
402 
403 /**
404  * @brief Call interceptor on_destroy methods.
405  * @locality application thread calling rd_kafka_new() or rd_kafka_destroy()
406  */
407 void
rd_kafka_interceptors_on_destroy(rd_kafka_t * rk)408 rd_kafka_interceptors_on_destroy (rd_kafka_t *rk) {
409         rd_kafka_interceptor_method_t *method;
410         int i;
411 
412         RD_LIST_FOREACH(method, &rk->rk_conf.interceptors.on_destroy, i) {
413                 rd_kafka_resp_err_t err;
414 
415                 err = method->u.on_destroy(rk, method->ic_opaque);
416                 if (unlikely(err))
417                         rd_kafka_interceptor_failed(rk, method, "on_destroy",
418                                                     err, NULL, NULL);
419         }
420 }
421 
422 
423 
424 /**
425  * @brief Call interceptor on_send methods.
426  * @locality application thread calling produce()
427  */
428 void
rd_kafka_interceptors_on_send(rd_kafka_t * rk,rd_kafka_message_t * rkmessage)429 rd_kafka_interceptors_on_send (rd_kafka_t *rk, rd_kafka_message_t *rkmessage) {
430         rd_kafka_interceptor_method_t *method;
431         int i;
432 
433         RD_LIST_FOREACH(method, &rk->rk_conf.interceptors.on_send, i) {
434                 rd_kafka_resp_err_t err;
435 
436                 err = method->u.on_send(rk, rkmessage, method->ic_opaque);
437                 if (unlikely(err))
438                         rd_kafka_interceptor_failed(rk, method, "on_send", err,
439                                                     rkmessage, NULL);
440         }
441 }
442 
443 
444 
445 /**
446  * @brief Call interceptor on_acknowledgement methods.
447  * @locality application thread calling poll(), or the broker thread if
448  *           if dr callback has been set.
449  */
450 void
rd_kafka_interceptors_on_acknowledgement(rd_kafka_t * rk,rd_kafka_message_t * rkmessage)451 rd_kafka_interceptors_on_acknowledgement (rd_kafka_t *rk,
452                                           rd_kafka_message_t *rkmessage) {
453         rd_kafka_interceptor_method_t *method;
454         int i;
455 
456         RD_LIST_FOREACH(method,
457                         &rk->rk_conf.interceptors.on_acknowledgement, i) {
458                 rd_kafka_resp_err_t err;
459 
460                 err = method->u.on_acknowledgement(rk, rkmessage,
461                                                    method->ic_opaque);
462                 if (unlikely(err))
463                         rd_kafka_interceptor_failed(rk, method,
464                                                     "on_acknowledgement", err,
465                                                     rkmessage, NULL);
466         }
467 }
468 
469 
470 /**
471  * @brief Call on_acknowledgement methods for all messages in queue.
472  *
473  * @param force_err If non-zero, sets this error on each message.
474  *
475  * @locality broker thread
476  */
477 void
rd_kafka_interceptors_on_acknowledgement_queue(rd_kafka_t * rk,rd_kafka_msgq_t * rkmq,rd_kafka_resp_err_t force_err)478 rd_kafka_interceptors_on_acknowledgement_queue (rd_kafka_t *rk,
479                                                 rd_kafka_msgq_t *rkmq,
480                                                 rd_kafka_resp_err_t force_err) {
481         rd_kafka_msg_t *rkm;
482 
483         RD_KAFKA_MSGQ_FOREACH(rkm, rkmq) {
484                 if (force_err)
485                         rkm->rkm_err = force_err;
486                 rd_kafka_interceptors_on_acknowledgement(rk,
487                                                          &rkm->rkm_rkmessage);
488         }
489 }
490 
491 
492 /**
493  * @brief Call interceptor on_consume methods.
494  * @locality application thread calling poll(), consume() or similar prior to
495  *           passing the message to the application.
496  */
497 void
rd_kafka_interceptors_on_consume(rd_kafka_t * rk,rd_kafka_message_t * rkmessage)498 rd_kafka_interceptors_on_consume (rd_kafka_t *rk,
499                                   rd_kafka_message_t *rkmessage) {
500         rd_kafka_interceptor_method_t *method;
501         int i;
502 
503         RD_LIST_FOREACH(method, &rk->rk_conf.interceptors.on_consume, i) {
504                 rd_kafka_resp_err_t err;
505 
506                 err = method->u.on_consume(rk, rkmessage,
507                                                    method->ic_opaque);
508                 if (unlikely(err))
509                         rd_kafka_interceptor_failed(rk, method,
510                                                     "on_consume", err,
511                                                     rkmessage, NULL);
512         }
513 }
514 
515 
516 /**
517  * @brief Call interceptor on_commit methods.
518  * @locality application thread calling poll(), consume() or similar,
519  *           or rdkafka main thread if no commit_cb or handler registered.
520  */
521 void
rd_kafka_interceptors_on_commit(rd_kafka_t * rk,const rd_kafka_topic_partition_list_t * offsets,rd_kafka_resp_err_t err)522 rd_kafka_interceptors_on_commit (rd_kafka_t *rk,
523                                  const rd_kafka_topic_partition_list_t *offsets,
524                                  rd_kafka_resp_err_t err) {
525         rd_kafka_interceptor_method_t *method;
526         int i;
527 
528         RD_LIST_FOREACH(method, &rk->rk_conf.interceptors.on_commit, i) {
529                 rd_kafka_resp_err_t ic_err;
530 
531                 ic_err = method->u.on_commit(rk, offsets, err,
532                                              method->ic_opaque);
533                 if (unlikely(ic_err))
534                         rd_kafka_interceptor_failed(rk, method,
535                                                     "on_commit", ic_err, NULL,
536                                                     NULL);
537         }
538 }
539 
540 
541 /**
542  * @brief Call interceptor on_request_sent methods
543  * @locality internal broker thread
544  */
rd_kafka_interceptors_on_request_sent(rd_kafka_t * rk,int sockfd,const char * brokername,int32_t brokerid,int16_t ApiKey,int16_t ApiVersion,int32_t CorrId,size_t size)545 void rd_kafka_interceptors_on_request_sent (rd_kafka_t *rk,
546                                             int sockfd,
547                                             const char *brokername,
548                                             int32_t brokerid,
549                                             int16_t ApiKey,
550                                             int16_t ApiVersion,
551                                             int32_t CorrId,
552                                             size_t  size) {
553         rd_kafka_interceptor_method_t *method;
554         int i;
555 
556         RD_LIST_FOREACH(method, &rk->rk_conf.interceptors.on_request_sent, i) {
557                 rd_kafka_resp_err_t ic_err;
558 
559                 ic_err = method->u.on_request_sent(rk,
560                                                    sockfd,
561                                                    brokername,
562                                                    brokerid,
563                                                    ApiKey,
564                                                    ApiVersion,
565                                                    CorrId,
566                                                    size,
567                                                    method->ic_opaque);
568                 if (unlikely(ic_err))
569                         rd_kafka_interceptor_failed(rk, method,
570                                                     "on_request_sent",
571                                                     ic_err, NULL, NULL);
572         }
573 }
574 
575 
576 /**
577  * @brief Call interceptor on_response_received methods
578  * @locality internal broker thread
579  */
rd_kafka_interceptors_on_response_received(rd_kafka_t * rk,int sockfd,const char * brokername,int32_t brokerid,int16_t ApiKey,int16_t ApiVersion,int32_t CorrId,size_t size,int64_t rtt,rd_kafka_resp_err_t err)580 void rd_kafka_interceptors_on_response_received (rd_kafka_t *rk,
581                                                  int sockfd,
582                                                  const char *brokername,
583                                                  int32_t brokerid,
584                                                  int16_t ApiKey,
585                                                  int16_t ApiVersion,
586                                                  int32_t CorrId,
587                                                  size_t  size,
588                                                  int64_t rtt,
589                                                  rd_kafka_resp_err_t err) {
590         rd_kafka_interceptor_method_t *method;
591         int i;
592 
593         RD_LIST_FOREACH(method, &rk->rk_conf.interceptors.on_response_received,
594                         i) {
595                 rd_kafka_resp_err_t ic_err;
596 
597                 ic_err = method->u.on_response_received(rk,
598                                                         sockfd,
599                                                         brokername,
600                                                         brokerid,
601                                                         ApiKey,
602                                                         ApiVersion,
603                                                         CorrId,
604                                                         size,
605                                                         rtt,
606                                                         err,
607                                                         method->ic_opaque);
608                 if (unlikely(ic_err))
609                         rd_kafka_interceptor_failed(rk, method,
610                                                     "on_response_received",
611                                                     ic_err, NULL, NULL);
612         }
613 }
614 
615 
616 void
rd_kafka_interceptors_on_thread_start(rd_kafka_t * rk,rd_kafka_thread_type_t thread_type)617 rd_kafka_interceptors_on_thread_start (rd_kafka_t *rk,
618                                        rd_kafka_thread_type_t thread_type) {
619         rd_kafka_interceptor_method_t *method;
620         int i;
621 
622         RD_LIST_FOREACH(method, &rk->rk_conf.interceptors.on_thread_start, i) {
623                 rd_kafka_resp_err_t ic_err;
624 
625                 ic_err = method->u.on_thread_start(rk,
626                                                    thread_type,
627                                                    rd_kafka_thread_name,
628                                                    method->ic_opaque);
629                 if (unlikely(ic_err))
630                         rd_kafka_interceptor_failed(rk, method,
631                                                     "on_thread_start",
632                                                     ic_err, NULL, NULL);
633         }
634 }
635 
636 
rd_kafka_interceptors_on_thread_exit(rd_kafka_t * rk,rd_kafka_thread_type_t thread_type)637 void rd_kafka_interceptors_on_thread_exit (rd_kafka_t *rk,
638                                            rd_kafka_thread_type_t thread_type) {
639         rd_kafka_interceptor_method_t *method;
640         int i;
641 
642         RD_LIST_FOREACH(method, &rk->rk_conf.interceptors.on_thread_exit, i) {
643                 rd_kafka_resp_err_t ic_err;
644 
645                 ic_err = method->u.on_thread_exit(rk,
646                                                   thread_type,
647                                                   rd_kafka_thread_name,
648                                                   method->ic_opaque);
649                 if (unlikely(ic_err))
650                         rd_kafka_interceptor_failed(rk, method,
651                                                     "on_thread_exit",
652                                                     ic_err, NULL, NULL);
653         }
654 }
655 
656 
657 
658 /**
659  * @name Public API (backend)
660  * @{
661  */
662 
663 
664 rd_kafka_resp_err_t
rd_kafka_conf_interceptor_add_on_conf_set(rd_kafka_conf_t * conf,const char * ic_name,rd_kafka_interceptor_f_on_conf_set_t * on_conf_set,void * ic_opaque)665 rd_kafka_conf_interceptor_add_on_conf_set (
666         rd_kafka_conf_t *conf, const char *ic_name,
667         rd_kafka_interceptor_f_on_conf_set_t *on_conf_set,
668         void *ic_opaque) {
669         return rd_kafka_interceptor_method_add(&conf->interceptors.on_conf_set,
670                                                ic_name, (void *)on_conf_set,
671                                                ic_opaque);
672 }
673 
674 rd_kafka_resp_err_t
rd_kafka_conf_interceptor_add_on_conf_dup(rd_kafka_conf_t * conf,const char * ic_name,rd_kafka_interceptor_f_on_conf_dup_t * on_conf_dup,void * ic_opaque)675 rd_kafka_conf_interceptor_add_on_conf_dup (
676         rd_kafka_conf_t *conf, const char *ic_name,
677         rd_kafka_interceptor_f_on_conf_dup_t *on_conf_dup,
678         void *ic_opaque) {
679         return rd_kafka_interceptor_method_add(&conf->interceptors.on_conf_dup,
680                                                ic_name, (void *)on_conf_dup,
681                                                ic_opaque);
682 }
683 
684 rd_kafka_resp_err_t
rd_kafka_conf_interceptor_add_on_conf_destroy(rd_kafka_conf_t * conf,const char * ic_name,rd_kafka_interceptor_f_on_conf_destroy_t * on_conf_destroy,void * ic_opaque)685 rd_kafka_conf_interceptor_add_on_conf_destroy (
686         rd_kafka_conf_t *conf, const char *ic_name,
687         rd_kafka_interceptor_f_on_conf_destroy_t *on_conf_destroy,
688         void *ic_opaque) {
689         return rd_kafka_interceptor_method_add(&conf->interceptors.on_conf_destroy,
690                                                ic_name, (void *)on_conf_destroy,
691                                                ic_opaque);
692 }
693 
694 
695 
696 rd_kafka_resp_err_t
rd_kafka_conf_interceptor_add_on_new(rd_kafka_conf_t * conf,const char * ic_name,rd_kafka_interceptor_f_on_new_t * on_new,void * ic_opaque)697 rd_kafka_conf_interceptor_add_on_new (
698         rd_kafka_conf_t *conf, const char *ic_name,
699         rd_kafka_interceptor_f_on_new_t *on_new,
700         void *ic_opaque) {
701         return rd_kafka_interceptor_method_add(&conf->interceptors.on_new,
702                                                ic_name, (void *)on_new,
703                                                ic_opaque);
704 }
705 
706 
707 rd_kafka_resp_err_t
rd_kafka_interceptor_add_on_destroy(rd_kafka_t * rk,const char * ic_name,rd_kafka_interceptor_f_on_destroy_t * on_destroy,void * ic_opaque)708 rd_kafka_interceptor_add_on_destroy (
709         rd_kafka_t *rk, const char *ic_name,
710         rd_kafka_interceptor_f_on_destroy_t *on_destroy,
711         void *ic_opaque) {
712         assert(!rk->rk_initialized);
713         return rd_kafka_interceptor_method_add(&rk->rk_conf.interceptors.on_destroy,
714                                                ic_name, (void *)on_destroy,
715                                                ic_opaque);
716 }
717 
718 rd_kafka_resp_err_t
rd_kafka_interceptor_add_on_send(rd_kafka_t * rk,const char * ic_name,rd_kafka_interceptor_f_on_send_t * on_send,void * ic_opaque)719 rd_kafka_interceptor_add_on_send (
720         rd_kafka_t *rk, const char *ic_name,
721         rd_kafka_interceptor_f_on_send_t *on_send,
722         void *ic_opaque) {
723         assert(!rk->rk_initialized);
724         return rd_kafka_interceptor_method_add(&rk->rk_conf.interceptors.on_send,
725                                                ic_name, (void *)on_send,
726                                                ic_opaque);
727 }
728 
729 rd_kafka_resp_err_t
rd_kafka_interceptor_add_on_acknowledgement(rd_kafka_t * rk,const char * ic_name,rd_kafka_interceptor_f_on_acknowledgement_t * on_acknowledgement,void * ic_opaque)730 rd_kafka_interceptor_add_on_acknowledgement (
731         rd_kafka_t *rk, const char *ic_name,
732         rd_kafka_interceptor_f_on_acknowledgement_t *on_acknowledgement,
733         void *ic_opaque) {
734         assert(!rk->rk_initialized);
735         return rd_kafka_interceptor_method_add(&rk->rk_conf.interceptors.
736                                                on_acknowledgement,
737                                                ic_name,
738                                                (void *)on_acknowledgement,
739                                                ic_opaque);
740 }
741 
742 
743 rd_kafka_resp_err_t
rd_kafka_interceptor_add_on_consume(rd_kafka_t * rk,const char * ic_name,rd_kafka_interceptor_f_on_consume_t * on_consume,void * ic_opaque)744 rd_kafka_interceptor_add_on_consume (
745         rd_kafka_t *rk, const char *ic_name,
746         rd_kafka_interceptor_f_on_consume_t *on_consume,
747         void *ic_opaque) {
748         assert(!rk->rk_initialized);
749         return rd_kafka_interceptor_method_add(&rk->rk_conf.interceptors.
750                                                on_consume,
751                                                ic_name, (void *)on_consume,
752                                                ic_opaque);
753 }
754 
755 
756 rd_kafka_resp_err_t
rd_kafka_interceptor_add_on_commit(rd_kafka_t * rk,const char * ic_name,rd_kafka_interceptor_f_on_commit_t * on_commit,void * ic_opaque)757 rd_kafka_interceptor_add_on_commit (
758         rd_kafka_t *rk, const char *ic_name,
759         rd_kafka_interceptor_f_on_commit_t *on_commit,
760         void *ic_opaque) {
761         assert(!rk->rk_initialized);
762         return rd_kafka_interceptor_method_add(&rk->rk_conf.interceptors.
763                                                on_commit,
764                                                ic_name, (void *)on_commit,
765                                                ic_opaque);
766 }
767 
768 
769 rd_kafka_resp_err_t
rd_kafka_interceptor_add_on_request_sent(rd_kafka_t * rk,const char * ic_name,rd_kafka_interceptor_f_on_request_sent_t * on_request_sent,void * ic_opaque)770 rd_kafka_interceptor_add_on_request_sent (
771         rd_kafka_t *rk, const char *ic_name,
772         rd_kafka_interceptor_f_on_request_sent_t *on_request_sent,
773         void *ic_opaque) {
774         assert(!rk->rk_initialized);
775         return rd_kafka_interceptor_method_add(&rk->rk_conf.interceptors.
776                                                on_request_sent,
777                                                ic_name, (void *)on_request_sent,
778                                                ic_opaque);
779 }
780 
781 
782 rd_kafka_resp_err_t
rd_kafka_interceptor_add_on_response_received(rd_kafka_t * rk,const char * ic_name,rd_kafka_interceptor_f_on_response_received_t * on_response_received,void * ic_opaque)783 rd_kafka_interceptor_add_on_response_received (
784         rd_kafka_t *rk, const char *ic_name,
785         rd_kafka_interceptor_f_on_response_received_t *on_response_received,
786         void *ic_opaque) {
787         assert(!rk->rk_initialized);
788         return rd_kafka_interceptor_method_add(&rk->rk_conf.interceptors.
789                                                on_response_received,
790                                                ic_name,
791                                                (void *)on_response_received,
792                                                ic_opaque);
793 }
794 
795 
796 rd_kafka_resp_err_t
rd_kafka_interceptor_add_on_thread_start(rd_kafka_t * rk,const char * ic_name,rd_kafka_interceptor_f_on_thread_start_t * on_thread_start,void * ic_opaque)797 rd_kafka_interceptor_add_on_thread_start (
798         rd_kafka_t *rk, const char *ic_name,
799         rd_kafka_interceptor_f_on_thread_start_t *on_thread_start,
800         void *ic_opaque) {
801         assert(!rk->rk_initialized);
802         return rd_kafka_interceptor_method_add(&rk->rk_conf.interceptors.
803                                                on_thread_start,
804                                                ic_name,
805                                                (void *)on_thread_start,
806                                                ic_opaque);
807 }
808 
809 
810 rd_kafka_resp_err_t
rd_kafka_interceptor_add_on_thread_exit(rd_kafka_t * rk,const char * ic_name,rd_kafka_interceptor_f_on_thread_exit_t * on_thread_exit,void * ic_opaque)811 rd_kafka_interceptor_add_on_thread_exit (
812         rd_kafka_t *rk, const char *ic_name,
813         rd_kafka_interceptor_f_on_thread_exit_t *on_thread_exit,
814         void *ic_opaque) {
815         assert(!rk->rk_initialized);
816         return rd_kafka_interceptor_method_add(&rk->rk_conf.interceptors.
817                                                on_thread_exit,
818                                                ic_name,
819                                                (void *)on_thread_exit,
820                                                ic_opaque);
821 }
822