1 /* 2 * Copyright (c) 2013-2016 Cisco Systems, Inc. and others. All rights reserved. 3 * 4 * This program and the accompanying materials are made available under the 5 * terms of the Eclipse Public License v1.0 which accompanies this distribution, 6 * and is available at http://www.eclipse.org/legal/epl-v10.html 7 * 8 */ 9 10 #ifndef MSGBUSIMPL_KAFKA_H_ 11 #define MSGBUSIMPL_KAFKA_H_ 12 13 #define HASH_SIZE 16 14 15 #include "MsgBusInterface.hpp" 16 #include "Logger.h" 17 #include <string> 18 #include <map> 19 #include <vector> 20 #include <ctime> 21 22 #include <librdkafka/rdkafkacpp.h> 23 24 #include <thread> 25 #include "safeQueue.hpp" 26 #include "KafkaEventCallback.h" 27 #include "KafkaDeliveryReportCallback.h" 28 #include "KafkaTopicSelector.h" 29 30 #include "Config.h" 31 32 /** 33 * \class msgBus_kafka 34 * 35 * \brief Kafka message bus implementation 36 */ 37 class msgBus_kafka: public MsgBusInterface { 38 public: 39 #define MSGBUS_WORKING_BUF_SIZE 1800000 40 #define MSGBUS_API_VERSION "1.7" 41 42 /******************************************************************//** 43 * \brief This function will initialize and connect to Kafka. 44 * 45 * \details It is expected that this class will start off with a new connection. 46 * 47 * \param [in] logPtr Pointer to Logger instance 48 * \param [in] cfg Pointer to the config instance 49 * \param [in] c_hash_id Collector Hash ID 50 ********************************************************************/ 51 msgBus_kafka(Logger *logPtr, Config *cfg, u_char *c_hash_id); 52 ~msgBus_kafka(); 53 54 /* 55 * abstract methods implemented 56 * See MsgBusInterface.hpp for method details 57 */ 58 void update_Collector(struct obj_collector &c_obj, collector_action_code action_code); 59 void update_Router(struct obj_router &r_entry, router_action_code code); 60 void update_Peer(obj_bgp_peer &peer, obj_peer_up_event *up, obj_peer_down_event *down, peer_action_code code); 61 void update_baseAttribute(obj_bgp_peer &peer, obj_path_attr &attr, base_attr_action_code code); 62 void update_unicastPrefix(obj_bgp_peer &peer, std::vector<obj_rib> &rib, obj_path_attr *attr, unicast_prefix_action_code code); 63 void add_StatReport(obj_bgp_peer &peer, obj_stats_report &stats); 64 65 void update_LsNode(obj_bgp_peer &peer, obj_path_attr &attr, std::list<MsgBusInterface::obj_ls_node> &nodes, 66 ls_action_code code); 67 void update_LsLink(obj_bgp_peer &peer, obj_path_attr &attr, std::list<MsgBusInterface::obj_ls_link> &links, 68 ls_action_code code); 69 void update_LsPrefix(obj_bgp_peer &peer, obj_path_attr &attr, std::list<MsgBusInterface::obj_ls_prefix> &prefixes, 70 ls_action_code code); 71 72 void update_L3Vpn(obj_bgp_peer &peer, std::vector<obj_vpn> &vpn, obj_path_attr *attr, vpn_action_code code); 73 74 void update_eVPN(obj_bgp_peer &peer, std::vector<obj_evpn> &vpn, obj_path_attr *attr, vpn_action_code code); 75 76 void send_bmp_raw(u_char *r_hash, obj_bgp_peer &peer, u_char *data, size_t data_len); 77 78 // Debug methods 79 void enableDebug(); 80 void disableDebug(); 81 82 private: 83 char *prep_buf; ///< Large working buffer for message preparation 84 unsigned char *producer_buf; ///< Producer message buffer 85 bool debug; ///< debug flag to indicate debugging 86 Logger *logger; ///< Logging class pointer 87 88 std::string collector_hash; ///< collector hash string value 89 90 uint64_t router_seq; ///< Router add/del sequence 91 uint64_t collector_seq; ///< Collector add/del sequence 92 uint64_t peer_seq ; ///< Peer add/del sequence 93 uint64_t base_attr_seq; ///< Base attribute sequence 94 uint64_t unicast_prefix_seq; ///< Unicast prefix sequence 95 uint64_t bmp_stat_seq; ///< BMP stats sequence 96 uint64_t ls_node_seq; ///< LS node sequence 97 uint64_t ls_link_seq; ///< LS link sequence 98 uint64_t ls_prefix_seq; ///< LS prefix sequence 99 uint64_t l3vpn_seq; ///< l3vpn sequence 100 uint64_t evpn_seq; ///< evpn sequence 101 102 Config *cfg; ///< Pointer to config instance 103 104 /** 105 * Kafka Configuration object (global) 106 */ 107 RdKafka::Conf *conf; 108 109 RdKafka::Producer *producer; ///< Kafka Producer instance 110 111 /** 112 * Callback handlers 113 */ 114 KafkaEventCallback *event_callback; 115 KafkaDeliveryReportCallback *delivery_callback; 116 117 bool isConnected; ///< Indicates if Kafka is connected or not 118 119 // array of hashes 120 std::map<std::string, std::string> peer_list; 121 typedef std::map<std::string, std::string>::iterator peer_list_iter; 122 123 std::string router_ip; ///< Router IP in printed format 124 u_char router_hash[16]; ///< Router Hash in binary format 125 std::string router_group_name; ///< Router group name - if matched 126 127 128 std::map<std::string, RdKafka::Topic*> topic; 129 130 KafkaTopicSelector *topicSel; ///< Kafka topic selector/handler 131 132 /** 133 * Connects to kafka broker 134 */ 135 void connect(); 136 137 /** 138 * Disconnects from kafka broker 139 */ 140 void disconnect(int wait_ms=2000); 141 142 /** 143 * produce message to Kafka 144 * 145 * \param [in] topic_var Topic var to use in KafkaTopicSelector::getTopic() 146 * \param [in] msg message to produce 147 * \param [in] msg_size Length in bytes of the message 148 * \param [in] rows Number of rows in data 149 * \param [in] key Hash key 150 * \param [in] peer_group Peer group name - empty/NULL if not set or used 151 * \param [in] peer_asn Peer ASN 152 */ 153 void produce(const char *topic_var, char *msg, size_t msg_size, int rows, 154 std::string key, const std::string *peer_group, uint32_t); 155 156 /** 157 * \brief Method to resolve the IP address to a hostname 158 * 159 * \param [in] name String name (ip address) 160 * \param [out] hostname String reference for hostname 161 * 162 * \returns true if error, false if no error 163 */ 164 bool resolveIp(std::string name, std::string &hostname); 165 166 167 }; 168 169 #endif /* MSGBUSIMPL_KAFKA_H_ */ 170