1 /*
2  * Copyright (c) 2013-2016 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  *
8  */
9 
10 #include <arpa/inet.h>
11 #include <sys/socket.h>
12 #include <boost/algorithm/string/replace.hpp>
13 
14 #include "KafkaTopicSelector.h"
15 #include "kafka/MsgBusImpl_kafka.h"
16 
17 /*********************************************************************//**
18  * Constructor for class
19  *
20  * \param [in] logPtr   Pointer to Logger instance
21  * \param [in] cfg      Pointer to the config instance
22  * \param [in] producer Pointer to the kafka producer
23  ***********************************************************************/
KafkaTopicSelector(Logger * logPtr,Config * cfg,RdKafka::Producer * producer)24 KafkaTopicSelector::KafkaTopicSelector(Logger *logPtr, Config *cfg,  RdKafka::Producer *producer) {
25     logger = logPtr;
26     this->cfg = cfg;
27 
28     if (cfg->debug_msgbus)
29         debug = true;
30     else
31         debug = false;
32 
33 
34     this->producer = producer;
35 
36     peer_partitioner_callback = new KafkaPeerPartitionerCallback();
37     tconf = RdKafka::Conf::create(RdKafka::Conf::CONF_TOPIC);
38 
39 }
40 
41 /*********************************************************************//**
42  * Destructor for class
43  ***********************************************************************/
~KafkaTopicSelector()44 KafkaTopicSelector::~KafkaTopicSelector() {
45     SELF_DEBUG("Destory KafkaTopicSeletor");
46 
47     freeTopicMap();
48 
49     if (peer_partitioner_callback != NULL)
50         delete peer_partitioner_callback;
51 
52     delete tconf;
53 
54 }
55 
56 /*********************************************************************//**
57  * Gets topic pointer by topic var name, router and peer group.
58  *     If the topic doesn't exist, a new entry will be initialized.
59  *
60  * \param [in]  topic_var       MSGBUS_TOPIC_VAR_<name>
61  * \param [in]  router_group    Router group - empty/NULL means no router group
62  * \param [in]  peer_group      Peer group - empty/NULL means no peer group
63  * \param [in]  peer_asn        Peer asn (remote asn)
64  *
65  * \return (RdKafka::Topic *) pointer or NULL if error
66  ***********************************************************************/
67 
getTopic(const std::string & topic_var,const std::string * router_group,const std::string * peer_group,uint32_t peer_asn)68 RdKafka::Topic * KafkaTopicSelector::getTopic(const std::string &topic_var,
69                                               const std::string *router_group, const std::string *peer_group,
70                                               uint32_t peer_asn) {
71 
72     // Update the topic key based on the peer_group/router_group
73     std::string topic_key = getTopicKey(topic_var, router_group, peer_group, peer_asn);
74 
75     topic_map::iterator t_it;
76 
77     if ( (t_it=topic.find(topic_key)) != topic.end()) {
78         return t_it->second;                                              // Return the existing initialized topic
79     }
80     else {
81         SELF_DEBUG("Requesting to create topic for key=%s", topic_key.c_str());
82         return initTopic(topic_var, router_group, peer_group, peer_asn);  // create and return newly created topic
83     }
84 
85     return NULL;
86 }
87 
88 /*********************************************************************//**
89  * Check if a topic is enabled
90  *
91  * \param [in]  topic_var       MSGBUS_TOPIC_VAR_<name>
92  *
93  * \return bool true if the topic is enabled, false otherwise
94 ***********************************************************************/
topicEnabled(const std::string & topic_var)95 bool KafkaTopicSelector::topicEnabled(const std::string &topic_var) {
96     return this->cfg->topic_names_map[topic_var].length() > 0;
97 }
98 
99 /*********************************************************************//**
100  * Lookup peer group
101  *
102  * \param [in]  hostname        hostname/fqdn of the peer
103  * \param [in]  ip_addr         IP address of the peer (printed form)
104  * \param [in]  peer_asn        Peer ASN
105  * \param [out] peer_group_name Reference to string where peer group will be updated
106  *
107  * \return bool true if matched, false if no matched peer group
108  ***********************************************************************/
lookupPeerGroup(std::string hostname,std::string ip_addr,uint32_t peer_asn,std::string & peer_group_name)109 void KafkaTopicSelector::lookupPeerGroup(std::string hostname, std::string ip_addr, uint32_t peer_asn,
110                                          std::string &peer_group_name) {
111 
112     peer_group_name = "";
113 
114     /*
115      * Match against hostname regexp
116      */
117     if (hostname.size() > 0) {
118 
119         // Loop through all groups and their regular expressions
120         for (Config::match_peer_group_by_name_iter it = cfg->match_peer_group_by_name.begin();
121             it != cfg->match_peer_group_by_name.end(); ++it) {
122 
123             // loop through all regexps to see if there is a match
124             for (std::list<Config::match_type_regex>::iterator lit = it->second.begin();
125                     lit != it->second.end(); ++lit) {
126                 if (regex_search(hostname, lit->regexp)) {
127                     SELF_DEBUG("Regexp matched hostname %s to peer group '%s'",
128                                 hostname.c_str(), it->first.c_str());
129                     peer_group_name = it->first;
130                     return;
131                 }
132             }
133         }
134     }
135 
136     /*
137      * Match against prefix ranges
138      */
139     bool isIPv4 = ip_addr.find_first_of(':') == std::string::npos ? true : false;
140     uint8_t bits;
141 
142     uint32_t prefix[4]  __attribute__ ((aligned));
143     bzero(prefix,sizeof(prefix));
144 
145     inet_pton(isIPv4 ? AF_INET : AF_INET6, ip_addr.c_str(), prefix);
146 
147     // Loop through all groups and their regular expressions
148     for (Config::match_peer_group_by_ip_iter it = cfg->match_peer_group_by_ip.begin();
149          it != cfg->match_peer_group_by_ip.end(); ++it) {
150 
151         // loop through all prefix ranges to see if there is a match
152         for (std::list<Config::match_type_ip>::iterator lit = it->second.begin();
153              lit != it->second.end(); ++lit) {
154 
155             if (lit->isIPv4 == isIPv4) { // IPv4
156                 bits = 32 - lit->bits;
157 
158                 // Big endian
159                 prefix[0] <<= bits;
160                 prefix[0] >>= bits;
161 
162                 if (prefix[0] == lit->prefix[0]) {
163                     SELF_DEBUG("IP %s matched peer group %s", ip_addr.c_str(), it->first.c_str());
164                     peer_group_name = it->first;
165                     return;
166                 }
167             } else { // IPv6
168                 uint8_t end_idx = lit->bits / 32;
169                 bits = lit->bits - (32 * end_idx);
170 
171                 if (bits == 0)
172                     end_idx--;
173 
174                 if (end_idx < 4 and bits < 32) {    // end_idx should be less than 4 and bits less than 32
175 
176                     // Big endian
177                     prefix[end_idx] <<= bits;
178                     prefix[end_idx] >>= bits;
179                 }
180 
181                 if (prefix[0] == lit->prefix[0] and prefix[1] == lit->prefix[1]
182                         and prefix[2] == lit->prefix[2] and prefix[3] == lit->prefix[3]) {
183 
184                     SELF_DEBUG("IP %s matched peer group %s", ip_addr.c_str(), it->first.c_str());
185                     peer_group_name = it->first;
186                     return;
187                 }
188             }
189         }
190     }
191 
192     /*
193      * Match against asn list
194      */
195     // Loop through all groups and their regular expressions
196     for (Config::match_peer_group_by_asn_iter it = cfg->match_peer_group_by_asn.begin();
197          it != cfg->match_peer_group_by_asn.end(); ++it) {
198 
199         // loop through all prefix ranges to see if there is a match
200         for (std::list<uint32_t>::iterator lit = it->second.begin();
201              lit != it->second.end(); ++lit) {
202 
203             if (*lit == peer_asn) {
204                 SELF_DEBUG("Peer ASN %u matched peer group %s", peer_asn, it->first.c_str());
205                 peer_group_name = it->first;
206                 return;
207             }
208         }
209     }
210 
211 }
212 
213 /*********************************************************************//**
214  * Lookup router group
215  *
216  * \param [in]  hostname          hostname/fqdn of the router
217  * \param [in]  ip_addr           IP address of the peer (printed form)
218  * \param [out] router_group_name Reference to string where router group will be updated
219  *
220  * \return bool true if matched, false if no matched peer group
221  ***********************************************************************/
lookupRouterGroup(std::string hostname,std::string ip_addr,std::string & router_group_name)222 void KafkaTopicSelector::lookupRouterGroup(std::string hostname, std::string ip_addr,
223                                          std::string &router_group_name) {
224 
225     router_group_name = "";
226 
227     SELF_DEBUG("router lookup for hostname=%s and ip_addr=%s", hostname.c_str(), ip_addr.c_str());
228 
229     /*
230      * Match against hostname regexp
231      */
232     if (hostname.size() > 0) {
233 
234         // Loop through all groups and their regular expressions
235         for (Config::match_router_group_by_name_iter it = cfg->match_router_group_by_name.begin();
236              it != cfg->match_router_group_by_name.end(); ++it) {
237 
238             // loop through all regexps to see if there is a match
239             for (std::list<Config::match_type_regex>::iterator lit = it->second.begin();
240                  lit != it->second.end(); ++lit) {
241 
242                 if (regex_search(hostname, lit->regexp)) {
243                     SELF_DEBUG("Regexp matched hostname %s to router group '%s'",
244                                hostname.c_str(), it->first.c_str());
245                     router_group_name = it->first;
246                     return;
247                 }
248             }
249         }
250     }
251 
252     /*
253      * Match against prefix ranges
254      */
255     bool isIPv4 = ip_addr.find_first_of(':') == std::string::npos ? true : false;
256     uint8_t bits;
257 
258     uint32_t prefix[4]  __attribute__ ((aligned));
259     bzero(prefix,sizeof(prefix));
260 
261     inet_pton(isIPv4 ? AF_INET : AF_INET6, ip_addr.c_str(), prefix);
262 
263     // Loop through all groups and their regular expressions
264     for (Config::match_router_group_by_ip_iter it = cfg->match_router_group_by_ip.begin();
265          it != cfg->match_router_group_by_ip.end(); ++it) {
266 
267         // loop through all prefix ranges to see if there is a match
268         for (std::list<Config::match_type_ip>::iterator lit = it->second.begin();
269              lit != it->second.end(); ++lit) {
270 
271             if (lit->isIPv4 == isIPv4) { // IPv4
272 
273                 bits = 32 - lit->bits;
274 
275                 // Big endian
276                 prefix[0] <<= bits;
277                 prefix[0] >>= bits;
278 
279                 if (prefix[0] == lit->prefix[0]) {
280                     SELF_DEBUG("IP %s matched router group %s", ip_addr.c_str(), it->first.c_str());
281                     router_group_name = it->first;
282                     return;
283                 }
284             } else { // IPv6
285                 uint8_t end_idx = lit->bits / 32;
286                 bits = lit->bits - (32 * end_idx);
287 
288                 if (bits == 0)
289                     end_idx--;
290 
291                 if (end_idx < 4 and bits < 32) {    // end_idx should be less than 4 and bits less than 32
292 
293                     // Big endian
294                     prefix[end_idx] <<= bits;
295                     prefix[end_idx] >>= bits;
296                 }
297 
298                 if (prefix[0] == lit->prefix[0] and prefix[1] == lit->prefix[1]
299                     and prefix[2] == lit->prefix[2] and prefix[3] == lit->prefix[3]) {
300 
301                     SELF_DEBUG("IP %s matched router group %s", ip_addr.c_str(), it->first.c_str());
302                     router_group_name = it->first;
303                     return;
304                 }
305             }
306         }
307     }
308 }
309 
310 
311 
312 /**
313  * Initialize topic
314  *      Producer must be initialized and connected prior to calling this method.
315  *      Topic map will be updated.
316  *
317  * \param [in]  topic_var       MSGBUS_TOPIC_VAR_<name>
318  * \param [in]  router_group    Router group - empty/NULL means no router group
319  * \param [in]  peer_group      Peer group - empty/NULL means no peer group
320  * \param [in]  peer_asn        Peer asn (remote asn)
321  *
322  * \return  (RdKafka::Topic *) pointer or NULL if error
323  */
initTopic(const std::string & topic_var,const std::string * router_group,const std::string * peer_group,uint32_t peer_asn)324 RdKafka::Topic * KafkaTopicSelector::initTopic(const std::string &topic_var,
325                                                const std::string *router_group, const std::string *peer_group,
326                                                uint32_t peer_asn) {
327     std::string errstr;
328     char uint32_str[12];
329 
330     // Get the actual topic name based on var
331     std::string topic_name = this->cfg->topic_names_map[topic_var];
332 
333     /*
334      * topics that contain the peer asn need to have the key include the peer asn
335      */
336     if (topic_name.find("{peer_asn}") != std::string::npos) {
337         topic_flags_map[topic_var].include_peerAsn = true;
338         SELF_DEBUG("peer_asn found in topic %s, setting topic flag to include peer ASN", topic_name.c_str());
339     } else {
340         topic_flags_map[topic_var].include_peerAsn = false;
341     }
342 
343     // Update the topic key based on the peer_group/router_group
344     std::string topic_key = getTopicKey(topic_var, router_group, peer_group, peer_asn);
345 
346     // Update the topic name based on app variables
347     if (topic_var.compare(MSGBUS_TOPIC_VAR_COLLECTOR)) {   // if not collector topic
348         if (router_group != NULL and router_group->size() > 0) {
349             boost::replace_all(topic_name, "{router_group}", *router_group);
350         } else
351             boost::replace_all(topic_name, "{router_group}", "default");
352 
353         if (topic_var.compare(MSGBUS_TOPIC_VAR_ROUTER)) {    // if not router topic
354             if (peer_group != NULL and peer_group->size() > 0) {
355                 boost::replace_all(topic_name, "{peer_group}", *peer_group);
356             } else
357                 boost::replace_all(topic_name, "{peer_group}", "default");
358 
359             if (peer_asn > 0) {
360                 snprintf(uint32_str, sizeof(uint32_str), "%u", peer_asn);
361                 boost::replace_all(topic_name, "{peer_asn}", (const char *)uint32_str);
362             } else
363                 boost::replace_all(topic_name, "{peer_asn}", "default");
364         }
365     }
366 
367     SELF_DEBUG("Creating topic %s (map key=%s)" , topic_name.c_str(), topic_key.c_str());
368 
369     // Delete topic if it already exists
370     topic_map::iterator t_it;
371 
372     if ( (t_it=topic.find(topic_key)) != topic.end() and t_it->second != NULL) {
373         delete t_it->second;
374     }
375 
376     /*
377      * Topic configuration
378      */
379     if (tconf->set("partitioner_cb", peer_partitioner_callback, errstr) != RdKafka::Conf::CONF_OK) {
380         LOG_ERR("Failed to configure kafka partitioner callback: %s", errstr.c_str());
381         throw "ERROR: Failed to configure kafka partitioner callback";
382     }
383 
384     topic[topic_key] = RdKafka::Topic::create(producer, topic_name.c_str(), tconf, errstr);
385 
386     if (topic[topic_key] == NULL) {
387         LOG_ERR("Failed to create '%s' topic: %s", topic_name.c_str(), errstr.c_str());
388         throw "ERROR: Failed to create topic";
389 
390     } else {
391         return topic[topic_key];
392     }
393 
394     return NULL;
395 }
396 
397 /**
398  * Get the topic map key name
399  *
400  * \param [in]  topic_var       MSGBUS_TOPIC_VAR_<name>
401  * \param [in]  router_group    Router group - empty/NULL means no router group
402  * \param [in]  peer_group      Peer group - empty/NULL means no peer group
403  * \param [in]  peer_asn        Peer asn (remote asn)
404  *
405  * \return string value of the topic key to be used with the topic map
406  */
getTopicKey(const std::string & topic_var,const std::string * router_group,const std::string * peer_group,uint32_t peer_asn)407 std::string KafkaTopicSelector::getTopicKey(const std::string &topic_var,
408                                             const std::string *router_group, const std::string *peer_group,
409                                             uint32_t peer_asn) {
410 
411     std::string topic_key = topic_var;
412     char uint32_str[12];
413 
414     // Update the topic name based on app variables
415     if (topic_var.compare(MSGBUS_TOPIC_VAR_COLLECTOR)) {   // if not collector topic
416         topic_key += "_";
417 
418         if (router_group != NULL and router_group->size() > 0) {
419             topic_key += *router_group;
420         }
421 
422         if (topic_var.compare(MSGBUS_TOPIC_VAR_ROUTER)) {    // if not router topic
423             topic_key += "_";
424 
425             if (peer_group != NULL and peer_group->size() > 0) {
426                 topic_key += *peer_group;
427             }
428 
429             if (topic_flags_map[topic_var].include_peerAsn) {
430                 topic_key += "_";
431                 if (peer_asn > 0) {
432                     snprintf(uint32_str, sizeof(uint32_str), "%u", peer_asn);
433                     topic_key += uint32_str;
434                 }
435             }
436         }
437     }
438 
439     return topic_key;
440 }
441 
442 /**
443  * Free allocated topic map pointers
444  */
freeTopicMap()445 void KafkaTopicSelector::freeTopicMap() {
446     // Free topic pointers
447     for (topic_map::iterator it = topic.begin(); it != topic.end(); it++) {
448         if (it->second) {
449             delete it->second;
450             it->second = NULL;
451         }
452     }
453 }
454