1 /* 2 pmacct (Promiscuous mode IP Accounting package) 3 pmacct is Copyright (C) 2003-2020 by Paolo Lucente 4 */ 5 6 /* 7 This program is free software; you can redistribute it and/or modify 8 it under the terms of the GNU General Public License as published by 9 the Free Software Foundation; either version 2 of the License, or 10 (at your option) any later version. 11 12 This program is distributed in the hope that it will be useful, 13 but WITHOUT ANY WARRANTY; without even the implied warranty of 14 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 15 GNU General Public License for more details. 16 17 You should have received a copy of the GNU General Public License 18 along with this program; if not, write to the Free Software 19 Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA. 20 */ 21 22 #ifndef KAFKA_COMMON_H 23 #define KAFKA_COMMON_H 24 25 26 /* includes */ 27 #include <librdkafka/rdkafka.h> 28 #include "plugin_common.h" 29 #ifdef WITH_SERDES 30 #include <libserdes/serdes-avro.h> 31 #endif 32 33 /* defines */ 34 #define PM_KAFKA_ERRSTR_LEN 512 35 #define PM_KAFKA_DEFAULT_RETRY 60 36 #define PM_KAFKA_LONGLONG_RETRY INT_MAX 37 #define PM_KAFKA_OUTQ_LEN_RETRIES 3 38 39 #define PM_KAFKA_CNT_TYPE_STR 1 40 #define PM_KAFKA_CNT_TYPE_BIN 2 41 42 /* structures */ 43 struct p_kafka_host { 44 char broker[SRVBUFLEN]; 45 char errstr[PM_KAFKA_ERRSTR_LEN]; 46 u_int8_t content_type; 47 48 rd_kafka_t *rk; 49 rd_kafka_conf_t *cfg; 50 rd_kafka_topic_t *topic; 51 rd_kafka_topic_conf_t *topic_cfg; 52 char *config_file; 53 int partition; 54 char *key; 55 int key_len; 56 struct p_table_rr topic_rr; 57 58 #ifdef WITH_SERDES 59 serdes_schema_t *sd_schema[MAX_AVRO_SCHEMA]; 60 #endif 61 62 struct p_broker_timers btimers; 63 }; 64 65 /* prototypes */ 66 extern void p_kafka_init_host(struct p_kafka_host *, char *); 67 extern void p_kafka_init_topic_rr(struct p_kafka_host *); 68 69 extern void p_kafka_set_broker(struct p_kafka_host *, char *, int); 70 extern void p_kafka_set_topic(struct p_kafka_host *, char *); 71 extern void p_kafka_set_topic_rr(struct p_kafka_host *, int); 72 extern void p_kafka_set_content_type(struct p_kafka_host *, int); 73 extern void p_kafka_set_partition(struct p_kafka_host *, int); 74 extern void p_kafka_set_key(struct p_kafka_host *, char *, int); 75 extern void p_kafka_set_config_file(struct p_kafka_host *, char *); 76 77 extern rd_kafka_t *p_kafka_get_handler(struct p_kafka_host *); 78 extern char *p_kafka_get_broker(struct p_kafka_host *); 79 extern char *p_kafka_get_topic(struct p_kafka_host *); 80 extern int p_kafka_get_topic_rr(struct p_kafka_host *); 81 extern int p_kafka_get_content_type(struct p_kafka_host *); 82 extern int p_kafka_get_partition(struct p_kafka_host *); 83 extern void p_kafka_set_dynamic_partitioner(struct p_kafka_host *); 84 extern char *p_kafka_get_key(struct p_kafka_host *); 85 extern void p_kafka_get_version(); 86 87 extern void p_kafka_unset_topic(struct p_kafka_host *); 88 89 extern int p_kafka_parse_config_entry(char *, char *, char **, char **); 90 extern void p_kafka_apply_global_config(struct p_kafka_host *); 91 extern void p_kafka_apply_topic_config(struct p_kafka_host *); 92 93 extern void p_kafka_logger(const rd_kafka_t *, int, const char *, const char *); 94 extern void p_kafka_msg_delivered(rd_kafka_t *, void *, size_t, int, void *, void *); 95 extern void p_kafka_msg_error(rd_kafka_t *, int, const char *, void *); 96 extern int p_kafka_stats(rd_kafka_t *, char *, size_t, void *); 97 98 extern int p_kafka_connect_to_produce(struct p_kafka_host *); 99 extern int p_kafka_produce_data(struct p_kafka_host *, void *, size_t); 100 extern int p_kafka_produce_data_to_part(struct p_kafka_host *, void *, size_t, int); 101 102 extern int p_kafka_connect_to_consume(struct p_kafka_host *); 103 extern int p_kafka_manage_consumer(struct p_kafka_host *, int); 104 extern int p_kafka_consume_poller(struct p_kafka_host *, void **, int); 105 extern int p_kafka_consume_data(struct p_kafka_host *, void *, u_char *, size_t); 106 107 extern void p_kafka_close(struct p_kafka_host *, int); 108 extern int p_kafka_check_outq_len(struct p_kafka_host *); 109 110 extern int write_and_free_json_kafka(void *, void *); 111 extern int write_binary_kafka(void *, void *, size_t); 112 113 /* global vars */ 114 extern struct p_kafka_host kafkap_kafka_host; 115 extern struct p_kafka_host bgp_daemon_msglog_kafka_host; 116 extern struct p_kafka_host bgp_table_dump_kafka_host; 117 extern struct p_kafka_host bmp_daemon_msglog_kafka_host; 118 extern struct p_kafka_host bmp_dump_kafka_host; 119 extern struct p_kafka_host sfacctd_counter_kafka_host; 120 extern struct p_kafka_host telemetry_kafka_host; 121 extern struct p_kafka_host telemetry_daemon_msglog_kafka_host; 122 extern struct p_kafka_host telemetry_dump_kafka_host; 123 extern struct p_kafka_host nfacctd_kafka_host; 124 125 extern int kafkap_ret_err_cb; 126 extern int dyn_partition_key; 127 128 extern char default_kafka_broker_host[]; 129 extern int default_kafka_broker_port; 130 extern char default_kafka_topic[]; 131 #endif //KAFKA_COMMON_H 132