1 /*
2     pmacct (Promiscuous mode IP Accounting package)
3     pmacct is Copyright (C) 2003-2020 by Paolo Lucente
4 */
5 
6 /*
7     This program is free software; you can redistribute it and/or modify
8     it under the terms of the GNU General Public License as published by
9     the Free Software Foundation; either version 2 of the License, or
10     (at your option) any later version.
11 
12     This program is distributed in the hope that it will be useful,
13     but WITHOUT ANY WARRANTY; without even the implied warranty of
14     MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
15     GNU General Public License for more details.
16 
17     You should have received a copy of the GNU General Public License
18     along with this program; if not, write to the Free Software
19     Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.
20 */
21 
22 #ifndef KAFKA_COMMON_H
23 #define KAFKA_COMMON_H
24 
25 
26 /* includes */
27 #include <librdkafka/rdkafka.h>
28 #include "plugin_common.h"
29 #ifdef WITH_SERDES
30 #include <libserdes/serdes-avro.h>
31 #endif
32 
33 /* defines */
34 #define PM_KAFKA_ERRSTR_LEN		512
35 #define PM_KAFKA_DEFAULT_RETRY		60
36 #define PM_KAFKA_LONGLONG_RETRY		INT_MAX
37 #define PM_KAFKA_OUTQ_LEN_RETRIES	3
38 
39 #define PM_KAFKA_CNT_TYPE_STR		1
40 #define PM_KAFKA_CNT_TYPE_BIN		2
41 
42 /* structures */
43 struct p_kafka_host {
44   char broker[SRVBUFLEN];
45   char errstr[PM_KAFKA_ERRSTR_LEN];
46   u_int8_t content_type;
47 
48   rd_kafka_t *rk;
49   rd_kafka_conf_t *cfg;
50   rd_kafka_topic_t *topic;
51   rd_kafka_topic_conf_t *topic_cfg;
52   char *config_file;
53   int partition;
54   char *key;
55   int key_len;
56   struct p_table_rr topic_rr;
57 
58 #ifdef WITH_SERDES
59   serdes_schema_t *sd_schema[MAX_AVRO_SCHEMA];
60 #endif
61 
62   struct p_broker_timers btimers;
63 };
64 
65 /* prototypes */
66 extern void p_kafka_init_host(struct p_kafka_host *, char *);
67 extern void p_kafka_init_topic_rr(struct p_kafka_host *);
68 
69 extern void p_kafka_set_broker(struct p_kafka_host *, char *, int);
70 extern void p_kafka_set_topic(struct p_kafka_host *, char *);
71 extern void p_kafka_set_topic_rr(struct p_kafka_host *, int);
72 extern void p_kafka_set_content_type(struct p_kafka_host *, int);
73 extern void p_kafka_set_partition(struct p_kafka_host *, int);
74 extern void p_kafka_set_key(struct p_kafka_host *, char *, int);
75 extern void p_kafka_set_config_file(struct p_kafka_host *, char *);
76 
77 extern rd_kafka_t *p_kafka_get_handler(struct p_kafka_host *);
78 extern char *p_kafka_get_broker(struct p_kafka_host *);
79 extern char *p_kafka_get_topic(struct p_kafka_host *);
80 extern int p_kafka_get_topic_rr(struct p_kafka_host *);
81 extern int p_kafka_get_content_type(struct p_kafka_host *);
82 extern int p_kafka_get_partition(struct p_kafka_host *);
83 extern void p_kafka_set_dynamic_partitioner(struct p_kafka_host *);
84 extern char *p_kafka_get_key(struct p_kafka_host *);
85 extern void p_kafka_get_version();
86 
87 extern void p_kafka_unset_topic(struct p_kafka_host *);
88 
89 extern int p_kafka_parse_config_entry(char *, char *, char **, char **);
90 extern void p_kafka_apply_global_config(struct p_kafka_host *);
91 extern void p_kafka_apply_topic_config(struct p_kafka_host *);
92 
93 extern void p_kafka_logger(const rd_kafka_t *, int, const char *, const char *);
94 extern void p_kafka_msg_delivered(rd_kafka_t *, void *, size_t, int, void *, void *);
95 extern void p_kafka_msg_error(rd_kafka_t *, int, const char *, void *);
96 extern int p_kafka_stats(rd_kafka_t *, char *, size_t, void *);
97 
98 extern int p_kafka_connect_to_produce(struct p_kafka_host *);
99 extern int p_kafka_produce_data(struct p_kafka_host *, void *, size_t);
100 extern int p_kafka_produce_data_to_part(struct p_kafka_host *, void *, size_t, int);
101 
102 extern int p_kafka_connect_to_consume(struct p_kafka_host *);
103 extern int p_kafka_manage_consumer(struct p_kafka_host *, int);
104 extern int p_kafka_consume_poller(struct p_kafka_host *, void **, int);
105 extern int p_kafka_consume_data(struct p_kafka_host *, void *, u_char *, size_t);
106 
107 extern void p_kafka_close(struct p_kafka_host *, int);
108 extern int p_kafka_check_outq_len(struct p_kafka_host *);
109 
110 extern int write_and_free_json_kafka(void *, void *);
111 extern int write_binary_kafka(void *, void *, size_t);
112 
113 /* global vars */
114 extern struct p_kafka_host kafkap_kafka_host;
115 extern struct p_kafka_host bgp_daemon_msglog_kafka_host;
116 extern struct p_kafka_host bgp_table_dump_kafka_host;
117 extern struct p_kafka_host bmp_daemon_msglog_kafka_host;
118 extern struct p_kafka_host bmp_dump_kafka_host;
119 extern struct p_kafka_host sfacctd_counter_kafka_host;
120 extern struct p_kafka_host telemetry_kafka_host;
121 extern struct p_kafka_host telemetry_daemon_msglog_kafka_host;
122 extern struct p_kafka_host telemetry_dump_kafka_host;
123 extern struct p_kafka_host nfacctd_kafka_host;
124 
125 extern int kafkap_ret_err_cb;
126 extern int dyn_partition_key;
127 
128 extern char default_kafka_broker_host[];
129 extern int default_kafka_broker_port;
130 extern char default_kafka_topic[];
131 #endif //KAFKA_COMMON_H
132