1 /*
2  * Copyright (c) 2013-2016 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  *
8  */
9 #include <cstdlib>
10 #include <cstring>
11 #include <iostream>
12 
13 #include <cinttypes>
14 
15 #include <librdkafka/rdkafkacpp.h>
16 #include <netdb.h>
17 #include <unistd.h>
18 
19 #include <thread>
20 #include <arpa/inet.h>
21 
22 #include "MsgBusImpl_kafka.h"
23 #include "KafkaEventCallback.h"
24 #include "KafkaDeliveryReportCallback.h"
25 #include "KafkaTopicSelector.h"
26 
27 #include <boost/algorithm/string/replace.hpp>
28 
29 #include <librdkafka/rdkafka.h>
30 
31 
32 #include "md5.h"
33 
34 using namespace std;
35 
36 /******************************************************************//**
37  * \brief This function will initialize and connect to Kafka.
38  *
39  * \details It is expected that this class will start off with a new connection.
40  *
41  *  \param [in] logPtr      Pointer to Logger instance
42  *  \param [in] cfg         Pointer to the config instance
43  *  \param [in] c_hash_id   Collector Hash ID
44  ********************************************************************/
msgBus_kafka(Logger * logPtr,Config * cfg,u_char * c_hash_id)45 msgBus_kafka::msgBus_kafka(Logger *logPtr, Config *cfg, u_char *c_hash_id) {
46     logger = logPtr;
47 
48     producer_buf = new unsigned char[MSGBUS_WORKING_BUF_SIZE];
49     prep_buf = new char[MSGBUS_WORKING_BUF_SIZE];
50 
51     hash_toStr(c_hash_id, collector_hash);
52 
53     isConnected = false;
54     conf = RdKafka::Conf::create(RdKafka::Conf::CONF_GLOBAL);
55 
56     disableDebug();
57 
58     // TODO: Init the topic selector class
59 
60     router_seq          = 0L;
61     collector_seq       = 0L;
62     peer_seq            = 0L;
63     base_attr_seq       = 0L;
64     unicast_prefix_seq  = 0L;
65     l3vpn_seq           = 0L;
66     evpn_seq            = 0L;
67     ls_node_seq         = 0L;
68     ls_link_seq         = 0L;
69     ls_prefix_seq       = 0L;
70     bmp_stat_seq        = 0L;
71 
72     this->cfg           = cfg;
73 
74     // Make the connection to the server
75     event_callback       = NULL;
76     delivery_callback    = NULL;
77     producer             = NULL;
78     topicSel             = NULL;
79 
80     router_ip.assign("");
81     bzero(router_hash, sizeof(router_hash));
82 
83     connect();
84 }
85 
86 /**
87  * Destructor
88  */
~msgBus_kafka()89 msgBus_kafka::~msgBus_kafka() {
90 
91     SELF_DEBUG("Destory msgBus Kafka instance");
92 
93     // Disconnect/term the router if not already done
94     MsgBusInterface::obj_router r_object;
95     bool router_defined = false;
96     for (int i=0; i < sizeof(router_hash); i++) {
97         if (router_hash[i] != 0) {
98             router_defined = true;
99             break;
100         }
101     }
102 
103     if (router_defined) {
104         bzero(&r_object, sizeof(r_object));
105         memcpy(r_object.hash_id, router_hash, sizeof(r_object.hash_id));
106         snprintf((char *)r_object.ip_addr, sizeof(r_object.ip_addr), "%s", router_ip.c_str());
107         r_object.term_reason_code = 65533;
108         snprintf(r_object.term_reason_text, sizeof(r_object.term_reason_text),
109                  "Connection closed");
110 
111         update_Router(r_object, msgBus_kafka::ROUTER_ACTION_TERM);
112     }
113 
114     sleep(2);
115 
116     delete [] producer_buf;
117     delete [] prep_buf;
118 
119     peer_list.clear();
120 
121     disconnect(500);
122 
123     delete conf;
124 }
125 
126 /**
127  * Disconnect from Kafka
128  */
disconnect(int wait_ms)129 void msgBus_kafka::disconnect(int wait_ms) {
130 
131     if (isConnected) {
132         int i = 0;
133         while (producer->outq_len() > 0 and i < 8) {
134             LOG_INFO("Waiting for producer to finish before disconnecting: outq=%d", producer->outq_len());
135             producer->poll(500);
136             i++;
137         }
138     }
139 
140     if (topicSel != NULL) delete topicSel;
141 
142     topicSel = NULL;
143 
144     if (producer != NULL) delete producer;
145     producer = NULL;
146 
147     // suggested by librdkafka to free memory
148     RdKafka::wait_destroyed(wait_ms);
149 
150     if (event_callback != NULL) delete event_callback;
151     event_callback = NULL;
152 
153     if (delivery_callback != NULL) delete delivery_callback;
154     delivery_callback = NULL;
155 
156     isConnected = false;
157 }
158 
159 /**
160  * Connects to Kafka broker
161  */
connect()162 void msgBus_kafka::connect() {
163     string errstr;
164     string value;
165     std::ostringstream rx_bytes, tx_bytes, sess_timeout, socket_timeout;
166     std::ostringstream q_buf_max_msgs, q_buf_max_kbytes, q_buf_max_ms,
167 		msg_send_max_retry, retry_backoff_ms;
168 
169     disconnect();
170 
171     /*
172      * Configure Kafka Producer (https://kafka.apache.org/08/configuration.html)
173      */
174     //TODO: Add config options to change these settings
175 
176     // Disable logging of connection close/idle timeouts caused by Kafka 0.9.x (connections.max.idle.ms)
177     //    See https://github.com/edenhill/librdkafka/issues/437 for more details.
178     // TODO: change this when librdkafka has better handling of the idle disconnects
179     value = "false";
180     if (conf->set("log.connection.close", value, errstr) != RdKafka::Conf::CONF_OK) {
181         LOG_ERR("Failed to configure log.connection.close=false: %s.", errstr.c_str());
182     }
183 
184     value = "true";
185     if (conf->set("api.version.request", value, errstr) != RdKafka::Conf::CONF_OK) {
186         LOG_ERR("Failed to configure api.version.request=true: %s.", errstr.c_str());
187     }
188 
189     // TODO: Add config for address family - default is any
190     /*value = "v4";
191     if (conf->set("broker.address.family", value, errstr) != RdKafka::Conf::CONF_OK) {
192         LOG_ERR("Failed to configure broker.address.family: %s.", errstr.c_str());
193     }*/
194 
195 
196     // Batch message number
197     value = "100";
198     if (conf->set("batch.num.messages", value, errstr) != RdKafka::Conf::CONF_OK) {
199         LOG_ERR("Failed to configure batch.num.messages for kafka: %s.", errstr.c_str());
200         throw "ERROR: Failed to configure kafka batch.num.messages";
201     }
202 
203     // Batch message max wait time (in ms)
204     q_buf_max_ms << cfg->q_buf_max_ms;
205     if (conf->set("queue.buffering.max.ms", q_buf_max_ms.str(), errstr) != RdKafka::Conf::CONF_OK) {
206         LOG_ERR("Failed to configure queue.buffering.max.ms for kafka: %s.", errstr.c_str());
207         throw "ERROR: Failed to configure kafka queue.buffer.max.ms";
208     }
209 
210 
211     // compression
212     value = cfg->compression;
213     if (conf->set("compression.codec", value, errstr) != RdKafka::Conf::CONF_OK) {
214         LOG_ERR("Failed to configure %s compression for kafka: %s.", value.c_str(), errstr.c_str());
215         throw "ERROR: Failed to configure kafka compression";
216     }
217 
218     // broker list
219     if (conf->set("metadata.broker.list", cfg->kafka_brokers, errstr) != RdKafka::Conf::CONF_OK) {
220         LOG_ERR("Failed to configure broker list for kafka: %s", errstr.c_str());
221         throw "ERROR: Failed to configure kafka broker list";
222     }
223 
224     // Maximum transmit byte size
225     tx_bytes << cfg->tx_max_bytes;
226     if (conf->set("message.max.bytes", tx_bytes.str(),
227                              errstr) != RdKafka::Conf::CONF_OK)
228     {
229        LOG_ERR("Failed to configure transmit max message size for kafka: %s",
230                                errstr.c_str());
231        throw "ERROR: Failed to configure transmit max message size";
232     }
233 
234     // Maximum receive byte size
235     rx_bytes << cfg->rx_max_bytes;
236     if (conf->set("receive.message.max.bytes", rx_bytes.str(),
237                              errstr) != RdKafka::Conf::CONF_OK)
238     {
239        LOG_ERR("Failed to configure receive max message size for kafka: %s",
240                                errstr.c_str());
241        throw "ERROR: Failed to configure receive max message size";
242     }
243 
244     // Client group session and failure detection timeout
245     sess_timeout << cfg->session_timeout;
246     if (conf->set("session.timeout.ms", sess_timeout.str(),
247                              errstr) != RdKafka::Conf::CONF_OK)
248     {
249        LOG_ERR("Failed to configure session timeout for kafka: %s",
250                                errstr.c_str());
251        throw "ERROR: Failed to configure session timeout ";
252     }
253 
254     // Timeout for network requests
255     socket_timeout << cfg->socket_timeout;
256     if (conf->set("socket.timeout.ms", socket_timeout.str(),
257                              errstr) != RdKafka::Conf::CONF_OK)
258     {
259        LOG_ERR("Failed to configure socket timeout for kafka: %s",
260                                errstr.c_str());
261        throw "ERROR: Failed to configure socket timeout ";
262     }
263 
264     // Maximum number of messages allowed on the producer queue
265     q_buf_max_msgs << cfg->q_buf_max_msgs;
266     if (conf->set("queue.buffering.max.messages", q_buf_max_msgs.str(),
267                              errstr) != RdKafka::Conf::CONF_OK)
268     {
269        LOG_ERR("Failed to configure max messages in buffer for kafka: %s",
270                                errstr.c_str());
271        throw "ERROR: Failed to configure max messages in buffer ";
272     }
273 
274     // Maximum number of messages allowed on the producer queue
275     q_buf_max_kbytes << cfg->q_buf_max_kbytes;
276     if (conf->set("queue.buffering.max.kbytes", q_buf_max_kbytes.str(),
277                   errstr) != RdKafka::Conf::CONF_OK)
278     {
279         LOG_ERR("Failed to configure max kbytes in buffer for kafka: %s",
280                 errstr.c_str());
281         throw "ERROR: Failed to configure max kbytes in buffer ";
282     }
283 
284 
285     // How many times to retry sending a failing MessageSet
286     msg_send_max_retry << cfg->msg_send_max_retry;
287     if (conf->set("message.send.max.retries", msg_send_max_retry.str(),
288                              errstr) != RdKafka::Conf::CONF_OK)
289     {
290        LOG_ERR("Failed to configure max retries for sending "
291                "failed message for kafka: %s",
292                                errstr.c_str());
293        throw "ERROR: Failed to configure max retries for sending failed message";
294     }
295 
296     // Backoff time in ms before retrying a message send
297     retry_backoff_ms << cfg->retry_backoff_ms;
298     if (conf->set("retry.backoff.ms", retry_backoff_ms.str(),
299                              errstr) != RdKafka::Conf::CONF_OK)
300     {
301        LOG_ERR("Failed to configure backoff time before retrying to send"
302                "failed message for kafka: %s",
303                                errstr.c_str());
304        throw "ERROR: Failed to configure backoff time before resending"
305              " failed messages ";
306     }
307 
308     // Register event callback
309     event_callback = new KafkaEventCallback(&isConnected, logger);
310     if (conf->set("event_cb", event_callback, errstr) != RdKafka::Conf::CONF_OK) {
311         LOG_ERR("Failed to configure kafka event callback: %s", errstr.c_str());
312         throw "ERROR: Failed to configure kafka event callback";
313     }
314 
315     // Register delivery report callback
316     /*
317     delivery_callback = new KafkaDeliveryReportCallback();
318 
319     if (conf->set("dr_cb", delivery_callback, errstr) != RdKafka::Conf::CONF_OK) {
320         LOG_ERR("Failed to configure kafka delivery report callback: %s", errstr.c_str());
321         throw "ERROR: Failed to configure kafka delivery report callback";
322     }
323     */
324 
325 
326     // Create producer and connect
327     producer = RdKafka::Producer::create(conf, errstr);
328     if (producer == NULL) {
329         LOG_ERR("rtr=%s: Failed to create producer: %s", router_ip.c_str(), errstr.c_str());
330         throw "ERROR: Failed to create producer";
331     }
332 
333     isConnected = true;
334 
335     producer->poll(1000);
336 
337     if (not isConnected) {
338         LOG_ERR("rtr=%s: Failed to connect to Kafka, will try again in a few", router_ip.c_str());
339         return;
340 
341     }
342 
343     /*
344      * Initialize the topic selector/handler
345      */
346     try {
347         topicSel = new KafkaTopicSelector(logger, cfg, producer);
348 
349     } catch (char const *str) {
350         LOG_ERR("rtr=%s: Failed to create one or more topics, will try again in a few: err=%s", router_ip.c_str(), str);
351         isConnected = false;
352         return;
353     }
354 
355     producer->poll(100);
356 }
357 
358 /**
359  * produce message to Kafka
360  *
361  * \param [in] topic_var     Topic var to use in KafkaTopicSelector::getTopic() MSGBUS_TOPIC_VAR_*
362  * \param [in] msg           message to produce
363  * \param [in] msg_size      Length in bytes of the message
364  * \param [in] rows          Number of rows
365  * \param [in] key           Hash key
366  * \param [in] peer_group    Peer group name - empty/NULL if not set or used
367  * \param [in] peer_asn      Peer ASN
368  */
produce(const char * topic_var,char * msg,size_t msg_size,int rows,string key,const string * peer_group,uint32_t peer_asn)369 void msgBus_kafka::produce(const char *topic_var, char *msg, size_t msg_size, int rows, string key,
370                            const string *peer_group, uint32_t peer_asn) {
371     size_t len;
372     RdKafka::Topic *topic = NULL;
373 
374     while (isConnected == false or topicSel == NULL) {
375         // Do not attempt to reconnect if this is the main process (router ip is null)
376         // Changed on 10/29/15 to support docker startup delay with kafka
377         /*
378         if (router_ip.size() <= 0) {
379             return;
380         }*/
381 
382         LOG_WARN("rtr=%s: Not connected to Kafka, attempting to reconnect", router_ip.c_str());
383         connect();
384 
385         sleep(1);
386     }
387 
388     // if topic is disabled, don't bother producing the message
389     // TODO: it would be more efficient to move this check to the top of the various update_* methods, but I'm not sure which parts of these methods have side-effects that need to be preserved.
390     if (!topicSel->topicEnabled(topic_var))
391         return;
392 
393     char headers[256];
394     len = snprintf(headers, sizeof(headers), "V: %s\nC_HASH_ID: %s\nT: %s\nL: %lu\nR: %d\n\n",
395             MSGBUS_API_VERSION, collector_hash.c_str(), topic_var, msg_size, rows);
396 
397     memcpy(producer_buf, headers, len);
398     memcpy(producer_buf+len, msg, msg_size);
399 
400 
401     topic = topicSel->getTopic(topic_var, &router_group_name, peer_group, peer_asn);
402     if (topic != NULL) {
403         SELF_DEBUG("rtr=%s: Producing message: topic=%s key=%s, msg size = %lu", router_ip.c_str(),
404                    topic->name().c_str(), key.c_str(), msg_size);
405 
406         RdKafka::ErrorCode resp = producer->produce(topic, RdKafka::Topic::PARTITION_UA,
407                                                     RdKafka::Producer::RK_MSG_COPY,
408                                                     producer_buf, msg_size + len,
409                                                     (const std::string *) &key, NULL);
410         if (resp != RdKafka::ERR_NO_ERROR) {
411             LOG_ERR("rtr=%s: Failed to produce message: %s", router_ip.c_str(), RdKafka::err2str(resp).c_str());
412             producer->poll(100);
413         }
414     } else {
415         LOG_NOTICE("rtr=%s: failed to produce message because topic couldn't be found: topic=%s key=%s, msg size = %lu", router_ip.c_str(),
416                    topic_var, key.c_str(), msg_size);
417     }
418 
419     producer->poll(0);
420 }
421 
422 /**
423  * Abstract method Implementation - See MsgBusInterface.hpp for details
424  */
update_Collector(obj_collector & c_object,collector_action_code action_code)425 void msgBus_kafka::update_Collector(obj_collector &c_object, collector_action_code action_code) {
426     char buf[4096]; // Misc working buffer
427 
428     string ts;
429     getTimestamp(c_object.timestamp_secs, c_object.timestamp_us, ts);
430 
431     char *action = const_cast<char *>("change");
432 
433     switch (action_code) {
434         case COLLECTOR_ACTION_STARTED:
435             action = const_cast<char *>("started");
436             break;
437         case COLLECTOR_ACTION_CHANGE:
438             action = const_cast<char *>("change");
439             break;
440         case COLLECTOR_ACTION_HEARTBEAT:
441             action = const_cast<char *>("heartbeat");
442             break;
443         case COLLECTOR_ACTION_STOPPED:
444             action = const_cast<char *>("stopped");
445             break;
446     }
447 
448     snprintf(buf, sizeof(buf),
449              "%s\t%" PRIu64 "\t%s\t%s\t%s\t%u\t%s\n",
450              action, collector_seq, c_object.admin_id, collector_hash.c_str(),
451              c_object.routers, c_object.router_count, ts.c_str());
452 
453     produce(MSGBUS_TOPIC_VAR_COLLECTOR, buf, strlen(buf), 1, collector_hash, NULL, 0);
454 
455     collector_seq++;
456 }
457 
458 /**
459  * Abstract method Implementation - See MsgBusInterface.hpp for details
460  */
update_Router(obj_router & r_object,router_action_code code)461 void msgBus_kafka::update_Router(obj_router &r_object, router_action_code code) {
462     char buf[4096]; // Misc working buffer
463 
464     // Convert binary hash to string
465     string r_hash_str;
466     hash_toStr(r_object.hash_id, r_hash_str);
467 
468     bool skip_if_defined = true;
469 
470     string action = "first";
471 
472     switch (code) {
473         case ROUTER_ACTION_FIRST :
474             action.assign("first");
475             break;
476 
477         case ROUTER_ACTION_INIT :
478             skip_if_defined = false;
479             action.assign("init");
480             break;
481 
482         case ROUTER_ACTION_TERM:
483             skip_if_defined = false;
484             action.assign("term");
485             bzero(router_hash, sizeof(router_hash));
486             break;
487     }
488 
489 
490     // Check if we have already processed this entry, if so return
491     if (skip_if_defined) {
492         for (int i=0; i < sizeof(router_hash); i++) {
493             if (router_hash[i] != 0)
494                 return;
495         }
496     }
497 
498     if (code != ROUTER_ACTION_TERM)
499         memcpy(router_hash, r_object.hash_id, sizeof(router_hash));
500 
501     router_ip.assign((char *)r_object.ip_addr);                     // Update router IP for logging
502 
503     string descr((char *)r_object.descr);
504     boost::replace_all(descr, "\n", "\\n");
505     boost::replace_all(descr, "\t", " ");
506 
507     string initData(r_object.initiate_data);
508     boost::replace_all(initData, "\n", "\\n");
509     boost::replace_all(initData, "\t", " ");
510 
511     string termData(r_object.term_data);
512     boost::replace_all(termData, "\n", "\\n");
513     boost::replace_all(termData, "\t", " ");
514 
515     string ts;
516     getTimestamp(r_object.timestamp_secs, r_object.timestamp_us, ts);
517 
518     // Get the hostname
519     string hostname = "";
520     if (strlen((char *)r_object.name) <= 0) {
521         resolveIp((char *) r_object.ip_addr, hostname);
522         snprintf((char *)r_object.name, sizeof(r_object.name)-1, "%s", hostname.c_str());
523     }
524 
525     if (topicSel != NULL)
526         topicSel->lookupRouterGroup((char *)r_object.name, (char *)r_object.ip_addr, router_group_name);
527 
528     size_t size = snprintf(buf, sizeof(buf),
529              "%s\t%" PRIu64 "\t%s\t%s\t%s\t%s\t%" PRIu16 "\t%s\t%s\t%s\t%s\t%s\n", action.c_str(),
530              router_seq, r_object.name, r_hash_str.c_str(), r_object.ip_addr, descr.c_str(),
531              r_object.term_reason_code, r_object.term_reason_text,
532              initData.c_str(), termData.c_str(), ts.c_str(), r_object.bgp_id);
533 
534     produce(MSGBUS_TOPIC_VAR_ROUTER, buf, size, 1, r_hash_str, NULL, 0);
535 
536     router_seq++;
537 }
538 
539 /**
540  * Abstract method Implementation - See MsgBusInterface.hpp for details
541  */
update_Peer(obj_bgp_peer & peer,obj_peer_up_event * up,obj_peer_down_event * down,peer_action_code code)542 void msgBus_kafka::update_Peer(obj_bgp_peer &peer, obj_peer_up_event *up, obj_peer_down_event *down, peer_action_code code) {
543 
544     char buf[4096]; // Misc working buffer
545 
546     string r_hash_str;
547     hash_toStr(peer.router_hash_id, r_hash_str);
548 
549     // Generate the hash
550     MD5 hash;
551 
552     hash.update((unsigned char *) peer.peer_addr,
553                 strlen(peer.peer_addr));
554     hash.update((unsigned char *) peer.peer_rd, strlen(peer.peer_rd));
555     hash.update((unsigned char *)r_hash_str.c_str(), r_hash_str.length());
556 
557     /* TODO: Uncomment once this is fixed in XR
558      * Disable hashing the bgp peer ID since XR has an issue where it sends 0.0.0.0 on subsequent PEER_UP's
559      *    This will be fixed in XR, but for now we can disable hashing on it.
560      *
561     hash.update((unsigned char *) p_object.peer_bgp_id,
562             strlen(p_object.peer_bgp_id));
563     */
564 
565     hash.finalize();
566 
567     // Save the hash
568     unsigned char *hash_raw = hash.raw_digest();
569     memcpy(peer.hash_id, hash_raw, 16);
570     delete[] hash_raw;
571 
572     // Convert binary hash to string
573     string p_hash_str;
574     hash_toStr(peer.hash_id, p_hash_str);
575 
576     bool skip_if_in_cache = true;
577     bool add_to_cache = true;
578 
579     string action = "first";
580 
581     // Determine the action and if cache should be used or not - don't want to do too much in this switch block
582     switch (code) {
583         case PEER_ACTION_FIRST :
584             action.assign("first");
585             break;
586 
587         case PEER_ACTION_UP :
588             skip_if_in_cache = false;
589             action.assign("up");
590             break;
591 
592         case PEER_ACTION_DOWN:
593             skip_if_in_cache = false;
594             action.assign("down");
595             add_to_cache = false;
596 
597             if (peer_list.find(p_hash_str) != peer_list.end())
598                 peer_list.erase(p_hash_str);
599 
600             break;
601     }
602 
603     // Check if we have already processed this entry, if so return
604     if (skip_if_in_cache and peer_list.find(p_hash_str) != peer_list.end()) {
605         return;
606     }
607 
608     // Get the hostname using DNS
609     string hostname;
610     resolveIp(peer.peer_addr, hostname);
611 
612     string ts;
613     getTimestamp(peer.timestamp_secs, peer.timestamp_us, ts);
614 
615     // Insert/Update map entry
616     if (add_to_cache) {
617         if (topicSel != NULL)
618             topicSel->lookupPeerGroup(hostname, peer.peer_addr, peer.peer_as, peer_list[p_hash_str]);
619     }
620 
621     switch (code) {
622         case PEER_ACTION_FIRST :
623             snprintf(buf, sizeof(buf),
624                      "%s\t%" PRIu64 "\t%s\t%s\t%s\t%s\t%s\t%s\t%" PRIu32 "\t%s\t%s\t\t\t\t\t\t\t\t\t\t\t\t\t\t\t%d\t%d\t%d\t%d\t%d\t%s\n",
625                      action.c_str(), peer_seq, p_hash_str.c_str(), r_hash_str.c_str(), hostname.c_str(),
626                      peer.peer_bgp_id,router_ip.c_str(), ts.c_str(), peer.peer_as, peer.peer_addr,peer.peer_rd,
627                      peer.isL3VPN, peer.isPrePolicy, peer.isIPv4, peer.isLocRib, peer.isLocRibFiltered, peer.table_name);
628             action.assign("first");
629             break;
630 
631         case PEER_ACTION_UP : {
632             if (up == NULL)
633                 return;
634 
635             string infoData(up->info_data);
636             if (up->info_data[0] != 0) {
637                 boost::replace_all(infoData, "\n", "\\n");
638                 boost::replace_all(infoData, "\t", " ");
639             }
640 
641             snprintf(buf, sizeof(buf),
642                      "%s\t%" PRIu64 "\t%s\t%s\t%s\t%s\t%s\t%s\t%" PRIu32 "\t%s\t%s\t%" PRIu16 "\t%" PRIu32 "\t%s\t%" PRIu16
643                              "\t%s\t%s\t%s\t%s\t%" PRIu16 "\t%" PRIu16 "\t\t\t\t\t%d\t%d\t%d\t%d\t%d\t%s\n",
644                      action.c_str(), peer_seq, p_hash_str.c_str(), r_hash_str.c_str(), hostname.c_str(),
645                      peer.peer_bgp_id, router_ip.c_str(), ts.c_str(), peer.peer_as, peer.peer_addr, peer.peer_rd,
646 
647                     /* Peer UP specific fields */
648                      up->remote_port, up->local_asn, up->local_ip, up->local_port, up->local_bgp_id, infoData.c_str(), up->sent_cap,
649                      up->recv_cap, up->remote_hold_time, up->local_hold_time,
650                      peer.isL3VPN, peer.isPrePolicy, peer.isIPv4, peer.isLocRib, peer.isLocRibFiltered, peer.table_name);
651 
652             skip_if_in_cache = false;
653             action.assign("up");
654             break;
655         }
656         case PEER_ACTION_DOWN: {
657             if (down == NULL)
658                 return;
659 
660             snprintf(buf, sizeof(buf),
661                      "%s\t%" PRIu64 "\t%s\t%s\t%s\t%s\t%s\t%s\t%" PRIu32 "\t%s\t%s\t\t\t\t\t\t\t\t\t\t\t%d\t%d\t%d\t%s\t%d\t%d\t%d\t%d\t%d\t\n",
662                      action.c_str(), peer_seq, p_hash_str.c_str(), r_hash_str.c_str(), hostname.c_str(),
663                      peer.peer_bgp_id, router_ip.c_str(), ts.c_str(), peer.peer_as, peer.peer_addr, peer.peer_rd,
664 
665                      /* Peer DOWN specific fields */
666                      down->bmp_reason, down->bgp_err_code, down->bgp_err_subcode, down->error_text,
667 
668                      peer.isL3VPN, peer.isPrePolicy, peer.isIPv4, peer.isLocRib, peer.isLocRibFiltered);
669 
670             skip_if_in_cache = false;
671             action.assign("down");
672             add_to_cache = false;
673 
674             if (peer_list.find(p_hash_str) != peer_list.end())
675                 peer_list.erase(p_hash_str);
676 
677             break;
678         }
679     }
680 
681     produce(MSGBUS_TOPIC_VAR_PEER, buf, strlen(buf), 1, p_hash_str, &peer_list[p_hash_str], peer.peer_as);
682 
683     peer_seq++;
684 }
685 
686 /**
687  * Abstract method Implementation - See MsgBusInterface.hpp for details
688  */
update_baseAttribute(obj_bgp_peer & peer,obj_path_attr & attr,base_attr_action_code code)689 void msgBus_kafka::update_baseAttribute(obj_bgp_peer &peer, obj_path_attr &attr, base_attr_action_code code) {
690 
691     prep_buf[0] = 0;
692     size_t  buf_len;                    // size of the message in buf
693 
694     string path_hash_str;
695     string p_hash_str;
696     string r_hash_str;
697     hash_toStr(peer.hash_id, p_hash_str);
698     hash_toStr(peer.router_hash_id, r_hash_str);
699 
700 
701     // Generate the hash
702     MD5 hash;
703 
704     //hash.update(path_object.peer_hash_id, HASH_SIZE);
705     hash.update((unsigned char *) attr.as_path.c_str(), attr.as_path.length());
706     hash.update((unsigned char *) attr.next_hop,
707                 strlen(attr.next_hop));
708     hash.update((unsigned char *) attr.aggregator,
709                 strlen(attr.aggregator));
710     hash.update((unsigned char *) attr.origin,
711                 strlen(attr.origin));
712     hash.update((unsigned char *) &attr.med, sizeof(attr.med));
713     hash.update((unsigned char *) &attr.local_pref,
714                 sizeof(attr.local_pref));
715 
716     hash.update((unsigned char *) attr.community_list.c_str(), attr.community_list.length());
717     hash.update((unsigned char *) attr.ext_community_list.c_str(), attr.ext_community_list.length());
718     hash.update((unsigned char *) p_hash_str.c_str(), p_hash_str.length());
719 
720     hash.finalize();
721 
722     // Save the hash
723     unsigned char *hash_raw = hash.raw_digest();
724     memcpy(attr.hash_id, hash_raw, 16);
725     delete[] hash_raw;
726 
727     hash_toStr(attr.hash_id, path_hash_str);
728 
729     string ts;
730     getTimestamp(peer.timestamp_secs, peer.timestamp_us, ts);
731 
732     buf_len =
733             snprintf(prep_buf, MSGBUS_WORKING_BUF_SIZE,
734                      "add\t%" PRIu64 "\t%s\t%s\t%s\t%s\t%s\t%" PRIu32 "\t%s\t%s\t%s\t%" PRIu16 "\t%" PRIu32
735                              "\t%s\t%" PRIu32 "\t%" PRIu32 "\t%s\t%s\t%s\t%s\t%d\t%d\t%s\t%s\n",
736                      base_attr_seq, path_hash_str.c_str(), r_hash_str.c_str(), router_ip.c_str(), p_hash_str.c_str(),
737                      peer.peer_addr,peer.peer_as, ts.c_str(),
738                      attr.origin, attr.as_path.c_str(), attr.as_path_count, attr.origin_as, attr.next_hop, attr.med,
739                      attr.local_pref, attr.aggregator, attr.community_list.c_str(), attr.ext_community_list.c_str(), attr.cluster_list.c_str(),
740                      attr.atomic_agg, attr.nexthop_isIPv4, attr.originator_id,attr.large_community_list.c_str());
741 
742     produce(MSGBUS_TOPIC_VAR_BASE_ATTRIBUTE, prep_buf, buf_len, 1, p_hash_str, &peer_list[p_hash_str], peer.peer_as);
743 
744     ++base_attr_seq;
745 }
746 
747 /**
748  * Abstract method Implementation - See MsgBusInterface.hpp for details
749  */
update_L3Vpn(obj_bgp_peer & peer,std::vector<obj_vpn> & vpn,obj_path_attr * attr,vpn_action_code code)750 void msgBus_kafka::update_L3Vpn(obj_bgp_peer &peer, std::vector<obj_vpn> &vpn,
751                                 obj_path_attr *attr, vpn_action_code code) {
752 
753     prep_buf[0] = 0;
754 
755     char    buf2[80000];                         // Second working buffer
756     size_t  buf_len = 0;                         // query buffer length
757 
758     string vpn_hash_str;
759     string path_hash_str;
760     string p_hash_str;
761     string r_hash_str;
762 
763     hash_toStr(peer.router_hash_id, r_hash_str);
764 
765     if (attr != NULL)
766         hash_toStr(attr->hash_id, path_hash_str);
767 
768     hash_toStr(peer.hash_id, p_hash_str);
769 
770     string ts;
771     getTimestamp(peer.timestamp_secs, peer.timestamp_us, ts);
772 
773     // Loop through the vector array of vpn entries
774     for (size_t i = 0; i < vpn.size(); i++) {
775 
776         // Generate the hash
777         MD5 hash;
778 
779         hash.update((unsigned char *) vpn[i].prefix, strlen(vpn[i].prefix));
780         hash.update(&vpn[i].prefix_len, sizeof(vpn[i].prefix_len));
781         hash.update((unsigned char *) vpn[i].rd_administrator_subfield.c_str(),
782                     vpn[i].rd_administrator_subfield.length());
783         hash.update((unsigned char *) vpn[i].rd_assigned_number.c_str(),
784                     vpn[i].rd_assigned_number.length());
785 
786         hash.update((unsigned char *) p_hash_str.c_str(), p_hash_str.length());
787 
788         // Add path ID to hash only if exists
789         if (vpn[i].path_id > 0)
790             hash.update((unsigned char *)&vpn[i].path_id, sizeof(vpn[i].path_id));
791 
792         /*
793          * Add constant "1" to hash if labels are present
794          *      Withdrawn and updated NLRI's do not carry the original label, therefore we cannot
795          *      hash on the label string.  Instead, we has on a constant value of 1.
796          */
797         if (vpn[i].labels[0] != 0) {
798             buf2[0] = 1;
799             hash.update((unsigned char *) buf2, 1);
800             buf2[0] = 0;
801         }
802 
803         hash.finalize();
804 
805         // Save the hash
806         unsigned char *hash_raw = hash.raw_digest();
807         memcpy(vpn[i].hash_id, hash_raw, 16);
808         delete[] hash_raw;
809 
810         // Build the query
811         hash_toStr(vpn[i].hash_id, vpn_hash_str);
812 
813         switch (code) {
814 
815             case VPN_ACTION_ADD:
816                 if (attr == NULL)
817                     return;
818 
819                 buf_len += snprintf(buf2, sizeof(buf2),
820                                     "add\t%" PRIu64 "\t%s\t%s\t%s\t%s\t%s\t%s\t%" PRIu32 "\t%s\t%s\t%d\t%d\t%s\t%s\t%" PRIu16
821                                             "\t%" PRIu32 "\t%s\t%" PRIu32 "\t%" PRIu32 "\t%s\t%s\t%s\t%s\t%d\t%d\t%s\t%" PRIu32
822                                             "\t%s\t%d\t%d\t%s:%s\t%d\t%s\n",
823                                     l3vpn_seq, vpn_hash_str.c_str(), r_hash_str.c_str(),
824                                     router_ip.c_str(),path_hash_str.c_str(), p_hash_str.c_str(),
825                                     peer.peer_addr, peer.peer_as, ts.c_str(), vpn[i].prefix, vpn[i].prefix_len,
826                                     vpn[i].isIPv4, attr->origin,
827                                     attr->as_path.c_str(), attr->as_path_count, attr->origin_as, attr->next_hop, attr->med, attr->local_pref,
828                                     attr->aggregator,
829                                     attr->community_list.c_str(), attr->ext_community_list.c_str(), attr->cluster_list.c_str(),
830                                     attr->atomic_agg, attr->nexthop_isIPv4,
831                                     attr->originator_id, vpn[i].path_id, vpn[i].labels, peer.isPrePolicy, peer.isAdjIn,
832                                     vpn[i].rd_administrator_subfield.c_str(), vpn[i].rd_assigned_number.c_str(), vpn[i].rd_type,
833                                     attr->large_community_list.c_str());
834 
835                 break;
836 
837             case VPN_ACTION_DEL:
838                 buf_len += snprintf(buf2, sizeof(buf2),
839                                     "del\t%" PRIu64 "\t%s\t%s\t%s\t\t%s\t%s\t%" PRIu32 "\t%s\t%s\t%d\t%d\t\t\t"
840                                             "\t\t\t\t\t\t\t\t\t\t\t\t%" PRIu32
841                                             "\t%s\t%d\t%d\t%s:%s\t%d\t\n",
842                                     l3vpn_seq, vpn_hash_str.c_str(), r_hash_str.c_str(),
843                                     router_ip.c_str(), p_hash_str.c_str(),
844                                     peer.peer_addr, peer.peer_as, ts.c_str(), vpn[i].prefix, vpn[i].prefix_len,
845                                     vpn[i].isIPv4, vpn[i].path_id, vpn[i].labels, peer.isPrePolicy, peer.isAdjIn,
846                                     vpn[i].rd_administrator_subfield.c_str(), vpn[i].rd_assigned_number.c_str(),
847                                     vpn[i].rd_type);
848                 break;
849 
850         }
851 
852         // Cat the entry to the query buff
853         if (buf_len < MSGBUS_WORKING_BUF_SIZE /* size of buf */)
854             strcat(prep_buf, buf2);
855 
856         ++l3vpn_seq;
857     }
858 
859     produce(MSGBUS_TOPIC_VAR_L3VPN, prep_buf, strlen(prep_buf), vpn.size(), p_hash_str,
860             &peer_list[p_hash_str], peer.peer_as);
861 }
862 
863 
864 /**
865  * Abstract method Implementation - See MsgBusInterface.hpp for details
866  */
update_eVPN(obj_bgp_peer & peer,std::vector<obj_evpn> & vpn,obj_path_attr * attr,vpn_action_code code)867 void msgBus_kafka::update_eVPN(obj_bgp_peer &peer, std::vector<obj_evpn> &vpn,
868                               obj_path_attr *attr, vpn_action_code code) {
869 
870     prep_buf[0] = 0;
871 
872     char    buf2[80000];                         // Second working buffer
873     size_t  buf_len = 0;                         // query buffer length
874 
875     string vpn_hash_str;
876     string path_hash_str;
877     string p_hash_str;
878     string r_hash_str;
879 
880     hash_toStr(peer.router_hash_id, r_hash_str);
881 
882     if (attr != NULL)
883         hash_toStr(attr->hash_id, path_hash_str);
884 
885     hash_toStr(peer.hash_id, p_hash_str);
886 
887     string ts;
888     getTimestamp(peer.timestamp_secs, peer.timestamp_us, ts);
889 
890     // Loop through the vector array of vpn entries
891     for (size_t i = 0; i < vpn.size(); i++) {
892 
893         // Generate the hash
894         MD5 hash;
895 
896         hash.update((unsigned char *) p_hash_str.c_str(), p_hash_str.length());
897 
898         hash.update((unsigned char *) vpn[i].mac, strlen(vpn[i].mac));
899         hash.update((unsigned char *) vpn[i].ip, strlen(vpn[i].ip));
900         hash.update(&vpn[i].ip_len, sizeof(vpn[i].ip_len));
901         hash.update((unsigned char *) vpn[i].ethernet_segment_identifier, strlen(vpn[i].ethernet_segment_identifier));
902         hash.update((unsigned char *) vpn[i].rd_administrator_subfield.c_str(),
903                     vpn[i].rd_administrator_subfield.length());
904         hash.update((unsigned char *) vpn[i].rd_assigned_number.c_str(),
905                     vpn[i].rd_assigned_number.length());
906 
907         // Add path ID to hash only if exists
908         if (vpn[i].path_id > 0)
909             hash.update((unsigned char *)&vpn[i].path_id, sizeof(vpn[i].path_id));
910 
911         hash.finalize();
912 
913         // Save the hash
914         unsigned char *hash_raw = hash.raw_digest();
915         memcpy(vpn[i].hash_id, hash_raw, 16);
916         delete[] hash_raw;
917 
918         // Build the query
919         hash_toStr(vpn[i].hash_id, vpn_hash_str);
920 
921         switch (code) {
922 
923             case VPN_ACTION_ADD:
924                 if (attr == NULL)
925                     return;
926 
927                 buf_len += snprintf(buf2, sizeof(buf2),
928                                     "add\t%" PRIu64 "\t%s\t%s\t%s\t%s\t%s\t%s\t%" PRIu32 "\t%s\t%s\t%s\t%" PRIu16
929                                         "\t%" PRIu32 "\t%s\t%" PRIu32 "\t%" PRIu32 "\t%s\t%s\t%s\t%s\t%d\t%d\t%s\t%" PRIu32
930                                         "\t%d\t%d\t%s:%s\t%d\t%d\t%s\t%s\t%s\t%d\t%s\t%d\t%s\t%" PRIu32 "\t%" PRIu32 "\n",
931                                     evpn_seq, vpn_hash_str.c_str(), r_hash_str.c_str(),
932                                     router_ip.c_str(),path_hash_str.c_str(), p_hash_str.c_str(),
933                                     peer.peer_addr, peer.peer_as, ts.c_str(),
934                                     attr->origin,
935                                     attr->as_path.c_str(), attr->as_path_count, attr->origin_as, attr->next_hop, attr->med, attr->local_pref,
936                                     attr->aggregator,
937                                     attr->community_list.c_str(), attr->ext_community_list.c_str(), attr->cluster_list.c_str(),
938                                     attr->atomic_agg, attr->nexthop_isIPv4,
939                                     attr->originator_id, vpn[i].path_id, peer.isPrePolicy, peer.isAdjIn,
940                                     vpn[i].rd_administrator_subfield.c_str(), vpn[i].rd_assigned_number.c_str(), vpn[i].rd_type,
941                                     vpn[i].originating_router_ip_len, vpn[i].originating_router_ip, vpn[i].ethernet_tag_id_hex,
942                                     vpn[i].ethernet_segment_identifier, vpn[i].mac_len,
943                                     vpn[i].mac, vpn[i].ip_len, vpn[i].ip, vpn[i].mpls_label_1, vpn[i].mpls_label_2);
944 
945                 break;
946 
947             case VPN_ACTION_DEL:
948                 buf_len += snprintf(buf2, sizeof(buf2),
949                                     "del\t%" PRIu64 "\t%s\t%s\t%s\t%s\t%s\t%s\t%" PRIu32 "\t%s\t\t\t"
950                                             "\t\t\t\t\t\t\t\t\t\t\t\t%" PRIu32
951                                             "\t%d\t%d\t%s:%s\t%d\t%d\t%s\t%s\t%s\t%d\t%s\t%d\t%s\t%" PRIu32 "\t%" PRIu32 "\n",
952                                     evpn_seq, vpn_hash_str.c_str(), r_hash_str.c_str(),
953                                     router_ip.c_str(),path_hash_str.c_str(), p_hash_str.c_str(),
954                                     peer.peer_addr, peer.peer_as, ts.c_str(),
955                                     vpn[i].path_id, peer.isPrePolicy, peer.isAdjIn,
956                                     vpn[i].rd_administrator_subfield.c_str(), vpn[i].rd_assigned_number.c_str(), vpn[i].rd_type,
957                                     vpn[i].originating_router_ip_len, vpn[i].originating_router_ip, vpn[i].ethernet_tag_id_hex,
958                                     vpn[i].ethernet_segment_identifier, vpn[i].mac_len,
959                                     vpn[i].mac, vpn[i].ip_len, vpn[i].ip, vpn[i].mpls_label_1, vpn[i].mpls_label_2);
960 
961                 break;
962 
963         }
964 
965         // Cat the entry to the query buff
966         if (buf_len < MSGBUS_WORKING_BUF_SIZE /* size of buf */)
967             strcat(prep_buf, buf2);
968 
969         ++evpn_seq;
970     }
971 
972     produce(MSGBUS_TOPIC_VAR_EVPN, prep_buf, strlen(prep_buf), vpn.size(), p_hash_str,
973             &peer_list[p_hash_str], peer.peer_as);
974 }
975 
976 
977 /**
978  * Abstract method Implementation - See MsgBusInterface.hpp for details
979  */
update_unicastPrefix(obj_bgp_peer & peer,std::vector<obj_rib> & rib,obj_path_attr * attr,unicast_prefix_action_code code)980 void msgBus_kafka::update_unicastPrefix(obj_bgp_peer &peer, std::vector<obj_rib> &rib,
981                                         obj_path_attr *attr, unicast_prefix_action_code code) {
982     //bzero(prep_buf, MSGBUS_WORKING_BUF_SIZE);
983     prep_buf[0] = 0;
984 
985     char    buf2[80000];                         // Second working buffer
986     size_t  buf_len = 0;                         // query buffer length
987 
988     string rib_hash_str;
989     string path_hash_str;
990     string p_hash_str;
991     string r_hash_str;
992 
993     hash_toStr(peer.router_hash_id, r_hash_str);
994 
995     if (attr != NULL)
996         hash_toStr(attr->hash_id, path_hash_str);
997 
998     hash_toStr(peer.hash_id, p_hash_str);
999 
1000     string action = "add";
1001     switch (code) {
1002         case UNICAST_PREFIX_ACTION_ADD:
1003             action = "add";
1004             break;
1005         case UNICAST_PREFIX_ACTION_DEL:
1006             action = "del";
1007             break;
1008     }
1009 
1010     string ts;
1011     getTimestamp(peer.timestamp_secs, peer.timestamp_us, ts);
1012 
1013     // Loop through the vector array of rib entries
1014     for (size_t i = 0; i < rib.size(); i++) {
1015 
1016         // Generate the hash
1017         MD5 hash;
1018 
1019         hash.update((unsigned char *) rib[i].prefix, strlen(rib[i].prefix));
1020         hash.update(&rib[i].prefix_len, sizeof(rib[i].prefix_len));
1021         hash.update((unsigned char *) p_hash_str.c_str(), p_hash_str.length());
1022 
1023         // Add path ID to hash only if exists
1024         if (rib[i].path_id > 0)
1025             hash.update((unsigned char *)&rib[i].path_id, sizeof(rib[i].path_id));
1026 
1027         /*
1028          * Add constant "1" to hash if labels are present
1029          *      Withdrawn and updated NLRI's do not carry the original label, therefore we cannot
1030          *      hash on the label string.  Instead, we has on a constant value of 1.
1031          */
1032         if (rib[i].labels[0] != 0) {
1033             buf2[0] = 1;
1034             hash.update((unsigned char *) buf2, 1);
1035             buf2[0] = 0;
1036         }
1037 
1038         hash.finalize();
1039 
1040         // Save the hash
1041         unsigned char *hash_raw = hash.raw_digest();
1042         memcpy(rib[i].hash_id, hash_raw, 16);
1043         delete[] hash_raw;
1044 
1045         // Build the query
1046         hash_toStr(rib[i].hash_id, rib_hash_str);
1047 
1048         switch (code) {
1049 
1050             case UNICAST_PREFIX_ACTION_ADD:
1051                 if (attr == NULL)
1052                     return;
1053 
1054                 buf_len += snprintf(buf2, sizeof(buf2),
1055                                     "%s\t%" PRIu64 "\t%s\t%s\t%s\t%s\t%s\t%s\t%" PRIu32 "\t%s\t%s\t%d\t%d\t%s\t%s\t%" PRIu16
1056                                             "\t%" PRIu32 "\t%s\t%" PRIu32 "\t%" PRIu32 "\t%s\t%s\t%s\t%s\t%d\t%d\t%s\t%" PRIu32
1057                                             "\t%s\t%d\t%d\t%s\n",
1058                                     action.c_str(), unicast_prefix_seq, rib_hash_str.c_str(), r_hash_str.c_str(),
1059                                     router_ip.c_str(),path_hash_str.c_str(), p_hash_str.c_str(),
1060                                     peer.peer_addr, peer.peer_as, ts.c_str(), rib[i].prefix, rib[i].prefix_len,
1061                                     rib[i].isIPv4, attr->origin,
1062                                     attr->as_path.c_str(), attr->as_path_count, attr->origin_as, attr->next_hop, attr->med, attr->local_pref,
1063                                     attr->aggregator,
1064                                     attr->community_list.c_str(), attr->ext_community_list.c_str(), attr->cluster_list.c_str(),
1065                                     attr->atomic_agg, attr->nexthop_isIPv4,
1066                                     attr->originator_id, rib[i].path_id, rib[i].labels, peer.isPrePolicy, peer.isAdjIn,
1067                                     attr->large_community_list.c_str());
1068                 break;
1069 
1070             case UNICAST_PREFIX_ACTION_DEL:
1071                 buf_len += snprintf(buf2, sizeof(buf2),
1072                                     "%s\t%" PRIu64 "\t%s\t%s\t%s\t\t%s\t%s\t%" PRIu32 "\t%s\t%s\t%d\t%d\t\t\t\t\t\t\t\t\t\t\t\t\t\t\t%" PRIu32
1073                                             "\t%s\t%d\t%d\t\n",
1074                                     action.c_str(), unicast_prefix_seq, rib_hash_str.c_str(), r_hash_str.c_str(),
1075                                     router_ip.c_str(), p_hash_str.c_str(),
1076                                     peer.peer_addr, peer.peer_as, ts.c_str(), rib[i].prefix, rib[i].prefix_len,
1077                                     rib[i].isIPv4, rib[i].path_id, rib[i].labels, peer.isPrePolicy, peer.isAdjIn);
1078                 break;
1079         }
1080 
1081         // Cat the entry to the query buff
1082         if (buf_len < MSGBUS_WORKING_BUF_SIZE /* size of buf */)
1083             strcat(prep_buf, buf2);
1084 
1085         ++unicast_prefix_seq;
1086 	++ribSeq;
1087     }
1088 
1089 
1090     produce(MSGBUS_TOPIC_VAR_UNICAST_PREFIX, prep_buf, strlen(prep_buf), rib.size(), p_hash_str,
1091             &peer_list[p_hash_str], peer.peer_as);
1092 }
1093 
1094 /**
1095  * Abstract method Implementation - See MsgBusInterface.hpp for details
1096  */
add_StatReport(obj_bgp_peer & peer,obj_stats_report & stats)1097 void msgBus_kafka::add_StatReport(obj_bgp_peer &peer, obj_stats_report &stats) {
1098     char buf[4096];                 // Misc working buffer
1099 
1100     // Build the query
1101     string p_hash_str;
1102     string r_hash_str;
1103     hash_toStr(peer.hash_id, p_hash_str);
1104     hash_toStr(peer.router_hash_id, r_hash_str);
1105 
1106     string ts;
1107     getTimestamp(peer.timestamp_secs, peer.timestamp_us, ts);
1108 
1109     snprintf(buf, sizeof(buf),
1110              "add\t%" PRIu64 "\t%s\t%s\t%s\t%s\t%" PRIu32 "\t%s\t%" PRIu32 "\t%" PRIu32 "\t%" PRIu32 "\t%" PRIu32 "\t%" PRIu32
1111                      "\t%" PRIu32 "\t%" PRIu32 "\t%" PRIu64 "\t%" PRIu64 "\n",
1112              bmp_stat_seq, r_hash_str.c_str(), router_ip.c_str(),p_hash_str.c_str(), peer.peer_addr, peer.peer_as, ts.c_str(),
1113              stats.prefixes_rej,stats.known_dup_prefixes, stats.known_dup_withdraws, stats.invalid_cluster_list,
1114              stats.invalid_as_path_loop, stats.invalid_originator_id, stats.invalid_as_confed_loop,
1115              stats.routes_adj_rib_in, stats.routes_loc_rib);
1116 
1117 
1118     produce(MSGBUS_TOPIC_VAR_BMP_STAT, buf, strlen(buf), 1, p_hash_str, &peer_list[p_hash_str], peer.peer_as);
1119     ++bmp_stat_seq;
1120 }
1121 
1122 /**
1123  * Abstract method Implementation - See MsgBusInterface.hpp for details
1124  */
update_LsNode(obj_bgp_peer & peer,obj_path_attr & attr,std::list<MsgBusInterface::obj_ls_node> & nodes,ls_action_code code)1125 void msgBus_kafka::update_LsNode(obj_bgp_peer &peer, obj_path_attr &attr, std::list<MsgBusInterface::obj_ls_node> &nodes,
1126                                   ls_action_code code) {
1127     bzero(prep_buf, MSGBUS_WORKING_BUF_SIZE);
1128 
1129     char    buf2[8192];                          // Second working buffer
1130     int     buf_len = 0;                         // query buffer length
1131     int     i;
1132 
1133     string hash_str;
1134     string r_hash_str;
1135     string path_hash_str;
1136     string peer_hash_str;
1137 
1138     hash_toStr(peer.router_hash_id, r_hash_str);
1139     hash_toStr(attr.hash_id, path_hash_str);
1140     hash_toStr(peer.hash_id, peer_hash_str);
1141 
1142     string action = "add";
1143     switch (code) {
1144         case LS_ACTION_ADD:
1145             action = "add";
1146             break;
1147         case LS_ACTION_DEL:
1148             action = "del";
1149             break;
1150     }
1151 
1152     string ts;
1153     getTimestamp(peer.timestamp_secs, peer.timestamp_us, ts);
1154 
1155     char igp_router_id[46];
1156     char router_id[46];
1157     char ospf_area_id[16] = {0};
1158     char isis_area_id[32] = {0};
1159     char dr[16];
1160 
1161     // Loop through the vector array of entries
1162     int rows = 0;
1163     for (std::list<MsgBusInterface::obj_ls_node>::iterator it = nodes.begin();
1164             it != nodes.end(); it++) {
1165         ++rows;
1166         MsgBusInterface::obj_ls_node &node = (*it);
1167 
1168         hash_toStr(node.hash_id, hash_str);
1169 
1170         if (node.isIPv4) {
1171             inet_ntop(PF_INET, node.router_id, router_id, sizeof(router_id));
1172         } else {
1173             inet_ntop(PF_INET6, node.router_id, router_id, sizeof(router_id));
1174         }
1175 
1176         if (!strcmp(node.protocol, "OSPFv3") or !strcmp(node.protocol, "OSPFv2") ) {
1177             bzero(isis_area_id, sizeof(isis_area_id));
1178             bzero(igp_router_id, sizeof(igp_router_id));
1179 
1180             // The first 4 octets are the router ID and the second 4 are the DR or ZERO if no DR
1181             inet_ntop(PF_INET, node.igp_router_id, igp_router_id, sizeof(igp_router_id));
1182 
1183             string hostname;
1184             resolveIp(igp_router_id, hostname);
1185             strncpy(node.name, hostname.c_str(), sizeof(node.name));
1186 
1187             if ((uint32_t) *(node.igp_router_id+4) != 0) {
1188                 inet_ntop(PF_INET, node.igp_router_id+4, dr, sizeof(dr));
1189                 strncat(igp_router_id, "[", sizeof(igp_router_id) - strlen(igp_router_id) - 1);
1190                 strncat(igp_router_id, dr, sizeof(igp_router_id) - strlen(igp_router_id) - 1);
1191                 strncat(igp_router_id, "]", sizeof(igp_router_id) - strlen(igp_router_id) - 1);
1192                 LOG_INFO("igp router id includes DR: %s %s", igp_router_id, dr);
1193             }
1194 
1195             inet_ntop(PF_INET, node.ospf_area_Id, ospf_area_id, sizeof(ospf_area_id));
1196 
1197         } else {
1198             bzero(ospf_area_id, sizeof(ospf_area_id));
1199 
1200             snprintf(igp_router_id, sizeof(igp_router_id),
1201                      "%02hhX%02hhX.%02hhX%02hhX.%02hhX%02hhX.%02hhX%02hhX",
1202                      node.igp_router_id[0], node.igp_router_id[1], node.igp_router_id[2], node.igp_router_id[3],
1203                      node.igp_router_id[4], node.igp_router_id[5], node.igp_router_id[6], node.igp_router_id[7]);
1204 
1205             if (node.isis_area_id[8] <= sizeof(node.isis_area_id))
1206                 for (i=0; i < node.isis_area_id[8]; i++) {
1207                     snprintf(buf2, sizeof(buf2), "%02hhX", node.isis_area_id[i]);
1208                     strcat(isis_area_id, buf2);
1209 
1210                     if (i == 0)
1211                         strcat(isis_area_id, ".");
1212                 }
1213         }
1214 
1215         buf_len += snprintf(buf2, sizeof(buf2),
1216                         "%s\t%" PRIu64 "\t%s\t%s\t%s\t%s\t%s\t%s\t%" PRIu32 "\t%s\t%s\t%s\t%" PRIx64 "\t%" PRIx32 "\t%s"
1217                                 "\t%s\t%s\t%s\t%s\t%s\t%" PRIu32 "\t%" PRIu32 "\t%s\t%s\t%d\t%d\t%s\n",
1218                         action.c_str(),ls_node_seq, hash_str.c_str(),path_hash_str.c_str(), r_hash_str.c_str(),
1219                         router_ip.c_str(), peer_hash_str.c_str(), peer.peer_addr, peer.peer_as, ts.c_str(),
1220                         igp_router_id, router_id, node.id, node.bgp_ls_id,node.mt_id, ospf_area_id, isis_area_id,
1221                         node.protocol, node.flags, attr.as_path.c_str(), attr.local_pref, attr.med, attr.next_hop, node.name,
1222                         peer.isPrePolicy, peer.isAdjIn, node.sr_capabilities_tlv);
1223 
1224         // Cat the entry to the query buff
1225         if (buf_len < MSGBUS_WORKING_BUF_SIZE /* size of buf */)
1226             strcat(prep_buf, buf2);
1227 
1228         ++ls_node_seq;
1229     }
1230 
1231 
1232     produce(MSGBUS_TOPIC_VAR_LS_NODE, prep_buf, buf_len, rows, peer_hash_str, &peer_list[peer_hash_str], peer.peer_as);
1233 }
1234 
1235 /**
1236  * Abstract method Implementation - See MsgBusInterface.hpp for details
1237  */
update_LsLink(obj_bgp_peer & peer,obj_path_attr & attr,std::list<MsgBusInterface::obj_ls_link> & links,ls_action_code code)1238 void msgBus_kafka::update_LsLink(obj_bgp_peer &peer, obj_path_attr &attr, std::list<MsgBusInterface::obj_ls_link> &links,
1239                                  ls_action_code code) {
1240     bzero(prep_buf, MSGBUS_WORKING_BUF_SIZE);
1241 
1242     char    buf2[8192];                          // Second working buffer
1243     int     buf_len = 0;                         // query buffer length
1244     int     i;
1245 
1246     string hash_str;
1247     string r_hash_str;
1248     string path_hash_str;
1249     string peer_hash_str;
1250 
1251     hash_toStr(peer.router_hash_id, r_hash_str);
1252     hash_toStr(attr.hash_id, path_hash_str);
1253     hash_toStr(peer.hash_id, peer_hash_str);
1254 
1255     string action = "add";
1256     switch (code) {
1257         case LS_ACTION_ADD:
1258             action = "add";
1259             break;
1260         case LS_ACTION_DEL:
1261             action = "del";
1262             break;
1263     }
1264 
1265     string ts;
1266     getTimestamp(peer.timestamp_secs, peer.timestamp_us, ts);
1267 
1268     string local_node_hash_id;
1269     string remote_node_hash_id;
1270 
1271     char intf_ip[46];
1272     char nei_ip[46];
1273     char igp_router_id[46];
1274     char remote_igp_router_id[46];
1275     char router_id[46];
1276     char remote_router_id[46];
1277     char ospf_area_id[17] = {0};
1278     char isis_area_id[33] = {0};
1279     char dr[16];
1280 
1281     // Loop through the vector array of entries
1282     int rows = 0;
1283     for (std::list<MsgBusInterface::obj_ls_link>::iterator it = links.begin();
1284          it != links.end(); it++) {
1285 
1286         ++rows;
1287         MsgBusInterface::obj_ls_link &link = (*it);
1288 
1289         MD5 hash;
1290 
1291         hash.update(link.intf_addr, sizeof(link.intf_addr));
1292         hash.update(link.nei_addr, sizeof(link.nei_addr));
1293         hash.update((unsigned char *)&link.id, sizeof(link.id));
1294         hash.update(link.local_node_hash_id, sizeof(link.local_node_hash_id));
1295         hash.update(link.remote_node_hash_id, sizeof(link.remote_node_hash_id));
1296         hash.update((unsigned char *)&link.local_link_id, sizeof(link.local_link_id));
1297         hash.update((unsigned char *)&link.remote_link_id, sizeof(link.remote_link_id));
1298         hash.update((unsigned char *)peer_hash_str.c_str(), peer_hash_str.length());
1299         hash.update((unsigned char *)&link.mt_id, sizeof(link.mt_id));
1300         hash.finalize();
1301 
1302         // Save the hash
1303         unsigned char *hash_bin = hash.raw_digest();
1304         memcpy(link.hash_id, hash_bin, 16);
1305         delete[] hash_bin;
1306 
1307         hash_toStr(link.hash_id, hash_str);
1308         hash_toStr(link.local_node_hash_id, local_node_hash_id);
1309         hash_toStr(link.remote_node_hash_id, remote_node_hash_id);
1310 
1311         int afi = link.isIPv4 ? PF_INET : PF_INET6;
1312 
1313         inet_ntop(afi, link.intf_addr, intf_ip, sizeof(intf_ip));
1314         inet_ntop(afi, link.nei_addr, nei_ip, sizeof(nei_ip));
1315         inet_ntop(afi, link.router_id, router_id, sizeof(router_id));
1316         inet_ntop(afi, link.remote_router_id, remote_router_id, sizeof(remote_router_id));
1317 
1318         if (!strcmp(link.protocol, "OSPFv3") or !strcmp(link.protocol, "OSPFv2") ) {
1319             bzero(isis_area_id, sizeof(isis_area_id));
1320 
1321             inet_ntop(PF_INET, link.igp_router_id, igp_router_id, sizeof(igp_router_id));
1322 
1323             if ((uint32_t) *(link.igp_router_id+4) != 0) {
1324                 inet_ntop(PF_INET, link.igp_router_id+4, dr, sizeof(dr));
1325                 strncat(igp_router_id, "[", sizeof(igp_router_id) - strlen(igp_router_id) - 1);
1326                 strncat(igp_router_id, dr, sizeof(igp_router_id) - strlen(igp_router_id) - 1);
1327                 strncat(igp_router_id, "]", sizeof(igp_router_id) - strlen(igp_router_id) - 1);
1328             }
1329 
1330             inet_ntop(PF_INET, link.remote_igp_router_id, remote_igp_router_id, sizeof(remote_igp_router_id));
1331 
1332             if ((uint32_t) *(link.remote_igp_router_id+4) != 0) {
1333                 bzero(dr, sizeof(dr));
1334                 inet_ntop(PF_INET, link.remote_igp_router_id+4, dr, sizeof(dr));
1335                 strncat(remote_igp_router_id, "[", sizeof(remote_igp_router_id) - strlen(remote_igp_router_id) - 1);
1336                 strncat(remote_igp_router_id, dr, sizeof(remote_igp_router_id) - strlen(remote_igp_router_id) - 1);
1337                 strncat(remote_igp_router_id, "]", sizeof(remote_igp_router_id) - strlen(remote_igp_router_id) - 1);
1338             }
1339 
1340             inet_ntop(PF_INET, link.ospf_area_Id, ospf_area_id, sizeof(ospf_area_id));
1341 
1342         } else if (!strcmp(link.protocol, "IS-IS_L1") or !strcmp(link.protocol, "IS-IS_L2")) {
1343             bzero(ospf_area_id, sizeof(ospf_area_id));
1344 
1345             snprintf(igp_router_id, sizeof(igp_router_id),
1346                      "%02hhX%02hhX.%02hhX%02hhX.%02hhX%02hhX.%02hhX%02hhX",
1347                      link.igp_router_id[0], link.igp_router_id[1], link.igp_router_id[2], link.igp_router_id[3],
1348                      link.igp_router_id[4], link.igp_router_id[5], link.igp_router_id[6], link.igp_router_id[7]);
1349 
1350             if (link.isis_area_id[8] <= sizeof(link.isis_area_id)) {
1351                 for (i = 0; i < link.isis_area_id[8]; i++) {
1352                     snprintf(buf2, sizeof(buf2), "%02hhX", link.isis_area_id[i]);
1353                     strcat(isis_area_id, buf2);
1354 
1355                     if (i == 0)
1356                         strcat(isis_area_id, ".");
1357                 }
1358             }
1359 
1360             snprintf(remote_igp_router_id, sizeof(remote_igp_router_id),
1361                      "%02hhX%02hhX.%02hhX%02hhX.%02hhX%02hhX.%02hhX%02hhX",
1362                      link.remote_igp_router_id[0], link.remote_igp_router_id[1], link.remote_igp_router_id[2], link.remote_igp_router_id[3],
1363                      link.remote_igp_router_id[4], link.remote_igp_router_id[5], link.remote_igp_router_id[6], link.remote_igp_router_id[7]);
1364 
1365 
1366         } else /* static, direct, epe, ... */ {
1367             ospf_area_id[0]         = 0;
1368             isis_area_id[0]         = 0;
1369             igp_router_id[0]        = 0;
1370             remote_igp_router_id[0] = 0;
1371 
1372             inet_ntop(PF_INET, &link.local_bgp_router_id, router_id, sizeof(router_id));
1373             inet_ntop(PF_INET, &link.remote_bgp_router_id, remote_router_id, sizeof(remote_router_id));
1374         }
1375 
1376 
1377         buf_len += snprintf(buf2, sizeof(buf2),
1378                 "%s\t%" PRIu64 "\t%s\t%s\t%s\t%s\t%s\t%s\t%" PRIu32 "\t%s\t%s\t%s\t%" PRIx64 "\t%" PRIx32 "\t%s\t%s\t%s\t%s\t%"
1379                         PRIu32 "\t%" PRIu32 "\t%s\t%" PRIx32 "\t%" PRIu32 "\t%" PRIu32 "\t%s\t%s\t%" PRIu32 "\t%" PRIu32
1380                         "\t%" PRIu32 "\t%" PRIu32 "\t%s\t%" PRIu32 "\t%s\t%s\t%s\t%s\t%s\t%s\t%s\t%s\t%" PRIu32 ""
1381                         "\t%" PRIu32 "\t%s\t%d\t%d\t%s\n",
1382                             action.c_str(), ls_link_seq, hash_str.c_str(), path_hash_str.c_str(),r_hash_str.c_str(),
1383                             router_ip.c_str(), peer_hash_str.c_str(), peer.peer_addr, peer.peer_as, ts.c_str(),
1384                             igp_router_id, router_id, link.id, link.bgp_ls_id, ospf_area_id,
1385                             isis_area_id, link.protocol, attr.as_path.c_str(), attr.local_pref, attr.med, attr.next_hop,
1386                             link.mt_id, link.local_link_id, link.remote_link_id, intf_ip, nei_ip, link.igp_metric,
1387                             link.admin_group, link.max_link_bw, link.max_resv_bw, link.unreserved_bw, link.te_def_metric,
1388                             link.protection_type, link.mpls_proto_mask, link.srlg, link.name, remote_node_hash_id.c_str(),
1389                             local_node_hash_id.c_str(),remote_igp_router_id, remote_router_id,
1390                             link.local_node_asn,link.remote_node_asn, link.peer_node_sid, peer.isPrePolicy, peer.isAdjIn,
1391                             link.peer_adj_sid);
1392 
1393         // Cat the entry to the query buff
1394         if (buf_len < MSGBUS_WORKING_BUF_SIZE /* size of buf */)
1395             strcat(prep_buf, buf2);
1396 
1397         ++ls_link_seq;
1398     }
1399 
1400     produce(MSGBUS_TOPIC_VAR_LS_LINK, prep_buf, strlen(prep_buf), rows, peer_hash_str,
1401             &peer_list[peer_hash_str], peer.peer_as);
1402 }
1403 
1404 /**
1405  * Abstract method Implementation - See MsgBusInterface.hpp for details
1406  */
update_LsPrefix(obj_bgp_peer & peer,obj_path_attr & attr,std::list<MsgBusInterface::obj_ls_prefix> & prefixes,ls_action_code code)1407 void msgBus_kafka::update_LsPrefix(obj_bgp_peer &peer, obj_path_attr &attr, std::list<MsgBusInterface::obj_ls_prefix> &prefixes,
1408                                    ls_action_code code) {
1409     bzero(prep_buf, MSGBUS_WORKING_BUF_SIZE);
1410 
1411     char    buf2[8192];                          // Second working buffer
1412     int     buf_len = 0;                         // query buffer length
1413     int     i;
1414 
1415     string hash_str;
1416     string r_hash_str;
1417     string path_hash_str;
1418     string peer_hash_str;
1419 
1420     hash_toStr(peer.router_hash_id, r_hash_str);
1421     hash_toStr(attr.hash_id, path_hash_str);
1422     hash_toStr(peer.hash_id, peer_hash_str);
1423 
1424     string action = "add";
1425     switch (code) {
1426         case LS_ACTION_ADD:
1427             action = "add";
1428             break;
1429         case LS_ACTION_DEL:
1430             action = "del";
1431             break;
1432     }
1433 
1434     string ts;
1435     getTimestamp(peer.timestamp_secs, peer.timestamp_us, ts);
1436 
1437     string local_node_hash_id;
1438 
1439     char intf_ip[46];
1440     char nei_ip[46];
1441     char igp_router_id[46];
1442     char router_id[46];
1443     char ospf_fwd_addr[46];
1444     char prefix_ip[46];
1445     char ospf_area_id[16] = {0};
1446     char isis_area_id[32] = {0};
1447     char dr[16];
1448 
1449     // Loop through the vector array of entries
1450     int rows = 0;
1451     for (std::list<MsgBusInterface::obj_ls_prefix>::iterator it = prefixes.begin();
1452          it != prefixes.end(); it++) {
1453 
1454         ++rows;
1455         MsgBusInterface::obj_ls_prefix &prefix = (*it);
1456 
1457         MD5 hash;
1458 
1459         hash.update(prefix.prefix_bin, sizeof(prefix.prefix_bin));
1460         hash.update(&prefix.prefix_len, 1);
1461         hash.update((unsigned char *)&prefix.id, sizeof(prefix.id));
1462         hash.update(prefix.local_node_hash_id, sizeof(prefix.local_node_hash_id));
1463         hash.update((unsigned char *)prefix.ospf_route_type, sizeof(prefix.ospf_route_type));
1464         hash.update((unsigned char *)&prefix.mt_id, sizeof(prefix.mt_id));
1465         hash.finalize();
1466 
1467         // Save the hash
1468         unsigned char *hash_bin = hash.raw_digest();
1469         memcpy(prefix.hash_id, hash_bin, 16);
1470         delete[] hash_bin;
1471 
1472         // Build the query
1473         hash_toStr(prefix.hash_id, hash_str);
1474         hash_toStr(prefix.local_node_hash_id, local_node_hash_id);
1475 
1476         if (prefix.isIPv4) {
1477             inet_ntop(PF_INET, prefix.intf_addr, intf_ip, sizeof(intf_ip));
1478             inet_ntop(PF_INET, prefix.nei_addr, nei_ip, sizeof(nei_ip));
1479             inet_ntop(PF_INET, prefix.ospf_fwd_addr, ospf_fwd_addr, sizeof(ospf_fwd_addr));
1480             inet_ntop(PF_INET, prefix.prefix_bin, prefix_ip, sizeof(prefix_ip));
1481             inet_ntop(PF_INET, prefix.router_id, router_id, sizeof(router_id));
1482         } else {
1483             inet_ntop(PF_INET6, prefix.intf_addr, intf_ip, sizeof(intf_ip));
1484             inet_ntop(PF_INET6, prefix.nei_addr, nei_ip, sizeof(nei_ip));
1485             inet_ntop(PF_INET6, prefix.router_id, router_id, sizeof(router_id));
1486             inet_ntop(PF_INET6, prefix.ospf_fwd_addr, ospf_fwd_addr, sizeof(ospf_fwd_addr));
1487             inet_ntop(PF_INET6, prefix.prefix_bin, prefix_ip, sizeof(prefix_ip));
1488         }
1489 
1490         if (!strcmp(prefix.protocol, "OSPFv3") or !strcmp(prefix.protocol, "OSPFv2") ) {
1491             bzero(isis_area_id, sizeof(isis_area_id));
1492 
1493             inet_ntop(PF_INET, prefix.igp_router_id, igp_router_id, sizeof(igp_router_id));
1494 
1495             if ((uint32_t) *(prefix.igp_router_id+4) != 0) {
1496                 inet_ntop(PF_INET, prefix.igp_router_id+4, dr, sizeof(dr));
1497                 strncat(igp_router_id, "[", sizeof(igp_router_id) - strlen(igp_router_id) - 1);
1498                 strncat(igp_router_id, dr, sizeof(igp_router_id) - strlen(igp_router_id) - 1);
1499                 strncat(igp_router_id, "]", sizeof(igp_router_id) - strlen(igp_router_id) - 1);
1500             }
1501 
1502 
1503             inet_ntop(PF_INET, prefix.ospf_area_Id, ospf_area_id, sizeof(ospf_area_id));
1504         } else {
1505             bzero(ospf_area_id, sizeof(ospf_area_id));
1506 
1507             snprintf(igp_router_id, sizeof(igp_router_id),
1508                      "%02hhX%02hhX.%02hhX%02hhX.%02hhX%02hhX.%02hhX%02hhX",
1509                      prefix.igp_router_id[0], prefix.igp_router_id[1], prefix.igp_router_id[2], prefix.igp_router_id[3],
1510                      prefix.igp_router_id[4], prefix.igp_router_id[5], prefix.igp_router_id[6], prefix.igp_router_id[7]);
1511 
1512             if (prefix.isis_area_id[8] <= sizeof(prefix.isis_area_id))
1513                 for (i=0; i < prefix.isis_area_id[8]; i++) {
1514                     snprintf(buf2, sizeof(buf2), "%02hhX", prefix.isis_area_id[i]);
1515                     strcat(isis_area_id, buf2);
1516 
1517                     if (i == 0)
1518                         strcat(isis_area_id, ".");
1519                 }
1520         }
1521 
1522 
1523         buf_len += snprintf(buf2, sizeof(buf2),
1524                 "%s\t%" PRIu64 "\t%s\t%s\t%s\t%s\t%s\t%s\t%" PRIu32 "\t%s\t%s\t%s\t%" PRIx64 "\t%" PRIx32
1525                         "\t%s\t%s\t%s\t%s\t%" PRIu32 "\t%" PRIu32 "\t%s\t%s\t%" PRIx32 "\t%s\t%s\t%" PRIu32 "\t%" PRIx64
1526                             "\t%s\t%" PRIu32 "\t%s\t%d\t%d\t%d\t%s\n",
1527                             action.c_str(), ls_prefix_seq, hash_str.c_str(), path_hash_str.c_str(), r_hash_str.c_str(),
1528                             router_ip.c_str(), peer_hash_str.c_str(), peer.peer_addr, peer.peer_as, ts.c_str(),
1529                             igp_router_id, router_id, prefix.id, prefix.bgp_ls_id, ospf_area_id, isis_area_id,
1530                             prefix.protocol, attr.as_path.c_str(), attr.local_pref, attr.med, attr.next_hop, local_node_hash_id.c_str(),
1531                             prefix.mt_id, prefix.ospf_route_type, prefix.igp_flags, prefix.route_tag, prefix.ext_route_tag,
1532                             ospf_fwd_addr, prefix.metric, prefix_ip, prefix.prefix_len, peer.isPrePolicy, peer.isAdjIn,
1533                             prefix.sid_tlv);
1534 
1535         // Cat the entry to the query buff
1536         if (buf_len < MSGBUS_WORKING_BUF_SIZE /* size of buf */)
1537             strcat(prep_buf, buf2);
1538 
1539         ++ls_prefix_seq;
1540     }
1541 
1542     produce(MSGBUS_TOPIC_VAR_LS_PREFIX, prep_buf, strlen(prep_buf), rows, peer_hash_str,
1543             &peer_list[peer_hash_str], peer.peer_as);
1544 }
1545 
1546 /**
1547  * Abstract method Implementation - See MsgBusInterface.hpp for details
1548  *
1549  * TODO: Consolidate this to single produce method
1550  */
send_bmp_raw(u_char * r_hash,obj_bgp_peer & peer,u_char * data,size_t data_len)1551 void msgBus_kafka::send_bmp_raw(u_char *r_hash, obj_bgp_peer &peer, u_char *data, size_t data_len) {
1552     string r_hash_str;
1553     string p_hash_str;
1554     RdKafka::Topic *topic = NULL;
1555 
1556     hash_toStr(peer.hash_id, p_hash_str);
1557     hash_toStr(r_hash, r_hash_str);
1558 
1559     if (data_len == 0)
1560         return;
1561 
1562     while (isConnected == false) {
1563         LOG_WARN("rtr=%s: Not connected to Kafka, attempting to reconnect", router_ip.c_str());
1564         connect();
1565 
1566         sleep(2);
1567     }
1568 
1569     // if topic is disabled, don't bother producing the message
1570     if (!topicSel->topicEnabled(MSGBUS_TOPIC_VAR_BMP_RAW))
1571         return;
1572 
1573     char headers[256];
1574     size_t hdr_len = snprintf(headers, sizeof(headers), "V: %s\nC_HASH_ID: %s\nR_HASH: %s\nR_IP: %s\nL: %lu\n\n",
1575              MSGBUS_API_VERSION, collector_hash.c_str(), r_hash_str.c_str(), router_ip.c_str(), data_len);
1576 
1577     memcpy(producer_buf, headers, hdr_len);
1578     memcpy(producer_buf+hdr_len, data, data_len);
1579 
1580     topic = topicSel->getTopic(MSGBUS_TOPIC_VAR_BMP_RAW, &router_group_name, &peer_list[p_hash_str], peer.peer_as);
1581     if (topic != NULL) {
1582         SELF_DEBUG("rtr=%s: Producing bmp raw message: topic=%s key=%s, msg size = %lu", router_ip.c_str(),
1583                    topic->name().c_str(), r_hash_str.c_str(), data_len);
1584 
1585         RdKafka::ErrorCode resp = producer->produce(topic, RdKafka::Topic::PARTITION_UA,
1586                                                     RdKafka::Producer::RK_MSG_COPY /* Copy payload */,
1587                                                     producer_buf, data_len + hdr_len,
1588                                                     (const std::string *)&r_hash_str, NULL);
1589 
1590         if (resp != RdKafka::ERR_NO_ERROR) {
1591             LOG_ERR("rtr=%s: Failed to produce bmp raw message: %s", router_ip.c_str(), RdKafka::err2str(resp).c_str());
1592             producer->poll(100);
1593         }
1594     }
1595     else {
1596         SELF_DEBUG("rtr=%s: failed to produce bmp raw message because topic couldn't be found: topic=%s key=%s, msg size = %lu",
1597                    router_ip.c_str(), MSGBUS_TOPIC_VAR_BMP_RAW, r_hash_str.c_str(), data_len);
1598     }
1599 
1600     producer->poll(0);
1601 }
1602 
1603 /**
1604 * \brief Method to resolve the IP address to a hostname
1605 *
1606 *  \param [in]   name      String name (ip address)
1607 *  \param [out]  hostname  String reference for hostname
1608 *
1609 *  \returns true if error, false if no error
1610 */
resolveIp(string name,string & hostname)1611 bool msgBus_kafka::resolveIp(string name, string &hostname) {
1612     addrinfo *ai;
1613     char host[255];
1614 
1615     if (!getaddrinfo(name.c_str(), NULL, NULL, &ai)) {
1616 
1617         if (!getnameinfo(ai->ai_addr,ai->ai_addrlen, host, sizeof(host), NULL, 0, NI_NAMEREQD)) {
1618             hostname.assign(host);
1619             LOG_INFO("resolve: %s to %s", name.c_str(), hostname.c_str());
1620         }
1621 
1622         freeaddrinfo(ai);
1623         return false;
1624     }
1625 
1626     return true;
1627 }
1628 
1629 /*
1630  * Enable/disable debugs
1631  */
enableDebug()1632 void msgBus_kafka::enableDebug() {
1633     string value = "all";
1634     string errstr;
1635 
1636     disconnect();
1637 
1638     if (conf->set("debug", value, errstr) != RdKafka::Conf::CONF_OK) {
1639         LOG_ERR("Failed to enable debug on kafka producer confg: %s", errstr.c_str());
1640     }
1641 
1642     connect();
1643 
1644     debug = true;
1645 
1646 }
disableDebug()1647 void msgBus_kafka::disableDebug() {
1648     string errstr;
1649     string value = "";
1650 
1651     if (conf)
1652         conf->set("debug", value, errstr);
1653 
1654     debug = false;
1655 }
1656