1 /*
2  * Copyright (c) 2013-2016 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  *
8  */
9 
10 #ifndef OPENBMP_KAFKATOPICSELECTOR_H
11 #define OPENBMP_KAFKATOPICSELECTOR_H
12 
13 #include <librdkafka/rdkafkacpp.h>
14 #include "Config.h"
15 #include "Logger.h"
16 #include "KafkaPeerPartitionerCallback.h"
17 
18 class KafkaTopicSelector {
19 public:
20     /**
21      * MSGBUS_TOPIC_* defines the default topic names
22      */
23     #define MSGBUS_TOPIC_COLLECTOR              "openbmp.parsed.collector"
24     #define MSGBUS_TOPIC_ROUTER                 "openbmp.parsed.router"
25     #define MSGBUS_TOPIC_PEER                   "openbmp.parsed.peer"
26     #define MSGBUS_TOPIC_BASE_ATTRIBUTE         "openbmp.parsed.base_attribute"
27     #define MSGBUS_TOPIC_UNICAST_PREFIX         "openbmp.parsed.unicast_prefix"
28     #define MSGBUS_TOPIC_L3VPN                  "openbmp.parsed.l3vpn"
29     #define MSGBUS_TOPIC_EVPN                   "openbmp.parsed.evpn"
30     #define MSGBUS_TOPIC_LS_NODE                "openbmp.parsed.ls_node"
31     #define MSGBUS_TOPIC_LS_LINK                "openbmp.parsed.ls_link"
32     #define MSGBUS_TOPIC_LS_PREFIX              "openbmp.parsed.ls_prefix"
33     #define MSGBUS_TOPIC_BMP_STAT               "openbmp.parsed.bmp_stat"
34     #define MSGBUS_TOPIC_BMP_RAW                "openbmp.bmp_raw"
35 
36     /**
37      * MSGBUS_TOPIC_VAR_* defines the topic var/key for the topic maps.
38      *      This matches the config topic var name, which is the map key.
39      */
40     #define MSGBUS_TOPIC_VAR_COLLECTOR          "collector"
41     #define MSGBUS_TOPIC_VAR_ROUTER             "router"
42     #define MSGBUS_TOPIC_VAR_PEER               "peer"
43     #define MSGBUS_TOPIC_VAR_BASE_ATTRIBUTE     "base_attribute"
44     #define MSGBUS_TOPIC_VAR_UNICAST_PREFIX     "unicast_prefix"
45     #define MSGBUS_TOPIC_VAR_L3VPN              "l3vpn"
46     #define MSGBUS_TOPIC_VAR_EVPN               "evpn"
47     #define MSGBUS_TOPIC_VAR_LS_NODE            "ls_node"
48     #define MSGBUS_TOPIC_VAR_LS_LINK            "ls_link"
49     #define MSGBUS_TOPIC_VAR_LS_PREFIX          "ls_prefix"
50     #define MSGBUS_TOPIC_VAR_BMP_STAT           "bmp_stat"
51     #define MSGBUS_TOPIC_VAR_BMP_RAW            "bmp_raw"
52 
53 
54     /*********************************************************************//**
55      * Constructor for class
56      *
57      * \param [in] logPtr   Pointer to Logger instance
58      * \param [in] cfg      Pointer to the config instance
59      * \param [in] producer Pointer to the kafka producer
60      ***********************************************************************/
61     KafkaTopicSelector(Logger *logPtr, Config *cfg,  RdKafka::Producer *producer);
62 
63     /*********************************************************************//**
64      * Destructor for class
65      ***********************************************************************/
66     ~KafkaTopicSelector();
67 
68     /*********************************************************************//**
69      * Gets topic pointer by topic var name, router and peer group.  If the topic doesn't exist, a new entry
70      *      will be initialized.
71      *
72      * \param [in]  topic_var       MSGBUS_TOPIC_VAR_<name>
73      * \param [in]  router_group    Router group - empty/NULL means no router group
74      * \param [in]  peer_group      Peer group - empty/NULL means no peer group
75      * \param [in]  peer_asn        Peer asn (remote asn)
76      *
77      * \return (RdKafka::Topic *) pointer or NULL if error
78      ***********************************************************************/
79     RdKafka::Topic * getTopic(const std::string &topic_var, const std::string *router_group,
80                               const std::string *peer_group,
81                               uint32_t peer_asn);
82 
83     /*********************************************************************//**
84      * Check if a topic is enabled
85      *
86      * \param [in]  topic_var       MSGBUS_TOPIC_VAR_<name>
87      *
88      * \return bool true if the topic is enabled, false otherwise
89      ***********************************************************************/
90     bool topicEnabled(const std::string &topic_var);
91 
92     /*********************************************************************//**
93      * Lookup router group
94      *
95      * \param [in]  hostname          hostname/fqdn of the router
96      * \param [in]  ip_addr           IP address of the peer (printed form)
97      * \param [out] router_group_name Reference to string where router group will be updated
98      *
99      * \return bool true if matched, false if no matched peer group
100      ***********************************************************************/
101     void lookupRouterGroup(std::string hostname, std::string ip_addr, std::string &router_group_name);
102 
103     /*********************************************************************//**
104      * Lookup peer group
105      *
106      * \param [in]  hostname        hostname/fqdn of the peer
107      * \param [in]  ip_addr         IP address of the peer (printed form)
108      * \param [in]  peer_asn        Peer ASN
109      * \param [out] peer_group_name Reference to string where peer group will be updated
110      *
111      * \return bool true if matched, false if no matched peer group
112      ***********************************************************************/
113     void lookupPeerGroup(std::string hostname, std::string ip_addr, uint32_t peer_asn,
114                          std::string &peer_group_name);
115 
116 
117 private:
118     Config          *cfg;                       ///< Configuration instance
119     Logger          *logger;                    ///< Logging class pointer
120     bool            debug;                      ///< debug flag to indicate debugging
121 
122 
123     RdKafka::Producer *producer;                ///< Kafka Producer instance
124     RdKafka::Conf     *tconf;                   ///< rdkafka topic level configuration
125 
126     ///< Partition callback for peer
127     KafkaPeerPartitionerCallback *peer_partitioner_callback;
128 
129     /**
130      * Topic name to rdkafka pointer map (key=Name, value=topic pointer)
131      *
132      *      Key will be MSGBUS_TOPIC_VAR_<topic>_<router_group>_<peer_group>[_<peer_asn>]
133      *          Keys will not contain the optional values unless topic_flags_map includes them.
134      *
135      *      If router_group or peer group is empty, the key will include them as empty values.
136      *      Eg. unicast_prefix___ (both router and peer groups are empty)
137      *          unicast_prefix_routergrp1__ (peer group is empty but router group is defined)
138      *          unicast_prefix__peergrp1_ (router group is empty but peer group is defined)
139      *          unicast_prefix_routergrp1_peergroup1_ (both router and peer groups are defiend)
140      */
141     typedef std::map<std::string, RdKafka::Topic *> topic_map;
142     std::map<std::string, RdKafka::Topic*> topic;
143 
144 
145     /**
146      * Topic flags and map define various flags per topic var
147      */
148     struct topic_flags {
149         bool include_peerAsn;           ///< Indicates if peer ASN should be included in the topic key
150     };
151     std::map<std::string, topic_flags> topic_flags_map;     ///< Map key is one of MSGBUS_TOPIC_VAR_<topic>
152 
153     /**
154      * Free allocated topic map pointers
155      */
156     void freeTopicMap();
157 
158     /**
159      * Initialize topic
160      *      Producer must be initialized and connected prior to calling this method.
161      *      Topic map will be updated.
162      *
163      * \param [in]  topic_var       MSGBUS_TOPIC_VAR_<name>
164      * \param [in]  router_group    Router group - empty/NULL means no router group
165      * \param [in]  peer_group      Peer group - empty/NULL means no peer group
166      * \param [in]  peer_asn        Peer asn (remote asn)
167      *
168      * \return  (RdKafka::Topic *) pointer or NULL if error
169      */
170     RdKafka::Topic * initTopic(const std::string &topic_var,
171                                const std::string *router_group, const std::string *peer_group,
172                                uint32_t peer_asn);
173 
174     /**
175      * Get the topic map key name
176      *
177      * \param [in]  topic_var       MSGBUS_TOPIC_VAR_<name>
178      * \param [in]  router_group    Router group - empty/NULL means no router group
179      * \param [in]  peer_group      Peer group - empty/NULL means no peer group
180      * \param [in]  peer_asn        Peer asn (remote asn)
181      *
182      * \return string value of the topic key to be used with the topic map
183      */
184     std::string getTopicKey(const std::string &topic_var,
185                             const std::string *router_group, const std::string *peer_group,
186                             uint32_t peer_asn);
187 
188 
189 };
190 
191 
192 #endif //OPENBMP_KAFKATOPICSELECTOR_H
193