1 /* 2 * Copyright (c) 2013-2016 Cisco Systems, Inc. and others. All rights reserved. 3 * 4 * This program and the accompanying materials are made available under the 5 * terms of the Eclipse Public License v1.0 which accompanies this distribution, 6 * and is available at http://www.eclipse.org/legal/epl-v10.html 7 * 8 */ 9 10 #ifndef OPENBMP_KAFKATOPICSELECTOR_H 11 #define OPENBMP_KAFKATOPICSELECTOR_H 12 13 #include <librdkafka/rdkafkacpp.h> 14 #include "Config.h" 15 #include "Logger.h" 16 #include "KafkaPeerPartitionerCallback.h" 17 18 class KafkaTopicSelector { 19 public: 20 /** 21 * MSGBUS_TOPIC_* defines the default topic names 22 */ 23 #define MSGBUS_TOPIC_COLLECTOR "openbmp.parsed.collector" 24 #define MSGBUS_TOPIC_ROUTER "openbmp.parsed.router" 25 #define MSGBUS_TOPIC_PEER "openbmp.parsed.peer" 26 #define MSGBUS_TOPIC_BASE_ATTRIBUTE "openbmp.parsed.base_attribute" 27 #define MSGBUS_TOPIC_UNICAST_PREFIX "openbmp.parsed.unicast_prefix" 28 #define MSGBUS_TOPIC_L3VPN "openbmp.parsed.l3vpn" 29 #define MSGBUS_TOPIC_EVPN "openbmp.parsed.evpn" 30 #define MSGBUS_TOPIC_LS_NODE "openbmp.parsed.ls_node" 31 #define MSGBUS_TOPIC_LS_LINK "openbmp.parsed.ls_link" 32 #define MSGBUS_TOPIC_LS_PREFIX "openbmp.parsed.ls_prefix" 33 #define MSGBUS_TOPIC_BMP_STAT "openbmp.parsed.bmp_stat" 34 #define MSGBUS_TOPIC_BMP_RAW "openbmp.bmp_raw" 35 36 /** 37 * MSGBUS_TOPIC_VAR_* defines the topic var/key for the topic maps. 38 * This matches the config topic var name, which is the map key. 39 */ 40 #define MSGBUS_TOPIC_VAR_COLLECTOR "collector" 41 #define MSGBUS_TOPIC_VAR_ROUTER "router" 42 #define MSGBUS_TOPIC_VAR_PEER "peer" 43 #define MSGBUS_TOPIC_VAR_BASE_ATTRIBUTE "base_attribute" 44 #define MSGBUS_TOPIC_VAR_UNICAST_PREFIX "unicast_prefix" 45 #define MSGBUS_TOPIC_VAR_L3VPN "l3vpn" 46 #define MSGBUS_TOPIC_VAR_EVPN "evpn" 47 #define MSGBUS_TOPIC_VAR_LS_NODE "ls_node" 48 #define MSGBUS_TOPIC_VAR_LS_LINK "ls_link" 49 #define MSGBUS_TOPIC_VAR_LS_PREFIX "ls_prefix" 50 #define MSGBUS_TOPIC_VAR_BMP_STAT "bmp_stat" 51 #define MSGBUS_TOPIC_VAR_BMP_RAW "bmp_raw" 52 53 54 /*********************************************************************//** 55 * Constructor for class 56 * 57 * \param [in] logPtr Pointer to Logger instance 58 * \param [in] cfg Pointer to the config instance 59 * \param [in] producer Pointer to the kafka producer 60 ***********************************************************************/ 61 KafkaTopicSelector(Logger *logPtr, Config *cfg, RdKafka::Producer *producer); 62 63 /*********************************************************************//** 64 * Destructor for class 65 ***********************************************************************/ 66 ~KafkaTopicSelector(); 67 68 /*********************************************************************//** 69 * Gets topic pointer by topic var name, router and peer group. If the topic doesn't exist, a new entry 70 * will be initialized. 71 * 72 * \param [in] topic_var MSGBUS_TOPIC_VAR_<name> 73 * \param [in] router_group Router group - empty/NULL means no router group 74 * \param [in] peer_group Peer group - empty/NULL means no peer group 75 * \param [in] peer_asn Peer asn (remote asn) 76 * 77 * \return (RdKafka::Topic *) pointer or NULL if error 78 ***********************************************************************/ 79 RdKafka::Topic * getTopic(const std::string &topic_var, const std::string *router_group, 80 const std::string *peer_group, 81 uint32_t peer_asn); 82 83 /*********************************************************************//** 84 * Check if a topic is enabled 85 * 86 * \param [in] topic_var MSGBUS_TOPIC_VAR_<name> 87 * 88 * \return bool true if the topic is enabled, false otherwise 89 ***********************************************************************/ 90 bool topicEnabled(const std::string &topic_var); 91 92 /*********************************************************************//** 93 * Lookup router group 94 * 95 * \param [in] hostname hostname/fqdn of the router 96 * \param [in] ip_addr IP address of the peer (printed form) 97 * \param [out] router_group_name Reference to string where router group will be updated 98 * 99 * \return bool true if matched, false if no matched peer group 100 ***********************************************************************/ 101 void lookupRouterGroup(std::string hostname, std::string ip_addr, std::string &router_group_name); 102 103 /*********************************************************************//** 104 * Lookup peer group 105 * 106 * \param [in] hostname hostname/fqdn of the peer 107 * \param [in] ip_addr IP address of the peer (printed form) 108 * \param [in] peer_asn Peer ASN 109 * \param [out] peer_group_name Reference to string where peer group will be updated 110 * 111 * \return bool true if matched, false if no matched peer group 112 ***********************************************************************/ 113 void lookupPeerGroup(std::string hostname, std::string ip_addr, uint32_t peer_asn, 114 std::string &peer_group_name); 115 116 117 private: 118 Config *cfg; ///< Configuration instance 119 Logger *logger; ///< Logging class pointer 120 bool debug; ///< debug flag to indicate debugging 121 122 123 RdKafka::Producer *producer; ///< Kafka Producer instance 124 RdKafka::Conf *tconf; ///< rdkafka topic level configuration 125 126 ///< Partition callback for peer 127 KafkaPeerPartitionerCallback *peer_partitioner_callback; 128 129 /** 130 * Topic name to rdkafka pointer map (key=Name, value=topic pointer) 131 * 132 * Key will be MSGBUS_TOPIC_VAR_<topic>_<router_group>_<peer_group>[_<peer_asn>] 133 * Keys will not contain the optional values unless topic_flags_map includes them. 134 * 135 * If router_group or peer group is empty, the key will include them as empty values. 136 * Eg. unicast_prefix___ (both router and peer groups are empty) 137 * unicast_prefix_routergrp1__ (peer group is empty but router group is defined) 138 * unicast_prefix__peergrp1_ (router group is empty but peer group is defined) 139 * unicast_prefix_routergrp1_peergroup1_ (both router and peer groups are defiend) 140 */ 141 typedef std::map<std::string, RdKafka::Topic *> topic_map; 142 std::map<std::string, RdKafka::Topic*> topic; 143 144 145 /** 146 * Topic flags and map define various flags per topic var 147 */ 148 struct topic_flags { 149 bool include_peerAsn; ///< Indicates if peer ASN should be included in the topic key 150 }; 151 std::map<std::string, topic_flags> topic_flags_map; ///< Map key is one of MSGBUS_TOPIC_VAR_<topic> 152 153 /** 154 * Free allocated topic map pointers 155 */ 156 void freeTopicMap(); 157 158 /** 159 * Initialize topic 160 * Producer must be initialized and connected prior to calling this method. 161 * Topic map will be updated. 162 * 163 * \param [in] topic_var MSGBUS_TOPIC_VAR_<name> 164 * \param [in] router_group Router group - empty/NULL means no router group 165 * \param [in] peer_group Peer group - empty/NULL means no peer group 166 * \param [in] peer_asn Peer asn (remote asn) 167 * 168 * \return (RdKafka::Topic *) pointer or NULL if error 169 */ 170 RdKafka::Topic * initTopic(const std::string &topic_var, 171 const std::string *router_group, const std::string *peer_group, 172 uint32_t peer_asn); 173 174 /** 175 * Get the topic map key name 176 * 177 * \param [in] topic_var MSGBUS_TOPIC_VAR_<name> 178 * \param [in] router_group Router group - empty/NULL means no router group 179 * \param [in] peer_group Peer group - empty/NULL means no peer group 180 * \param [in] peer_asn Peer asn (remote asn) 181 * 182 * \return string value of the topic key to be used with the topic map 183 */ 184 std::string getTopicKey(const std::string &topic_var, 185 const std::string *router_group, const std::string *peer_group, 186 uint32_t peer_asn); 187 188 189 }; 190 191 192 #endif //OPENBMP_KAFKATOPICSELECTOR_H 193