1 /*
2 * Copyright (c) 2013-2016 Cisco Systems, Inc. and others. All rights reserved.
3 *
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
7 *
8 */
9 #include <cstdlib>
10 #include <cstring>
11 #include <iostream>
12
13 #include <cinttypes>
14
15 #include <librdkafka/rdkafkacpp.h>
16 #include <netdb.h>
17 #include <unistd.h>
18
19 #include <thread>
20 #include <arpa/inet.h>
21
22 #include "MsgBusImpl_kafka.h"
23 #include "KafkaEventCallback.h"
24 #include "KafkaDeliveryReportCallback.h"
25 #include "KafkaTopicSelector.h"
26
27 #include <boost/algorithm/string/replace.hpp>
28
29 #include <librdkafka/rdkafka.h>
30
31
32 #include "md5.h"
33
34 using namespace std;
35
36 /******************************************************************//**
37 * \brief This function will initialize and connect to Kafka.
38 *
39 * \details It is expected that this class will start off with a new connection.
40 *
41 * \param [in] logPtr Pointer to Logger instance
42 * \param [in] cfg Pointer to the config instance
43 * \param [in] c_hash_id Collector Hash ID
44 ********************************************************************/
msgBus_kafka(Logger * logPtr,Config * cfg,u_char * c_hash_id)45 msgBus_kafka::msgBus_kafka(Logger *logPtr, Config *cfg, u_char *c_hash_id) {
46 logger = logPtr;
47
48 producer_buf = new unsigned char[MSGBUS_WORKING_BUF_SIZE];
49 prep_buf = new char[MSGBUS_WORKING_BUF_SIZE];
50
51 hash_toStr(c_hash_id, collector_hash);
52
53 isConnected = false;
54 conf = RdKafka::Conf::create(RdKafka::Conf::CONF_GLOBAL);
55
56 disableDebug();
57
58 // TODO: Init the topic selector class
59
60 router_seq = 0L;
61 collector_seq = 0L;
62 peer_seq = 0L;
63 base_attr_seq = 0L;
64 unicast_prefix_seq = 0L;
65 l3vpn_seq = 0L;
66 evpn_seq = 0L;
67 ls_node_seq = 0L;
68 ls_link_seq = 0L;
69 ls_prefix_seq = 0L;
70 bmp_stat_seq = 0L;
71
72 this->cfg = cfg;
73
74 // Make the connection to the server
75 event_callback = NULL;
76 delivery_callback = NULL;
77 producer = NULL;
78 topicSel = NULL;
79
80 router_ip.assign("");
81 bzero(router_hash, sizeof(router_hash));
82
83 connect();
84 }
85
86 /**
87 * Destructor
88 */
~msgBus_kafka()89 msgBus_kafka::~msgBus_kafka() {
90
91 SELF_DEBUG("Destory msgBus Kafka instance");
92
93 // Disconnect/term the router if not already done
94 MsgBusInterface::obj_router r_object;
95 bool router_defined = false;
96 for (int i=0; i < sizeof(router_hash); i++) {
97 if (router_hash[i] != 0) {
98 router_defined = true;
99 break;
100 }
101 }
102
103 if (router_defined) {
104 bzero(&r_object, sizeof(r_object));
105 memcpy(r_object.hash_id, router_hash, sizeof(r_object.hash_id));
106 snprintf((char *)r_object.ip_addr, sizeof(r_object.ip_addr), "%s", router_ip.c_str());
107 r_object.term_reason_code = 65533;
108 snprintf(r_object.term_reason_text, sizeof(r_object.term_reason_text),
109 "Connection closed");
110
111 update_Router(r_object, msgBus_kafka::ROUTER_ACTION_TERM);
112 }
113
114 sleep(2);
115
116 delete [] producer_buf;
117 delete [] prep_buf;
118
119 peer_list.clear();
120
121 disconnect(500);
122
123 delete conf;
124 }
125
126 /**
127 * Disconnect from Kafka
128 */
disconnect(int wait_ms)129 void msgBus_kafka::disconnect(int wait_ms) {
130
131 if (isConnected) {
132 int i = 0;
133 while (producer->outq_len() > 0 and i < 8) {
134 LOG_INFO("Waiting for producer to finish before disconnecting: outq=%d", producer->outq_len());
135 producer->poll(500);
136 i++;
137 }
138 }
139
140 if (topicSel != NULL) delete topicSel;
141
142 topicSel = NULL;
143
144 if (producer != NULL) delete producer;
145 producer = NULL;
146
147 // suggested by librdkafka to free memory
148 RdKafka::wait_destroyed(wait_ms);
149
150 if (event_callback != NULL) delete event_callback;
151 event_callback = NULL;
152
153 if (delivery_callback != NULL) delete delivery_callback;
154 delivery_callback = NULL;
155
156 isConnected = false;
157 }
158
159 /**
160 * Connects to Kafka broker
161 */
connect()162 void msgBus_kafka::connect() {
163 string errstr;
164 string value;
165 std::ostringstream rx_bytes, tx_bytes, sess_timeout, socket_timeout;
166 std::ostringstream q_buf_max_msgs, q_buf_max_kbytes, q_buf_max_ms,
167 msg_send_max_retry, retry_backoff_ms;
168
169 disconnect();
170
171 /*
172 * Configure Kafka Producer (https://kafka.apache.org/08/configuration.html)
173 */
174 //TODO: Add config options to change these settings
175
176 // Disable logging of connection close/idle timeouts caused by Kafka 0.9.x (connections.max.idle.ms)
177 // See https://github.com/edenhill/librdkafka/issues/437 for more details.
178 // TODO: change this when librdkafka has better handling of the idle disconnects
179 value = "false";
180 if (conf->set("log.connection.close", value, errstr) != RdKafka::Conf::CONF_OK) {
181 LOG_ERR("Failed to configure log.connection.close=false: %s.", errstr.c_str());
182 }
183
184 value = "true";
185 if (conf->set("api.version.request", value, errstr) != RdKafka::Conf::CONF_OK) {
186 LOG_ERR("Failed to configure api.version.request=true: %s.", errstr.c_str());
187 }
188
189 // TODO: Add config for address family - default is any
190 /*value = "v4";
191 if (conf->set("broker.address.family", value, errstr) != RdKafka::Conf::CONF_OK) {
192 LOG_ERR("Failed to configure broker.address.family: %s.", errstr.c_str());
193 }*/
194
195
196 // Batch message number
197 value = "100";
198 if (conf->set("batch.num.messages", value, errstr) != RdKafka::Conf::CONF_OK) {
199 LOG_ERR("Failed to configure batch.num.messages for kafka: %s.", errstr.c_str());
200 throw "ERROR: Failed to configure kafka batch.num.messages";
201 }
202
203 // Batch message max wait time (in ms)
204 q_buf_max_ms << cfg->q_buf_max_ms;
205 if (conf->set("queue.buffering.max.ms", q_buf_max_ms.str(), errstr) != RdKafka::Conf::CONF_OK) {
206 LOG_ERR("Failed to configure queue.buffering.max.ms for kafka: %s.", errstr.c_str());
207 throw "ERROR: Failed to configure kafka queue.buffer.max.ms";
208 }
209
210
211 // compression
212 value = cfg->compression;
213 if (conf->set("compression.codec", value, errstr) != RdKafka::Conf::CONF_OK) {
214 LOG_ERR("Failed to configure %s compression for kafka: %s.", value.c_str(), errstr.c_str());
215 throw "ERROR: Failed to configure kafka compression";
216 }
217
218 // broker list
219 if (conf->set("metadata.broker.list", cfg->kafka_brokers, errstr) != RdKafka::Conf::CONF_OK) {
220 LOG_ERR("Failed to configure broker list for kafka: %s", errstr.c_str());
221 throw "ERROR: Failed to configure kafka broker list";
222 }
223
224 // Maximum transmit byte size
225 tx_bytes << cfg->tx_max_bytes;
226 if (conf->set("message.max.bytes", tx_bytes.str(),
227 errstr) != RdKafka::Conf::CONF_OK)
228 {
229 LOG_ERR("Failed to configure transmit max message size for kafka: %s",
230 errstr.c_str());
231 throw "ERROR: Failed to configure transmit max message size";
232 }
233
234 // Maximum receive byte size
235 rx_bytes << cfg->rx_max_bytes;
236 if (conf->set("receive.message.max.bytes", rx_bytes.str(),
237 errstr) != RdKafka::Conf::CONF_OK)
238 {
239 LOG_ERR("Failed to configure receive max message size for kafka: %s",
240 errstr.c_str());
241 throw "ERROR: Failed to configure receive max message size";
242 }
243
244 // Client group session and failure detection timeout
245 sess_timeout << cfg->session_timeout;
246 if (conf->set("session.timeout.ms", sess_timeout.str(),
247 errstr) != RdKafka::Conf::CONF_OK)
248 {
249 LOG_ERR("Failed to configure session timeout for kafka: %s",
250 errstr.c_str());
251 throw "ERROR: Failed to configure session timeout ";
252 }
253
254 // Timeout for network requests
255 socket_timeout << cfg->socket_timeout;
256 if (conf->set("socket.timeout.ms", socket_timeout.str(),
257 errstr) != RdKafka::Conf::CONF_OK)
258 {
259 LOG_ERR("Failed to configure socket timeout for kafka: %s",
260 errstr.c_str());
261 throw "ERROR: Failed to configure socket timeout ";
262 }
263
264 // Maximum number of messages allowed on the producer queue
265 q_buf_max_msgs << cfg->q_buf_max_msgs;
266 if (conf->set("queue.buffering.max.messages", q_buf_max_msgs.str(),
267 errstr) != RdKafka::Conf::CONF_OK)
268 {
269 LOG_ERR("Failed to configure max messages in buffer for kafka: %s",
270 errstr.c_str());
271 throw "ERROR: Failed to configure max messages in buffer ";
272 }
273
274 // Maximum number of messages allowed on the producer queue
275 q_buf_max_kbytes << cfg->q_buf_max_kbytes;
276 if (conf->set("queue.buffering.max.kbytes", q_buf_max_kbytes.str(),
277 errstr) != RdKafka::Conf::CONF_OK)
278 {
279 LOG_ERR("Failed to configure max kbytes in buffer for kafka: %s",
280 errstr.c_str());
281 throw "ERROR: Failed to configure max kbytes in buffer ";
282 }
283
284
285 // How many times to retry sending a failing MessageSet
286 msg_send_max_retry << cfg->msg_send_max_retry;
287 if (conf->set("message.send.max.retries", msg_send_max_retry.str(),
288 errstr) != RdKafka::Conf::CONF_OK)
289 {
290 LOG_ERR("Failed to configure max retries for sending "
291 "failed message for kafka: %s",
292 errstr.c_str());
293 throw "ERROR: Failed to configure max retries for sending failed message";
294 }
295
296 // Backoff time in ms before retrying a message send
297 retry_backoff_ms << cfg->retry_backoff_ms;
298 if (conf->set("retry.backoff.ms", retry_backoff_ms.str(),
299 errstr) != RdKafka::Conf::CONF_OK)
300 {
301 LOG_ERR("Failed to configure backoff time before retrying to send"
302 "failed message for kafka: %s",
303 errstr.c_str());
304 throw "ERROR: Failed to configure backoff time before resending"
305 " failed messages ";
306 }
307
308 // Register event callback
309 event_callback = new KafkaEventCallback(&isConnected, logger);
310 if (conf->set("event_cb", event_callback, errstr) != RdKafka::Conf::CONF_OK) {
311 LOG_ERR("Failed to configure kafka event callback: %s", errstr.c_str());
312 throw "ERROR: Failed to configure kafka event callback";
313 }
314
315 // Register delivery report callback
316 /*
317 delivery_callback = new KafkaDeliveryReportCallback();
318
319 if (conf->set("dr_cb", delivery_callback, errstr) != RdKafka::Conf::CONF_OK) {
320 LOG_ERR("Failed to configure kafka delivery report callback: %s", errstr.c_str());
321 throw "ERROR: Failed to configure kafka delivery report callback";
322 }
323 */
324
325
326 // Create producer and connect
327 producer = RdKafka::Producer::create(conf, errstr);
328 if (producer == NULL) {
329 LOG_ERR("rtr=%s: Failed to create producer: %s", router_ip.c_str(), errstr.c_str());
330 throw "ERROR: Failed to create producer";
331 }
332
333 isConnected = true;
334
335 producer->poll(1000);
336
337 if (not isConnected) {
338 LOG_ERR("rtr=%s: Failed to connect to Kafka, will try again in a few", router_ip.c_str());
339 return;
340
341 }
342
343 /*
344 * Initialize the topic selector/handler
345 */
346 try {
347 topicSel = new KafkaTopicSelector(logger, cfg, producer);
348
349 } catch (char const *str) {
350 LOG_ERR("rtr=%s: Failed to create one or more topics, will try again in a few: err=%s", router_ip.c_str(), str);
351 isConnected = false;
352 return;
353 }
354
355 producer->poll(100);
356 }
357
358 /**
359 * produce message to Kafka
360 *
361 * \param [in] topic_var Topic var to use in KafkaTopicSelector::getTopic() MSGBUS_TOPIC_VAR_*
362 * \param [in] msg message to produce
363 * \param [in] msg_size Length in bytes of the message
364 * \param [in] rows Number of rows
365 * \param [in] key Hash key
366 * \param [in] peer_group Peer group name - empty/NULL if not set or used
367 * \param [in] peer_asn Peer ASN
368 */
produce(const char * topic_var,char * msg,size_t msg_size,int rows,string key,const string * peer_group,uint32_t peer_asn)369 void msgBus_kafka::produce(const char *topic_var, char *msg, size_t msg_size, int rows, string key,
370 const string *peer_group, uint32_t peer_asn) {
371 size_t len;
372 RdKafka::Topic *topic = NULL;
373
374 while (isConnected == false or topicSel == NULL) {
375 // Do not attempt to reconnect if this is the main process (router ip is null)
376 // Changed on 10/29/15 to support docker startup delay with kafka
377 /*
378 if (router_ip.size() <= 0) {
379 return;
380 }*/
381
382 LOG_WARN("rtr=%s: Not connected to Kafka, attempting to reconnect", router_ip.c_str());
383 connect();
384
385 sleep(1);
386 }
387
388 // if topic is disabled, don't bother producing the message
389 // TODO: it would be more efficient to move this check to the top of the various update_* methods, but I'm not sure which parts of these methods have side-effects that need to be preserved.
390 if (!topicSel->topicEnabled(topic_var))
391 return;
392
393 char headers[256];
394 len = snprintf(headers, sizeof(headers), "V: %s\nC_HASH_ID: %s\nT: %s\nL: %lu\nR: %d\n\n",
395 MSGBUS_API_VERSION, collector_hash.c_str(), topic_var, msg_size, rows);
396
397 memcpy(producer_buf, headers, len);
398 memcpy(producer_buf+len, msg, msg_size);
399
400
401 topic = topicSel->getTopic(topic_var, &router_group_name, peer_group, peer_asn);
402 if (topic != NULL) {
403 SELF_DEBUG("rtr=%s: Producing message: topic=%s key=%s, msg size = %lu", router_ip.c_str(),
404 topic->name().c_str(), key.c_str(), msg_size);
405
406 RdKafka::ErrorCode resp = producer->produce(topic, RdKafka::Topic::PARTITION_UA,
407 RdKafka::Producer::RK_MSG_COPY,
408 producer_buf, msg_size + len,
409 (const std::string *) &key, NULL);
410 if (resp != RdKafka::ERR_NO_ERROR) {
411 LOG_ERR("rtr=%s: Failed to produce message: %s", router_ip.c_str(), RdKafka::err2str(resp).c_str());
412 producer->poll(100);
413 }
414 } else {
415 LOG_NOTICE("rtr=%s: failed to produce message because topic couldn't be found: topic=%s key=%s, msg size = %lu", router_ip.c_str(),
416 topic_var, key.c_str(), msg_size);
417 }
418
419 producer->poll(0);
420 }
421
422 /**
423 * Abstract method Implementation - See MsgBusInterface.hpp for details
424 */
update_Collector(obj_collector & c_object,collector_action_code action_code)425 void msgBus_kafka::update_Collector(obj_collector &c_object, collector_action_code action_code) {
426 char buf[4096]; // Misc working buffer
427
428 string ts;
429 getTimestamp(c_object.timestamp_secs, c_object.timestamp_us, ts);
430
431 char *action = const_cast<char *>("change");
432
433 switch (action_code) {
434 case COLLECTOR_ACTION_STARTED:
435 action = const_cast<char *>("started");
436 break;
437 case COLLECTOR_ACTION_CHANGE:
438 action = const_cast<char *>("change");
439 break;
440 case COLLECTOR_ACTION_HEARTBEAT:
441 action = const_cast<char *>("heartbeat");
442 break;
443 case COLLECTOR_ACTION_STOPPED:
444 action = const_cast<char *>("stopped");
445 break;
446 }
447
448 snprintf(buf, sizeof(buf),
449 "%s\t%" PRIu64 "\t%s\t%s\t%s\t%u\t%s\n",
450 action, collector_seq, c_object.admin_id, collector_hash.c_str(),
451 c_object.routers, c_object.router_count, ts.c_str());
452
453 produce(MSGBUS_TOPIC_VAR_COLLECTOR, buf, strlen(buf), 1, collector_hash, NULL, 0);
454
455 collector_seq++;
456 }
457
458 /**
459 * Abstract method Implementation - See MsgBusInterface.hpp for details
460 */
update_Router(obj_router & r_object,router_action_code code)461 void msgBus_kafka::update_Router(obj_router &r_object, router_action_code code) {
462 char buf[4096]; // Misc working buffer
463
464 // Convert binary hash to string
465 string r_hash_str;
466 hash_toStr(r_object.hash_id, r_hash_str);
467
468 bool skip_if_defined = true;
469
470 string action = "first";
471
472 switch (code) {
473 case ROUTER_ACTION_FIRST :
474 action.assign("first");
475 break;
476
477 case ROUTER_ACTION_INIT :
478 skip_if_defined = false;
479 action.assign("init");
480 break;
481
482 case ROUTER_ACTION_TERM:
483 skip_if_defined = false;
484 action.assign("term");
485 bzero(router_hash, sizeof(router_hash));
486 break;
487 }
488
489
490 // Check if we have already processed this entry, if so return
491 if (skip_if_defined) {
492 for (int i=0; i < sizeof(router_hash); i++) {
493 if (router_hash[i] != 0)
494 return;
495 }
496 }
497
498 if (code != ROUTER_ACTION_TERM)
499 memcpy(router_hash, r_object.hash_id, sizeof(router_hash));
500
501 router_ip.assign((char *)r_object.ip_addr); // Update router IP for logging
502
503 string descr((char *)r_object.descr);
504 boost::replace_all(descr, "\n", "\\n");
505 boost::replace_all(descr, "\t", " ");
506
507 string initData(r_object.initiate_data);
508 boost::replace_all(initData, "\n", "\\n");
509 boost::replace_all(initData, "\t", " ");
510
511 string termData(r_object.term_data);
512 boost::replace_all(termData, "\n", "\\n");
513 boost::replace_all(termData, "\t", " ");
514
515 string ts;
516 getTimestamp(r_object.timestamp_secs, r_object.timestamp_us, ts);
517
518 // Get the hostname
519 string hostname = "";
520 if (strlen((char *)r_object.name) <= 0) {
521 resolveIp((char *) r_object.ip_addr, hostname);
522 snprintf((char *)r_object.name, sizeof(r_object.name)-1, "%s", hostname.c_str());
523 }
524
525 if (topicSel != NULL)
526 topicSel->lookupRouterGroup((char *)r_object.name, (char *)r_object.ip_addr, router_group_name);
527
528 size_t size = snprintf(buf, sizeof(buf),
529 "%s\t%" PRIu64 "\t%s\t%s\t%s\t%s\t%" PRIu16 "\t%s\t%s\t%s\t%s\t%s\n", action.c_str(),
530 router_seq, r_object.name, r_hash_str.c_str(), r_object.ip_addr, descr.c_str(),
531 r_object.term_reason_code, r_object.term_reason_text,
532 initData.c_str(), termData.c_str(), ts.c_str(), r_object.bgp_id);
533
534 produce(MSGBUS_TOPIC_VAR_ROUTER, buf, size, 1, r_hash_str, NULL, 0);
535
536 router_seq++;
537 }
538
539 /**
540 * Abstract method Implementation - See MsgBusInterface.hpp for details
541 */
update_Peer(obj_bgp_peer & peer,obj_peer_up_event * up,obj_peer_down_event * down,peer_action_code code)542 void msgBus_kafka::update_Peer(obj_bgp_peer &peer, obj_peer_up_event *up, obj_peer_down_event *down, peer_action_code code) {
543
544 char buf[4096]; // Misc working buffer
545
546 string r_hash_str;
547 hash_toStr(peer.router_hash_id, r_hash_str);
548
549 // Generate the hash
550 MD5 hash;
551
552 hash.update((unsigned char *) peer.peer_addr,
553 strlen(peer.peer_addr));
554 hash.update((unsigned char *) peer.peer_rd, strlen(peer.peer_rd));
555 hash.update((unsigned char *)r_hash_str.c_str(), r_hash_str.length());
556
557 /* TODO: Uncomment once this is fixed in XR
558 * Disable hashing the bgp peer ID since XR has an issue where it sends 0.0.0.0 on subsequent PEER_UP's
559 * This will be fixed in XR, but for now we can disable hashing on it.
560 *
561 hash.update((unsigned char *) p_object.peer_bgp_id,
562 strlen(p_object.peer_bgp_id));
563 */
564
565 hash.finalize();
566
567 // Save the hash
568 unsigned char *hash_raw = hash.raw_digest();
569 memcpy(peer.hash_id, hash_raw, 16);
570 delete[] hash_raw;
571
572 // Convert binary hash to string
573 string p_hash_str;
574 hash_toStr(peer.hash_id, p_hash_str);
575
576 bool skip_if_in_cache = true;
577 bool add_to_cache = true;
578
579 string action = "first";
580
581 // Determine the action and if cache should be used or not - don't want to do too much in this switch block
582 switch (code) {
583 case PEER_ACTION_FIRST :
584 action.assign("first");
585 break;
586
587 case PEER_ACTION_UP :
588 skip_if_in_cache = false;
589 action.assign("up");
590 break;
591
592 case PEER_ACTION_DOWN:
593 skip_if_in_cache = false;
594 action.assign("down");
595 add_to_cache = false;
596
597 if (peer_list.find(p_hash_str) != peer_list.end())
598 peer_list.erase(p_hash_str);
599
600 break;
601 }
602
603 // Check if we have already processed this entry, if so return
604 if (skip_if_in_cache and peer_list.find(p_hash_str) != peer_list.end()) {
605 return;
606 }
607
608 // Get the hostname using DNS
609 string hostname;
610 resolveIp(peer.peer_addr, hostname);
611
612 string ts;
613 getTimestamp(peer.timestamp_secs, peer.timestamp_us, ts);
614
615 // Insert/Update map entry
616 if (add_to_cache) {
617 if (topicSel != NULL)
618 topicSel->lookupPeerGroup(hostname, peer.peer_addr, peer.peer_as, peer_list[p_hash_str]);
619 }
620
621 switch (code) {
622 case PEER_ACTION_FIRST :
623 snprintf(buf, sizeof(buf),
624 "%s\t%" PRIu64 "\t%s\t%s\t%s\t%s\t%s\t%s\t%" PRIu32 "\t%s\t%s\t\t\t\t\t\t\t\t\t\t\t\t\t\t\t%d\t%d\t%d\t%d\t%d\t%s\n",
625 action.c_str(), peer_seq, p_hash_str.c_str(), r_hash_str.c_str(), hostname.c_str(),
626 peer.peer_bgp_id,router_ip.c_str(), ts.c_str(), peer.peer_as, peer.peer_addr,peer.peer_rd,
627 peer.isL3VPN, peer.isPrePolicy, peer.isIPv4, peer.isLocRib, peer.isLocRibFiltered, peer.table_name);
628 action.assign("first");
629 break;
630
631 case PEER_ACTION_UP : {
632 if (up == NULL)
633 return;
634
635 string infoData(up->info_data);
636 if (up->info_data[0] != 0) {
637 boost::replace_all(infoData, "\n", "\\n");
638 boost::replace_all(infoData, "\t", " ");
639 }
640
641 snprintf(buf, sizeof(buf),
642 "%s\t%" PRIu64 "\t%s\t%s\t%s\t%s\t%s\t%s\t%" PRIu32 "\t%s\t%s\t%" PRIu16 "\t%" PRIu32 "\t%s\t%" PRIu16
643 "\t%s\t%s\t%s\t%s\t%" PRIu16 "\t%" PRIu16 "\t\t\t\t\t%d\t%d\t%d\t%d\t%d\t%s\n",
644 action.c_str(), peer_seq, p_hash_str.c_str(), r_hash_str.c_str(), hostname.c_str(),
645 peer.peer_bgp_id, router_ip.c_str(), ts.c_str(), peer.peer_as, peer.peer_addr, peer.peer_rd,
646
647 /* Peer UP specific fields */
648 up->remote_port, up->local_asn, up->local_ip, up->local_port, up->local_bgp_id, infoData.c_str(), up->sent_cap,
649 up->recv_cap, up->remote_hold_time, up->local_hold_time,
650 peer.isL3VPN, peer.isPrePolicy, peer.isIPv4, peer.isLocRib, peer.isLocRibFiltered, peer.table_name);
651
652 skip_if_in_cache = false;
653 action.assign("up");
654 break;
655 }
656 case PEER_ACTION_DOWN: {
657 if (down == NULL)
658 return;
659
660 snprintf(buf, sizeof(buf),
661 "%s\t%" PRIu64 "\t%s\t%s\t%s\t%s\t%s\t%s\t%" PRIu32 "\t%s\t%s\t\t\t\t\t\t\t\t\t\t\t%d\t%d\t%d\t%s\t%d\t%d\t%d\t%d\t%d\t\n",
662 action.c_str(), peer_seq, p_hash_str.c_str(), r_hash_str.c_str(), hostname.c_str(),
663 peer.peer_bgp_id, router_ip.c_str(), ts.c_str(), peer.peer_as, peer.peer_addr, peer.peer_rd,
664
665 /* Peer DOWN specific fields */
666 down->bmp_reason, down->bgp_err_code, down->bgp_err_subcode, down->error_text,
667
668 peer.isL3VPN, peer.isPrePolicy, peer.isIPv4, peer.isLocRib, peer.isLocRibFiltered);
669
670 skip_if_in_cache = false;
671 action.assign("down");
672 add_to_cache = false;
673
674 if (peer_list.find(p_hash_str) != peer_list.end())
675 peer_list.erase(p_hash_str);
676
677 break;
678 }
679 }
680
681 produce(MSGBUS_TOPIC_VAR_PEER, buf, strlen(buf), 1, p_hash_str, &peer_list[p_hash_str], peer.peer_as);
682
683 peer_seq++;
684 }
685
686 /**
687 * Abstract method Implementation - See MsgBusInterface.hpp for details
688 */
update_baseAttribute(obj_bgp_peer & peer,obj_path_attr & attr,base_attr_action_code code)689 void msgBus_kafka::update_baseAttribute(obj_bgp_peer &peer, obj_path_attr &attr, base_attr_action_code code) {
690
691 prep_buf[0] = 0;
692 size_t buf_len; // size of the message in buf
693
694 string path_hash_str;
695 string p_hash_str;
696 string r_hash_str;
697 hash_toStr(peer.hash_id, p_hash_str);
698 hash_toStr(peer.router_hash_id, r_hash_str);
699
700
701 // Generate the hash
702 MD5 hash;
703
704 //hash.update(path_object.peer_hash_id, HASH_SIZE);
705 hash.update((unsigned char *) attr.as_path.c_str(), attr.as_path.length());
706 hash.update((unsigned char *) attr.next_hop,
707 strlen(attr.next_hop));
708 hash.update((unsigned char *) attr.aggregator,
709 strlen(attr.aggregator));
710 hash.update((unsigned char *) attr.origin,
711 strlen(attr.origin));
712 hash.update((unsigned char *) &attr.med, sizeof(attr.med));
713 hash.update((unsigned char *) &attr.local_pref,
714 sizeof(attr.local_pref));
715
716 hash.update((unsigned char *) attr.community_list.c_str(), attr.community_list.length());
717 hash.update((unsigned char *) attr.ext_community_list.c_str(), attr.ext_community_list.length());
718 hash.update((unsigned char *) p_hash_str.c_str(), p_hash_str.length());
719
720 hash.finalize();
721
722 // Save the hash
723 unsigned char *hash_raw = hash.raw_digest();
724 memcpy(attr.hash_id, hash_raw, 16);
725 delete[] hash_raw;
726
727 hash_toStr(attr.hash_id, path_hash_str);
728
729 string ts;
730 getTimestamp(peer.timestamp_secs, peer.timestamp_us, ts);
731
732 buf_len =
733 snprintf(prep_buf, MSGBUS_WORKING_BUF_SIZE,
734 "add\t%" PRIu64 "\t%s\t%s\t%s\t%s\t%s\t%" PRIu32 "\t%s\t%s\t%s\t%" PRIu16 "\t%" PRIu32
735 "\t%s\t%" PRIu32 "\t%" PRIu32 "\t%s\t%s\t%s\t%s\t%d\t%d\t%s\t%s\n",
736 base_attr_seq, path_hash_str.c_str(), r_hash_str.c_str(), router_ip.c_str(), p_hash_str.c_str(),
737 peer.peer_addr,peer.peer_as, ts.c_str(),
738 attr.origin, attr.as_path.c_str(), attr.as_path_count, attr.origin_as, attr.next_hop, attr.med,
739 attr.local_pref, attr.aggregator, attr.community_list.c_str(), attr.ext_community_list.c_str(), attr.cluster_list.c_str(),
740 attr.atomic_agg, attr.nexthop_isIPv4, attr.originator_id,attr.large_community_list.c_str());
741
742 produce(MSGBUS_TOPIC_VAR_BASE_ATTRIBUTE, prep_buf, buf_len, 1, p_hash_str, &peer_list[p_hash_str], peer.peer_as);
743
744 ++base_attr_seq;
745 }
746
747 /**
748 * Abstract method Implementation - See MsgBusInterface.hpp for details
749 */
update_L3Vpn(obj_bgp_peer & peer,std::vector<obj_vpn> & vpn,obj_path_attr * attr,vpn_action_code code)750 void msgBus_kafka::update_L3Vpn(obj_bgp_peer &peer, std::vector<obj_vpn> &vpn,
751 obj_path_attr *attr, vpn_action_code code) {
752
753 prep_buf[0] = 0;
754
755 char buf2[80000]; // Second working buffer
756 size_t buf_len = 0; // query buffer length
757
758 string vpn_hash_str;
759 string path_hash_str;
760 string p_hash_str;
761 string r_hash_str;
762
763 hash_toStr(peer.router_hash_id, r_hash_str);
764
765 if (attr != NULL)
766 hash_toStr(attr->hash_id, path_hash_str);
767
768 hash_toStr(peer.hash_id, p_hash_str);
769
770 string ts;
771 getTimestamp(peer.timestamp_secs, peer.timestamp_us, ts);
772
773 // Loop through the vector array of vpn entries
774 for (size_t i = 0; i < vpn.size(); i++) {
775
776 // Generate the hash
777 MD5 hash;
778
779 hash.update((unsigned char *) vpn[i].prefix, strlen(vpn[i].prefix));
780 hash.update(&vpn[i].prefix_len, sizeof(vpn[i].prefix_len));
781 hash.update((unsigned char *) vpn[i].rd_administrator_subfield.c_str(),
782 vpn[i].rd_administrator_subfield.length());
783 hash.update((unsigned char *) vpn[i].rd_assigned_number.c_str(),
784 vpn[i].rd_assigned_number.length());
785
786 hash.update((unsigned char *) p_hash_str.c_str(), p_hash_str.length());
787
788 // Add path ID to hash only if exists
789 if (vpn[i].path_id > 0)
790 hash.update((unsigned char *)&vpn[i].path_id, sizeof(vpn[i].path_id));
791
792 /*
793 * Add constant "1" to hash if labels are present
794 * Withdrawn and updated NLRI's do not carry the original label, therefore we cannot
795 * hash on the label string. Instead, we has on a constant value of 1.
796 */
797 if (vpn[i].labels[0] != 0) {
798 buf2[0] = 1;
799 hash.update((unsigned char *) buf2, 1);
800 buf2[0] = 0;
801 }
802
803 hash.finalize();
804
805 // Save the hash
806 unsigned char *hash_raw = hash.raw_digest();
807 memcpy(vpn[i].hash_id, hash_raw, 16);
808 delete[] hash_raw;
809
810 // Build the query
811 hash_toStr(vpn[i].hash_id, vpn_hash_str);
812
813 switch (code) {
814
815 case VPN_ACTION_ADD:
816 if (attr == NULL)
817 return;
818
819 buf_len += snprintf(buf2, sizeof(buf2),
820 "add\t%" PRIu64 "\t%s\t%s\t%s\t%s\t%s\t%s\t%" PRIu32 "\t%s\t%s\t%d\t%d\t%s\t%s\t%" PRIu16
821 "\t%" PRIu32 "\t%s\t%" PRIu32 "\t%" PRIu32 "\t%s\t%s\t%s\t%s\t%d\t%d\t%s\t%" PRIu32
822 "\t%s\t%d\t%d\t%s:%s\t%d\t%s\n",
823 l3vpn_seq, vpn_hash_str.c_str(), r_hash_str.c_str(),
824 router_ip.c_str(),path_hash_str.c_str(), p_hash_str.c_str(),
825 peer.peer_addr, peer.peer_as, ts.c_str(), vpn[i].prefix, vpn[i].prefix_len,
826 vpn[i].isIPv4, attr->origin,
827 attr->as_path.c_str(), attr->as_path_count, attr->origin_as, attr->next_hop, attr->med, attr->local_pref,
828 attr->aggregator,
829 attr->community_list.c_str(), attr->ext_community_list.c_str(), attr->cluster_list.c_str(),
830 attr->atomic_agg, attr->nexthop_isIPv4,
831 attr->originator_id, vpn[i].path_id, vpn[i].labels, peer.isPrePolicy, peer.isAdjIn,
832 vpn[i].rd_administrator_subfield.c_str(), vpn[i].rd_assigned_number.c_str(), vpn[i].rd_type,
833 attr->large_community_list.c_str());
834
835 break;
836
837 case VPN_ACTION_DEL:
838 buf_len += snprintf(buf2, sizeof(buf2),
839 "del\t%" PRIu64 "\t%s\t%s\t%s\t\t%s\t%s\t%" PRIu32 "\t%s\t%s\t%d\t%d\t\t\t"
840 "\t\t\t\t\t\t\t\t\t\t\t\t%" PRIu32
841 "\t%s\t%d\t%d\t%s:%s\t%d\t\n",
842 l3vpn_seq, vpn_hash_str.c_str(), r_hash_str.c_str(),
843 router_ip.c_str(), p_hash_str.c_str(),
844 peer.peer_addr, peer.peer_as, ts.c_str(), vpn[i].prefix, vpn[i].prefix_len,
845 vpn[i].isIPv4, vpn[i].path_id, vpn[i].labels, peer.isPrePolicy, peer.isAdjIn,
846 vpn[i].rd_administrator_subfield.c_str(), vpn[i].rd_assigned_number.c_str(),
847 vpn[i].rd_type);
848 break;
849
850 }
851
852 // Cat the entry to the query buff
853 if (buf_len < MSGBUS_WORKING_BUF_SIZE /* size of buf */)
854 strcat(prep_buf, buf2);
855
856 ++l3vpn_seq;
857 }
858
859 produce(MSGBUS_TOPIC_VAR_L3VPN, prep_buf, strlen(prep_buf), vpn.size(), p_hash_str,
860 &peer_list[p_hash_str], peer.peer_as);
861 }
862
863
864 /**
865 * Abstract method Implementation - See MsgBusInterface.hpp for details
866 */
update_eVPN(obj_bgp_peer & peer,std::vector<obj_evpn> & vpn,obj_path_attr * attr,vpn_action_code code)867 void msgBus_kafka::update_eVPN(obj_bgp_peer &peer, std::vector<obj_evpn> &vpn,
868 obj_path_attr *attr, vpn_action_code code) {
869
870 prep_buf[0] = 0;
871
872 char buf2[80000]; // Second working buffer
873 size_t buf_len = 0; // query buffer length
874
875 string vpn_hash_str;
876 string path_hash_str;
877 string p_hash_str;
878 string r_hash_str;
879
880 hash_toStr(peer.router_hash_id, r_hash_str);
881
882 if (attr != NULL)
883 hash_toStr(attr->hash_id, path_hash_str);
884
885 hash_toStr(peer.hash_id, p_hash_str);
886
887 string ts;
888 getTimestamp(peer.timestamp_secs, peer.timestamp_us, ts);
889
890 // Loop through the vector array of vpn entries
891 for (size_t i = 0; i < vpn.size(); i++) {
892
893 // Generate the hash
894 MD5 hash;
895
896 hash.update((unsigned char *) p_hash_str.c_str(), p_hash_str.length());
897
898 hash.update((unsigned char *) vpn[i].mac, strlen(vpn[i].mac));
899 hash.update((unsigned char *) vpn[i].ip, strlen(vpn[i].ip));
900 hash.update(&vpn[i].ip_len, sizeof(vpn[i].ip_len));
901 hash.update((unsigned char *) vpn[i].ethernet_segment_identifier, strlen(vpn[i].ethernet_segment_identifier));
902 hash.update((unsigned char *) vpn[i].rd_administrator_subfield.c_str(),
903 vpn[i].rd_administrator_subfield.length());
904 hash.update((unsigned char *) vpn[i].rd_assigned_number.c_str(),
905 vpn[i].rd_assigned_number.length());
906
907 // Add path ID to hash only if exists
908 if (vpn[i].path_id > 0)
909 hash.update((unsigned char *)&vpn[i].path_id, sizeof(vpn[i].path_id));
910
911 hash.finalize();
912
913 // Save the hash
914 unsigned char *hash_raw = hash.raw_digest();
915 memcpy(vpn[i].hash_id, hash_raw, 16);
916 delete[] hash_raw;
917
918 // Build the query
919 hash_toStr(vpn[i].hash_id, vpn_hash_str);
920
921 switch (code) {
922
923 case VPN_ACTION_ADD:
924 if (attr == NULL)
925 return;
926
927 buf_len += snprintf(buf2, sizeof(buf2),
928 "add\t%" PRIu64 "\t%s\t%s\t%s\t%s\t%s\t%s\t%" PRIu32 "\t%s\t%s\t%s\t%" PRIu16
929 "\t%" PRIu32 "\t%s\t%" PRIu32 "\t%" PRIu32 "\t%s\t%s\t%s\t%s\t%d\t%d\t%s\t%" PRIu32
930 "\t%d\t%d\t%s:%s\t%d\t%d\t%s\t%s\t%s\t%d\t%s\t%d\t%s\t%" PRIu32 "\t%" PRIu32 "\n",
931 evpn_seq, vpn_hash_str.c_str(), r_hash_str.c_str(),
932 router_ip.c_str(),path_hash_str.c_str(), p_hash_str.c_str(),
933 peer.peer_addr, peer.peer_as, ts.c_str(),
934 attr->origin,
935 attr->as_path.c_str(), attr->as_path_count, attr->origin_as, attr->next_hop, attr->med, attr->local_pref,
936 attr->aggregator,
937 attr->community_list.c_str(), attr->ext_community_list.c_str(), attr->cluster_list.c_str(),
938 attr->atomic_agg, attr->nexthop_isIPv4,
939 attr->originator_id, vpn[i].path_id, peer.isPrePolicy, peer.isAdjIn,
940 vpn[i].rd_administrator_subfield.c_str(), vpn[i].rd_assigned_number.c_str(), vpn[i].rd_type,
941 vpn[i].originating_router_ip_len, vpn[i].originating_router_ip, vpn[i].ethernet_tag_id_hex,
942 vpn[i].ethernet_segment_identifier, vpn[i].mac_len,
943 vpn[i].mac, vpn[i].ip_len, vpn[i].ip, vpn[i].mpls_label_1, vpn[i].mpls_label_2);
944
945 break;
946
947 case VPN_ACTION_DEL:
948 buf_len += snprintf(buf2, sizeof(buf2),
949 "del\t%" PRIu64 "\t%s\t%s\t%s\t%s\t%s\t%s\t%" PRIu32 "\t%s\t\t\t"
950 "\t\t\t\t\t\t\t\t\t\t\t\t%" PRIu32
951 "\t%d\t%d\t%s:%s\t%d\t%d\t%s\t%s\t%s\t%d\t%s\t%d\t%s\t%" PRIu32 "\t%" PRIu32 "\n",
952 evpn_seq, vpn_hash_str.c_str(), r_hash_str.c_str(),
953 router_ip.c_str(),path_hash_str.c_str(), p_hash_str.c_str(),
954 peer.peer_addr, peer.peer_as, ts.c_str(),
955 vpn[i].path_id, peer.isPrePolicy, peer.isAdjIn,
956 vpn[i].rd_administrator_subfield.c_str(), vpn[i].rd_assigned_number.c_str(), vpn[i].rd_type,
957 vpn[i].originating_router_ip_len, vpn[i].originating_router_ip, vpn[i].ethernet_tag_id_hex,
958 vpn[i].ethernet_segment_identifier, vpn[i].mac_len,
959 vpn[i].mac, vpn[i].ip_len, vpn[i].ip, vpn[i].mpls_label_1, vpn[i].mpls_label_2);
960
961 break;
962
963 }
964
965 // Cat the entry to the query buff
966 if (buf_len < MSGBUS_WORKING_BUF_SIZE /* size of buf */)
967 strcat(prep_buf, buf2);
968
969 ++evpn_seq;
970 }
971
972 produce(MSGBUS_TOPIC_VAR_EVPN, prep_buf, strlen(prep_buf), vpn.size(), p_hash_str,
973 &peer_list[p_hash_str], peer.peer_as);
974 }
975
976
977 /**
978 * Abstract method Implementation - See MsgBusInterface.hpp for details
979 */
update_unicastPrefix(obj_bgp_peer & peer,std::vector<obj_rib> & rib,obj_path_attr * attr,unicast_prefix_action_code code)980 void msgBus_kafka::update_unicastPrefix(obj_bgp_peer &peer, std::vector<obj_rib> &rib,
981 obj_path_attr *attr, unicast_prefix_action_code code) {
982 //bzero(prep_buf, MSGBUS_WORKING_BUF_SIZE);
983 prep_buf[0] = 0;
984
985 char buf2[80000]; // Second working buffer
986 size_t buf_len = 0; // query buffer length
987
988 string rib_hash_str;
989 string path_hash_str;
990 string p_hash_str;
991 string r_hash_str;
992
993 hash_toStr(peer.router_hash_id, r_hash_str);
994
995 if (attr != NULL)
996 hash_toStr(attr->hash_id, path_hash_str);
997
998 hash_toStr(peer.hash_id, p_hash_str);
999
1000 string action = "add";
1001 switch (code) {
1002 case UNICAST_PREFIX_ACTION_ADD:
1003 action = "add";
1004 break;
1005 case UNICAST_PREFIX_ACTION_DEL:
1006 action = "del";
1007 break;
1008 }
1009
1010 string ts;
1011 getTimestamp(peer.timestamp_secs, peer.timestamp_us, ts);
1012
1013 // Loop through the vector array of rib entries
1014 for (size_t i = 0; i < rib.size(); i++) {
1015
1016 // Generate the hash
1017 MD5 hash;
1018
1019 hash.update((unsigned char *) rib[i].prefix, strlen(rib[i].prefix));
1020 hash.update(&rib[i].prefix_len, sizeof(rib[i].prefix_len));
1021 hash.update((unsigned char *) p_hash_str.c_str(), p_hash_str.length());
1022
1023 // Add path ID to hash only if exists
1024 if (rib[i].path_id > 0)
1025 hash.update((unsigned char *)&rib[i].path_id, sizeof(rib[i].path_id));
1026
1027 /*
1028 * Add constant "1" to hash if labels are present
1029 * Withdrawn and updated NLRI's do not carry the original label, therefore we cannot
1030 * hash on the label string. Instead, we has on a constant value of 1.
1031 */
1032 if (rib[i].labels[0] != 0) {
1033 buf2[0] = 1;
1034 hash.update((unsigned char *) buf2, 1);
1035 buf2[0] = 0;
1036 }
1037
1038 hash.finalize();
1039
1040 // Save the hash
1041 unsigned char *hash_raw = hash.raw_digest();
1042 memcpy(rib[i].hash_id, hash_raw, 16);
1043 delete[] hash_raw;
1044
1045 // Build the query
1046 hash_toStr(rib[i].hash_id, rib_hash_str);
1047
1048 switch (code) {
1049
1050 case UNICAST_PREFIX_ACTION_ADD:
1051 if (attr == NULL)
1052 return;
1053
1054 buf_len += snprintf(buf2, sizeof(buf2),
1055 "%s\t%" PRIu64 "\t%s\t%s\t%s\t%s\t%s\t%s\t%" PRIu32 "\t%s\t%s\t%d\t%d\t%s\t%s\t%" PRIu16
1056 "\t%" PRIu32 "\t%s\t%" PRIu32 "\t%" PRIu32 "\t%s\t%s\t%s\t%s\t%d\t%d\t%s\t%" PRIu32
1057 "\t%s\t%d\t%d\t%s\n",
1058 action.c_str(), unicast_prefix_seq, rib_hash_str.c_str(), r_hash_str.c_str(),
1059 router_ip.c_str(),path_hash_str.c_str(), p_hash_str.c_str(),
1060 peer.peer_addr, peer.peer_as, ts.c_str(), rib[i].prefix, rib[i].prefix_len,
1061 rib[i].isIPv4, attr->origin,
1062 attr->as_path.c_str(), attr->as_path_count, attr->origin_as, attr->next_hop, attr->med, attr->local_pref,
1063 attr->aggregator,
1064 attr->community_list.c_str(), attr->ext_community_list.c_str(), attr->cluster_list.c_str(),
1065 attr->atomic_agg, attr->nexthop_isIPv4,
1066 attr->originator_id, rib[i].path_id, rib[i].labels, peer.isPrePolicy, peer.isAdjIn,
1067 attr->large_community_list.c_str());
1068 break;
1069
1070 case UNICAST_PREFIX_ACTION_DEL:
1071 buf_len += snprintf(buf2, sizeof(buf2),
1072 "%s\t%" PRIu64 "\t%s\t%s\t%s\t\t%s\t%s\t%" PRIu32 "\t%s\t%s\t%d\t%d\t\t\t\t\t\t\t\t\t\t\t\t\t\t\t%" PRIu32
1073 "\t%s\t%d\t%d\t\n",
1074 action.c_str(), unicast_prefix_seq, rib_hash_str.c_str(), r_hash_str.c_str(),
1075 router_ip.c_str(), p_hash_str.c_str(),
1076 peer.peer_addr, peer.peer_as, ts.c_str(), rib[i].prefix, rib[i].prefix_len,
1077 rib[i].isIPv4, rib[i].path_id, rib[i].labels, peer.isPrePolicy, peer.isAdjIn);
1078 break;
1079 }
1080
1081 // Cat the entry to the query buff
1082 if (buf_len < MSGBUS_WORKING_BUF_SIZE /* size of buf */)
1083 strcat(prep_buf, buf2);
1084
1085 ++unicast_prefix_seq;
1086 ++ribSeq;
1087 }
1088
1089
1090 produce(MSGBUS_TOPIC_VAR_UNICAST_PREFIX, prep_buf, strlen(prep_buf), rib.size(), p_hash_str,
1091 &peer_list[p_hash_str], peer.peer_as);
1092 }
1093
1094 /**
1095 * Abstract method Implementation - See MsgBusInterface.hpp for details
1096 */
add_StatReport(obj_bgp_peer & peer,obj_stats_report & stats)1097 void msgBus_kafka::add_StatReport(obj_bgp_peer &peer, obj_stats_report &stats) {
1098 char buf[4096]; // Misc working buffer
1099
1100 // Build the query
1101 string p_hash_str;
1102 string r_hash_str;
1103 hash_toStr(peer.hash_id, p_hash_str);
1104 hash_toStr(peer.router_hash_id, r_hash_str);
1105
1106 string ts;
1107 getTimestamp(peer.timestamp_secs, peer.timestamp_us, ts);
1108
1109 snprintf(buf, sizeof(buf),
1110 "add\t%" PRIu64 "\t%s\t%s\t%s\t%s\t%" PRIu32 "\t%s\t%" PRIu32 "\t%" PRIu32 "\t%" PRIu32 "\t%" PRIu32 "\t%" PRIu32
1111 "\t%" PRIu32 "\t%" PRIu32 "\t%" PRIu64 "\t%" PRIu64 "\n",
1112 bmp_stat_seq, r_hash_str.c_str(), router_ip.c_str(),p_hash_str.c_str(), peer.peer_addr, peer.peer_as, ts.c_str(),
1113 stats.prefixes_rej,stats.known_dup_prefixes, stats.known_dup_withdraws, stats.invalid_cluster_list,
1114 stats.invalid_as_path_loop, stats.invalid_originator_id, stats.invalid_as_confed_loop,
1115 stats.routes_adj_rib_in, stats.routes_loc_rib);
1116
1117
1118 produce(MSGBUS_TOPIC_VAR_BMP_STAT, buf, strlen(buf), 1, p_hash_str, &peer_list[p_hash_str], peer.peer_as);
1119 ++bmp_stat_seq;
1120 }
1121
1122 /**
1123 * Abstract method Implementation - See MsgBusInterface.hpp for details
1124 */
update_LsNode(obj_bgp_peer & peer,obj_path_attr & attr,std::list<MsgBusInterface::obj_ls_node> & nodes,ls_action_code code)1125 void msgBus_kafka::update_LsNode(obj_bgp_peer &peer, obj_path_attr &attr, std::list<MsgBusInterface::obj_ls_node> &nodes,
1126 ls_action_code code) {
1127 bzero(prep_buf, MSGBUS_WORKING_BUF_SIZE);
1128
1129 char buf2[8192]; // Second working buffer
1130 int buf_len = 0; // query buffer length
1131 int i;
1132
1133 string hash_str;
1134 string r_hash_str;
1135 string path_hash_str;
1136 string peer_hash_str;
1137
1138 hash_toStr(peer.router_hash_id, r_hash_str);
1139 hash_toStr(attr.hash_id, path_hash_str);
1140 hash_toStr(peer.hash_id, peer_hash_str);
1141
1142 string action = "add";
1143 switch (code) {
1144 case LS_ACTION_ADD:
1145 action = "add";
1146 break;
1147 case LS_ACTION_DEL:
1148 action = "del";
1149 break;
1150 }
1151
1152 string ts;
1153 getTimestamp(peer.timestamp_secs, peer.timestamp_us, ts);
1154
1155 char igp_router_id[46];
1156 char router_id[46];
1157 char ospf_area_id[16] = {0};
1158 char isis_area_id[32] = {0};
1159 char dr[16];
1160
1161 // Loop through the vector array of entries
1162 int rows = 0;
1163 for (std::list<MsgBusInterface::obj_ls_node>::iterator it = nodes.begin();
1164 it != nodes.end(); it++) {
1165 ++rows;
1166 MsgBusInterface::obj_ls_node &node = (*it);
1167
1168 hash_toStr(node.hash_id, hash_str);
1169
1170 if (node.isIPv4) {
1171 inet_ntop(PF_INET, node.router_id, router_id, sizeof(router_id));
1172 } else {
1173 inet_ntop(PF_INET6, node.router_id, router_id, sizeof(router_id));
1174 }
1175
1176 if (!strcmp(node.protocol, "OSPFv3") or !strcmp(node.protocol, "OSPFv2") ) {
1177 bzero(isis_area_id, sizeof(isis_area_id));
1178 bzero(igp_router_id, sizeof(igp_router_id));
1179
1180 // The first 4 octets are the router ID and the second 4 are the DR or ZERO if no DR
1181 inet_ntop(PF_INET, node.igp_router_id, igp_router_id, sizeof(igp_router_id));
1182
1183 string hostname;
1184 resolveIp(igp_router_id, hostname);
1185 strncpy(node.name, hostname.c_str(), sizeof(node.name));
1186
1187 if ((uint32_t) *(node.igp_router_id+4) != 0) {
1188 inet_ntop(PF_INET, node.igp_router_id+4, dr, sizeof(dr));
1189 strncat(igp_router_id, "[", sizeof(igp_router_id) - strlen(igp_router_id) - 1);
1190 strncat(igp_router_id, dr, sizeof(igp_router_id) - strlen(igp_router_id) - 1);
1191 strncat(igp_router_id, "]", sizeof(igp_router_id) - strlen(igp_router_id) - 1);
1192 LOG_INFO("igp router id includes DR: %s %s", igp_router_id, dr);
1193 }
1194
1195 inet_ntop(PF_INET, node.ospf_area_Id, ospf_area_id, sizeof(ospf_area_id));
1196
1197 } else {
1198 bzero(ospf_area_id, sizeof(ospf_area_id));
1199
1200 snprintf(igp_router_id, sizeof(igp_router_id),
1201 "%02hhX%02hhX.%02hhX%02hhX.%02hhX%02hhX.%02hhX%02hhX",
1202 node.igp_router_id[0], node.igp_router_id[1], node.igp_router_id[2], node.igp_router_id[3],
1203 node.igp_router_id[4], node.igp_router_id[5], node.igp_router_id[6], node.igp_router_id[7]);
1204
1205 if (node.isis_area_id[8] <= sizeof(node.isis_area_id))
1206 for (i=0; i < node.isis_area_id[8]; i++) {
1207 snprintf(buf2, sizeof(buf2), "%02hhX", node.isis_area_id[i]);
1208 strcat(isis_area_id, buf2);
1209
1210 if (i == 0)
1211 strcat(isis_area_id, ".");
1212 }
1213 }
1214
1215 buf_len += snprintf(buf2, sizeof(buf2),
1216 "%s\t%" PRIu64 "\t%s\t%s\t%s\t%s\t%s\t%s\t%" PRIu32 "\t%s\t%s\t%s\t%" PRIx64 "\t%" PRIx32 "\t%s"
1217 "\t%s\t%s\t%s\t%s\t%s\t%" PRIu32 "\t%" PRIu32 "\t%s\t%s\t%d\t%d\t%s\n",
1218 action.c_str(),ls_node_seq, hash_str.c_str(),path_hash_str.c_str(), r_hash_str.c_str(),
1219 router_ip.c_str(), peer_hash_str.c_str(), peer.peer_addr, peer.peer_as, ts.c_str(),
1220 igp_router_id, router_id, node.id, node.bgp_ls_id,node.mt_id, ospf_area_id, isis_area_id,
1221 node.protocol, node.flags, attr.as_path.c_str(), attr.local_pref, attr.med, attr.next_hop, node.name,
1222 peer.isPrePolicy, peer.isAdjIn, node.sr_capabilities_tlv);
1223
1224 // Cat the entry to the query buff
1225 if (buf_len < MSGBUS_WORKING_BUF_SIZE /* size of buf */)
1226 strcat(prep_buf, buf2);
1227
1228 ++ls_node_seq;
1229 }
1230
1231
1232 produce(MSGBUS_TOPIC_VAR_LS_NODE, prep_buf, buf_len, rows, peer_hash_str, &peer_list[peer_hash_str], peer.peer_as);
1233 }
1234
1235 /**
1236 * Abstract method Implementation - See MsgBusInterface.hpp for details
1237 */
update_LsLink(obj_bgp_peer & peer,obj_path_attr & attr,std::list<MsgBusInterface::obj_ls_link> & links,ls_action_code code)1238 void msgBus_kafka::update_LsLink(obj_bgp_peer &peer, obj_path_attr &attr, std::list<MsgBusInterface::obj_ls_link> &links,
1239 ls_action_code code) {
1240 bzero(prep_buf, MSGBUS_WORKING_BUF_SIZE);
1241
1242 char buf2[8192]; // Second working buffer
1243 int buf_len = 0; // query buffer length
1244 int i;
1245
1246 string hash_str;
1247 string r_hash_str;
1248 string path_hash_str;
1249 string peer_hash_str;
1250
1251 hash_toStr(peer.router_hash_id, r_hash_str);
1252 hash_toStr(attr.hash_id, path_hash_str);
1253 hash_toStr(peer.hash_id, peer_hash_str);
1254
1255 string action = "add";
1256 switch (code) {
1257 case LS_ACTION_ADD:
1258 action = "add";
1259 break;
1260 case LS_ACTION_DEL:
1261 action = "del";
1262 break;
1263 }
1264
1265 string ts;
1266 getTimestamp(peer.timestamp_secs, peer.timestamp_us, ts);
1267
1268 string local_node_hash_id;
1269 string remote_node_hash_id;
1270
1271 char intf_ip[46];
1272 char nei_ip[46];
1273 char igp_router_id[46];
1274 char remote_igp_router_id[46];
1275 char router_id[46];
1276 char remote_router_id[46];
1277 char ospf_area_id[17] = {0};
1278 char isis_area_id[33] = {0};
1279 char dr[16];
1280
1281 // Loop through the vector array of entries
1282 int rows = 0;
1283 for (std::list<MsgBusInterface::obj_ls_link>::iterator it = links.begin();
1284 it != links.end(); it++) {
1285
1286 ++rows;
1287 MsgBusInterface::obj_ls_link &link = (*it);
1288
1289 MD5 hash;
1290
1291 hash.update(link.intf_addr, sizeof(link.intf_addr));
1292 hash.update(link.nei_addr, sizeof(link.nei_addr));
1293 hash.update((unsigned char *)&link.id, sizeof(link.id));
1294 hash.update(link.local_node_hash_id, sizeof(link.local_node_hash_id));
1295 hash.update(link.remote_node_hash_id, sizeof(link.remote_node_hash_id));
1296 hash.update((unsigned char *)&link.local_link_id, sizeof(link.local_link_id));
1297 hash.update((unsigned char *)&link.remote_link_id, sizeof(link.remote_link_id));
1298 hash.update((unsigned char *)peer_hash_str.c_str(), peer_hash_str.length());
1299 hash.update((unsigned char *)&link.mt_id, sizeof(link.mt_id));
1300 hash.finalize();
1301
1302 // Save the hash
1303 unsigned char *hash_bin = hash.raw_digest();
1304 memcpy(link.hash_id, hash_bin, 16);
1305 delete[] hash_bin;
1306
1307 hash_toStr(link.hash_id, hash_str);
1308 hash_toStr(link.local_node_hash_id, local_node_hash_id);
1309 hash_toStr(link.remote_node_hash_id, remote_node_hash_id);
1310
1311 int afi = link.isIPv4 ? PF_INET : PF_INET6;
1312
1313 inet_ntop(afi, link.intf_addr, intf_ip, sizeof(intf_ip));
1314 inet_ntop(afi, link.nei_addr, nei_ip, sizeof(nei_ip));
1315 inet_ntop(afi, link.router_id, router_id, sizeof(router_id));
1316 inet_ntop(afi, link.remote_router_id, remote_router_id, sizeof(remote_router_id));
1317
1318 if (!strcmp(link.protocol, "OSPFv3") or !strcmp(link.protocol, "OSPFv2") ) {
1319 bzero(isis_area_id, sizeof(isis_area_id));
1320
1321 inet_ntop(PF_INET, link.igp_router_id, igp_router_id, sizeof(igp_router_id));
1322
1323 if ((uint32_t) *(link.igp_router_id+4) != 0) {
1324 inet_ntop(PF_INET, link.igp_router_id+4, dr, sizeof(dr));
1325 strncat(igp_router_id, "[", sizeof(igp_router_id) - strlen(igp_router_id) - 1);
1326 strncat(igp_router_id, dr, sizeof(igp_router_id) - strlen(igp_router_id) - 1);
1327 strncat(igp_router_id, "]", sizeof(igp_router_id) - strlen(igp_router_id) - 1);
1328 }
1329
1330 inet_ntop(PF_INET, link.remote_igp_router_id, remote_igp_router_id, sizeof(remote_igp_router_id));
1331
1332 if ((uint32_t) *(link.remote_igp_router_id+4) != 0) {
1333 bzero(dr, sizeof(dr));
1334 inet_ntop(PF_INET, link.remote_igp_router_id+4, dr, sizeof(dr));
1335 strncat(remote_igp_router_id, "[", sizeof(remote_igp_router_id) - strlen(remote_igp_router_id) - 1);
1336 strncat(remote_igp_router_id, dr, sizeof(remote_igp_router_id) - strlen(remote_igp_router_id) - 1);
1337 strncat(remote_igp_router_id, "]", sizeof(remote_igp_router_id) - strlen(remote_igp_router_id) - 1);
1338 }
1339
1340 inet_ntop(PF_INET, link.ospf_area_Id, ospf_area_id, sizeof(ospf_area_id));
1341
1342 } else if (!strcmp(link.protocol, "IS-IS_L1") or !strcmp(link.protocol, "IS-IS_L2")) {
1343 bzero(ospf_area_id, sizeof(ospf_area_id));
1344
1345 snprintf(igp_router_id, sizeof(igp_router_id),
1346 "%02hhX%02hhX.%02hhX%02hhX.%02hhX%02hhX.%02hhX%02hhX",
1347 link.igp_router_id[0], link.igp_router_id[1], link.igp_router_id[2], link.igp_router_id[3],
1348 link.igp_router_id[4], link.igp_router_id[5], link.igp_router_id[6], link.igp_router_id[7]);
1349
1350 if (link.isis_area_id[8] <= sizeof(link.isis_area_id)) {
1351 for (i = 0; i < link.isis_area_id[8]; i++) {
1352 snprintf(buf2, sizeof(buf2), "%02hhX", link.isis_area_id[i]);
1353 strcat(isis_area_id, buf2);
1354
1355 if (i == 0)
1356 strcat(isis_area_id, ".");
1357 }
1358 }
1359
1360 snprintf(remote_igp_router_id, sizeof(remote_igp_router_id),
1361 "%02hhX%02hhX.%02hhX%02hhX.%02hhX%02hhX.%02hhX%02hhX",
1362 link.remote_igp_router_id[0], link.remote_igp_router_id[1], link.remote_igp_router_id[2], link.remote_igp_router_id[3],
1363 link.remote_igp_router_id[4], link.remote_igp_router_id[5], link.remote_igp_router_id[6], link.remote_igp_router_id[7]);
1364
1365
1366 } else /* static, direct, epe, ... */ {
1367 ospf_area_id[0] = 0;
1368 isis_area_id[0] = 0;
1369 igp_router_id[0] = 0;
1370 remote_igp_router_id[0] = 0;
1371
1372 inet_ntop(PF_INET, &link.local_bgp_router_id, router_id, sizeof(router_id));
1373 inet_ntop(PF_INET, &link.remote_bgp_router_id, remote_router_id, sizeof(remote_router_id));
1374 }
1375
1376
1377 buf_len += snprintf(buf2, sizeof(buf2),
1378 "%s\t%" PRIu64 "\t%s\t%s\t%s\t%s\t%s\t%s\t%" PRIu32 "\t%s\t%s\t%s\t%" PRIx64 "\t%" PRIx32 "\t%s\t%s\t%s\t%s\t%"
1379 PRIu32 "\t%" PRIu32 "\t%s\t%" PRIx32 "\t%" PRIu32 "\t%" PRIu32 "\t%s\t%s\t%" PRIu32 "\t%" PRIu32
1380 "\t%" PRIu32 "\t%" PRIu32 "\t%s\t%" PRIu32 "\t%s\t%s\t%s\t%s\t%s\t%s\t%s\t%s\t%" PRIu32 ""
1381 "\t%" PRIu32 "\t%s\t%d\t%d\t%s\n",
1382 action.c_str(), ls_link_seq, hash_str.c_str(), path_hash_str.c_str(),r_hash_str.c_str(),
1383 router_ip.c_str(), peer_hash_str.c_str(), peer.peer_addr, peer.peer_as, ts.c_str(),
1384 igp_router_id, router_id, link.id, link.bgp_ls_id, ospf_area_id,
1385 isis_area_id, link.protocol, attr.as_path.c_str(), attr.local_pref, attr.med, attr.next_hop,
1386 link.mt_id, link.local_link_id, link.remote_link_id, intf_ip, nei_ip, link.igp_metric,
1387 link.admin_group, link.max_link_bw, link.max_resv_bw, link.unreserved_bw, link.te_def_metric,
1388 link.protection_type, link.mpls_proto_mask, link.srlg, link.name, remote_node_hash_id.c_str(),
1389 local_node_hash_id.c_str(),remote_igp_router_id, remote_router_id,
1390 link.local_node_asn,link.remote_node_asn, link.peer_node_sid, peer.isPrePolicy, peer.isAdjIn,
1391 link.peer_adj_sid);
1392
1393 // Cat the entry to the query buff
1394 if (buf_len < MSGBUS_WORKING_BUF_SIZE /* size of buf */)
1395 strcat(prep_buf, buf2);
1396
1397 ++ls_link_seq;
1398 }
1399
1400 produce(MSGBUS_TOPIC_VAR_LS_LINK, prep_buf, strlen(prep_buf), rows, peer_hash_str,
1401 &peer_list[peer_hash_str], peer.peer_as);
1402 }
1403
1404 /**
1405 * Abstract method Implementation - See MsgBusInterface.hpp for details
1406 */
update_LsPrefix(obj_bgp_peer & peer,obj_path_attr & attr,std::list<MsgBusInterface::obj_ls_prefix> & prefixes,ls_action_code code)1407 void msgBus_kafka::update_LsPrefix(obj_bgp_peer &peer, obj_path_attr &attr, std::list<MsgBusInterface::obj_ls_prefix> &prefixes,
1408 ls_action_code code) {
1409 bzero(prep_buf, MSGBUS_WORKING_BUF_SIZE);
1410
1411 char buf2[8192]; // Second working buffer
1412 int buf_len = 0; // query buffer length
1413 int i;
1414
1415 string hash_str;
1416 string r_hash_str;
1417 string path_hash_str;
1418 string peer_hash_str;
1419
1420 hash_toStr(peer.router_hash_id, r_hash_str);
1421 hash_toStr(attr.hash_id, path_hash_str);
1422 hash_toStr(peer.hash_id, peer_hash_str);
1423
1424 string action = "add";
1425 switch (code) {
1426 case LS_ACTION_ADD:
1427 action = "add";
1428 break;
1429 case LS_ACTION_DEL:
1430 action = "del";
1431 break;
1432 }
1433
1434 string ts;
1435 getTimestamp(peer.timestamp_secs, peer.timestamp_us, ts);
1436
1437 string local_node_hash_id;
1438
1439 char intf_ip[46];
1440 char nei_ip[46];
1441 char igp_router_id[46];
1442 char router_id[46];
1443 char ospf_fwd_addr[46];
1444 char prefix_ip[46];
1445 char ospf_area_id[16] = {0};
1446 char isis_area_id[32] = {0};
1447 char dr[16];
1448
1449 // Loop through the vector array of entries
1450 int rows = 0;
1451 for (std::list<MsgBusInterface::obj_ls_prefix>::iterator it = prefixes.begin();
1452 it != prefixes.end(); it++) {
1453
1454 ++rows;
1455 MsgBusInterface::obj_ls_prefix &prefix = (*it);
1456
1457 MD5 hash;
1458
1459 hash.update(prefix.prefix_bin, sizeof(prefix.prefix_bin));
1460 hash.update(&prefix.prefix_len, 1);
1461 hash.update((unsigned char *)&prefix.id, sizeof(prefix.id));
1462 hash.update(prefix.local_node_hash_id, sizeof(prefix.local_node_hash_id));
1463 hash.update((unsigned char *)prefix.ospf_route_type, sizeof(prefix.ospf_route_type));
1464 hash.update((unsigned char *)&prefix.mt_id, sizeof(prefix.mt_id));
1465 hash.finalize();
1466
1467 // Save the hash
1468 unsigned char *hash_bin = hash.raw_digest();
1469 memcpy(prefix.hash_id, hash_bin, 16);
1470 delete[] hash_bin;
1471
1472 // Build the query
1473 hash_toStr(prefix.hash_id, hash_str);
1474 hash_toStr(prefix.local_node_hash_id, local_node_hash_id);
1475
1476 if (prefix.isIPv4) {
1477 inet_ntop(PF_INET, prefix.intf_addr, intf_ip, sizeof(intf_ip));
1478 inet_ntop(PF_INET, prefix.nei_addr, nei_ip, sizeof(nei_ip));
1479 inet_ntop(PF_INET, prefix.ospf_fwd_addr, ospf_fwd_addr, sizeof(ospf_fwd_addr));
1480 inet_ntop(PF_INET, prefix.prefix_bin, prefix_ip, sizeof(prefix_ip));
1481 inet_ntop(PF_INET, prefix.router_id, router_id, sizeof(router_id));
1482 } else {
1483 inet_ntop(PF_INET6, prefix.intf_addr, intf_ip, sizeof(intf_ip));
1484 inet_ntop(PF_INET6, prefix.nei_addr, nei_ip, sizeof(nei_ip));
1485 inet_ntop(PF_INET6, prefix.router_id, router_id, sizeof(router_id));
1486 inet_ntop(PF_INET6, prefix.ospf_fwd_addr, ospf_fwd_addr, sizeof(ospf_fwd_addr));
1487 inet_ntop(PF_INET6, prefix.prefix_bin, prefix_ip, sizeof(prefix_ip));
1488 }
1489
1490 if (!strcmp(prefix.protocol, "OSPFv3") or !strcmp(prefix.protocol, "OSPFv2") ) {
1491 bzero(isis_area_id, sizeof(isis_area_id));
1492
1493 inet_ntop(PF_INET, prefix.igp_router_id, igp_router_id, sizeof(igp_router_id));
1494
1495 if ((uint32_t) *(prefix.igp_router_id+4) != 0) {
1496 inet_ntop(PF_INET, prefix.igp_router_id+4, dr, sizeof(dr));
1497 strncat(igp_router_id, "[", sizeof(igp_router_id) - strlen(igp_router_id) - 1);
1498 strncat(igp_router_id, dr, sizeof(igp_router_id) - strlen(igp_router_id) - 1);
1499 strncat(igp_router_id, "]", sizeof(igp_router_id) - strlen(igp_router_id) - 1);
1500 }
1501
1502
1503 inet_ntop(PF_INET, prefix.ospf_area_Id, ospf_area_id, sizeof(ospf_area_id));
1504 } else {
1505 bzero(ospf_area_id, sizeof(ospf_area_id));
1506
1507 snprintf(igp_router_id, sizeof(igp_router_id),
1508 "%02hhX%02hhX.%02hhX%02hhX.%02hhX%02hhX.%02hhX%02hhX",
1509 prefix.igp_router_id[0], prefix.igp_router_id[1], prefix.igp_router_id[2], prefix.igp_router_id[3],
1510 prefix.igp_router_id[4], prefix.igp_router_id[5], prefix.igp_router_id[6], prefix.igp_router_id[7]);
1511
1512 if (prefix.isis_area_id[8] <= sizeof(prefix.isis_area_id))
1513 for (i=0; i < prefix.isis_area_id[8]; i++) {
1514 snprintf(buf2, sizeof(buf2), "%02hhX", prefix.isis_area_id[i]);
1515 strcat(isis_area_id, buf2);
1516
1517 if (i == 0)
1518 strcat(isis_area_id, ".");
1519 }
1520 }
1521
1522
1523 buf_len += snprintf(buf2, sizeof(buf2),
1524 "%s\t%" PRIu64 "\t%s\t%s\t%s\t%s\t%s\t%s\t%" PRIu32 "\t%s\t%s\t%s\t%" PRIx64 "\t%" PRIx32
1525 "\t%s\t%s\t%s\t%s\t%" PRIu32 "\t%" PRIu32 "\t%s\t%s\t%" PRIx32 "\t%s\t%s\t%" PRIu32 "\t%" PRIx64
1526 "\t%s\t%" PRIu32 "\t%s\t%d\t%d\t%d\t%s\n",
1527 action.c_str(), ls_prefix_seq, hash_str.c_str(), path_hash_str.c_str(), r_hash_str.c_str(),
1528 router_ip.c_str(), peer_hash_str.c_str(), peer.peer_addr, peer.peer_as, ts.c_str(),
1529 igp_router_id, router_id, prefix.id, prefix.bgp_ls_id, ospf_area_id, isis_area_id,
1530 prefix.protocol, attr.as_path.c_str(), attr.local_pref, attr.med, attr.next_hop, local_node_hash_id.c_str(),
1531 prefix.mt_id, prefix.ospf_route_type, prefix.igp_flags, prefix.route_tag, prefix.ext_route_tag,
1532 ospf_fwd_addr, prefix.metric, prefix_ip, prefix.prefix_len, peer.isPrePolicy, peer.isAdjIn,
1533 prefix.sid_tlv);
1534
1535 // Cat the entry to the query buff
1536 if (buf_len < MSGBUS_WORKING_BUF_SIZE /* size of buf */)
1537 strcat(prep_buf, buf2);
1538
1539 ++ls_prefix_seq;
1540 }
1541
1542 produce(MSGBUS_TOPIC_VAR_LS_PREFIX, prep_buf, strlen(prep_buf), rows, peer_hash_str,
1543 &peer_list[peer_hash_str], peer.peer_as);
1544 }
1545
1546 /**
1547 * Abstract method Implementation - See MsgBusInterface.hpp for details
1548 *
1549 * TODO: Consolidate this to single produce method
1550 */
send_bmp_raw(u_char * r_hash,obj_bgp_peer & peer,u_char * data,size_t data_len)1551 void msgBus_kafka::send_bmp_raw(u_char *r_hash, obj_bgp_peer &peer, u_char *data, size_t data_len) {
1552 string r_hash_str;
1553 string p_hash_str;
1554 RdKafka::Topic *topic = NULL;
1555
1556 hash_toStr(peer.hash_id, p_hash_str);
1557 hash_toStr(r_hash, r_hash_str);
1558
1559 if (data_len == 0)
1560 return;
1561
1562 while (isConnected == false) {
1563 LOG_WARN("rtr=%s: Not connected to Kafka, attempting to reconnect", router_ip.c_str());
1564 connect();
1565
1566 sleep(2);
1567 }
1568
1569 // if topic is disabled, don't bother producing the message
1570 if (!topicSel->topicEnabled(MSGBUS_TOPIC_VAR_BMP_RAW))
1571 return;
1572
1573 char headers[256];
1574 size_t hdr_len = snprintf(headers, sizeof(headers), "V: %s\nC_HASH_ID: %s\nR_HASH: %s\nR_IP: %s\nL: %lu\n\n",
1575 MSGBUS_API_VERSION, collector_hash.c_str(), r_hash_str.c_str(), router_ip.c_str(), data_len);
1576
1577 memcpy(producer_buf, headers, hdr_len);
1578 memcpy(producer_buf+hdr_len, data, data_len);
1579
1580 topic = topicSel->getTopic(MSGBUS_TOPIC_VAR_BMP_RAW, &router_group_name, &peer_list[p_hash_str], peer.peer_as);
1581 if (topic != NULL) {
1582 SELF_DEBUG("rtr=%s: Producing bmp raw message: topic=%s key=%s, msg size = %lu", router_ip.c_str(),
1583 topic->name().c_str(), r_hash_str.c_str(), data_len);
1584
1585 RdKafka::ErrorCode resp = producer->produce(topic, RdKafka::Topic::PARTITION_UA,
1586 RdKafka::Producer::RK_MSG_COPY /* Copy payload */,
1587 producer_buf, data_len + hdr_len,
1588 (const std::string *)&r_hash_str, NULL);
1589
1590 if (resp != RdKafka::ERR_NO_ERROR) {
1591 LOG_ERR("rtr=%s: Failed to produce bmp raw message: %s", router_ip.c_str(), RdKafka::err2str(resp).c_str());
1592 producer->poll(100);
1593 }
1594 }
1595 else {
1596 SELF_DEBUG("rtr=%s: failed to produce bmp raw message because topic couldn't be found: topic=%s key=%s, msg size = %lu",
1597 router_ip.c_str(), MSGBUS_TOPIC_VAR_BMP_RAW, r_hash_str.c_str(), data_len);
1598 }
1599
1600 producer->poll(0);
1601 }
1602
1603 /**
1604 * \brief Method to resolve the IP address to a hostname
1605 *
1606 * \param [in] name String name (ip address)
1607 * \param [out] hostname String reference for hostname
1608 *
1609 * \returns true if error, false if no error
1610 */
resolveIp(string name,string & hostname)1611 bool msgBus_kafka::resolveIp(string name, string &hostname) {
1612 addrinfo *ai;
1613 char host[255];
1614
1615 if (!getaddrinfo(name.c_str(), NULL, NULL, &ai)) {
1616
1617 if (!getnameinfo(ai->ai_addr,ai->ai_addrlen, host, sizeof(host), NULL, 0, NI_NAMEREQD)) {
1618 hostname.assign(host);
1619 LOG_INFO("resolve: %s to %s", name.c_str(), hostname.c_str());
1620 }
1621
1622 freeaddrinfo(ai);
1623 return false;
1624 }
1625
1626 return true;
1627 }
1628
1629 /*
1630 * Enable/disable debugs
1631 */
enableDebug()1632 void msgBus_kafka::enableDebug() {
1633 string value = "all";
1634 string errstr;
1635
1636 disconnect();
1637
1638 if (conf->set("debug", value, errstr) != RdKafka::Conf::CONF_OK) {
1639 LOG_ERR("Failed to enable debug on kafka producer confg: %s", errstr.c_str());
1640 }
1641
1642 connect();
1643
1644 debug = true;
1645
1646 }
disableDebug()1647 void msgBus_kafka::disableDebug() {
1648 string errstr;
1649 string value = "";
1650
1651 if (conf)
1652 conf->set("debug", value, errstr);
1653
1654 debug = false;
1655 }
1656