1 /*
2 * Copyright (c) 2013-2016 Cisco Systems, Inc. and others. All rights reserved.
3 *
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
7 *
8 */
9
10 #include <arpa/inet.h>
11 #include <sys/socket.h>
12 #include <boost/algorithm/string/replace.hpp>
13
14 #include "KafkaTopicSelector.h"
15 #include "kafka/MsgBusImpl_kafka.h"
16
17 /*********************************************************************//**
18 * Constructor for class
19 *
20 * \param [in] logPtr Pointer to Logger instance
21 * \param [in] cfg Pointer to the config instance
22 * \param [in] producer Pointer to the kafka producer
23 ***********************************************************************/
KafkaTopicSelector(Logger * logPtr,Config * cfg,RdKafka::Producer * producer)24 KafkaTopicSelector::KafkaTopicSelector(Logger *logPtr, Config *cfg, RdKafka::Producer *producer) {
25 logger = logPtr;
26 this->cfg = cfg;
27
28 if (cfg->debug_msgbus)
29 debug = true;
30 else
31 debug = false;
32
33
34 this->producer = producer;
35
36 peer_partitioner_callback = new KafkaPeerPartitionerCallback();
37 tconf = RdKafka::Conf::create(RdKafka::Conf::CONF_TOPIC);
38
39 }
40
41 /*********************************************************************//**
42 * Destructor for class
43 ***********************************************************************/
~KafkaTopicSelector()44 KafkaTopicSelector::~KafkaTopicSelector() {
45 SELF_DEBUG("Destory KafkaTopicSeletor");
46
47 freeTopicMap();
48
49 if (peer_partitioner_callback != NULL)
50 delete peer_partitioner_callback;
51
52 delete tconf;
53
54 }
55
56 /*********************************************************************//**
57 * Gets topic pointer by topic var name, router and peer group.
58 * If the topic doesn't exist, a new entry will be initialized.
59 *
60 * \param [in] topic_var MSGBUS_TOPIC_VAR_<name>
61 * \param [in] router_group Router group - empty/NULL means no router group
62 * \param [in] peer_group Peer group - empty/NULL means no peer group
63 * \param [in] peer_asn Peer asn (remote asn)
64 *
65 * \return (RdKafka::Topic *) pointer or NULL if error
66 ***********************************************************************/
67
getTopic(const std::string & topic_var,const std::string * router_group,const std::string * peer_group,uint32_t peer_asn)68 RdKafka::Topic * KafkaTopicSelector::getTopic(const std::string &topic_var,
69 const std::string *router_group, const std::string *peer_group,
70 uint32_t peer_asn) {
71
72 // Update the topic key based on the peer_group/router_group
73 std::string topic_key = getTopicKey(topic_var, router_group, peer_group, peer_asn);
74
75 topic_map::iterator t_it;
76
77 if ( (t_it=topic.find(topic_key)) != topic.end()) {
78 return t_it->second; // Return the existing initialized topic
79 }
80 else {
81 SELF_DEBUG("Requesting to create topic for key=%s", topic_key.c_str());
82 return initTopic(topic_var, router_group, peer_group, peer_asn); // create and return newly created topic
83 }
84
85 return NULL;
86 }
87
88 /*********************************************************************//**
89 * Check if a topic is enabled
90 *
91 * \param [in] topic_var MSGBUS_TOPIC_VAR_<name>
92 *
93 * \return bool true if the topic is enabled, false otherwise
94 ***********************************************************************/
topicEnabled(const std::string & topic_var)95 bool KafkaTopicSelector::topicEnabled(const std::string &topic_var) {
96 return this->cfg->topic_names_map[topic_var].length() > 0;
97 }
98
99 /*********************************************************************//**
100 * Lookup peer group
101 *
102 * \param [in] hostname hostname/fqdn of the peer
103 * \param [in] ip_addr IP address of the peer (printed form)
104 * \param [in] peer_asn Peer ASN
105 * \param [out] peer_group_name Reference to string where peer group will be updated
106 *
107 * \return bool true if matched, false if no matched peer group
108 ***********************************************************************/
lookupPeerGroup(std::string hostname,std::string ip_addr,uint32_t peer_asn,std::string & peer_group_name)109 void KafkaTopicSelector::lookupPeerGroup(std::string hostname, std::string ip_addr, uint32_t peer_asn,
110 std::string &peer_group_name) {
111
112 peer_group_name = "";
113
114 /*
115 * Match against hostname regexp
116 */
117 if (hostname.size() > 0) {
118
119 // Loop through all groups and their regular expressions
120 for (Config::match_peer_group_by_name_iter it = cfg->match_peer_group_by_name.begin();
121 it != cfg->match_peer_group_by_name.end(); ++it) {
122
123 // loop through all regexps to see if there is a match
124 for (std::list<Config::match_type_regex>::iterator lit = it->second.begin();
125 lit != it->second.end(); ++lit) {
126 if (regex_search(hostname, lit->regexp)) {
127 SELF_DEBUG("Regexp matched hostname %s to peer group '%s'",
128 hostname.c_str(), it->first.c_str());
129 peer_group_name = it->first;
130 return;
131 }
132 }
133 }
134 }
135
136 /*
137 * Match against prefix ranges
138 */
139 bool isIPv4 = ip_addr.find_first_of(':') == std::string::npos ? true : false;
140 uint8_t bits;
141
142 uint32_t prefix[4] __attribute__ ((aligned));
143 bzero(prefix,sizeof(prefix));
144
145 inet_pton(isIPv4 ? AF_INET : AF_INET6, ip_addr.c_str(), prefix);
146
147 // Loop through all groups and their regular expressions
148 for (Config::match_peer_group_by_ip_iter it = cfg->match_peer_group_by_ip.begin();
149 it != cfg->match_peer_group_by_ip.end(); ++it) {
150
151 // loop through all prefix ranges to see if there is a match
152 for (std::list<Config::match_type_ip>::iterator lit = it->second.begin();
153 lit != it->second.end(); ++lit) {
154
155 if (lit->isIPv4 == isIPv4) { // IPv4
156 bits = 32 - lit->bits;
157
158 // Big endian
159 prefix[0] <<= bits;
160 prefix[0] >>= bits;
161
162 if (prefix[0] == lit->prefix[0]) {
163 SELF_DEBUG("IP %s matched peer group %s", ip_addr.c_str(), it->first.c_str());
164 peer_group_name = it->first;
165 return;
166 }
167 } else { // IPv6
168 uint8_t end_idx = lit->bits / 32;
169 bits = lit->bits - (32 * end_idx);
170
171 if (bits == 0)
172 end_idx--;
173
174 if (end_idx < 4 and bits < 32) { // end_idx should be less than 4 and bits less than 32
175
176 // Big endian
177 prefix[end_idx] <<= bits;
178 prefix[end_idx] >>= bits;
179 }
180
181 if (prefix[0] == lit->prefix[0] and prefix[1] == lit->prefix[1]
182 and prefix[2] == lit->prefix[2] and prefix[3] == lit->prefix[3]) {
183
184 SELF_DEBUG("IP %s matched peer group %s", ip_addr.c_str(), it->first.c_str());
185 peer_group_name = it->first;
186 return;
187 }
188 }
189 }
190 }
191
192 /*
193 * Match against asn list
194 */
195 // Loop through all groups and their regular expressions
196 for (Config::match_peer_group_by_asn_iter it = cfg->match_peer_group_by_asn.begin();
197 it != cfg->match_peer_group_by_asn.end(); ++it) {
198
199 // loop through all prefix ranges to see if there is a match
200 for (std::list<uint32_t>::iterator lit = it->second.begin();
201 lit != it->second.end(); ++lit) {
202
203 if (*lit == peer_asn) {
204 SELF_DEBUG("Peer ASN %u matched peer group %s", peer_asn, it->first.c_str());
205 peer_group_name = it->first;
206 return;
207 }
208 }
209 }
210
211 }
212
213 /*********************************************************************//**
214 * Lookup router group
215 *
216 * \param [in] hostname hostname/fqdn of the router
217 * \param [in] ip_addr IP address of the peer (printed form)
218 * \param [out] router_group_name Reference to string where router group will be updated
219 *
220 * \return bool true if matched, false if no matched peer group
221 ***********************************************************************/
lookupRouterGroup(std::string hostname,std::string ip_addr,std::string & router_group_name)222 void KafkaTopicSelector::lookupRouterGroup(std::string hostname, std::string ip_addr,
223 std::string &router_group_name) {
224
225 router_group_name = "";
226
227 SELF_DEBUG("router lookup for hostname=%s and ip_addr=%s", hostname.c_str(), ip_addr.c_str());
228
229 /*
230 * Match against hostname regexp
231 */
232 if (hostname.size() > 0) {
233
234 // Loop through all groups and their regular expressions
235 for (Config::match_router_group_by_name_iter it = cfg->match_router_group_by_name.begin();
236 it != cfg->match_router_group_by_name.end(); ++it) {
237
238 // loop through all regexps to see if there is a match
239 for (std::list<Config::match_type_regex>::iterator lit = it->second.begin();
240 lit != it->second.end(); ++lit) {
241
242 if (regex_search(hostname, lit->regexp)) {
243 SELF_DEBUG("Regexp matched hostname %s to router group '%s'",
244 hostname.c_str(), it->first.c_str());
245 router_group_name = it->first;
246 return;
247 }
248 }
249 }
250 }
251
252 /*
253 * Match against prefix ranges
254 */
255 bool isIPv4 = ip_addr.find_first_of(':') == std::string::npos ? true : false;
256 uint8_t bits;
257
258 uint32_t prefix[4] __attribute__ ((aligned));
259 bzero(prefix,sizeof(prefix));
260
261 inet_pton(isIPv4 ? AF_INET : AF_INET6, ip_addr.c_str(), prefix);
262
263 // Loop through all groups and their regular expressions
264 for (Config::match_router_group_by_ip_iter it = cfg->match_router_group_by_ip.begin();
265 it != cfg->match_router_group_by_ip.end(); ++it) {
266
267 // loop through all prefix ranges to see if there is a match
268 for (std::list<Config::match_type_ip>::iterator lit = it->second.begin();
269 lit != it->second.end(); ++lit) {
270
271 if (lit->isIPv4 == isIPv4) { // IPv4
272
273 bits = 32 - lit->bits;
274
275 // Big endian
276 prefix[0] <<= bits;
277 prefix[0] >>= bits;
278
279 if (prefix[0] == lit->prefix[0]) {
280 SELF_DEBUG("IP %s matched router group %s", ip_addr.c_str(), it->first.c_str());
281 router_group_name = it->first;
282 return;
283 }
284 } else { // IPv6
285 uint8_t end_idx = lit->bits / 32;
286 bits = lit->bits - (32 * end_idx);
287
288 if (bits == 0)
289 end_idx--;
290
291 if (end_idx < 4 and bits < 32) { // end_idx should be less than 4 and bits less than 32
292
293 // Big endian
294 prefix[end_idx] <<= bits;
295 prefix[end_idx] >>= bits;
296 }
297
298 if (prefix[0] == lit->prefix[0] and prefix[1] == lit->prefix[1]
299 and prefix[2] == lit->prefix[2] and prefix[3] == lit->prefix[3]) {
300
301 SELF_DEBUG("IP %s matched router group %s", ip_addr.c_str(), it->first.c_str());
302 router_group_name = it->first;
303 return;
304 }
305 }
306 }
307 }
308 }
309
310
311
312 /**
313 * Initialize topic
314 * Producer must be initialized and connected prior to calling this method.
315 * Topic map will be updated.
316 *
317 * \param [in] topic_var MSGBUS_TOPIC_VAR_<name>
318 * \param [in] router_group Router group - empty/NULL means no router group
319 * \param [in] peer_group Peer group - empty/NULL means no peer group
320 * \param [in] peer_asn Peer asn (remote asn)
321 *
322 * \return (RdKafka::Topic *) pointer or NULL if error
323 */
initTopic(const std::string & topic_var,const std::string * router_group,const std::string * peer_group,uint32_t peer_asn)324 RdKafka::Topic * KafkaTopicSelector::initTopic(const std::string &topic_var,
325 const std::string *router_group, const std::string *peer_group,
326 uint32_t peer_asn) {
327 std::string errstr;
328 char uint32_str[12];
329
330 // Get the actual topic name based on var
331 std::string topic_name = this->cfg->topic_names_map[topic_var];
332
333 /*
334 * topics that contain the peer asn need to have the key include the peer asn
335 */
336 if (topic_name.find("{peer_asn}") != std::string::npos) {
337 topic_flags_map[topic_var].include_peerAsn = true;
338 SELF_DEBUG("peer_asn found in topic %s, setting topic flag to include peer ASN", topic_name.c_str());
339 } else {
340 topic_flags_map[topic_var].include_peerAsn = false;
341 }
342
343 // Update the topic key based on the peer_group/router_group
344 std::string topic_key = getTopicKey(topic_var, router_group, peer_group, peer_asn);
345
346 // Update the topic name based on app variables
347 if (topic_var.compare(MSGBUS_TOPIC_VAR_COLLECTOR)) { // if not collector topic
348 if (router_group != NULL and router_group->size() > 0) {
349 boost::replace_all(topic_name, "{router_group}", *router_group);
350 } else
351 boost::replace_all(topic_name, "{router_group}", "default");
352
353 if (topic_var.compare(MSGBUS_TOPIC_VAR_ROUTER)) { // if not router topic
354 if (peer_group != NULL and peer_group->size() > 0) {
355 boost::replace_all(topic_name, "{peer_group}", *peer_group);
356 } else
357 boost::replace_all(topic_name, "{peer_group}", "default");
358
359 if (peer_asn > 0) {
360 snprintf(uint32_str, sizeof(uint32_str), "%u", peer_asn);
361 boost::replace_all(topic_name, "{peer_asn}", (const char *)uint32_str);
362 } else
363 boost::replace_all(topic_name, "{peer_asn}", "default");
364 }
365 }
366
367 SELF_DEBUG("Creating topic %s (map key=%s)" , topic_name.c_str(), topic_key.c_str());
368
369 // Delete topic if it already exists
370 topic_map::iterator t_it;
371
372 if ( (t_it=topic.find(topic_key)) != topic.end() and t_it->second != NULL) {
373 delete t_it->second;
374 }
375
376 /*
377 * Topic configuration
378 */
379 if (tconf->set("partitioner_cb", peer_partitioner_callback, errstr) != RdKafka::Conf::CONF_OK) {
380 LOG_ERR("Failed to configure kafka partitioner callback: %s", errstr.c_str());
381 throw "ERROR: Failed to configure kafka partitioner callback";
382 }
383
384 topic[topic_key] = RdKafka::Topic::create(producer, topic_name.c_str(), tconf, errstr);
385
386 if (topic[topic_key] == NULL) {
387 LOG_ERR("Failed to create '%s' topic: %s", topic_name.c_str(), errstr.c_str());
388 throw "ERROR: Failed to create topic";
389
390 } else {
391 return topic[topic_key];
392 }
393
394 return NULL;
395 }
396
397 /**
398 * Get the topic map key name
399 *
400 * \param [in] topic_var MSGBUS_TOPIC_VAR_<name>
401 * \param [in] router_group Router group - empty/NULL means no router group
402 * \param [in] peer_group Peer group - empty/NULL means no peer group
403 * \param [in] peer_asn Peer asn (remote asn)
404 *
405 * \return string value of the topic key to be used with the topic map
406 */
getTopicKey(const std::string & topic_var,const std::string * router_group,const std::string * peer_group,uint32_t peer_asn)407 std::string KafkaTopicSelector::getTopicKey(const std::string &topic_var,
408 const std::string *router_group, const std::string *peer_group,
409 uint32_t peer_asn) {
410
411 std::string topic_key = topic_var;
412 char uint32_str[12];
413
414 // Update the topic name based on app variables
415 if (topic_var.compare(MSGBUS_TOPIC_VAR_COLLECTOR)) { // if not collector topic
416 topic_key += "_";
417
418 if (router_group != NULL and router_group->size() > 0) {
419 topic_key += *router_group;
420 }
421
422 if (topic_var.compare(MSGBUS_TOPIC_VAR_ROUTER)) { // if not router topic
423 topic_key += "_";
424
425 if (peer_group != NULL and peer_group->size() > 0) {
426 topic_key += *peer_group;
427 }
428
429 if (topic_flags_map[topic_var].include_peerAsn) {
430 topic_key += "_";
431 if (peer_asn > 0) {
432 snprintf(uint32_str, sizeof(uint32_str), "%u", peer_asn);
433 topic_key += uint32_str;
434 }
435 }
436 }
437 }
438
439 return topic_key;
440 }
441
442 /**
443 * Free allocated topic map pointers
444 */
freeTopicMap()445 void KafkaTopicSelector::freeTopicMap() {
446 // Free topic pointers
447 for (topic_map::iterator it = topic.begin(); it != topic.end(); it++) {
448 if (it->second) {
449 delete it->second;
450 it->second = NULL;
451 }
452 }
453 }
454