1 /*
2 * librdkafka - The Apache Kafka C/C++ library
3 *
4 * Copyright (c) 2017 Magnus Edenhill
5 * All rights reserved.
6 *
7 * Redistribution and use in source and binary forms, with or without
8 * modification, are permitted provided that the following conditions are met:
9 *
10 * 1. Redistributions of source code must retain the above copyright notice,
11 * this list of conditions and the following disclaimer.
12 * 2. Redistributions in binary form must reproduce the above copyright notice,
13 * this list of conditions and the following disclaimer in the documentation
14 * and/or other materials provided with the distribution.
15 *
16 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
17 * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
18 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
19 * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
20 * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
21 * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
22 * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
23 * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
24 * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
25 * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
26 * POSSIBILITY OF SUCH DAMAGE.
27 */
28
29 #include "rdkafka_int.h"
30 #include "rdkafka_interceptor.h"
31 #include "rdstring.h"
32
33 /**
34 * @brief Interceptor methodtion/method reference
35 */
36 typedef struct rd_kafka_interceptor_method_s {
37 union {
38 rd_kafka_interceptor_f_on_conf_set_t *on_conf_set;
39 rd_kafka_interceptor_f_on_conf_dup_t *on_conf_dup;
40 rd_kafka_interceptor_f_on_conf_destroy_t *on_conf_destroy;
41 rd_kafka_interceptor_f_on_new_t *on_new;
42 rd_kafka_interceptor_f_on_destroy_t *on_destroy;
43 rd_kafka_interceptor_f_on_send_t *on_send;
44 rd_kafka_interceptor_f_on_acknowledgement_t *on_acknowledgement;
45 rd_kafka_interceptor_f_on_consume_t *on_consume;
46 rd_kafka_interceptor_f_on_commit_t *on_commit;
47 rd_kafka_interceptor_f_on_request_sent_t *on_request_sent;
48 rd_kafka_interceptor_f_on_response_received_t
49 *on_response_received;
50 rd_kafka_interceptor_f_on_thread_start_t *on_thread_start;
51 rd_kafka_interceptor_f_on_thread_exit_t *on_thread_exit;
52 void *generic; /* For easy assignment */
53
54 } u;
55 char *ic_name;
56 void *ic_opaque;
57 } rd_kafka_interceptor_method_t;
58
59 /**
60 * @brief Destroy interceptor methodtion reference
61 */
62 static void
rd_kafka_interceptor_method_destroy(void * ptr)63 rd_kafka_interceptor_method_destroy (void *ptr) {
64 rd_kafka_interceptor_method_t *method = ptr;
65 rd_free(method->ic_name);
66 rd_free(method);
67 }
68
69
70
71
72
73 /**
74 * @brief Handle an interceptor on_... methodtion call failures.
75 */
76 static RD_INLINE void
rd_kafka_interceptor_failed(rd_kafka_t * rk,const rd_kafka_interceptor_method_t * method,const char * method_name,rd_kafka_resp_err_t err,const rd_kafka_message_t * rkmessage,const char * errstr)77 rd_kafka_interceptor_failed (rd_kafka_t *rk,
78 const rd_kafka_interceptor_method_t *method,
79 const char *method_name, rd_kafka_resp_err_t err,
80 const rd_kafka_message_t *rkmessage,
81 const char *errstr) {
82
83 /* FIXME: Suppress log messages, eventually */
84 if (rkmessage)
85 rd_kafka_log(rk, LOG_WARNING, "ICFAIL",
86 "Interceptor %s failed %s for "
87 "message on %s [%"PRId32"] @ %"PRId64
88 ": %s%s%s",
89 method->ic_name, method_name,
90 rd_kafka_topic_name(rkmessage->rkt),
91 rkmessage->partition,
92 rkmessage->offset,
93 rd_kafka_err2str(err),
94 errstr ? ": " : "",
95 errstr ? errstr : "");
96 else
97 rd_kafka_log(rk, LOG_WARNING, "ICFAIL",
98 "Interceptor %s failed %s: %s%s%s",
99 method->ic_name, method_name,
100 rd_kafka_err2str(err),
101 errstr ? ": " : "",
102 errstr ? errstr : "");
103
104 }
105
106
107
108 /**
109 * @brief Create interceptor method reference.
110 * Duplicates are rejected
111 */
112 static rd_kafka_interceptor_method_t *
rd_kafka_interceptor_method_new(const char * ic_name,void * func,void * ic_opaque)113 rd_kafka_interceptor_method_new (const char *ic_name,
114 void *func, void *ic_opaque) {
115 rd_kafka_interceptor_method_t *method;
116
117 method = rd_calloc(1, sizeof(*method));
118 method->ic_name = rd_strdup(ic_name);
119 method->ic_opaque = ic_opaque;
120 method->u.generic = func;
121
122 return method;
123 }
124
125
126 /**
127 * @brief Method comparator to be used for finding, not sorting.
128 */
rd_kafka_interceptor_method_cmp(const void * _a,const void * _b)129 static int rd_kafka_interceptor_method_cmp (const void *_a, const void *_b) {
130 const rd_kafka_interceptor_method_t *a = _a, *b = _b;
131
132 if (a->u.generic != b->u.generic)
133 return -1;
134
135 return strcmp(a->ic_name, b->ic_name);
136 }
137
138 /**
139 * @brief Add interceptor method reference
140 */
141 static rd_kafka_resp_err_t
rd_kafka_interceptor_method_add(rd_list_t * list,const char * ic_name,void * func,void * ic_opaque)142 rd_kafka_interceptor_method_add (rd_list_t *list, const char *ic_name,
143 void *func, void *ic_opaque) {
144 rd_kafka_interceptor_method_t *method;
145 const rd_kafka_interceptor_method_t skel = {
146 .ic_name = (char *)ic_name,
147 .u = { .generic = func }
148 };
149
150 /* Reject same method from same interceptor.
151 * This is needed to avoid duplicate interceptors when configuration
152 * objects are duplicated.
153 * An exception is made for lists with _F_UNIQUE, which is currently
154 * only on_conf_destroy() to allow interceptor cleanup. */
155 if ((list->rl_flags & RD_LIST_F_UNIQUE) &&
156 rd_list_find(list, &skel, rd_kafka_interceptor_method_cmp))
157 return RD_KAFKA_RESP_ERR__CONFLICT;
158
159 method = rd_kafka_interceptor_method_new(ic_name, func, ic_opaque);
160 rd_list_add(list, method);
161
162 return RD_KAFKA_RESP_ERR_NO_ERROR;
163 }
164
165 /**
166 * @brief Destroy all interceptors
167 * @locality application thread calling rd_kafka_conf_destroy() or
168 * rd_kafka_destroy()
169 */
rd_kafka_interceptors_destroy(rd_kafka_conf_t * conf)170 void rd_kafka_interceptors_destroy (rd_kafka_conf_t *conf) {
171 rd_list_destroy(&conf->interceptors.on_conf_set);
172 rd_list_destroy(&conf->interceptors.on_conf_dup);
173 rd_list_destroy(&conf->interceptors.on_conf_destroy);
174 rd_list_destroy(&conf->interceptors.on_new);
175 rd_list_destroy(&conf->interceptors.on_destroy);
176 rd_list_destroy(&conf->interceptors.on_send);
177 rd_list_destroy(&conf->interceptors.on_acknowledgement);
178 rd_list_destroy(&conf->interceptors.on_consume);
179 rd_list_destroy(&conf->interceptors.on_commit);
180 rd_list_destroy(&conf->interceptors.on_request_sent);
181 rd_list_destroy(&conf->interceptors.on_response_received);
182 rd_list_destroy(&conf->interceptors.on_thread_start);
183 rd_list_destroy(&conf->interceptors.on_thread_exit);
184
185 /* Interceptor config */
186 rd_list_destroy(&conf->interceptors.config);
187 }
188
189
190 /**
191 * @brief Initialize interceptor sub-system for config object.
192 * @locality application thread
193 */
194 static void
rd_kafka_interceptors_init(rd_kafka_conf_t * conf)195 rd_kafka_interceptors_init (rd_kafka_conf_t *conf) {
196 rd_list_init(&conf->interceptors.on_conf_set, 0,
197 rd_kafka_interceptor_method_destroy)
198 ->rl_flags |= RD_LIST_F_UNIQUE;
199 rd_list_init(&conf->interceptors.on_conf_dup, 0,
200 rd_kafka_interceptor_method_destroy)
201 ->rl_flags |= RD_LIST_F_UNIQUE;
202 /* conf_destroy() allows duplicates entries. */
203 rd_list_init(&conf->interceptors.on_conf_destroy, 0,
204 rd_kafka_interceptor_method_destroy);
205 rd_list_init(&conf->interceptors.on_new, 0,
206 rd_kafka_interceptor_method_destroy)
207 ->rl_flags |= RD_LIST_F_UNIQUE;
208 rd_list_init(&conf->interceptors.on_destroy, 0,
209 rd_kafka_interceptor_method_destroy)
210 ->rl_flags |= RD_LIST_F_UNIQUE;
211 rd_list_init(&conf->interceptors.on_send, 0,
212 rd_kafka_interceptor_method_destroy)
213 ->rl_flags |= RD_LIST_F_UNIQUE;
214 rd_list_init(&conf->interceptors.on_acknowledgement, 0,
215 rd_kafka_interceptor_method_destroy)
216 ->rl_flags |= RD_LIST_F_UNIQUE;
217 rd_list_init(&conf->interceptors.on_consume, 0,
218 rd_kafka_interceptor_method_destroy)
219 ->rl_flags |= RD_LIST_F_UNIQUE;
220 rd_list_init(&conf->interceptors.on_commit, 0,
221 rd_kafka_interceptor_method_destroy)
222 ->rl_flags |= RD_LIST_F_UNIQUE;
223 rd_list_init(&conf->interceptors.on_request_sent, 0,
224 rd_kafka_interceptor_method_destroy)
225 ->rl_flags |= RD_LIST_F_UNIQUE;
226 rd_list_init(&conf->interceptors.on_response_received, 0,
227 rd_kafka_interceptor_method_destroy)
228 ->rl_flags |= RD_LIST_F_UNIQUE;
229 rd_list_init(&conf->interceptors.on_thread_start, 0,
230 rd_kafka_interceptor_method_destroy)
231 ->rl_flags |= RD_LIST_F_UNIQUE;
232 rd_list_init(&conf->interceptors.on_thread_exit, 0,
233 rd_kafka_interceptor_method_destroy)
234 ->rl_flags |= RD_LIST_F_UNIQUE;
235
236 /* Interceptor config */
237 rd_list_init(&conf->interceptors.config, 0,
238 (void (*)(void *))rd_strtup_destroy);
239 }
240
241
242
243
244 /**
245 * @name Configuration backend
246 */
247
248
249 /**
250 * @brief Constructor called when configuration object is created.
251 */
rd_kafka_conf_interceptor_ctor(int scope,void * pconf)252 void rd_kafka_conf_interceptor_ctor (int scope, void *pconf) {
253 rd_kafka_conf_t *conf = pconf;
254 assert(scope == _RK_GLOBAL);
255 rd_kafka_interceptors_init(conf);
256 }
257
258 /**
259 * @brief Destructor called when configuration object is destroyed.
260 */
rd_kafka_conf_interceptor_dtor(int scope,void * pconf)261 void rd_kafka_conf_interceptor_dtor (int scope, void *pconf) {
262 rd_kafka_conf_t *conf = pconf;
263 assert(scope == _RK_GLOBAL);
264 rd_kafka_interceptors_destroy(conf);
265 }
266
267 /**
268 * @brief Copy-constructor called when configuration object \p psrcp is
269 * duplicated to \p dstp.
270 * @remark Interceptors are NOT copied, but interceptor config is.
271 *
272 */
rd_kafka_conf_interceptor_copy(int scope,void * pdst,const void * psrc,void * dstptr,const void * srcptr,size_t filter_cnt,const char ** filter)273 void rd_kafka_conf_interceptor_copy (int scope, void *pdst, const void *psrc,
274 void *dstptr, const void *srcptr,
275 size_t filter_cnt, const char **filter) {
276 rd_kafka_conf_t *dconf = pdst;
277 const rd_kafka_conf_t *sconf = psrc;
278 int i;
279 const rd_strtup_t *confval;
280
281 assert(scope == _RK_GLOBAL);
282
283 /* Apply interceptor configuration values.
284 * on_conf_dup() has already been called for dconf so
285 * on_conf_set() interceptors are already in place and we can
286 * apply the configuration through the standard conf_set() API. */
287 RD_LIST_FOREACH(confval, &sconf->interceptors.config, i) {
288 size_t fi;
289 size_t nlen = strlen(confval->name);
290
291 /* Apply filter */
292 for (fi = 0 ; fi < filter_cnt ; fi++) {
293 size_t flen = strlen(filter[fi]);
294 if (nlen >= flen && !strncmp(filter[fi], confval->name,
295 flen))
296 break;
297 }
298
299 if (fi < filter_cnt)
300 continue; /* Filter matched: ignore property. */
301
302 /* Ignore errors for now */
303 rd_kafka_conf_set(dconf, confval->name, confval->value,
304 NULL, 0);
305 }
306 }
307
308
309
310
311 /**
312 * @brief Call interceptor on_conf_set methods.
313 * @locality application thread calling rd_kafka_conf_set() and
314 * rd_kafka_conf_dup()
315 */
316 rd_kafka_conf_res_t
rd_kafka_interceptors_on_conf_set(rd_kafka_conf_t * conf,const char * name,const char * val,char * errstr,size_t errstr_size)317 rd_kafka_interceptors_on_conf_set (rd_kafka_conf_t *conf,
318 const char *name, const char *val,
319 char *errstr, size_t errstr_size) {
320 rd_kafka_interceptor_method_t *method;
321 int i;
322
323 RD_LIST_FOREACH(method, &conf->interceptors.on_conf_set, i) {
324 rd_kafka_conf_res_t res;
325
326 res = method->u.on_conf_set(conf,
327 name, val, errstr, errstr_size,
328 method->ic_opaque);
329 if (res == RD_KAFKA_CONF_UNKNOWN)
330 continue;
331
332 /* Add successfully handled properties to list of
333 * interceptor config properties so conf_t objects
334 * can be copied. */
335 if (res == RD_KAFKA_CONF_OK)
336 rd_list_add(&conf->interceptors.config,
337 rd_strtup_new(name, val));
338 return res;
339 }
340
341 return RD_KAFKA_CONF_UNKNOWN;
342 }
343
344 /**
345 * @brief Call interceptor on_conf_dup methods.
346 * @locality application thread calling rd_kafka_conf_dup()
347 */
348 void
rd_kafka_interceptors_on_conf_dup(rd_kafka_conf_t * new_conf,const rd_kafka_conf_t * old_conf,size_t filter_cnt,const char ** filter)349 rd_kafka_interceptors_on_conf_dup (rd_kafka_conf_t *new_conf,
350 const rd_kafka_conf_t *old_conf,
351 size_t filter_cnt, const char **filter) {
352 rd_kafka_interceptor_method_t *method;
353 int i;
354
355 RD_LIST_FOREACH(method, &old_conf->interceptors.on_conf_dup, i) {
356 /* FIXME: Ignore error for now */
357 method->u.on_conf_dup(new_conf, old_conf,
358 filter_cnt, filter, method->ic_opaque);
359 }
360 }
361
362
363 /**
364 * @brief Call interceptor on_conf_destroy methods.
365 * @locality application thread calling rd_kafka_conf_destroy(), rd_kafka_new(),
366 * rd_kafka_destroy()
367 */
368 void
rd_kafka_interceptors_on_conf_destroy(rd_kafka_conf_t * conf)369 rd_kafka_interceptors_on_conf_destroy (rd_kafka_conf_t *conf) {
370 rd_kafka_interceptor_method_t *method;
371 int i;
372
373 RD_LIST_FOREACH(method, &conf->interceptors.on_conf_destroy, i) {
374 /* FIXME: Ignore error for now */
375 method->u.on_conf_destroy(method->ic_opaque);
376 }
377 }
378
379
380 /**
381 * @brief Call interceptor on_new methods.
382 * @locality application thread calling rd_kafka_new()
383 */
384 void
rd_kafka_interceptors_on_new(rd_kafka_t * rk,const rd_kafka_conf_t * conf)385 rd_kafka_interceptors_on_new (rd_kafka_t *rk, const rd_kafka_conf_t *conf) {
386 rd_kafka_interceptor_method_t *method;
387 int i;
388 char errstr[512];
389
390 RD_LIST_FOREACH(method, &rk->rk_conf.interceptors.on_new, i) {
391 rd_kafka_resp_err_t err;
392
393 err = method->u.on_new(rk, conf, method->ic_opaque,
394 errstr, sizeof(errstr));
395 if (unlikely(err))
396 rd_kafka_interceptor_failed(rk, method, "on_new", err,
397 NULL, errstr);
398 }
399 }
400
401
402
403 /**
404 * @brief Call interceptor on_destroy methods.
405 * @locality application thread calling rd_kafka_new() or rd_kafka_destroy()
406 */
407 void
rd_kafka_interceptors_on_destroy(rd_kafka_t * rk)408 rd_kafka_interceptors_on_destroy (rd_kafka_t *rk) {
409 rd_kafka_interceptor_method_t *method;
410 int i;
411
412 RD_LIST_FOREACH(method, &rk->rk_conf.interceptors.on_destroy, i) {
413 rd_kafka_resp_err_t err;
414
415 err = method->u.on_destroy(rk, method->ic_opaque);
416 if (unlikely(err))
417 rd_kafka_interceptor_failed(rk, method, "on_destroy",
418 err, NULL, NULL);
419 }
420 }
421
422
423
424 /**
425 * @brief Call interceptor on_send methods.
426 * @locality application thread calling produce()
427 */
428 void
rd_kafka_interceptors_on_send(rd_kafka_t * rk,rd_kafka_message_t * rkmessage)429 rd_kafka_interceptors_on_send (rd_kafka_t *rk, rd_kafka_message_t *rkmessage) {
430 rd_kafka_interceptor_method_t *method;
431 int i;
432
433 RD_LIST_FOREACH(method, &rk->rk_conf.interceptors.on_send, i) {
434 rd_kafka_resp_err_t err;
435
436 err = method->u.on_send(rk, rkmessage, method->ic_opaque);
437 if (unlikely(err))
438 rd_kafka_interceptor_failed(rk, method, "on_send", err,
439 rkmessage, NULL);
440 }
441 }
442
443
444
445 /**
446 * @brief Call interceptor on_acknowledgement methods.
447 * @locality application thread calling poll(), or the broker thread if
448 * if dr callback has been set.
449 */
450 void
rd_kafka_interceptors_on_acknowledgement(rd_kafka_t * rk,rd_kafka_message_t * rkmessage)451 rd_kafka_interceptors_on_acknowledgement (rd_kafka_t *rk,
452 rd_kafka_message_t *rkmessage) {
453 rd_kafka_interceptor_method_t *method;
454 int i;
455
456 RD_LIST_FOREACH(method,
457 &rk->rk_conf.interceptors.on_acknowledgement, i) {
458 rd_kafka_resp_err_t err;
459
460 err = method->u.on_acknowledgement(rk, rkmessage,
461 method->ic_opaque);
462 if (unlikely(err))
463 rd_kafka_interceptor_failed(rk, method,
464 "on_acknowledgement", err,
465 rkmessage, NULL);
466 }
467 }
468
469
470 /**
471 * @brief Call on_acknowledgement methods for all messages in queue.
472 *
473 * @param force_err If non-zero, sets this error on each message.
474 *
475 * @locality broker thread
476 */
477 void
rd_kafka_interceptors_on_acknowledgement_queue(rd_kafka_t * rk,rd_kafka_msgq_t * rkmq,rd_kafka_resp_err_t force_err)478 rd_kafka_interceptors_on_acknowledgement_queue (rd_kafka_t *rk,
479 rd_kafka_msgq_t *rkmq,
480 rd_kafka_resp_err_t force_err) {
481 rd_kafka_msg_t *rkm;
482
483 RD_KAFKA_MSGQ_FOREACH(rkm, rkmq) {
484 if (force_err)
485 rkm->rkm_err = force_err;
486 rd_kafka_interceptors_on_acknowledgement(rk,
487 &rkm->rkm_rkmessage);
488 }
489 }
490
491
492 /**
493 * @brief Call interceptor on_consume methods.
494 * @locality application thread calling poll(), consume() or similar prior to
495 * passing the message to the application.
496 */
497 void
rd_kafka_interceptors_on_consume(rd_kafka_t * rk,rd_kafka_message_t * rkmessage)498 rd_kafka_interceptors_on_consume (rd_kafka_t *rk,
499 rd_kafka_message_t *rkmessage) {
500 rd_kafka_interceptor_method_t *method;
501 int i;
502
503 RD_LIST_FOREACH(method, &rk->rk_conf.interceptors.on_consume, i) {
504 rd_kafka_resp_err_t err;
505
506 err = method->u.on_consume(rk, rkmessage,
507 method->ic_opaque);
508 if (unlikely(err))
509 rd_kafka_interceptor_failed(rk, method,
510 "on_consume", err,
511 rkmessage, NULL);
512 }
513 }
514
515
516 /**
517 * @brief Call interceptor on_commit methods.
518 * @locality application thread calling poll(), consume() or similar,
519 * or rdkafka main thread if no commit_cb or handler registered.
520 */
521 void
rd_kafka_interceptors_on_commit(rd_kafka_t * rk,const rd_kafka_topic_partition_list_t * offsets,rd_kafka_resp_err_t err)522 rd_kafka_interceptors_on_commit (rd_kafka_t *rk,
523 const rd_kafka_topic_partition_list_t *offsets,
524 rd_kafka_resp_err_t err) {
525 rd_kafka_interceptor_method_t *method;
526 int i;
527
528 RD_LIST_FOREACH(method, &rk->rk_conf.interceptors.on_commit, i) {
529 rd_kafka_resp_err_t ic_err;
530
531 ic_err = method->u.on_commit(rk, offsets, err,
532 method->ic_opaque);
533 if (unlikely(ic_err))
534 rd_kafka_interceptor_failed(rk, method,
535 "on_commit", ic_err, NULL,
536 NULL);
537 }
538 }
539
540
541 /**
542 * @brief Call interceptor on_request_sent methods
543 * @locality internal broker thread
544 */
rd_kafka_interceptors_on_request_sent(rd_kafka_t * rk,int sockfd,const char * brokername,int32_t brokerid,int16_t ApiKey,int16_t ApiVersion,int32_t CorrId,size_t size)545 void rd_kafka_interceptors_on_request_sent (rd_kafka_t *rk,
546 int sockfd,
547 const char *brokername,
548 int32_t brokerid,
549 int16_t ApiKey,
550 int16_t ApiVersion,
551 int32_t CorrId,
552 size_t size) {
553 rd_kafka_interceptor_method_t *method;
554 int i;
555
556 RD_LIST_FOREACH(method, &rk->rk_conf.interceptors.on_request_sent, i) {
557 rd_kafka_resp_err_t ic_err;
558
559 ic_err = method->u.on_request_sent(rk,
560 sockfd,
561 brokername,
562 brokerid,
563 ApiKey,
564 ApiVersion,
565 CorrId,
566 size,
567 method->ic_opaque);
568 if (unlikely(ic_err))
569 rd_kafka_interceptor_failed(rk, method,
570 "on_request_sent",
571 ic_err, NULL, NULL);
572 }
573 }
574
575
576 /**
577 * @brief Call interceptor on_response_received methods
578 * @locality internal broker thread
579 */
rd_kafka_interceptors_on_response_received(rd_kafka_t * rk,int sockfd,const char * brokername,int32_t brokerid,int16_t ApiKey,int16_t ApiVersion,int32_t CorrId,size_t size,int64_t rtt,rd_kafka_resp_err_t err)580 void rd_kafka_interceptors_on_response_received (rd_kafka_t *rk,
581 int sockfd,
582 const char *brokername,
583 int32_t brokerid,
584 int16_t ApiKey,
585 int16_t ApiVersion,
586 int32_t CorrId,
587 size_t size,
588 int64_t rtt,
589 rd_kafka_resp_err_t err) {
590 rd_kafka_interceptor_method_t *method;
591 int i;
592
593 RD_LIST_FOREACH(method, &rk->rk_conf.interceptors.on_response_received,
594 i) {
595 rd_kafka_resp_err_t ic_err;
596
597 ic_err = method->u.on_response_received(rk,
598 sockfd,
599 brokername,
600 brokerid,
601 ApiKey,
602 ApiVersion,
603 CorrId,
604 size,
605 rtt,
606 err,
607 method->ic_opaque);
608 if (unlikely(ic_err))
609 rd_kafka_interceptor_failed(rk, method,
610 "on_response_received",
611 ic_err, NULL, NULL);
612 }
613 }
614
615
616 void
rd_kafka_interceptors_on_thread_start(rd_kafka_t * rk,rd_kafka_thread_type_t thread_type)617 rd_kafka_interceptors_on_thread_start (rd_kafka_t *rk,
618 rd_kafka_thread_type_t thread_type) {
619 rd_kafka_interceptor_method_t *method;
620 int i;
621
622 RD_LIST_FOREACH(method, &rk->rk_conf.interceptors.on_thread_start, i) {
623 rd_kafka_resp_err_t ic_err;
624
625 ic_err = method->u.on_thread_start(rk,
626 thread_type,
627 rd_kafka_thread_name,
628 method->ic_opaque);
629 if (unlikely(ic_err))
630 rd_kafka_interceptor_failed(rk, method,
631 "on_thread_start",
632 ic_err, NULL, NULL);
633 }
634 }
635
636
rd_kafka_interceptors_on_thread_exit(rd_kafka_t * rk,rd_kafka_thread_type_t thread_type)637 void rd_kafka_interceptors_on_thread_exit (rd_kafka_t *rk,
638 rd_kafka_thread_type_t thread_type) {
639 rd_kafka_interceptor_method_t *method;
640 int i;
641
642 RD_LIST_FOREACH(method, &rk->rk_conf.interceptors.on_thread_exit, i) {
643 rd_kafka_resp_err_t ic_err;
644
645 ic_err = method->u.on_thread_exit(rk,
646 thread_type,
647 rd_kafka_thread_name,
648 method->ic_opaque);
649 if (unlikely(ic_err))
650 rd_kafka_interceptor_failed(rk, method,
651 "on_thread_exit",
652 ic_err, NULL, NULL);
653 }
654 }
655
656
657
658 /**
659 * @name Public API (backend)
660 * @{
661 */
662
663
664 rd_kafka_resp_err_t
rd_kafka_conf_interceptor_add_on_conf_set(rd_kafka_conf_t * conf,const char * ic_name,rd_kafka_interceptor_f_on_conf_set_t * on_conf_set,void * ic_opaque)665 rd_kafka_conf_interceptor_add_on_conf_set (
666 rd_kafka_conf_t *conf, const char *ic_name,
667 rd_kafka_interceptor_f_on_conf_set_t *on_conf_set,
668 void *ic_opaque) {
669 return rd_kafka_interceptor_method_add(&conf->interceptors.on_conf_set,
670 ic_name, (void *)on_conf_set,
671 ic_opaque);
672 }
673
674 rd_kafka_resp_err_t
rd_kafka_conf_interceptor_add_on_conf_dup(rd_kafka_conf_t * conf,const char * ic_name,rd_kafka_interceptor_f_on_conf_dup_t * on_conf_dup,void * ic_opaque)675 rd_kafka_conf_interceptor_add_on_conf_dup (
676 rd_kafka_conf_t *conf, const char *ic_name,
677 rd_kafka_interceptor_f_on_conf_dup_t *on_conf_dup,
678 void *ic_opaque) {
679 return rd_kafka_interceptor_method_add(&conf->interceptors.on_conf_dup,
680 ic_name, (void *)on_conf_dup,
681 ic_opaque);
682 }
683
684 rd_kafka_resp_err_t
rd_kafka_conf_interceptor_add_on_conf_destroy(rd_kafka_conf_t * conf,const char * ic_name,rd_kafka_interceptor_f_on_conf_destroy_t * on_conf_destroy,void * ic_opaque)685 rd_kafka_conf_interceptor_add_on_conf_destroy (
686 rd_kafka_conf_t *conf, const char *ic_name,
687 rd_kafka_interceptor_f_on_conf_destroy_t *on_conf_destroy,
688 void *ic_opaque) {
689 return rd_kafka_interceptor_method_add(&conf->interceptors.on_conf_destroy,
690 ic_name, (void *)on_conf_destroy,
691 ic_opaque);
692 }
693
694
695
696 rd_kafka_resp_err_t
rd_kafka_conf_interceptor_add_on_new(rd_kafka_conf_t * conf,const char * ic_name,rd_kafka_interceptor_f_on_new_t * on_new,void * ic_opaque)697 rd_kafka_conf_interceptor_add_on_new (
698 rd_kafka_conf_t *conf, const char *ic_name,
699 rd_kafka_interceptor_f_on_new_t *on_new,
700 void *ic_opaque) {
701 return rd_kafka_interceptor_method_add(&conf->interceptors.on_new,
702 ic_name, (void *)on_new,
703 ic_opaque);
704 }
705
706
707 rd_kafka_resp_err_t
rd_kafka_interceptor_add_on_destroy(rd_kafka_t * rk,const char * ic_name,rd_kafka_interceptor_f_on_destroy_t * on_destroy,void * ic_opaque)708 rd_kafka_interceptor_add_on_destroy (
709 rd_kafka_t *rk, const char *ic_name,
710 rd_kafka_interceptor_f_on_destroy_t *on_destroy,
711 void *ic_opaque) {
712 assert(!rk->rk_initialized);
713 return rd_kafka_interceptor_method_add(&rk->rk_conf.interceptors.on_destroy,
714 ic_name, (void *)on_destroy,
715 ic_opaque);
716 }
717
718 rd_kafka_resp_err_t
rd_kafka_interceptor_add_on_send(rd_kafka_t * rk,const char * ic_name,rd_kafka_interceptor_f_on_send_t * on_send,void * ic_opaque)719 rd_kafka_interceptor_add_on_send (
720 rd_kafka_t *rk, const char *ic_name,
721 rd_kafka_interceptor_f_on_send_t *on_send,
722 void *ic_opaque) {
723 assert(!rk->rk_initialized);
724 return rd_kafka_interceptor_method_add(&rk->rk_conf.interceptors.on_send,
725 ic_name, (void *)on_send,
726 ic_opaque);
727 }
728
729 rd_kafka_resp_err_t
rd_kafka_interceptor_add_on_acknowledgement(rd_kafka_t * rk,const char * ic_name,rd_kafka_interceptor_f_on_acknowledgement_t * on_acknowledgement,void * ic_opaque)730 rd_kafka_interceptor_add_on_acknowledgement (
731 rd_kafka_t *rk, const char *ic_name,
732 rd_kafka_interceptor_f_on_acknowledgement_t *on_acknowledgement,
733 void *ic_opaque) {
734 assert(!rk->rk_initialized);
735 return rd_kafka_interceptor_method_add(&rk->rk_conf.interceptors.
736 on_acknowledgement,
737 ic_name,
738 (void *)on_acknowledgement,
739 ic_opaque);
740 }
741
742
743 rd_kafka_resp_err_t
rd_kafka_interceptor_add_on_consume(rd_kafka_t * rk,const char * ic_name,rd_kafka_interceptor_f_on_consume_t * on_consume,void * ic_opaque)744 rd_kafka_interceptor_add_on_consume (
745 rd_kafka_t *rk, const char *ic_name,
746 rd_kafka_interceptor_f_on_consume_t *on_consume,
747 void *ic_opaque) {
748 assert(!rk->rk_initialized);
749 return rd_kafka_interceptor_method_add(&rk->rk_conf.interceptors.
750 on_consume,
751 ic_name, (void *)on_consume,
752 ic_opaque);
753 }
754
755
756 rd_kafka_resp_err_t
rd_kafka_interceptor_add_on_commit(rd_kafka_t * rk,const char * ic_name,rd_kafka_interceptor_f_on_commit_t * on_commit,void * ic_opaque)757 rd_kafka_interceptor_add_on_commit (
758 rd_kafka_t *rk, const char *ic_name,
759 rd_kafka_interceptor_f_on_commit_t *on_commit,
760 void *ic_opaque) {
761 assert(!rk->rk_initialized);
762 return rd_kafka_interceptor_method_add(&rk->rk_conf.interceptors.
763 on_commit,
764 ic_name, (void *)on_commit,
765 ic_opaque);
766 }
767
768
769 rd_kafka_resp_err_t
rd_kafka_interceptor_add_on_request_sent(rd_kafka_t * rk,const char * ic_name,rd_kafka_interceptor_f_on_request_sent_t * on_request_sent,void * ic_opaque)770 rd_kafka_interceptor_add_on_request_sent (
771 rd_kafka_t *rk, const char *ic_name,
772 rd_kafka_interceptor_f_on_request_sent_t *on_request_sent,
773 void *ic_opaque) {
774 assert(!rk->rk_initialized);
775 return rd_kafka_interceptor_method_add(&rk->rk_conf.interceptors.
776 on_request_sent,
777 ic_name, (void *)on_request_sent,
778 ic_opaque);
779 }
780
781
782 rd_kafka_resp_err_t
rd_kafka_interceptor_add_on_response_received(rd_kafka_t * rk,const char * ic_name,rd_kafka_interceptor_f_on_response_received_t * on_response_received,void * ic_opaque)783 rd_kafka_interceptor_add_on_response_received (
784 rd_kafka_t *rk, const char *ic_name,
785 rd_kafka_interceptor_f_on_response_received_t *on_response_received,
786 void *ic_opaque) {
787 assert(!rk->rk_initialized);
788 return rd_kafka_interceptor_method_add(&rk->rk_conf.interceptors.
789 on_response_received,
790 ic_name,
791 (void *)on_response_received,
792 ic_opaque);
793 }
794
795
796 rd_kafka_resp_err_t
rd_kafka_interceptor_add_on_thread_start(rd_kafka_t * rk,const char * ic_name,rd_kafka_interceptor_f_on_thread_start_t * on_thread_start,void * ic_opaque)797 rd_kafka_interceptor_add_on_thread_start (
798 rd_kafka_t *rk, const char *ic_name,
799 rd_kafka_interceptor_f_on_thread_start_t *on_thread_start,
800 void *ic_opaque) {
801 assert(!rk->rk_initialized);
802 return rd_kafka_interceptor_method_add(&rk->rk_conf.interceptors.
803 on_thread_start,
804 ic_name,
805 (void *)on_thread_start,
806 ic_opaque);
807 }
808
809
810 rd_kafka_resp_err_t
rd_kafka_interceptor_add_on_thread_exit(rd_kafka_t * rk,const char * ic_name,rd_kafka_interceptor_f_on_thread_exit_t * on_thread_exit,void * ic_opaque)811 rd_kafka_interceptor_add_on_thread_exit (
812 rd_kafka_t *rk, const char *ic_name,
813 rd_kafka_interceptor_f_on_thread_exit_t *on_thread_exit,
814 void *ic_opaque) {
815 assert(!rk->rk_initialized);
816 return rd_kafka_interceptor_method_add(&rk->rk_conf.interceptors.
817 on_thread_exit,
818 ic_name,
819 (void *)on_thread_exit,
820 ic_opaque);
821 }
822