1 /*
2 pmacct (Promiscuous mode IP Accounting package)
3 pmacct is Copyright (C) 2003-2020 by Paolo Lucente
4 */
5
6 /*
7 This program is free software; you can redistribute it and/or modify
8 it under the terms of the GNU General Public License as published by
9 the Free Software Foundation; either version 2 of the License, or
10 (at your option) any later version.
11
12 This program is distributed in the hope that it will be useful,
13 but WITHOUT ANY WARRANTY; without even the implied warranty of
14 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
15 GNU General Public License for more details.
16
17 You should have received a copy of the GNU General Public License
18 along with this program; if not, write to the Free Software
19 Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.
20 */
21
22 /* includes */
23 #include "pmacct.h"
24 #include "pmacct-data.h"
25 #include "plugin_common.h"
26 #include "kafka_common.h"
27 #include "base64.h"
28
29 struct p_kafka_host kafkap_kafka_host;
30 struct p_kafka_host bgp_daemon_msglog_kafka_host;
31 struct p_kafka_host bgp_table_dump_kafka_host;
32 struct p_kafka_host bmp_daemon_msglog_kafka_host;
33 struct p_kafka_host bmp_dump_kafka_host;
34 struct p_kafka_host sfacctd_counter_kafka_host;
35 struct p_kafka_host telemetry_kafka_host;
36 struct p_kafka_host telemetry_daemon_msglog_kafka_host;
37 struct p_kafka_host telemetry_dump_kafka_host;
38 struct p_kafka_host nfacctd_kafka_host;
39
40 int kafkap_ret_err_cb;
41 int dyn_partition_key;
42 char default_kafka_broker_host[] = "127.0.0.1";
43 int default_kafka_broker_port = 9092;
44 char default_kafka_topic[] = "pmacct.acct";
45
46 /* Functions */
p_kafka_init_host(struct p_kafka_host * kafka_host,char * config_file)47 void p_kafka_init_host(struct p_kafka_host *kafka_host, char *config_file)
48 {
49 if (kafka_host) {
50 memset(kafka_host, 0, sizeof(struct p_kafka_host));
51 P_broker_timers_set_retry_interval(&kafka_host->btimers, PM_KAFKA_DEFAULT_RETRY);
52 p_kafka_set_config_file(kafka_host, config_file);
53
54 kafka_host->cfg = rd_kafka_conf_new();
55 if (kafka_host->cfg) {
56 rd_kafka_conf_set_log_cb(kafka_host->cfg, p_kafka_logger);
57 rd_kafka_conf_set_error_cb(kafka_host->cfg, p_kafka_msg_error);
58 rd_kafka_conf_set_dr_cb(kafka_host->cfg, p_kafka_msg_delivered);
59 rd_kafka_conf_set_stats_cb(kafka_host->cfg, p_kafka_stats);
60 rd_kafka_conf_set_opaque(kafka_host->cfg, kafka_host);
61 p_kafka_apply_global_config(kafka_host);
62
63 if (config.debug) {
64 const char **res;
65 size_t res_len, idx;
66
67 res = rd_kafka_conf_dump(kafka_host->cfg, &res_len);
68 for (idx = 0; idx < res_len; idx += 2)
69 Log(LOG_DEBUG, "DEBUG ( %s/%s ): librdkafka global config: %s = %s\n", config.name, config.type, res[idx], res[idx + 1]);
70
71 rd_kafka_conf_dump_free(res, res_len);
72 }
73 }
74 }
75 }
76
p_kafka_unset_topic(struct p_kafka_host * kafka_host)77 void p_kafka_unset_topic(struct p_kafka_host *kafka_host)
78 {
79 if (kafka_host && kafka_host->topic) {
80 rd_kafka_topic_destroy(kafka_host->topic);
81 kafka_host->topic = NULL;
82 }
83 }
84
p_kafka_set_topic(struct p_kafka_host * kafka_host,char * topic)85 void p_kafka_set_topic(struct p_kafka_host *kafka_host, char *topic)
86 {
87 if (kafka_host) {
88 kafka_host->topic_cfg = rd_kafka_topic_conf_new();
89 p_kafka_apply_topic_config(kafka_host);
90
91 if (config.debug) {
92 const char **res;
93 size_t res_len, idx;
94
95 res = rd_kafka_topic_conf_dump(kafka_host->topic_cfg, &res_len);
96 for (idx = 0; idx < res_len; idx += 2)
97 Log(LOG_DEBUG, "DEBUG ( %s/%s ): librdkafka '%s' topic config: %s = %s\n", config.name, config.type, topic, res[idx], res[idx + 1]);
98
99 rd_kafka_conf_dump_free(res, res_len);
100 }
101
102 /* This needs to be done here otherwise kafka_host->topic_cfg is null
103 * and the partitioner cannot be set */
104 if (config.kafka_partition_dynamic && kafka_host->topic_cfg)
105 p_kafka_set_dynamic_partitioner(kafka_host);
106
107 /* destroy current allocation before making a new one */
108 if (kafka_host->topic) p_kafka_unset_topic(kafka_host);
109
110 if (kafka_host->rk && kafka_host->topic_cfg) {
111 kafka_host->topic = rd_kafka_topic_new(kafka_host->rk, topic, kafka_host->topic_cfg);
112 kafka_host->topic_cfg = NULL; /* rd_kafka_topic_new() destroys conf as per rdkafka.h */
113 }
114 }
115 }
116
p_kafka_get_handler(struct p_kafka_host * kafka_host)117 rd_kafka_t *p_kafka_get_handler(struct p_kafka_host *kafka_host)
118 {
119 if (kafka_host) return kafka_host->rk;
120
121 return NULL;
122 }
123
p_kafka_get_broker(struct p_kafka_host * kafka_host)124 char *p_kafka_get_broker(struct p_kafka_host *kafka_host)
125 {
126 if (kafka_host && strlen(kafka_host->broker)) return kafka_host->broker;
127
128 return NULL;
129 }
130
p_kafka_get_topic(struct p_kafka_host * kafka_host)131 char *p_kafka_get_topic(struct p_kafka_host *kafka_host)
132 {
133 if (kafka_host && kafka_host->topic)
134 return (char*)rd_kafka_topic_name(kafka_host->topic);
135
136 return NULL;
137 }
138
p_kafka_init_topic_rr(struct p_kafka_host * kafka_host)139 void p_kafka_init_topic_rr(struct p_kafka_host *kafka_host)
140 {
141 if (kafka_host) memset(&kafka_host->topic_rr, 0, sizeof(struct p_table_rr));
142 }
143
p_kafka_set_topic_rr(struct p_kafka_host * kafka_host,int topic_rr)144 void p_kafka_set_topic_rr(struct p_kafka_host *kafka_host, int topic_rr)
145 {
146 if (kafka_host) kafka_host->topic_rr.max = topic_rr;
147 }
148
p_kafka_get_topic_rr(struct p_kafka_host * kafka_host)149 int p_kafka_get_topic_rr(struct p_kafka_host *kafka_host)
150 {
151 if (kafka_host) return kafka_host->topic_rr.max;
152
153 return FALSE;
154 }
155
p_kafka_set_broker(struct p_kafka_host * kafka_host,char * host,int port)156 void p_kafka_set_broker(struct p_kafka_host *kafka_host, char *host, int port)
157 {
158 int ret, multiple_brokers = FALSE;
159
160 if (strchr(host, ',')) multiple_brokers = TRUE;
161
162 if (kafka_host && kafka_host->rk) {
163 /* if host is a comma-separated list of brokers, assume port is part of the definition */
164 if (multiple_brokers) snprintf(kafka_host->broker, SRVBUFLEN, "%s", host);
165 else {
166 if (host && port) snprintf(kafka_host->broker, SRVBUFLEN, "%s:%u", host, port);
167 else if (host && !port) snprintf(kafka_host->broker, SRVBUFLEN, "%s", host);
168 }
169
170 if ((ret = rd_kafka_brokers_add(kafka_host->rk, kafka_host->broker)) == 0) {
171 Log(LOG_WARNING, "WARN ( %s/%s ): Invalid 'kafka_broker_host' or 'kafka_broker_port' specified (%s).\n",
172 config.name, config.type, kafka_host->broker);
173 }
174 else Log(LOG_DEBUG, "DEBUG ( %s/%s ): %u broker(s) successfully added.\n", config.name, config.type, ret);
175 }
176 }
177
p_kafka_set_content_type(struct p_kafka_host * kafka_host,int content_type)178 void p_kafka_set_content_type(struct p_kafka_host *kafka_host, int content_type)
179 {
180 if (kafka_host) kafka_host->content_type = content_type;
181 }
182
p_kafka_get_content_type(struct p_kafka_host * kafka_host)183 int p_kafka_get_content_type(struct p_kafka_host *kafka_host)
184 {
185 if (kafka_host) return kafka_host->content_type;
186
187 return FALSE;
188 }
189
p_kafka_set_partition(struct p_kafka_host * kafka_host,int partition)190 void p_kafka_set_partition(struct p_kafka_host *kafka_host, int partition)
191 {
192 if (kafka_host) {
193 if (!partition) kafka_host->partition = RD_KAFKA_PARTITION_UA;
194 else if (partition == FALSE_NONZERO) kafka_host->partition = 0;
195 else kafka_host->partition = partition;
196 }
197 }
198
p_kafka_get_partition(struct p_kafka_host * kafka_host)199 int p_kafka_get_partition(struct p_kafka_host *kafka_host)
200 {
201 if (kafka_host) return kafka_host->partition;
202
203 return FALSE;
204 }
205
p_kafka_set_dynamic_partitioner(struct p_kafka_host * kafka_host)206 void p_kafka_set_dynamic_partitioner(struct p_kafka_host *kafka_host)
207 {
208 rd_kafka_topic_conf_set_partitioner_cb(kafka_host->topic_cfg, &rd_kafka_msg_partitioner_consistent_random);
209 }
210
p_kafka_set_key(struct p_kafka_host * kafka_host,char * key,int key_len)211 void p_kafka_set_key(struct p_kafka_host *kafka_host, char *key, int key_len)
212 {
213 if (kafka_host) {
214 kafka_host->key = key;
215 kafka_host->key_len = key_len;
216 }
217 }
218
p_kafka_get_key(struct p_kafka_host * kafka_host)219 char *p_kafka_get_key(struct p_kafka_host *kafka_host)
220 {
221 if (kafka_host) return kafka_host->key;
222
223 return NULL;
224 }
225
p_kafka_set_config_file(struct p_kafka_host * kafka_host,char * config_file)226 void p_kafka_set_config_file(struct p_kafka_host *kafka_host, char *config_file)
227 {
228 if (kafka_host) {
229 kafka_host->config_file = config_file;
230 }
231 }
232
p_kafka_get_version()233 void p_kafka_get_version()
234 {
235 printf("rdkafka %s\n", rd_kafka_version_str());
236 }
237
p_kafka_parse_config_entry(char * buf,char * type,char ** key,char ** value)238 int p_kafka_parse_config_entry(char *buf, char *type, char **key, char **value)
239 {
240 char *value_ptr, *token;
241 int index, type_match = FALSE;
242
243 if (buf && type && key && value) {
244 value_ptr = buf;
245 (*key) = NULL;
246 (*value) = NULL;
247 index = 0;
248
249 while ((token = extract_token(&value_ptr, ','))) {
250 index++;
251 trim_spaces(token);
252
253 if (index == 1) {
254 lower_string(token);
255 if (!strcmp(token, type)) type_match = TRUE;
256 else break;
257 }
258 else if (index == 2) {
259 (*key) = token;
260 break;
261 }
262 }
263
264 if (strlen(value_ptr)) {
265 trim_spaces(value_ptr);
266 (*value) = value_ptr;
267 index++;
268 }
269
270 if (type_match && index != 3) return ERR;
271 }
272 else return ERR;
273
274 return type_match;
275 }
276
p_kafka_apply_global_config(struct p_kafka_host * kafka_host)277 void p_kafka_apply_global_config(struct p_kafka_host *kafka_host)
278 {
279 FILE *file;
280 char buf[SRVBUFLEN], errstr[SRVBUFLEN], *key, *value;
281 int lineno = 1, ret;
282
283 if (kafka_host && kafka_host->config_file && kafka_host->cfg) {
284 if ((file = fopen(kafka_host->config_file, "r")) == NULL) {
285 Log(LOG_WARNING, "WARN ( %s/%s ): [%s] file not found. librdkafka global config not loaded.\n", config.name, config.type, kafka_host->config_file);
286 return;
287 }
288 else Log(LOG_INFO, "INFO ( %s/%s ): [%s] Reading librdkafka global config.\n", config.name, config.type, kafka_host->config_file);
289
290 while (!feof(file)) {
291 if (fgets(buf, SRVBUFLEN, file)) {
292 if ((ret = p_kafka_parse_config_entry(buf, "global", &key, &value)) > 0) {
293 ret = rd_kafka_conf_set(kafka_host->cfg, key, value, errstr, sizeof(errstr));
294 if (ret != RD_KAFKA_CONF_OK) {
295 Log(LOG_WARNING, "WARN ( %s/%s ): [%s:%u] key=%s value=%s failed: %s\n",
296 config.name, config.type, kafka_host->config_file, lineno, key, value, errstr);
297 }
298 }
299 else {
300 if (ret == ERR) {
301 Log(LOG_WARNING, "WARN ( %s/%s ): [%s:%u] Line malformed. Ignored.\n", config.name, config.type, kafka_host->config_file, lineno);
302 }
303 }
304 }
305
306 lineno++;
307 }
308
309 fclose(file);
310 }
311 }
312
p_kafka_apply_topic_config(struct p_kafka_host * kafka_host)313 void p_kafka_apply_topic_config(struct p_kafka_host *kafka_host)
314 {
315 FILE *file;
316 char buf[SRVBUFLEN], errstr[SRVBUFLEN], *key, *value;
317 int lineno = 1, ret;
318
319 if (kafka_host && kafka_host->config_file && kafka_host->topic_cfg) {
320 if ((file = fopen(kafka_host->config_file, "r")) == NULL) {
321 Log(LOG_WARNING, "WARN ( %s/%s ): [%s] file not found. librdkafka topic configuration not loaded.\n", config.name, config.type, kafka_host->config_file);
322 return;
323 }
324 else Log(LOG_DEBUG, "DEBUG ( %s/%s ): [%s] Reading librdkafka topic configuration.\n", config.name, config.type, kafka_host->config_file);
325
326 while (!feof(file)) {
327 if (fgets(buf, SRVBUFLEN, file)) {
328 if ((ret = p_kafka_parse_config_entry(buf, "topic", &key, &value)) > 0) {
329 ret = rd_kafka_topic_conf_set(kafka_host->topic_cfg, key, value, errstr, sizeof(errstr));
330 if (ret != RD_KAFKA_CONF_OK) {
331 Log(LOG_WARNING, "WARN ( %s/%s ): [%s:%u] key=%s value=%s failed: %s\n",
332 config.name, config.type, kafka_host->config_file, lineno, key, value, errstr);
333 }
334 }
335 else {
336 if (ret == ERR) {
337 Log(LOG_WARNING, "WARN ( %s/%s ): [%s:%u] Line malformed. Ignored.\n", config.name, config.type, kafka_host->config_file, lineno);
338 }
339 }
340 }
341
342 lineno++;
343 }
344
345 fclose(file);
346 }
347 }
348
p_kafka_logger(const rd_kafka_t * rk,int level,const char * fac,const char * buf)349 void p_kafka_logger(const rd_kafka_t *rk, int level, const char *fac, const char *buf)
350 {
351 struct timeval tv;
352
353 gettimeofday(&tv, NULL);
354
355 Log(LOG_DEBUG, "DEBUG ( %s/%s ): RDKAFKA-%i-%s: %s: %s\n", config.name, config.type, level, fac, rd_kafka_name(rk), buf);
356 }
357
p_kafka_msg_delivered(rd_kafka_t * rk,void * payload,size_t len,int error_code,void * opaque,void * msg_opaque)358 void p_kafka_msg_delivered(rd_kafka_t *rk, void *payload, size_t len, int error_code, void *opaque, void *msg_opaque)
359 {
360 struct p_kafka_host *kafka_host = (struct p_kafka_host *) opaque;
361
362 if (error_code) {
363 Log(LOG_ERR, "ERROR ( %s/%s ): Kafka message delivery failed: %s\n", config.name, config.type, rd_kafka_err2str(error_code));
364 }
365 else {
366 if (config.debug) {
367 if (p_kafka_get_content_type(kafka_host) == PM_KAFKA_CNT_TYPE_STR) {
368 char *payload_str = (char *) payload;
369 char saved = payload_str[len];
370
371 payload_str[len] = '\0';
372 Log(LOG_DEBUG, "DEBUG ( %s/%s ): Kafka message delivery successful (%zd bytes): %p\n", config.name, config.type, len, payload);
373 payload_str[len] = saved;
374 }
375 else {
376 size_t base64_data_len = 0;
377 u_char *base64_data = base64_encode(payload, len, &base64_data_len);
378
379 Log(LOG_DEBUG, "DEBUG ( %s/%s ): Kafka message delivery successful (%zd bytes): %s\n", config.name, config.type, len, base64_data);
380
381 if (base64_data) base64_freebuf(base64_data);
382 }
383 }
384 }
385 }
386
p_kafka_msg_error(rd_kafka_t * rk,int err,const char * reason,void * opaque)387 void p_kafka_msg_error(rd_kafka_t *rk, int err, const char *reason, void *opaque)
388 {
389 kafkap_ret_err_cb = ERR;
390 }
391
p_kafka_stats(rd_kafka_t * rk,char * json,size_t json_len,void * opaque)392 int p_kafka_stats(rd_kafka_t *rk, char *json, size_t json_len, void *opaque)
393 {
394 Log(LOG_INFO, "INFO ( %s/%s ): %s\n", config.name, config.type, json);
395
396 /* We return 0 since we don't want to hold data any further;
397 see librdkafka header/docs for more info. */
398 return FALSE;
399 }
400
p_kafka_connect_to_produce(struct p_kafka_host * kafka_host)401 int p_kafka_connect_to_produce(struct p_kafka_host *kafka_host)
402 {
403 if (kafka_host) {
404 rd_kafka_conf_t* cfg = rd_kafka_conf_dup(kafka_host->cfg);
405 if (cfg == NULL) {
406 Log(LOG_ERR, "ERROR ( %s/%s ): Failed to clone kafka config object\n", config.name, config.type);
407 p_kafka_close(kafka_host, TRUE);
408 return ERR;
409 }
410
411 kafka_host->rk = rd_kafka_new(RD_KAFKA_PRODUCER, cfg, kafka_host->errstr, sizeof(kafka_host->errstr));
412 if (!kafka_host->rk) {
413 Log(LOG_ERR, "ERROR ( %s/%s ): Failed to create new Kafka producer: %s\n", config.name, config.type, kafka_host->errstr);
414 p_kafka_close(kafka_host, TRUE);
415 return ERR;
416 }
417
418 if (config.debug) rd_kafka_set_log_level(kafka_host->rk, LOG_DEBUG);
419 }
420 else return ERR;
421
422 return SUCCESS;
423 }
424
p_kafka_produce_data_to_part(struct p_kafka_host * kafka_host,void * data,size_t data_len,int part)425 int p_kafka_produce_data_to_part(struct p_kafka_host *kafka_host, void *data, size_t data_len, int part)
426 {
427 int ret = SUCCESS;
428
429 kafkap_ret_err_cb = FALSE;
430
431 if (kafka_host && kafka_host->rk && kafka_host->topic) {
432 ret = rd_kafka_produce(kafka_host->topic, part, RD_KAFKA_MSG_F_COPY,
433 data, data_len, kafka_host->key, kafka_host->key_len, NULL);
434
435 if (ret == ERR) {
436 Log(LOG_ERR, "ERROR ( %s/%s ): Failed to produce to topic %s partition %i: %s\n", config.name, config.type,
437 rd_kafka_topic_name(kafka_host->topic), part, rd_kafka_err2str(rd_kafka_last_error()));
438 p_kafka_close(kafka_host, TRUE);
439 return ERR;
440 }
441 }
442 else return ERR;
443
444 rd_kafka_poll(kafka_host->rk, 0);
445
446 return ret;
447 }
448
p_kafka_produce_data(struct p_kafka_host * kafka_host,void * data,size_t data_len)449 int p_kafka_produce_data(struct p_kafka_host *kafka_host, void *data, size_t data_len)
450 {
451 return p_kafka_produce_data_to_part(kafka_host, data, data_len, kafka_host->partition);
452 }
453
p_kafka_connect_to_consume(struct p_kafka_host * kafka_host)454 int p_kafka_connect_to_consume(struct p_kafka_host *kafka_host)
455 {
456 if (kafka_host) {
457 rd_kafka_conf_t* cfg = rd_kafka_conf_dup(kafka_host->cfg);
458 if (cfg == NULL) {
459 Log(LOG_ERR, "ERROR ( %s/%s ): Failed to clone kafka config object\n", config.name, config.type);
460 p_kafka_close(kafka_host, TRUE);
461 return ERR;
462 }
463
464 kafka_host->rk = rd_kafka_new(RD_KAFKA_CONSUMER, cfg, kafka_host->errstr, sizeof(kafka_host->errstr));
465 if (!kafka_host->rk) {
466 Log(LOG_ERR, "ERROR ( %s/%s ): Failed to create new Kafka producer: %s\n", config.name, config.type, kafka_host->errstr);
467 p_kafka_close(kafka_host, TRUE);
468 return ERR;
469 }
470
471 if (config.debug) rd_kafka_set_log_level(kafka_host->rk, LOG_DEBUG);
472 }
473 else return ERR;
474
475 return SUCCESS;
476 }
477
p_kafka_manage_consumer(struct p_kafka_host * kafka_host,int is_start)478 int p_kafka_manage_consumer(struct p_kafka_host *kafka_host, int is_start)
479 {
480 int ret = SUCCESS;
481
482 kafkap_ret_err_cb = FALSE;
483
484 if (kafka_host && kafka_host->rk && kafka_host->topic && !validate_truefalse(is_start)) {
485 if (is_start) {
486 ret = rd_kafka_consume_start(kafka_host->topic, kafka_host->partition, RD_KAFKA_OFFSET_END);
487 if (ret == ERR) {
488 Log(LOG_ERR, "ERROR ( %s/%s ): Failed to start consuming topic %s partition %i: %s\n", config.name, config.type,
489 rd_kafka_topic_name(kafka_host->topic), kafka_host->partition, rd_kafka_err2str(rd_kafka_last_error()));
490 p_kafka_close(kafka_host, TRUE);
491 return ERR;
492 }
493 }
494 else {
495 rd_kafka_consume_stop(kafka_host->topic, kafka_host->partition);
496 p_kafka_close(kafka_host, FALSE);
497 }
498 }
499 else return ERR;
500
501 return ret;
502 }
503
p_kafka_consume_poller(struct p_kafka_host * kafka_host,void ** data,int timeout)504 int p_kafka_consume_poller(struct p_kafka_host *kafka_host, void **data, int timeout)
505 {
506 rd_kafka_message_t *kafka_msg;
507 int ret = SUCCESS;
508
509 if (kafka_host && data && timeout) {
510 kafka_msg = rd_kafka_consume(kafka_host->topic, kafka_host->partition, timeout);
511 if (!kafka_msg) ret = FALSE; /* timeout */
512 else ret = TRUE; /* got data */
513
514 (*data) = kafka_msg;
515 }
516 else {
517 (*data) = NULL;
518 ret = ERR;
519 }
520
521 return ret;
522 }
523
p_kafka_consume_data(struct p_kafka_host * kafka_host,void * data,u_char * payload,size_t payload_len)524 int p_kafka_consume_data(struct p_kafka_host *kafka_host, void *data, u_char *payload, size_t payload_len)
525 {
526 rd_kafka_message_t *kafka_msg = (rd_kafka_message_t *) data;
527 int ret = 0;
528
529 if (kafka_host && data && payload && payload_len) {
530 if (kafka_msg->payload && kafka_msg->len) {
531 if (kafka_msg->len <= payload_len) {
532 memcpy(payload, kafka_msg->payload, kafka_msg->len);
533 ret = kafka_msg->len;
534 }
535 else ret = ERR;
536 }
537 else ret = 0;
538 }
539 else ret = ERR;
540
541 rd_kafka_message_destroy(kafka_msg);
542
543 return ret;
544 }
545
p_kafka_close(struct p_kafka_host * kafka_host,int set_fail)546 void p_kafka_close(struct p_kafka_host *kafka_host, int set_fail)
547 {
548 if (kafka_host && !validate_truefalse(set_fail)) {
549 if (set_fail) {
550 Log(LOG_ERR, "ERROR ( %s/%s ): Connection failed to Kafka: p_kafka_close()\n", config.name, config.type);
551 P_broker_timers_set_last_fail(&kafka_host->btimers, time(NULL));
552 }
553 else {
554 /* Wait for messages to be delivered */
555 if (kafka_host->rk) p_kafka_check_outq_len(kafka_host);
556 }
557
558 if (kafka_host->topic) {
559 rd_kafka_topic_destroy(kafka_host->topic);
560 kafka_host->topic = NULL;
561 }
562
563 if (kafka_host->topic_cfg) {
564 rd_kafka_topic_conf_destroy(kafka_host->topic_cfg);
565 kafka_host->topic_cfg = NULL;
566 }
567
568 if (kafka_host->rk) {
569 rd_kafka_destroy(kafka_host->rk);
570 kafka_host->rk = NULL;
571 }
572
573 if (kafka_host->cfg) {
574 rd_kafka_conf_destroy(kafka_host->cfg);
575 kafka_host->cfg = NULL;
576 }
577 }
578 }
579
p_kafka_check_outq_len(struct p_kafka_host * kafka_host)580 int p_kafka_check_outq_len(struct p_kafka_host *kafka_host)
581 {
582 int outq_len = 0, old_outq_len = 0, retries = 0;
583
584 if (kafka_host->rk) {
585 while ((outq_len = rd_kafka_outq_len(kafka_host->rk)) > 0) {
586 if (outq_len == old_outq_len) {
587 if (retries < PM_KAFKA_OUTQ_LEN_RETRIES) retries++;
588 else {
589 Log(LOG_ERR, "ERROR ( %s/%s ): Connection failed to Kafka: p_kafka_check_outq_len()\n", config.name, config.type);
590 p_kafka_close(kafka_host, TRUE);
591 return outq_len;
592 }
593 }
594 else old_outq_len = outq_len;
595
596 rd_kafka_poll(kafka_host->rk, 100);
597 sleep(1);
598 }
599 }
600 else return ERR;
601
602 return SUCCESS;
603 }
604
605 #if defined WITH_JANSSON
write_and_free_json_kafka(void * kafka_log,void * obj)606 int write_and_free_json_kafka(void *kafka_log, void *obj)
607 {
608 char *orig_kafka_topic = NULL, dyn_kafka_topic[SRVBUFLEN];
609 struct p_kafka_host *alog = (struct p_kafka_host *) kafka_log;
610 int ret = ERR;
611
612 char *tmpbuf = NULL;
613 json_t *json_obj = (json_t *) obj;
614
615 tmpbuf = json_dumps(json_obj, JSON_PRESERVE_ORDER);
616 json_decref(json_obj);
617
618 if (tmpbuf) {
619 if (alog->topic_rr.max) {
620 orig_kafka_topic = p_kafka_get_topic(alog);
621 P_handle_table_dyn_rr(dyn_kafka_topic, SRVBUFLEN, orig_kafka_topic, &alog->topic_rr);
622 p_kafka_set_topic(alog, dyn_kafka_topic);
623 }
624
625 ret = p_kafka_produce_data(alog, tmpbuf, strlen(tmpbuf));
626 free(tmpbuf);
627
628 if (alog->topic_rr.max) p_kafka_set_topic(alog, orig_kafka_topic);
629 }
630
631 return ret;
632 }
633 #else
write_and_free_json_kafka(void * kafka_log,void * obj)634 int write_and_free_json_kafka(void *kafka_log, void *obj)
635 {
636 if (config.debug) Log(LOG_DEBUG, "DEBUG ( %s/%s ): write_and_free_json_kafka(): JSON object not created due to missing --enable-jansson\n", config.name, config.type);
637 }
638 #endif
639
write_binary_kafka(void * kafka_log,void * obj,size_t len)640 int write_binary_kafka(void *kafka_log, void *obj, size_t len)
641 {
642 char *orig_kafka_topic = NULL, dyn_kafka_topic[SRVBUFLEN];
643 struct p_kafka_host *alog = (struct p_kafka_host *) kafka_log;
644 int ret = ERR;
645
646 if (obj && len) {
647 if (alog->topic_rr.max) {
648 orig_kafka_topic = p_kafka_get_topic(alog);
649 P_handle_table_dyn_rr(dyn_kafka_topic, SRVBUFLEN, orig_kafka_topic, &alog->topic_rr);
650 p_kafka_set_topic(alog, dyn_kafka_topic);
651 }
652
653 ret = p_kafka_produce_data(alog, obj, len);
654
655 if (alog->topic_rr.max) p_kafka_set_topic(alog, orig_kafka_topic);
656 }
657
658 return ret;
659 }
660