1 /*
2  * Copyright (c) 2013-2016 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  *
8  */
9 
10 #ifndef MSGBUSIMPL_KAFKA_H_
11 #define MSGBUSIMPL_KAFKA_H_
12 
13 #define HASH_SIZE 16
14 
15 #include "MsgBusInterface.hpp"
16 #include "Logger.h"
17 #include <string>
18 #include <map>
19 #include <vector>
20 #include <ctime>
21 
22 #include <librdkafka/rdkafkacpp.h>
23 
24 #include <thread>
25 #include "safeQueue.hpp"
26 #include "KafkaEventCallback.h"
27 #include "KafkaDeliveryReportCallback.h"
28 #include "KafkaTopicSelector.h"
29 
30 #include "Config.h"
31 
32 /**
33  * \class   msgBus_kafka
34  *
35  * \brief   Kafka message bus implementation
36   */
37 class msgBus_kafka: public MsgBusInterface {
38 public:
39     #define MSGBUS_WORKING_BUF_SIZE         1800000
40     #define MSGBUS_API_VERSION              "1.7"
41 
42     /******************************************************************//**
43      * \brief This function will initialize and connect to Kafka.
44      *
45      * \details It is expected that this class will start off with a new connection.
46      *
47      *  \param [in] logPtr      Pointer to Logger instance
48      *  \param [in] cfg         Pointer to the config instance
49      *  \param [in] c_hash_id   Collector Hash ID
50      ********************************************************************/
51     msgBus_kafka(Logger *logPtr, Config *cfg, u_char *c_hash_id);
52     ~msgBus_kafka();
53 
54     /*
55      * abstract methods implemented
56      * See MsgBusInterface.hpp for method details
57      */
58     void update_Collector(struct obj_collector &c_obj, collector_action_code action_code);
59     void update_Router(struct obj_router &r_entry, router_action_code code);
60     void update_Peer(obj_bgp_peer &peer, obj_peer_up_event *up, obj_peer_down_event *down, peer_action_code code);
61     void update_baseAttribute(obj_bgp_peer &peer, obj_path_attr &attr, base_attr_action_code code);
62     void update_unicastPrefix(obj_bgp_peer &peer, std::vector<obj_rib> &rib, obj_path_attr *attr, unicast_prefix_action_code code);
63     void add_StatReport(obj_bgp_peer &peer, obj_stats_report &stats);
64 
65     void update_LsNode(obj_bgp_peer &peer, obj_path_attr &attr, std::list<MsgBusInterface::obj_ls_node> &nodes,
66                      ls_action_code code);
67     void update_LsLink(obj_bgp_peer &peer, obj_path_attr &attr, std::list<MsgBusInterface::obj_ls_link> &links,
68                      ls_action_code code);
69     void update_LsPrefix(obj_bgp_peer &peer, obj_path_attr &attr, std::list<MsgBusInterface::obj_ls_prefix> &prefixes,
70                       ls_action_code code);
71 
72     void update_L3Vpn(obj_bgp_peer &peer, std::vector<obj_vpn> &vpn, obj_path_attr *attr, vpn_action_code code);
73 
74     void update_eVPN(obj_bgp_peer &peer, std::vector<obj_evpn> &vpn, obj_path_attr *attr, vpn_action_code code);
75 
76     void send_bmp_raw(u_char *r_hash, obj_bgp_peer &peer, u_char *data, size_t data_len);
77 
78     // Debug methods
79     void enableDebug();
80     void disableDebug();
81 
82 private:
83     char            *prep_buf;                  ///< Large working buffer for message preparation
84     unsigned char   *producer_buf;              ///< Producer message buffer
85     bool            debug;                      ///< debug flag to indicate debugging
86     Logger          *logger;                    ///< Logging class pointer
87 
88     std::string     collector_hash;             ///< collector hash string value
89 
90     uint64_t        router_seq;                 ///< Router add/del sequence
91     uint64_t        collector_seq;              ///< Collector add/del sequence
92     uint64_t        peer_seq ;                  ///< Peer add/del sequence
93     uint64_t        base_attr_seq;              ///< Base attribute sequence
94     uint64_t        unicast_prefix_seq;         ///< Unicast prefix sequence
95     uint64_t        bmp_stat_seq;               ///< BMP stats sequence
96     uint64_t        ls_node_seq;                ///< LS node sequence
97     uint64_t        ls_link_seq;                ///< LS link sequence
98     uint64_t        ls_prefix_seq;              ///< LS prefix sequence
99     uint64_t        l3vpn_seq;                  ///< l3vpn sequence
100     uint64_t        evpn_seq;                   ///< evpn sequence
101 
102     Config          *cfg;                       ///< Pointer to config instance
103 
104     /**
105      * Kafka Configuration object (global)
106      */
107     RdKafka::Conf   *conf;
108 
109     RdKafka::Producer *producer;                ///< Kafka Producer instance
110 
111     /**
112      * Callback handlers
113      */
114     KafkaEventCallback              *event_callback;
115     KafkaDeliveryReportCallback     *delivery_callback;
116 
117     bool isConnected;                           ///< Indicates if Kafka is connected or not
118 
119     // array of hashes
120     std::map<std::string, std::string> peer_list;
121     typedef std::map<std::string, std::string>::iterator peer_list_iter;
122 
123     std::string router_ip;                      ///< Router IP in printed format
124     u_char      router_hash[16];                ///< Router Hash in binary format
125     std::string router_group_name;              ///< Router group name - if matched
126 
127 
128     std::map<std::string, RdKafka::Topic*> topic;
129 
130     KafkaTopicSelector *topicSel;               ///< Kafka topic selector/handler
131 
132     /**
133      * Connects to kafka broker
134      */
135     void connect();
136 
137     /**
138      * Disconnects from kafka broker
139      */
140     void disconnect(int wait_ms=2000);
141 
142     /**
143      * produce message to Kafka
144      *
145      * \param [in] topic_var     Topic var to use in KafkaTopicSelector::getTopic()
146      * \param [in] msg           message to produce
147      * \param [in] msg_size      Length in bytes of the message
148      * \param [in] rows          Number of rows in data
149      * \param [in] key           Hash key
150      * \param [in] peer_group    Peer group name - empty/NULL if not set or used
151      * \param [in] peer_asn      Peer ASN
152      */
153     void produce(const char *topic_var, char *msg, size_t msg_size, int rows,
154                  std::string key, const std::string *peer_group, uint32_t);
155 
156     /**
157     * \brief Method to resolve the IP address to a hostname
158     *
159     *  \param [in]   name      String name (ip address)
160     *  \param [out]  hostname  String reference for hostname
161     *
162     *  \returns true if error, false if no error
163     */
164     bool resolveIp(std::string name, std::string &hostname);
165 
166 
167 };
168 
169 #endif /* MSGBUSIMPL_KAFKA_H_ */
170