1 /*
2     pmacct (Promiscuous mode IP Accounting package)
3     pmacct is Copyright (C) 2003-2020 by Paolo Lucente
4 */
5 
6 /*
7     This program is free software; you can redistribute it and/or modify
8     it under the terms of the GNU General Public License as published by
9     the Free Software Foundation; either version 2 of the License, or
10     (at your option) any later version.
11 
12     This program is distributed in the hope that it will be useful,
13     but WITHOUT ANY WARRANTY; without even the implied warranty of
14     MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
15     GNU General Public License for more details.
16 
17     You should have received a copy of the GNU General Public License
18     along with this program; if not, write to the Free Software
19     Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.
20 */
21 
22 /* includes */
23 #include "pmacct.h"
24 #include "pmacct-data.h"
25 #include "plugin_common.h"
26 #include "kafka_common.h"
27 #include "base64.h"
28 
29 struct p_kafka_host kafkap_kafka_host;
30 struct p_kafka_host bgp_daemon_msglog_kafka_host;
31 struct p_kafka_host bgp_table_dump_kafka_host;
32 struct p_kafka_host bmp_daemon_msglog_kafka_host;
33 struct p_kafka_host bmp_dump_kafka_host;
34 struct p_kafka_host sfacctd_counter_kafka_host;
35 struct p_kafka_host telemetry_kafka_host;
36 struct p_kafka_host telemetry_daemon_msglog_kafka_host;
37 struct p_kafka_host telemetry_dump_kafka_host;
38 struct p_kafka_host nfacctd_kafka_host;
39 
40 int kafkap_ret_err_cb;
41 int dyn_partition_key;
42 char default_kafka_broker_host[] = "127.0.0.1";
43 int default_kafka_broker_port = 9092;
44 char default_kafka_topic[] = "pmacct.acct";
45 
46 /* Functions */
p_kafka_init_host(struct p_kafka_host * kafka_host,char * config_file)47 void p_kafka_init_host(struct p_kafka_host *kafka_host, char *config_file)
48 {
49   if (kafka_host) {
50     memset(kafka_host, 0, sizeof(struct p_kafka_host));
51     P_broker_timers_set_retry_interval(&kafka_host->btimers, PM_KAFKA_DEFAULT_RETRY);
52     p_kafka_set_config_file(kafka_host, config_file);
53 
54     kafka_host->cfg = rd_kafka_conf_new();
55     if (kafka_host->cfg) {
56       rd_kafka_conf_set_log_cb(kafka_host->cfg, p_kafka_logger);
57       rd_kafka_conf_set_error_cb(kafka_host->cfg, p_kafka_msg_error);
58       rd_kafka_conf_set_dr_cb(kafka_host->cfg, p_kafka_msg_delivered);
59       rd_kafka_conf_set_stats_cb(kafka_host->cfg, p_kafka_stats);
60       rd_kafka_conf_set_opaque(kafka_host->cfg, kafka_host);
61       p_kafka_apply_global_config(kafka_host);
62 
63       if (config.debug) {
64 	const char **res;
65 	size_t res_len, idx;
66 
67 	res = rd_kafka_conf_dump(kafka_host->cfg, &res_len);
68 	for (idx = 0; idx < res_len; idx += 2)
69 	  Log(LOG_DEBUG, "DEBUG ( %s/%s ): librdkafka global config: %s = %s\n", config.name, config.type, res[idx], res[idx + 1]);
70 
71 	rd_kafka_conf_dump_free(res, res_len);
72       }
73     }
74   }
75 }
76 
p_kafka_unset_topic(struct p_kafka_host * kafka_host)77 void p_kafka_unset_topic(struct p_kafka_host *kafka_host)
78 {
79   if (kafka_host && kafka_host->topic) {
80     rd_kafka_topic_destroy(kafka_host->topic);
81     kafka_host->topic = NULL;
82   }
83 }
84 
p_kafka_set_topic(struct p_kafka_host * kafka_host,char * topic)85 void p_kafka_set_topic(struct p_kafka_host *kafka_host, char *topic)
86 {
87   if (kafka_host) {
88     kafka_host->topic_cfg = rd_kafka_topic_conf_new();
89     p_kafka_apply_topic_config(kafka_host);
90 
91     if (config.debug) {
92       const char **res;
93       size_t res_len, idx;
94 
95       res = rd_kafka_topic_conf_dump(kafka_host->topic_cfg, &res_len);
96       for (idx = 0; idx < res_len; idx += 2)
97         Log(LOG_DEBUG, "DEBUG ( %s/%s ): librdkafka '%s' topic config: %s = %s\n", config.name, config.type, topic, res[idx], res[idx + 1]);
98 
99       rd_kafka_conf_dump_free(res, res_len);
100     }
101 
102     /* This needs to be done here otherwise kafka_host->topic_cfg is null
103      * and the partitioner cannot be set */
104     if (config.kafka_partition_dynamic && kafka_host->topic_cfg)
105       p_kafka_set_dynamic_partitioner(kafka_host);
106 
107     /* destroy current allocation before making a new one */
108     if (kafka_host->topic) p_kafka_unset_topic(kafka_host);
109 
110     if (kafka_host->rk && kafka_host->topic_cfg) {
111       kafka_host->topic = rd_kafka_topic_new(kafka_host->rk, topic, kafka_host->topic_cfg);
112       kafka_host->topic_cfg = NULL; /* rd_kafka_topic_new() destroys conf as per rdkafka.h */
113     }
114   }
115 }
116 
p_kafka_get_handler(struct p_kafka_host * kafka_host)117 rd_kafka_t *p_kafka_get_handler(struct p_kafka_host *kafka_host)
118 {
119   if (kafka_host) return kafka_host->rk;
120 
121   return NULL;
122 }
123 
p_kafka_get_broker(struct p_kafka_host * kafka_host)124 char *p_kafka_get_broker(struct p_kafka_host *kafka_host)
125 {
126   if (kafka_host && strlen(kafka_host->broker)) return kafka_host->broker;
127 
128   return NULL;
129 }
130 
p_kafka_get_topic(struct p_kafka_host * kafka_host)131 char *p_kafka_get_topic(struct p_kafka_host *kafka_host)
132 {
133   if (kafka_host && kafka_host->topic)
134     return (char*)rd_kafka_topic_name(kafka_host->topic);
135 
136   return NULL;
137 }
138 
p_kafka_init_topic_rr(struct p_kafka_host * kafka_host)139 void p_kafka_init_topic_rr(struct p_kafka_host *kafka_host)
140 {
141   if (kafka_host) memset(&kafka_host->topic_rr, 0, sizeof(struct p_table_rr));
142 }
143 
p_kafka_set_topic_rr(struct p_kafka_host * kafka_host,int topic_rr)144 void p_kafka_set_topic_rr(struct p_kafka_host *kafka_host, int topic_rr)
145 {
146   if (kafka_host) kafka_host->topic_rr.max = topic_rr;
147 }
148 
p_kafka_get_topic_rr(struct p_kafka_host * kafka_host)149 int p_kafka_get_topic_rr(struct p_kafka_host *kafka_host)
150 {
151   if (kafka_host) return kafka_host->topic_rr.max;
152 
153   return FALSE;
154 }
155 
p_kafka_set_broker(struct p_kafka_host * kafka_host,char * host,int port)156 void p_kafka_set_broker(struct p_kafka_host *kafka_host, char *host, int port)
157 {
158   int ret, multiple_brokers = FALSE;
159 
160   if (strchr(host, ',')) multiple_brokers = TRUE;
161 
162   if (kafka_host && kafka_host->rk) {
163     /* if host is a comma-separated list of brokers, assume port is part of the definition */
164     if (multiple_brokers) snprintf(kafka_host->broker, SRVBUFLEN, "%s", host);
165     else {
166       if (host && port) snprintf(kafka_host->broker, SRVBUFLEN, "%s:%u", host, port);
167       else if (host && !port) snprintf(kafka_host->broker, SRVBUFLEN, "%s", host);
168     }
169 
170     if ((ret = rd_kafka_brokers_add(kafka_host->rk, kafka_host->broker)) == 0) {
171       Log(LOG_WARNING, "WARN ( %s/%s ): Invalid 'kafka_broker_host' or 'kafka_broker_port' specified (%s).\n",
172 	  config.name, config.type, kafka_host->broker);
173     }
174     else Log(LOG_DEBUG, "DEBUG ( %s/%s ): %u broker(s) successfully added.\n", config.name, config.type, ret);
175   }
176 }
177 
p_kafka_set_content_type(struct p_kafka_host * kafka_host,int content_type)178 void p_kafka_set_content_type(struct p_kafka_host *kafka_host, int content_type)
179 {
180   if (kafka_host) kafka_host->content_type = content_type;
181 }
182 
p_kafka_get_content_type(struct p_kafka_host * kafka_host)183 int p_kafka_get_content_type(struct p_kafka_host *kafka_host)
184 {
185   if (kafka_host) return kafka_host->content_type;
186 
187   return FALSE;
188 }
189 
p_kafka_set_partition(struct p_kafka_host * kafka_host,int partition)190 void p_kafka_set_partition(struct p_kafka_host *kafka_host, int partition)
191 {
192   if (kafka_host) {
193     if (!partition) kafka_host->partition = RD_KAFKA_PARTITION_UA;
194     else if (partition == FALSE_NONZERO) kafka_host->partition = 0;
195     else kafka_host->partition = partition;
196   }
197 }
198 
p_kafka_get_partition(struct p_kafka_host * kafka_host)199 int p_kafka_get_partition(struct p_kafka_host *kafka_host)
200 {
201   if (kafka_host) return kafka_host->partition;
202 
203   return FALSE;
204 }
205 
p_kafka_set_dynamic_partitioner(struct p_kafka_host * kafka_host)206 void p_kafka_set_dynamic_partitioner(struct p_kafka_host *kafka_host)
207 {
208   rd_kafka_topic_conf_set_partitioner_cb(kafka_host->topic_cfg, &rd_kafka_msg_partitioner_consistent_random);
209 }
210 
p_kafka_set_key(struct p_kafka_host * kafka_host,char * key,int key_len)211 void p_kafka_set_key(struct p_kafka_host *kafka_host, char *key, int key_len)
212 {
213   if (kafka_host) {
214     kafka_host->key = key;
215     kafka_host->key_len = key_len;
216   }
217 }
218 
p_kafka_get_key(struct p_kafka_host * kafka_host)219 char *p_kafka_get_key(struct p_kafka_host *kafka_host)
220 {
221   if (kafka_host) return kafka_host->key;
222 
223   return NULL;
224 }
225 
p_kafka_set_config_file(struct p_kafka_host * kafka_host,char * config_file)226 void p_kafka_set_config_file(struct p_kafka_host *kafka_host, char *config_file)
227 {
228   if (kafka_host) {
229     kafka_host->config_file = config_file;
230   }
231 }
232 
p_kafka_get_version()233 void p_kafka_get_version()
234 {
235   printf("rdkafka %s\n", rd_kafka_version_str());
236 }
237 
p_kafka_parse_config_entry(char * buf,char * type,char ** key,char ** value)238 int p_kafka_parse_config_entry(char *buf, char *type, char **key, char **value)
239 {
240   char *value_ptr, *token;
241   int index, type_match = FALSE;
242 
243   if (buf && type && key && value) {
244     value_ptr = buf;
245     (*key) = NULL;
246     (*value) = NULL;
247     index = 0;
248 
249     while ((token = extract_token(&value_ptr, ','))) {
250       index++;
251       trim_spaces(token);
252 
253       if (index == 1) {
254 	lower_string(token);
255 	if (!strcmp(token, type)) type_match = TRUE;
256 	else break;
257       }
258       else if (index == 2) {
259 	(*key) = token;
260 	break;
261       }
262     }
263 
264     if (strlen(value_ptr)) {
265       trim_spaces(value_ptr);
266       (*value) = value_ptr;
267       index++;
268     }
269 
270     if (type_match && index != 3) return ERR;
271   }
272   else return ERR;
273 
274   return type_match;
275 }
276 
p_kafka_apply_global_config(struct p_kafka_host * kafka_host)277 void p_kafka_apply_global_config(struct p_kafka_host *kafka_host)
278 {
279   FILE *file;
280   char buf[SRVBUFLEN], errstr[SRVBUFLEN], *key, *value;
281   int lineno = 1, ret;
282 
283   if (kafka_host && kafka_host->config_file && kafka_host->cfg) {
284     if ((file = fopen(kafka_host->config_file, "r")) == NULL) {
285       Log(LOG_WARNING, "WARN ( %s/%s ): [%s] file not found. librdkafka global config not loaded.\n", config.name, config.type, kafka_host->config_file);
286       return;
287     }
288     else Log(LOG_INFO, "INFO ( %s/%s ): [%s] Reading librdkafka global config.\n", config.name, config.type, kafka_host->config_file);
289 
290     while (!feof(file)) {
291       if (fgets(buf, SRVBUFLEN, file)) {
292 	if ((ret = p_kafka_parse_config_entry(buf, "global", &key, &value)) > 0) {
293 	  ret = rd_kafka_conf_set(kafka_host->cfg, key, value, errstr, sizeof(errstr));
294 	  if (ret != RD_KAFKA_CONF_OK) {
295 	    Log(LOG_WARNING, "WARN ( %s/%s ): [%s:%u] key=%s value=%s failed: %s\n",
296 		config.name, config.type, kafka_host->config_file, lineno, key, value, errstr);
297 	  }
298         }
299 	else {
300 	  if (ret == ERR) {
301 	    Log(LOG_WARNING, "WARN ( %s/%s ): [%s:%u] Line malformed. Ignored.\n", config.name, config.type, kafka_host->config_file, lineno);
302 	  }
303 	}
304       }
305 
306       lineno++;
307     }
308 
309     fclose(file);
310   }
311 }
312 
p_kafka_apply_topic_config(struct p_kafka_host * kafka_host)313 void p_kafka_apply_topic_config(struct p_kafka_host *kafka_host)
314 {
315   FILE *file;
316   char buf[SRVBUFLEN], errstr[SRVBUFLEN], *key, *value;
317   int lineno = 1, ret;
318 
319   if (kafka_host && kafka_host->config_file && kafka_host->topic_cfg) {
320     if ((file = fopen(kafka_host->config_file, "r")) == NULL) {
321       Log(LOG_WARNING, "WARN ( %s/%s ): [%s] file not found. librdkafka topic configuration not loaded.\n", config.name, config.type, kafka_host->config_file);
322       return;
323     }
324     else Log(LOG_DEBUG, "DEBUG ( %s/%s ): [%s] Reading librdkafka topic configuration.\n", config.name, config.type, kafka_host->config_file);
325 
326     while (!feof(file)) {
327       if (fgets(buf, SRVBUFLEN, file)) {
328         if ((ret = p_kafka_parse_config_entry(buf, "topic", &key, &value)) > 0) {
329           ret = rd_kafka_topic_conf_set(kafka_host->topic_cfg, key, value, errstr, sizeof(errstr));
330           if (ret != RD_KAFKA_CONF_OK) {
331             Log(LOG_WARNING, "WARN ( %s/%s ): [%s:%u] key=%s value=%s failed: %s\n",
332                 config.name, config.type, kafka_host->config_file, lineno, key, value, errstr);
333           }
334         }
335         else {
336           if (ret == ERR) {
337             Log(LOG_WARNING, "WARN ( %s/%s ): [%s:%u] Line malformed. Ignored.\n", config.name, config.type, kafka_host->config_file, lineno);
338           }
339         }
340       }
341 
342       lineno++;
343     }
344 
345     fclose(file);
346   }
347 }
348 
p_kafka_logger(const rd_kafka_t * rk,int level,const char * fac,const char * buf)349 void p_kafka_logger(const rd_kafka_t *rk, int level, const char *fac, const char *buf)
350 {
351   struct timeval tv;
352 
353   gettimeofday(&tv, NULL);
354 
355   Log(LOG_DEBUG, "DEBUG ( %s/%s ): RDKAFKA-%i-%s: %s: %s\n", config.name, config.type, level, fac, rd_kafka_name(rk), buf);
356 }
357 
p_kafka_msg_delivered(rd_kafka_t * rk,void * payload,size_t len,int error_code,void * opaque,void * msg_opaque)358 void p_kafka_msg_delivered(rd_kafka_t *rk, void *payload, size_t len, int error_code, void *opaque, void *msg_opaque)
359 {
360   struct p_kafka_host *kafka_host = (struct p_kafka_host *) opaque;
361 
362   if (error_code) {
363     Log(LOG_ERR, "ERROR ( %s/%s ): Kafka message delivery failed: %s\n", config.name, config.type, rd_kafka_err2str(error_code));
364   }
365   else {
366     if (config.debug) {
367       if (p_kafka_get_content_type(kafka_host) == PM_KAFKA_CNT_TYPE_STR) {
368         char *payload_str = (char *) payload;
369 	char saved = payload_str[len];
370 
371 	payload_str[len] = '\0';
372         Log(LOG_DEBUG, "DEBUG ( %s/%s ): Kafka message delivery successful (%zd bytes): %p\n", config.name, config.type, len, payload);
373 	payload_str[len] = saved;
374       }
375       else {
376 	size_t base64_data_len = 0;
377 	u_char *base64_data = base64_encode(payload, len, &base64_data_len);
378 
379 	Log(LOG_DEBUG, "DEBUG ( %s/%s ): Kafka message delivery successful (%zd bytes): %s\n", config.name, config.type, len, base64_data);
380 
381 	if (base64_data) base64_freebuf(base64_data);
382       }
383     }
384   }
385 }
386 
p_kafka_msg_error(rd_kafka_t * rk,int err,const char * reason,void * opaque)387 void p_kafka_msg_error(rd_kafka_t *rk, int err, const char *reason, void *opaque)
388 {
389   kafkap_ret_err_cb = ERR;
390 }
391 
p_kafka_stats(rd_kafka_t * rk,char * json,size_t json_len,void * opaque)392 int p_kafka_stats(rd_kafka_t *rk, char *json, size_t json_len, void *opaque)
393 {
394   Log(LOG_INFO, "INFO ( %s/%s ): %s\n", config.name, config.type, json);
395 
396   /* We return 0 since we don't want to hold data any further;
397      see librdkafka header/docs for more info. */
398   return FALSE;
399 }
400 
p_kafka_connect_to_produce(struct p_kafka_host * kafka_host)401 int p_kafka_connect_to_produce(struct p_kafka_host *kafka_host)
402 {
403   if (kafka_host) {
404     rd_kafka_conf_t* cfg = rd_kafka_conf_dup(kafka_host->cfg);
405     if (cfg == NULL) {
406       Log(LOG_ERR, "ERROR ( %s/%s ): Failed to clone kafka config object\n", config.name, config.type);
407       p_kafka_close(kafka_host, TRUE);
408       return ERR;
409     }
410 
411     kafka_host->rk = rd_kafka_new(RD_KAFKA_PRODUCER, cfg, kafka_host->errstr, sizeof(kafka_host->errstr));
412     if (!kafka_host->rk) {
413       Log(LOG_ERR, "ERROR ( %s/%s ): Failed to create new Kafka producer: %s\n", config.name, config.type, kafka_host->errstr);
414       p_kafka_close(kafka_host, TRUE);
415       return ERR;
416     }
417 
418     if (config.debug) rd_kafka_set_log_level(kafka_host->rk, LOG_DEBUG);
419   }
420   else return ERR;
421 
422   return SUCCESS;
423 }
424 
p_kafka_produce_data_to_part(struct p_kafka_host * kafka_host,void * data,size_t data_len,int part)425 int p_kafka_produce_data_to_part(struct p_kafka_host *kafka_host, void *data, size_t data_len, int part)
426 {
427   int ret = SUCCESS;
428 
429   kafkap_ret_err_cb = FALSE;
430 
431   if (kafka_host && kafka_host->rk && kafka_host->topic) {
432     ret = rd_kafka_produce(kafka_host->topic, part, RD_KAFKA_MSG_F_COPY,
433 			   data, data_len, kafka_host->key, kafka_host->key_len, NULL);
434 
435     if (ret == ERR) {
436       Log(LOG_ERR, "ERROR ( %s/%s ): Failed to produce to topic %s partition %i: %s\n", config.name, config.type,
437           rd_kafka_topic_name(kafka_host->topic), part, rd_kafka_err2str(rd_kafka_last_error()));
438       p_kafka_close(kafka_host, TRUE);
439       return ERR;
440     }
441   }
442   else return ERR;
443 
444   rd_kafka_poll(kafka_host->rk, 0);
445 
446   return ret;
447 }
448 
p_kafka_produce_data(struct p_kafka_host * kafka_host,void * data,size_t data_len)449 int p_kafka_produce_data(struct p_kafka_host *kafka_host, void *data, size_t data_len)
450 {
451   return p_kafka_produce_data_to_part(kafka_host, data, data_len, kafka_host->partition);
452 }
453 
p_kafka_connect_to_consume(struct p_kafka_host * kafka_host)454 int p_kafka_connect_to_consume(struct p_kafka_host *kafka_host)
455 {
456   if (kafka_host) {
457     rd_kafka_conf_t* cfg = rd_kafka_conf_dup(kafka_host->cfg);
458     if (cfg == NULL) {
459       Log(LOG_ERR, "ERROR ( %s/%s ): Failed to clone kafka config object\n", config.name, config.type);
460       p_kafka_close(kafka_host, TRUE);
461       return ERR;
462     }
463 
464     kafka_host->rk = rd_kafka_new(RD_KAFKA_CONSUMER, cfg, kafka_host->errstr, sizeof(kafka_host->errstr));
465     if (!kafka_host->rk) {
466       Log(LOG_ERR, "ERROR ( %s/%s ): Failed to create new Kafka producer: %s\n", config.name, config.type, kafka_host->errstr);
467       p_kafka_close(kafka_host, TRUE);
468       return ERR;
469     }
470 
471     if (config.debug) rd_kafka_set_log_level(kafka_host->rk, LOG_DEBUG);
472   }
473   else return ERR;
474 
475   return SUCCESS;
476 }
477 
p_kafka_manage_consumer(struct p_kafka_host * kafka_host,int is_start)478 int p_kafka_manage_consumer(struct p_kafka_host *kafka_host, int is_start)
479 {
480   int ret = SUCCESS;
481 
482   kafkap_ret_err_cb = FALSE;
483 
484   if (kafka_host && kafka_host->rk && kafka_host->topic && !validate_truefalse(is_start)) {
485     if (is_start) {
486       ret = rd_kafka_consume_start(kafka_host->topic, kafka_host->partition, RD_KAFKA_OFFSET_END);
487       if (ret == ERR) {
488         Log(LOG_ERR, "ERROR ( %s/%s ): Failed to start consuming topic %s partition %i: %s\n", config.name, config.type,
489           rd_kafka_topic_name(kafka_host->topic), kafka_host->partition, rd_kafka_err2str(rd_kafka_last_error()));
490         p_kafka_close(kafka_host, TRUE);
491         return ERR;
492       }
493     }
494     else {
495       rd_kafka_consume_stop(kafka_host->topic, kafka_host->partition);
496       p_kafka_close(kafka_host, FALSE);
497     }
498   }
499   else return ERR;
500 
501   return ret;
502 }
503 
p_kafka_consume_poller(struct p_kafka_host * kafka_host,void ** data,int timeout)504 int p_kafka_consume_poller(struct p_kafka_host *kafka_host, void **data, int timeout)
505 {
506   rd_kafka_message_t *kafka_msg;
507   int ret = SUCCESS;
508 
509   if (kafka_host && data && timeout) {
510     kafka_msg = rd_kafka_consume(kafka_host->topic, kafka_host->partition, timeout);
511     if (!kafka_msg) ret = FALSE; /* timeout */
512     else ret = TRUE; /* got data */
513 
514     (*data) = kafka_msg;
515   }
516   else {
517     (*data) = NULL;
518     ret = ERR;
519   }
520 
521   return ret;
522 }
523 
p_kafka_consume_data(struct p_kafka_host * kafka_host,void * data,u_char * payload,size_t payload_len)524 int p_kafka_consume_data(struct p_kafka_host *kafka_host, void *data, u_char *payload, size_t payload_len)
525 {
526   rd_kafka_message_t *kafka_msg = (rd_kafka_message_t *) data;
527   int ret = 0;
528 
529   if (kafka_host && data && payload && payload_len) {
530     if (kafka_msg->payload && kafka_msg->len) {
531       if (kafka_msg->len <= payload_len) {
532         memcpy(payload, kafka_msg->payload, kafka_msg->len);
533         ret = kafka_msg->len;
534       }
535       else ret = ERR;
536     }
537     else ret = 0;
538   }
539   else ret = ERR;
540 
541   rd_kafka_message_destroy(kafka_msg);
542 
543   return ret;
544 }
545 
p_kafka_close(struct p_kafka_host * kafka_host,int set_fail)546 void p_kafka_close(struct p_kafka_host *kafka_host, int set_fail)
547 {
548   if (kafka_host && !validate_truefalse(set_fail)) {
549     if (set_fail) {
550       Log(LOG_ERR, "ERROR ( %s/%s ): Connection failed to Kafka: p_kafka_close()\n", config.name, config.type);
551       P_broker_timers_set_last_fail(&kafka_host->btimers, time(NULL));
552     }
553     else {
554       /* Wait for messages to be delivered */
555       if (kafka_host->rk) p_kafka_check_outq_len(kafka_host);
556     }
557 
558     if (kafka_host->topic) {
559       rd_kafka_topic_destroy(kafka_host->topic);
560       kafka_host->topic = NULL;
561     }
562 
563     if (kafka_host->topic_cfg) {
564       rd_kafka_topic_conf_destroy(kafka_host->topic_cfg);
565       kafka_host->topic_cfg = NULL;
566     }
567 
568     if (kafka_host->rk) {
569       rd_kafka_destroy(kafka_host->rk);
570       kafka_host->rk = NULL;
571     }
572 
573     if (kafka_host->cfg) {
574       rd_kafka_conf_destroy(kafka_host->cfg);
575       kafka_host->cfg = NULL;
576     }
577   }
578 }
579 
p_kafka_check_outq_len(struct p_kafka_host * kafka_host)580 int p_kafka_check_outq_len(struct p_kafka_host *kafka_host)
581 {
582   int outq_len = 0, old_outq_len = 0, retries = 0;
583 
584   if (kafka_host->rk) {
585     while ((outq_len = rd_kafka_outq_len(kafka_host->rk)) > 0) {
586       if (outq_len == old_outq_len) {
587 	if (retries < PM_KAFKA_OUTQ_LEN_RETRIES) retries++;
588 	else {
589 	  Log(LOG_ERR, "ERROR ( %s/%s ): Connection failed to Kafka: p_kafka_check_outq_len()\n", config.name, config.type);
590           p_kafka_close(kafka_host, TRUE);
591 	  return outq_len;
592 	}
593       }
594       else old_outq_len = outq_len;
595 
596       rd_kafka_poll(kafka_host->rk, 100);
597       sleep(1);
598     }
599   }
600   else return ERR;
601 
602   return SUCCESS;
603 }
604 
605 #if defined WITH_JANSSON
write_and_free_json_kafka(void * kafka_log,void * obj)606 int write_and_free_json_kafka(void *kafka_log, void *obj)
607 {
608   char *orig_kafka_topic = NULL, dyn_kafka_topic[SRVBUFLEN];
609   struct p_kafka_host *alog = (struct p_kafka_host *) kafka_log;
610   int ret = ERR;
611 
612   char *tmpbuf = NULL;
613   json_t *json_obj = (json_t *) obj;
614 
615   tmpbuf = json_dumps(json_obj, JSON_PRESERVE_ORDER);
616   json_decref(json_obj);
617 
618   if (tmpbuf) {
619     if (alog->topic_rr.max) {
620       orig_kafka_topic = p_kafka_get_topic(alog);
621       P_handle_table_dyn_rr(dyn_kafka_topic, SRVBUFLEN, orig_kafka_topic, &alog->topic_rr);
622       p_kafka_set_topic(alog, dyn_kafka_topic);
623     }
624 
625     ret = p_kafka_produce_data(alog, tmpbuf, strlen(tmpbuf));
626     free(tmpbuf);
627 
628     if (alog->topic_rr.max) p_kafka_set_topic(alog, orig_kafka_topic);
629   }
630 
631   return ret;
632 }
633 #else
write_and_free_json_kafka(void * kafka_log,void * obj)634 int write_and_free_json_kafka(void *kafka_log, void *obj)
635 {
636   if (config.debug) Log(LOG_DEBUG, "DEBUG ( %s/%s ): write_and_free_json_kafka(): JSON object not created due to missing --enable-jansson\n", config.name, config.type);
637 }
638 #endif
639 
write_binary_kafka(void * kafka_log,void * obj,size_t len)640 int write_binary_kafka(void *kafka_log, void *obj, size_t len)
641 {
642   char *orig_kafka_topic = NULL, dyn_kafka_topic[SRVBUFLEN];
643   struct p_kafka_host *alog = (struct p_kafka_host *) kafka_log;
644   int ret = ERR;
645 
646   if (obj && len) {
647     if (alog->topic_rr.max) {
648       orig_kafka_topic = p_kafka_get_topic(alog);
649       P_handle_table_dyn_rr(dyn_kafka_topic, SRVBUFLEN, orig_kafka_topic, &alog->topic_rr);
650       p_kafka_set_topic(alog, dyn_kafka_topic);
651     }
652 
653     ret = p_kafka_produce_data(alog, obj, len);
654 
655     if (alog->topic_rr.max) p_kafka_set_topic(alog, orig_kafka_topic);
656   }
657 
658   return ret;
659 }
660