1 /*
2     pmacct (Promiscuous mode IP Accounting package)
3     pmacct is Copyright (C) 2003-2020 by Paolo Lucente
4 */
5 
6 /*
7     This program is free software; you can redistribute it and/or modify
8     it under the terms of the GNU General Public License as published by
9     the Free Software Foundation; either version 2 of the License, or
10     (at your option) any later version.
11 
12     This program is distributed in the hope that it will be useful,
13     but WITHOUT ANY WARRANTY; without even the implied warranty of
14     MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
15     GNU General Public License for more details.
16 
17     You should have received a copy of the GNU General Public License
18     along with this program; if not, write to the Free Software
19     Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.
20 */
21 
22 /* includes */
23 #include "pmacct.h"
24 #include "addr.h"
25 #include "bgp.h"
26 #include "bgp_xcs.h"
27 #include "rpki/rpki.h"
28 #include "bgp_blackhole.h"
29 #include "thread_pool.h"
30 #if defined WITH_RABBITMQ
31 #include "amqp_common.h"
32 #endif
33 #ifdef WITH_KAFKA
34 #include "kafka_common.h"
35 #endif
36 #if defined WITH_ZMQ
37 #include "zmq_common.h"
38 #endif
39 #if defined WITH_AVRO
40 #include "plugin_cmn_avro.h"
41 #endif
42 #include "bgp_lg.h"
43 
44 /* Global variables */
45 thread_pool_t *bgp_pool;
46 struct bgp_peer *peers;
47 struct bgp_peer_cache_bucket *peers_cache, *peers_port_cache;
48 char *std_comm_patterns[MAX_BGP_COMM_PATTERNS];
49 char *ext_comm_patterns[MAX_BGP_COMM_PATTERNS];
50 char *lrg_comm_patterns[MAX_BGP_COMM_PATTERNS];
51 char *std_comm_patterns_to_asn[MAX_BGP_COMM_PATTERNS];
52 char *lrg_comm_patterns_to_asn[MAX_BGP_COMM_PATTERNS];
53 struct bgp_comm_range peer_src_as_ifrange;
54 struct bgp_comm_range peer_src_as_asrange;
55 u_int32_t (*bgp_route_info_modulo)(struct bgp_peer *, path_id_t *, int);
56 struct bgp_rt_structs inter_domain_routing_dbs[FUNC_TYPE_MAX], *bgp_routing_db;
57 struct bgp_misc_structs inter_domain_misc_dbs[FUNC_TYPE_MAX], *bgp_misc_db;
58 struct bgp_xconnects bgp_xcs_map;
59 
60 /* Functions */
bgp_daemon_wrapper()61 void bgp_daemon_wrapper()
62 {
63   /* initialize variables */
64   if (!config.bgp_daemon_port) config.bgp_daemon_port = BGP_TCP_PORT;
65 
66 #if defined WITH_ZMQ
67   if (config.bgp_lg) bgp_lg_wrapper();
68 #else
69   if (config.bgp_lg) {
70     Log(LOG_ERR, "ERROR ( %s/core/lg ): 'bgp_daemon_lg' requires --enable-zmq. Exiting.\n", config.name);
71     exit_gracefully(1);
72   }
73 #endif
74 
75   /* initialize threads pool */
76   bgp_pool = allocate_thread_pool(1);
77   assert(bgp_pool);
78   Log(LOG_DEBUG, "DEBUG ( %s/core/BGP ): %d thread(s) initialized\n", config.name, 1);
79   bgp_prepare_thread();
80 
81   /* giving a kick to the BGP thread */
82   send_to_pool(bgp_pool, skinny_bgp_daemon, NULL);
83 }
84 
skinny_bgp_daemon()85 void skinny_bgp_daemon()
86 {
87   skinny_bgp_daemon_online();
88 }
89 
skinny_bgp_daemon_online()90 void skinny_bgp_daemon_online()
91 {
92   int ret, rc, peers_idx, allowed, yes=1;
93 #if (defined IPV6_BINDV6ONLY)
94   int no=0;
95 #endif
96   int peers_idx_rr = 0, peers_xconnect_idx_rr = 0, max_peers_idx = 0;
97   struct plugin_requests req;
98   struct host_addr addr;
99   struct bgp_peer *peer;
100   char bgp_reply_pkt[BGP_BUFFER_SIZE], *bgp_reply_pkt_ptr;
101   char bgp_peer_str[INET6_ADDRSTRLEN], bgp_xconnect_peer_str[BGP_XCONNECT_STRLEN];
102   struct sockaddr_storage server, client;
103   afi_t afi;
104   safi_t safi;
105   time_t now, dump_refresh_deadline = {0};
106   struct hosts_table allow;
107   struct bgp_md5_table bgp_md5;
108   struct timeval dump_refresh_timeout, *drt_ptr;
109   struct bgp_peer_batch bp_batch;
110   socklen_t slen, clen = sizeof(client);
111 
112   sigset_t signal_set;
113 
114   /* select() stuff */
115   fd_set read_descs, bkp_read_descs;
116   int fd, select_fd, bkp_select_fd, recalc_fds, select_num;
117   int recv_fd, send_fd;
118 
119   /* initial cleanups */
120   reload_map_bgp_thread = FALSE;
121   reload_log_bgp_thread = FALSE;
122   memset(&server, 0, sizeof(server));
123   memset(&client, 0, sizeof(client));
124   memset(&allow, 0, sizeof(struct hosts_table));
125 
126   bgp_routing_db = &inter_domain_routing_dbs[FUNC_TYPE_BGP];
127   memset(bgp_routing_db, 0, sizeof(struct bgp_rt_structs));
128 
129   if (!config.bgp_table_attr_hash_buckets) config.bgp_table_attr_hash_buckets = HASHTABSIZE;
130   bgp_attr_init(config.bgp_table_attr_hash_buckets, bgp_routing_db);
131 
132   /* socket creation for BGP server: IPv4 only */
133   if (!config.bgp_daemon_ip) {
134     struct sockaddr_in6 *sa6 = (struct sockaddr_in6 *)&server;
135 
136     sa6->sin6_family = AF_INET6;
137     sa6->sin6_port = htons(config.bgp_daemon_port);
138     slen = sizeof(struct sockaddr_in6);
139   }
140   else {
141     trim_spaces(config.bgp_daemon_ip);
142     ret = str_to_addr(config.bgp_daemon_ip, &addr);
143     if (!ret) {
144       Log(LOG_ERR, "ERROR ( %s/%s ): 'bgp_daemon_ip' value is not a valid IPv4/IPv6 address. Terminating thread.\n", config.name, bgp_misc_db->log_str);
145       exit_gracefully(1);
146     }
147     slen = addr_to_sa((struct sockaddr *)&server, &addr, config.bgp_daemon_port);
148   }
149 
150   if (!config.bgp_daemon_max_peers) config.bgp_daemon_max_peers = MAX_BGP_PEERS_DEFAULT;
151   Log(LOG_INFO, "INFO ( %s/%s ): maximum BGP peers allowed: %d\n", config.name, bgp_misc_db->log_str, config.bgp_daemon_max_peers);
152 
153   peers = malloc(config.bgp_daemon_max_peers*sizeof(struct bgp_peer));
154   if (!peers) {
155     Log(LOG_ERR, "ERROR ( %s/%s ): Unable to malloc() BGP peers structure. Terminating thread.\n", config.name, bgp_misc_db->log_str);
156     exit_gracefully(1);
157   }
158   memset(peers, 0, config.bgp_daemon_max_peers*sizeof(struct bgp_peer));
159 
160   if (config.bgp_lg) {
161     peers_cache = malloc(config.bgp_daemon_max_peers*sizeof(struct bgp_peer_cache_bucket));
162     if (!peers_cache) {
163       Log(LOG_ERR, "ERROR ( %s/%s ): Unable to malloc() BGP peers cache structure. Terminating thread.\n", config.name, bgp_misc_db->log_str);
164       exit_gracefully(1);
165     }
166 
167     bgp_peer_cache_init(peers_cache, config.bgp_daemon_max_peers);
168 
169     peers_port_cache = malloc(config.bgp_daemon_max_peers*sizeof(struct bgp_peer_cache_bucket));
170     if (!peers_port_cache) {
171       Log(LOG_ERR, "ERROR ( %s/%s ): Unable to malloc() BGP peers cache structure (2). Terminating thread.\n", config.name, bgp_misc_db->log_str);
172       exit_gracefully(1);
173     }
174 
175     bgp_peer_cache_init(peers_port_cache, config.bgp_daemon_max_peers);
176   }
177   else {
178     peers_cache = NULL;
179     peers_port_cache = NULL;
180   }
181 
182   if (config.bgp_xconnect_map) {
183     int bgp_xcs_allocated = FALSE;
184 
185     if (config.acct_type != ACCT_PMBGP) {
186       Log(LOG_ERR, "ERROR ( %s/%s ): bgp_daemon_xconnect_map feature not supported for this daemon. Exiting ...\n", config.name, config.type);
187       exit_gracefully(1);
188     }
189 
190     memset(&bgp_xcs_map, 0, sizeof(bgp_xcs_map));
191     memset(&req, 0, sizeof(req));
192 
193     /* Setting up the pool */
194     bgp_xcs_map.pool = malloc((config.bgp_daemon_max_peers + 1) * sizeof(struct bgp_xconnect));
195     if (!bgp_xcs_map.pool) {
196       Log(LOG_ERR, "ERROR ( %s/%s ): unable to allocate BGP xconnect pool. Exiting ...\n", config.name, config.type);
197       exit_gracefully(1);
198     }
199     else memset(bgp_xcs_map.pool, 0, (config.bgp_daemon_max_peers + 1) * sizeof(struct bgp_xconnect));
200 
201     req.key_value_table = (void *) &bgp_xcs_map;
202     load_id_file(MAP_BGP_XCS, config.bgp_xconnect_map, NULL, &req, &bgp_xcs_allocated);
203   }
204   else {
205     bgp_xcs_map.pool = 0;
206     bgp_xcs_map.num = 0;
207   }
208 
209   if (config.rpki_roas_file || config.rpki_rtr_cache) {
210     rpki_daemon_wrapper();
211 
212     /* Let's give the RPKI thread some advantage to create its structures */
213     sleep(DEFAULT_SLOTH_SLEEP_TIME);
214   }
215 
216   if (config.bgp_blackhole_stdcomm_list) {
217 #if defined WITH_ZMQ
218     struct p_zmq_host *bgp_blackhole_zmq_host = NULL;
219     char inproc_blackhole_str[] = "inproc://bgp_blackhole";
220     (void)inproc_blackhole_str;
221 
222     bgp_blackhole_daemon_wrapper();
223 
224     /* Let's give the BGP blackhole thread some advantage to create its structures */
225     sleep(DEFAULT_SLOTH_SLEEP_TIME);
226 
227     bgp_blackhole_zmq_host = bgp_blackhole_misc_db->bgp_blackhole_zmq_host;
228     p_zmq_push_connect_setup(bgp_blackhole_zmq_host);
229 #else
230     Log(LOG_ERR, "ERROR ( %s/%s ): 'bgp_blackhole_stdcomm_list' requires compiling with --enable-zmq. Exiting ..\n", config.name, bgp_misc_db->log_str);
231     exit_gracefully(1);
232 #endif
233   }
234 
235   if (config.bgp_daemon_msglog_file || config.bgp_daemon_msglog_amqp_routing_key || config.bgp_daemon_msglog_kafka_topic) {
236     if (config.bgp_daemon_msglog_file) bgp_misc_db->msglog_backend_methods++;
237     if (config.bgp_daemon_msglog_amqp_routing_key) bgp_misc_db->msglog_backend_methods++;
238     if (config.bgp_daemon_msglog_kafka_topic) bgp_misc_db->msglog_backend_methods++;
239 
240     if (bgp_misc_db->msglog_backend_methods > 1) {
241       Log(LOG_ERR, "ERROR ( %s/%s ): bgp_daemon_msglog_file, bgp_daemon_msglog_amqp_routing_key and bgp_daemon_msglog_kafka_topic are mutually exclusive. Terminating thread.\n", config.name, bgp_misc_db->log_str);
242       exit_gracefully(1);
243     }
244 
245     bgp_misc_db->peers_log = malloc(config.bgp_daemon_max_peers*sizeof(struct bgp_peer_log));
246     if (!bgp_misc_db->peers_log) {
247       Log(LOG_ERR, "ERROR ( %s/%s ): Unable to malloc() BGP peers log structure. Terminating thread.\n", config.name, bgp_misc_db->log_str);
248       exit_gracefully(1);
249     }
250     memset(bgp_misc_db->peers_log, 0, config.bgp_daemon_max_peers*sizeof(struct bgp_peer_log));
251     bgp_peer_log_seq_init(&bgp_misc_db->log_seq);
252 
253     if (config.bgp_daemon_msglog_amqp_routing_key) {
254 #ifdef WITH_RABBITMQ
255       bgp_daemon_msglog_init_amqp_host();
256       p_amqp_connect_to_publish(&bgp_daemon_msglog_amqp_host);
257 #else
258       Log(LOG_WARNING, "WARN ( %s/%s ): p_amqp_connect_to_publish() not possible due to missing --enable-rabbitmq\n", config.name, bgp_misc_db->log_str);
259 #endif
260     }
261 
262     if (config.bgp_daemon_msglog_kafka_topic) {
263 #ifdef WITH_KAFKA
264       bgp_daemon_msglog_init_kafka_host();
265 #else
266       Log(LOG_WARNING, "WARN ( %s/%s ): p_kafka_connect_to_produce() not possible due to missing --enable-kafka\n", config.name, bgp_misc_db->log_str);
267 #endif
268     }
269   }
270 
271   if (config.bgp_table_dump_file || config.bgp_table_dump_amqp_routing_key || config.bgp_table_dump_kafka_topic) {
272     if (config.bgp_table_dump_file) bgp_misc_db->dump_backend_methods++;
273     if (config.bgp_table_dump_amqp_routing_key) bgp_misc_db->dump_backend_methods++;
274     if (config.bgp_table_dump_kafka_topic) bgp_misc_db->dump_backend_methods++;
275 
276     if (bgp_misc_db->dump_backend_methods > 1) {
277       Log(LOG_ERR, "ERROR ( %s/%s ): bgp_table_dump_file, bgp_table_dump_amqp_routing_key and bgp_table_dump_kafka_topic are mutually exclusive. Terminating thread.\n", config.name, bgp_misc_db->log_str);
278       exit_gracefully(1);
279     }
280   }
281 
282   if ((bgp_misc_db->msglog_backend_methods || bgp_misc_db->dump_backend_methods) && config.bgp_xconnect_map) {
283     Log(LOG_ERR, "ERROR ( %s/%s ): bgp_daemon_xconnect_map is mutually exclusive with any BGP msglog and dump method. Terminating thread.\n", config.name, bgp_misc_db->log_str);
284     exit_gracefully(1);
285   }
286 
287   if (!config.bgp_table_peer_buckets) config.bgp_table_peer_buckets = DEFAULT_BGP_INFO_HASH;
288   if (!config.bgp_table_per_peer_buckets) config.bgp_table_per_peer_buckets = DEFAULT_BGP_INFO_PER_PEER_HASH;
289 
290   if (config.bgp_table_per_peer_hash == BGP_ASPATH_HASH_PATHID)
291     bgp_route_info_modulo = bgp_route_info_modulo_pathid;
292   else {
293     Log(LOG_ERR, "ERROR ( %s/%s ): Unknown 'bgp_table_per_peer_hash' value. Terminating thread.\n", config.name, bgp_misc_db->log_str);
294     exit_gracefully(1);
295   }
296 
297   config.bgp_sock = socket(((struct sockaddr *)&server)->sa_family, SOCK_STREAM, 0);
298   if (config.bgp_sock < 0) {
299     /* retry with IPv4 */
300     if (!config.bgp_daemon_ip) {
301       struct sockaddr_in *sa4 = (struct sockaddr_in *)&server;
302 
303       sa4->sin_family = AF_INET;
304       sa4->sin_addr.s_addr = htonl(0);
305       sa4->sin_port = htons(config.bgp_daemon_port);
306       slen = sizeof(struct sockaddr_in);
307 
308       config.bgp_sock = socket(((struct sockaddr *)&server)->sa_family, SOCK_STREAM, 0);
309     }
310 
311     if (config.bgp_sock < 0) {
312       Log(LOG_ERR, "ERROR ( %s/%s ): thread socket() failed. Terminating thread.\n", config.name, bgp_misc_db->log_str);
313       exit_gracefully(1);
314     }
315   }
316   if (config.bgp_daemon_ipprec) {
317     int opt = config.bgp_daemon_ipprec << 5;
318 
319     rc = setsockopt(config.bgp_sock, IPPROTO_IP, IP_TOS, &opt, (socklen_t) sizeof(opt));
320     if (rc < 0) Log(LOG_ERR, "WARN ( %s/%s ): setsockopt() failed for IP_TOS (errno: %d).\n", config.name, bgp_misc_db->log_str, errno);
321   }
322 
323 #if (defined LINUX) && (defined HAVE_SO_REUSEPORT)
324   rc = setsockopt(config.bgp_sock, SOL_SOCKET, SO_REUSEADDR|SO_REUSEPORT, (char *)&yes, (socklen_t) sizeof(yes));
325   if (rc < 0) Log(LOG_ERR, "WARN ( %s/%s ): setsockopt() failed for SO_REUSEADDR|SO_REUSEPORT (errno: %d).\n", config.name, bgp_misc_db->log_str, errno);
326 #else
327   rc = setsockopt(config.bgp_sock, SOL_SOCKET, SO_REUSEADDR, (char *)&yes, (socklen_t) sizeof(yes));
328   if (rc < 0) Log(LOG_ERR, "WARN ( %s/%s ): setsockopt() failed for SO_REUSEADDR (errno: %d).\n", config.name, bgp_misc_db->log_str, errno);
329 #endif
330 
331 #if (defined IPV6_BINDV6ONLY)
332   rc = setsockopt(config.bgp_sock, IPPROTO_IPV6, IPV6_BINDV6ONLY, (char *) &no, (socklen_t) sizeof(no));
333   if (rc < 0) Log(LOG_ERR, "WARN ( %s/%s ): setsockopt() failed for IPV6_BINDV6ONLY (errno: %d).\n", config.name, bgp_misc_db->log_str, errno);
334 #endif
335 
336   if (config.bgp_daemon_pipe_size) {
337     socklen_t l = sizeof(config.bgp_daemon_pipe_size);
338     int saved = 0, obtained = 0;
339 
340     getsockopt(config.bgp_sock, SOL_SOCKET, SO_RCVBUF, &saved, &l);
341     Setsocksize(config.bgp_sock, SOL_SOCKET, SO_RCVBUF, &config.bgp_daemon_pipe_size, (socklen_t) sizeof(config.bgp_daemon_pipe_size));
342     getsockopt(config.bgp_sock, SOL_SOCKET, SO_RCVBUF, &obtained, &l);
343 
344     Setsocksize(config.bgp_sock, SOL_SOCKET, SO_RCVBUF, &saved, l);
345     getsockopt(config.bgp_sock, SOL_SOCKET, SO_RCVBUF, &obtained, &l);
346     Log(LOG_INFO, "INFO ( %s/%s ): bgp_daemon_pipe_size: obtained=%d target=%d.\n", config.name, bgp_misc_db->log_str, obtained, config.bgp_daemon_pipe_size);
347   }
348 
349   rc = bind(config.bgp_sock, (struct sockaddr *) &server, slen);
350   if (rc < 0) {
351     char null_ip_address[] = "0.0.0.0";
352     char *ip_address;
353 
354     ip_address = config.bgp_daemon_ip ? config.bgp_daemon_ip : null_ip_address;
355     Log(LOG_ERR, "ERROR ( %s/%s ): bind() to ip=%s port=%d/tcp failed (errno: %d).\n", config.name, bgp_misc_db->log_str, ip_address, config.bgp_daemon_port, errno);
356     exit_gracefully(1);
357   }
358 
359   rc = listen(config.bgp_sock, 1);
360   if (rc < 0) {
361     Log(LOG_ERR, "ERROR ( %s/%s ): listen() failed (errno: %d).\n", config.name, bgp_misc_db->log_str, errno);
362     exit_gracefully(1);
363   }
364 
365   /* Preparing for syncronous I/O multiplexing */
366   select_fd = 0;
367   FD_ZERO(&bkp_read_descs);
368   FD_SET(config.bgp_sock, &bkp_read_descs);
369 
370   {
371     char srv_string[INET6_ADDRSTRLEN];
372     struct host_addr srv_addr;
373     u_int16_t srv_port;
374 
375     sa_to_addr((struct sockaddr *)&server, &srv_addr, &srv_port);
376     addr_to_str(srv_string, &srv_addr);
377     Log(LOG_INFO, "INFO ( %s/%s ): waiting for BGP data on %s:%u\n", config.name, bgp_misc_db->log_str, srv_string, srv_port);
378   }
379 
380   /* Preparing ACL, if any */
381   if (config.bgp_daemon_allow_file) load_allow_file(config.bgp_daemon_allow_file, &allow);
382 
383   /* Preparing MD5 keys, if any */
384   if (config.bgp_daemon_md5_file) {
385     bgp_md5_file_init(&bgp_md5);
386     bgp_md5_file_load(config.bgp_daemon_md5_file, &bgp_md5);
387     if (bgp_md5.num) bgp_md5_file_process(config.bgp_sock, &bgp_md5);
388   }
389 
390   /* Let's initialize clean shared RIB */
391   for (afi = AFI_IP; afi < AFI_MAX; afi++) {
392     for (safi = SAFI_UNICAST; safi < SAFI_MAX; safi++) {
393       bgp_routing_db->rib[afi][safi] = bgp_table_init(afi, safi);
394     }
395   }
396 
397   /* BGP peers batching checks */
398   if ((config.bgp_daemon_batch && !config.bgp_daemon_batch_interval) ||
399       (config.bgp_daemon_batch_interval && !config.bgp_daemon_batch)) {
400     Log(LOG_WARNING, "WARN ( %s/%s ): 'bgp_daemon_batch_interval' and 'bgp_daemon_batch' both set to zero.\n", config.name, bgp_misc_db->log_str);
401     config.bgp_daemon_batch = 0;
402     config.bgp_daemon_batch_interval = 0;
403   }
404   else bgp_batch_init(&bp_batch, config.bgp_daemon_batch, config.bgp_daemon_batch_interval);
405 
406   if (bgp_misc_db->msglog_backend_methods) {
407 #ifdef WITH_JANSSON
408     if (!config.bgp_daemon_msglog_output) config.bgp_daemon_msglog_output = PRINT_OUTPUT_JSON;
409 #else
410     Log(LOG_WARNING, "WARN ( %s/%s ): bgp_daemon_msglog_output set to json but will produce no output (missing --enable-jansson).\n", config.name, bgp_misc_db->log_str);
411 #endif
412 
413 #ifdef WITH_AVRO
414     if ((config.bgp_daemon_msglog_output == PRINT_OUTPUT_AVRO_BIN) ||
415 	(config.bgp_daemon_msglog_output == PRINT_OUTPUT_AVRO_JSON)) {
416       assert(BGP_LOG_TYPE_MAX < MAX_AVRO_SCHEMA);
417 
418       bgp_misc_db->msglog_avro_schema[0] = p_avro_schema_build_bgp(BGP_LOGDUMP_ET_LOG, "bgp_msglog");
419       bgp_misc_db->msglog_avro_schema[BGP_LOG_TYPE_LOGINIT] = p_avro_schema_build_bgp_log_initclose(BGP_LOGDUMP_ET_LOG, "bgp_loginit");
420       bgp_misc_db->msglog_avro_schema[BGP_LOG_TYPE_LOGCLOSE] = p_avro_schema_build_bgp_log_initclose(BGP_LOGDUMP_ET_LOG, "bgp_logclose");
421 
422       if (config.bgp_daemon_msglog_avro_schema_file) {
423 	char p_avro_schema_file[SRVBUFLEN];
424 
425 	if (strlen(config.bgp_daemon_msglog_avro_schema_file) > (SRVBUFLEN - SUPERSHORTBUFLEN)) {
426 	  Log(LOG_ERR, "ERROR ( %s/%s ): 'bgp_daemon_msglog_avro_schema_file' too long. Exiting.\n", config.name, bgp_misc_db->log_str);
427 	  exit_gracefully(1);
428 	}
429 
430 	write_avro_schema_to_file_with_suffix(config.bgp_daemon_msglog_avro_schema_file, "-bgp_msglog",
431 					      p_avro_schema_file, bgp_misc_db->msglog_avro_schema[0]);
432 
433 	write_avro_schema_to_file_with_suffix(config.bgp_daemon_msglog_avro_schema_file, "-bgp_loginit",
434 					      p_avro_schema_file, bgp_misc_db->msglog_avro_schema[BGP_LOG_TYPE_LOGINIT]);
435 
436 	write_avro_schema_to_file_with_suffix(config.bgp_daemon_msglog_avro_schema_file, "-bgp_logclose",
437 					      p_avro_schema_file, bgp_misc_db->msglog_avro_schema[BGP_LOG_TYPE_LOGCLOSE]);
438       }
439 
440       if (config.bgp_daemon_msglog_kafka_avro_schema_registry) {
441 #ifdef WITH_SERDES
442         if (strchr(config.bgp_daemon_msglog_kafka_topic, '$')) {
443           Log(LOG_ERR, "ERROR ( %s/%s ): dynamic 'bgp_daemon_msglog_kafka_topic' is not compatible with 'bgp_daemon_msglog_kafka_avro_schema_registry'. Exiting.\n",
444 	      config.name, bgp_misc_db->log_str);
445 	  exit_gracefully(1);
446         }
447 
448 	if (config.bgp_daemon_msglog_output == PRINT_OUTPUT_AVRO_JSON) {
449           Log(LOG_ERR, "ERROR ( %s/%s ): 'avro_json' output is not compatible with 'bgp_daemon_msglog_kafka_avro_schema_registry'. Exiting.\n",
450 	      config.name, bgp_misc_db->log_str);
451 	  exit_gracefully(1);
452 	}
453 
454 	bgp_daemon_msglog_kafka_host.sd_schema[0] = compose_avro_schema_registry_name_2(config.bgp_daemon_msglog_kafka_topic, FALSE,
455 										        bgp_misc_db->msglog_avro_schema[0], "bgp", "msglog",
456 										        config.bgp_daemon_msglog_kafka_avro_schema_registry);
457 
458 	bgp_daemon_msglog_kafka_host.sd_schema[BGP_LOG_TYPE_LOGINIT] = compose_avro_schema_registry_name_2(config.bgp_daemon_msglog_kafka_topic, FALSE,
459 										        bgp_misc_db->msglog_avro_schema[BGP_LOG_TYPE_LOGINIT], "bgp", "loginit",
460 										        config.bgp_daemon_msglog_kafka_avro_schema_registry);
461 
462 	bgp_daemon_msglog_kafka_host.sd_schema[BGP_LOG_TYPE_LOGCLOSE] = compose_avro_schema_registry_name_2(config.bgp_daemon_msglog_kafka_topic, FALSE,
463 										        bgp_misc_db->msglog_avro_schema[BGP_LOG_TYPE_LOGCLOSE], "bgp", "logclose",
464 										        config.bgp_daemon_msglog_kafka_avro_schema_registry);
465 #endif
466       }
467     }
468 #endif
469   }
470 
471   if (bgp_misc_db->dump_backend_methods) {
472 #ifdef WITH_JANSSON
473     if (!config.bgp_table_dump_output) config.bgp_table_dump_output = PRINT_OUTPUT_JSON;
474 #else
475     Log(LOG_WARNING, "WARN ( %s/%s ): bgp_table_dump_output set to json but will produce no output (missing --enable-jansson).\n", config.name, bgp_misc_db->log_str);
476 #endif
477 
478 #ifdef WITH_AVRO
479     if ((config.bgp_table_dump_output == PRINT_OUTPUT_AVRO_BIN) ||
480 	(config.bgp_table_dump_output == PRINT_OUTPUT_AVRO_JSON)) {
481       assert(BGP_LOG_TYPE_MAX < MAX_AVRO_SCHEMA);
482 
483       bgp_misc_db->dump_avro_schema[0] = p_avro_schema_build_bgp(BGP_LOGDUMP_ET_DUMP, "bgp_dump");
484       bgp_misc_db->dump_avro_schema[BGP_LOG_TYPE_DUMPINIT] = p_avro_schema_build_bgp_dump_init(BGP_LOGDUMP_ET_DUMP, "bgp_dumpinit");
485       bgp_misc_db->dump_avro_schema[BGP_LOG_TYPE_DUMPCLOSE] = p_avro_schema_build_bgp_dump_close(BGP_LOGDUMP_ET_DUMP, "bgp_dumpclose");
486 
487       if (config.bgp_table_dump_avro_schema_file) {
488 	char p_avro_schema_file[SRVBUFLEN];
489 
490 	if (strlen(config.bgp_table_dump_avro_schema_file) > (SRVBUFLEN - SUPERSHORTBUFLEN)) {
491 	  Log(LOG_ERR, "ERROR ( %s/%s ): 'bgp_table_dump_avro_schema_file' too long. Exiting.\n", config.name, bgp_misc_db->log_str);
492 	  exit_gracefully(1);
493 	}
494 
495 	write_avro_schema_to_file_with_suffix(config.bgp_table_dump_avro_schema_file, "-bgp_dump",
496 					      p_avro_schema_file, bgp_misc_db->dump_avro_schema[0]);
497 
498 	write_avro_schema_to_file_with_suffix(config.bgp_table_dump_avro_schema_file, "-bgp_dumpinit",
499 					      p_avro_schema_file, bgp_misc_db->dump_avro_schema[BGP_LOG_TYPE_DUMPINIT]);
500 
501 	write_avro_schema_to_file_with_suffix(config.bgp_table_dump_avro_schema_file, "-bgp_dumpclose",
502 					      p_avro_schema_file, bgp_misc_db->dump_avro_schema[BGP_LOG_TYPE_DUMPCLOSE]);
503       }
504     }
505 #endif
506   }
507 
508   if (bgp_misc_db->dump_backend_methods) {
509     char dump_roundoff[] = "m";
510     time_t tmp_time;
511 
512     if (config.bgp_table_dump_refresh_time) {
513       gettimeofday(&bgp_misc_db->log_tstamp, NULL);
514       dump_refresh_deadline = bgp_misc_db->log_tstamp.tv_sec;
515       tmp_time = roundoff_time(dump_refresh_deadline, dump_roundoff);
516       while ((tmp_time+config.bgp_table_dump_refresh_time) < dump_refresh_deadline) {
517         tmp_time += config.bgp_table_dump_refresh_time;
518       }
519       dump_refresh_deadline = tmp_time;
520       dump_refresh_deadline += config.bgp_table_dump_refresh_time; /* it's a deadline not a basetime */
521     }
522     else {
523       config.bgp_table_dump_file = NULL;
524       bgp_misc_db->dump_backend_methods = FALSE;
525       Log(LOG_WARNING, "WARN ( %s/%s ): Invalid 'bgp_table_dump_refresh_time'.\n", config.name, bgp_misc_db->log_str);
526     }
527 
528     if (config.bgp_table_dump_amqp_routing_key) bgp_table_dump_init_amqp_host();
529     if (config.bgp_table_dump_kafka_topic) bgp_table_dump_init_kafka_host();
530   }
531 
532 #ifdef WITH_AVRO
533   bgp_misc_db->avro_buf = malloc(LARGEBUFLEN);
534   if (!bgp_misc_db->avro_buf) {
535     Log(LOG_ERR, "ERROR ( %s/%s ): malloc() failed (avro_buf). Exiting ..\n", config.name, bgp_misc_db->log_str);
536     exit_gracefully(1);
537   }
538   else memset(bgp_misc_db->avro_buf, 0, LARGEBUFLEN);
539 #endif
540 
541   if (config.bgp_daemon_msglog_kafka_avro_schema_registry || config.bgp_table_dump_kafka_avro_schema_registry) {
542 #ifndef WITH_SERDES
543     Log(LOG_ERR, "ERROR ( %s/%s ): 'bgp_*_kafka_avro_schema_registry' require --enable-serdes. Exiting.\n", config.name, bgp_misc_db->log_str);
544     exit_gracefully(1);
545 #endif
546   }
547 
548   select_fd = bkp_select_fd = (config.bgp_sock + 1);
549   recalc_fds = FALSE;
550 
551   bgp_link_misc_structs(bgp_misc_db);
552 
553   sigemptyset(&signal_set);
554   sigaddset(&signal_set, SIGCHLD);
555   sigaddset(&signal_set, SIGHUP);
556   sigaddset(&signal_set, SIGUSR1);
557   sigaddset(&signal_set, SIGUSR2);
558   sigaddset(&signal_set, SIGTERM);
559   if (config.daemon) {
560     sigaddset(&signal_set, SIGINT);
561   }
562 
563   for (;;) {
564     select_again:
565 
566     if (!bgp_misc_db->is_thread) {
567       sigprocmask(SIG_UNBLOCK, &signal_set, NULL);
568       sigprocmask(SIG_BLOCK, &signal_set, NULL);
569     }
570 
571     if (recalc_fds) {
572       select_fd = config.bgp_sock;
573       max_peers_idx = -1; /* .. since valid indexes include 0 */
574 
575       for (peers_idx = 0; peers_idx < config.bgp_daemon_max_peers; peers_idx++) {
576         if (select_fd < peers[peers_idx].fd) select_fd = peers[peers_idx].fd;
577 
578         if (config.bgp_xconnect_map) {
579 	  if (select_fd < peers[peers_idx].xconnect_fd)
580 	    select_fd = peers[peers_idx].xconnect_fd;
581 	}
582 
583 	if (peers[peers_idx].fd) max_peers_idx = peers_idx;
584       }
585       select_fd++;
586       max_peers_idx++;
587 
588       bkp_select_fd = select_fd;
589       recalc_fds = FALSE;
590     }
591     else select_fd = bkp_select_fd;
592 
593     memcpy(&read_descs, &bkp_read_descs, sizeof(bkp_read_descs));
594 
595     if (bgp_misc_db->dump_backend_methods) {
596       int delta;
597 
598       calc_refresh_timeout_sec(dump_refresh_deadline, bgp_misc_db->log_tstamp.tv_sec, &delta);
599       dump_refresh_timeout.tv_sec = delta;
600       dump_refresh_timeout.tv_usec = 0;
601       drt_ptr = &dump_refresh_timeout;
602     }
603     else drt_ptr = NULL;
604 
605     select_num = select(select_fd, &read_descs, NULL, NULL, drt_ptr);
606     if (select_num < 0) goto select_again;
607     now = time(NULL);
608 
609     /* signals handling */
610     if (reload_map_bgp_thread) {
611       if (config.bgp_daemon_allow_file) load_allow_file(config.bgp_daemon_allow_file, &allow);
612 
613       if (config.bgp_daemon_md5_file) {
614 	bgp_md5_file_unload(&bgp_md5);
615 	if (bgp_md5.num) bgp_md5_file_process(config.bgp_sock, &bgp_md5); // process unload
616 
617 	bgp_md5_file_load(config.bgp_daemon_md5_file, &bgp_md5);
618 	if (bgp_md5.num) bgp_md5_file_process(config.bgp_sock, &bgp_md5); // process load
619       }
620 
621       if (config.bgp_xconnect_map) {
622 	int bgp_xcs_allocated = FALSE;
623 
624         bgp_xcs_map_destroy();
625 
626 	memset(&req, 0, sizeof(req));
627 	req.key_value_table = (void *) &bgp_xcs_map;
628 
629 	load_id_file(MAP_BGP_XCS, config.bgp_xconnect_map, NULL, &req, &bgp_xcs_allocated);
630       }
631 
632       reload_map_bgp_thread = FALSE;
633     }
634 
635     if (reload_log_bgp_thread) {
636       for (peers_idx = 0; peers_idx < config.bgp_daemon_max_peers; peers_idx++) {
637 	if (bgp_misc_db->peers_log[peers_idx].fd) {
638 	  fclose(bgp_misc_db->peers_log[peers_idx].fd);
639 	  bgp_misc_db->peers_log[peers_idx].fd = open_output_file(bgp_misc_db->peers_log[peers_idx].filename, "a", FALSE);
640 	  setlinebuf(bgp_misc_db->peers_log[peers_idx].fd);
641 	}
642 	else break;
643       }
644 
645       reload_log_bgp_thread = FALSE;
646     }
647 
648     if (reload_log && !bgp_misc_db->is_thread) {
649       reload_logs();
650       reload_log = FALSE;
651     }
652 
653     if (bgp_misc_db->msglog_backend_methods || bgp_misc_db->dump_backend_methods) {
654       gettimeofday(&bgp_misc_db->log_tstamp, NULL);
655       compose_timestamp(bgp_misc_db->log_tstamp_str, SRVBUFLEN, &bgp_misc_db->log_tstamp, TRUE,
656 			config.timestamps_since_epoch, config.timestamps_rfc3339, config.timestamps_utc);
657 
658       /* if dumping, let's reset log sequence at the next dump event */
659       if (!bgp_misc_db->dump_backend_methods) {
660         if (bgp_peer_log_seq_has_ro_bit(&bgp_misc_db->log_seq))
661 	  bgp_peer_log_seq_init(&bgp_misc_db->log_seq);
662       }
663 
664       if (bgp_misc_db->dump_backend_methods) {
665 	while (bgp_misc_db->log_tstamp.tv_sec > dump_refresh_deadline) {
666 	  bgp_misc_db->dump.tstamp.tv_sec = dump_refresh_deadline;
667 	  bgp_misc_db->dump.tstamp.tv_usec = 0;
668 	  compose_timestamp(bgp_misc_db->dump.tstamp_str, SRVBUFLEN, &bgp_misc_db->dump.tstamp, FALSE,
669 			    config.timestamps_since_epoch, config.timestamps_rfc3339, config.timestamps_utc);
670 	  bgp_misc_db->dump.period = config.bgp_table_dump_refresh_time;
671 
672 	  if (bgp_peer_log_seq_has_ro_bit(&bgp_misc_db->log_seq))
673 	    bgp_peer_log_seq_init(&bgp_misc_db->log_seq);
674 
675 	  bgp_handle_dump_event();
676 
677 	  dump_refresh_deadline += config.bgp_table_dump_refresh_time;
678 	}
679       }
680 
681 #ifdef WITH_RABBITMQ
682       if (config.bgp_daemon_msglog_amqp_routing_key) {
683         time_t last_fail = P_broker_timers_get_last_fail(&bgp_daemon_msglog_amqp_host.btimers);
684 
685 	if (last_fail && ((last_fail + P_broker_timers_get_retry_interval(&bgp_daemon_msglog_amqp_host.btimers)) <= bgp_misc_db->log_tstamp.tv_sec)) {
686           bgp_daemon_msglog_init_amqp_host();
687           p_amqp_connect_to_publish(&bgp_daemon_msglog_amqp_host);
688 	}
689       }
690 #endif
691 
692 #ifdef WITH_KAFKA
693       if (config.bgp_daemon_msglog_kafka_topic) {
694         time_t last_fail = P_broker_timers_get_last_fail(&bgp_daemon_msglog_kafka_host.btimers);
695 
696         if (last_fail && ((last_fail + P_broker_timers_get_retry_interval(&bgp_daemon_msglog_kafka_host.btimers)) <= bgp_misc_db->log_tstamp.tv_sec))
697           bgp_daemon_msglog_init_kafka_host();
698       }
699 #endif
700     }
701 
702     /*
703        If select_num == 0 then we got out of select() due to a timeout rather
704        than because we had a message from a peer to handle. By now we did all
705        routine checks and can happily return to select() again.
706     */
707     if (!select_num) goto select_again;
708 
709     /* New connection is coming in */
710     if (FD_ISSET(config.bgp_sock, &read_descs)) {
711       int peers_check_idx, peers_num;
712 
713       fd = accept(config.bgp_sock, (struct sockaddr *) &client, &clen);
714       if (fd == ERR) goto read_data;
715 
716       ipv4_mapped_to_ipv4(&client);
717 
718       /* If an ACL is defined, here we check against and enforce it */
719       if (allow.num) allowed = check_allow(&allow, (struct sockaddr *)&client);
720       else allowed = TRUE;
721 
722       if (!allowed) {
723 	char disallowed_str[INET6_ADDRSTRLEN];
724 
725 	sa_to_str(disallowed_str, sizeof(disallowed_str), (struct sockaddr *) &client);
726 	Log(LOG_INFO, "INFO ( %s/%s ): [%s] peer '%s' not allowed. close()\n", config.name, bgp_misc_db->log_str, config.bgp_daemon_allow_file, disallowed_str);
727 
728         close(fd);
729         goto read_data;
730       }
731 
732       for (peer = NULL, peers_idx = 0; peers_idx < config.bgp_daemon_max_peers; peers_idx++) {
733         if (!peers[peers_idx].fd) {
734 	  /*
735 	     Admitted if:
736 	     *  batching feature is disabled or
737 	     *  we have room in the current batch or
738 	     *  we can start a new batch
739 	  */
740           if (bgp_batch_is_admitted(&bp_batch, now)) {
741             peer = &peers[peers_idx];
742             if (bgp_peer_init(peer, FUNC_TYPE_BGP)) peer = NULL;
743 	    else recalc_fds = TRUE;
744 
745             log_notification_unset(&log_notifications.bgp_peers_throttling);
746 
747             if (bgp_batch_is_enabled(&bp_batch) && peer) {
748               if (bgp_batch_is_expired(&bp_batch, now)) bgp_batch_reset(&bp_batch, now);
749               if (bgp_batch_is_not_empty(&bp_batch)) bgp_batch_decrease_counter(&bp_batch);
750             }
751 
752             break;
753 	  }
754           else { /* throttle */
755             /* We briefly accept the new connection to be able to drop it */
756 	    if (!log_notification_isset(&log_notifications.bgp_peers_throttling, now)) {
757               Log(LOG_INFO, "INFO ( %s/%s ): throttling at BGP peer #%u\n", config.name, bgp_misc_db->log_str, peers_idx);
758 	      log_notification_set(&log_notifications.bgp_peers_throttling, now, FALSE);
759 	    }
760 
761             close(fd);
762             goto read_data;
763           }
764         }
765 	/* XXX: replenish sessions with expired keepalives */
766       }
767 
768       if (!peer) {
769 	/* We briefly accept the new connection to be able to drop it */
770         Log(LOG_ERR, "ERROR ( %s/%s ): Insufficient number of BGP peers has been configured by 'bgp_daemon_max_peers' (%d).\n",
771 			config.name, bgp_misc_db->log_str, config.bgp_daemon_max_peers);
772 
773 	close(fd);
774 	goto read_data;
775       }
776 
777       peer->fd = fd;
778       peer->idx = peers_idx;
779       FD_SET(peer->fd, &bkp_read_descs);
780       sa_to_addr((struct sockaddr *) &client, &peer->addr, &peer->tcp_port);
781 
782       if (peers_cache && peers_port_cache) {
783 	u_int32_t bucket;
784 
785 	bucket = addr_hash(&peer->addr, config.bgp_daemon_max_peers);
786 	bgp_peer_cache_insert(peers_cache, bucket, peer);
787 
788 	bucket = addr_port_hash(&peer->addr, peer->tcp_port, config.bgp_daemon_max_peers);
789 	bgp_peer_cache_insert(peers_port_cache, bucket, peer);
790       }
791 
792       if (bgp_misc_db->msglog_backend_methods)
793 	bgp_peer_log_init(peer, config.bgp_daemon_msglog_output, FUNC_TYPE_BGP);
794 
795       /* Check: more than one TCP connection from a peer (IP address) */
796       for (peers_check_idx = 0, peers_num = 0; peers_check_idx < config.bgp_daemon_max_peers; peers_check_idx++) {
797 	if (peers_idx != peers_check_idx && !memcmp(&peers[peers_check_idx].addr, &peer->addr, sizeof(peers[peers_check_idx].addr))) {
798 	  int same_peer = FALSE;
799 
800 	  bgp_peer_print(&peers[peers_check_idx], bgp_peer_str, INET6_ADDRSTRLEN);
801 
802           /* Check: if not x-connecting, let's see if we have to compare TCP ports
803 	     (ie. NAT traversal / non-transparent tee scenarios); then evaluate if
804 	     the new session is valid or has to be rejected */
805 	  if (!config.bgp_xconnect_map) {
806 	    if (config.tmp_bgp_lookup_compare_ports) {
807 	      if (peers[peers_check_idx].tcp_port == peer->tcp_port) same_peer = TRUE;
808 	      else {
809 		same_peer = FALSE;
810 	        if (peers[peers_check_idx].fd) peers_num++;
811 	      }
812 	    }
813 	    else same_peer = TRUE;
814 
815 	    if (same_peer) {
816 	      if ((now - peers[peers_check_idx].last_keepalive) > peers[peers_check_idx].ht) {
817 		bgp_peer_print(&peers[peers_check_idx], bgp_peer_str, INET6_ADDRSTRLEN);
818               	Log(LOG_INFO, "INFO ( %s/%s ): [%s] Replenishing stale connection by peer.\n",
819 			config.name, bgp_misc_db->log_str, bgp_peer_str);
820 		FD_CLR(peers[peers_check_idx].fd, &bkp_read_descs);
821 		bgp_peer_close(&peers[peers_check_idx], FUNC_TYPE_BGP, FALSE, FALSE, FALSE, FALSE, NULL);
822 	      }
823 	      else {
824 		Log(LOG_WARNING, "WARN ( %s/%s ): [%s] Refusing new connection from existing peer (residual holdtime: %ld).\n",
825 			config.name, bgp_misc_db->log_str, bgp_peer_str,
826 			(peers[peers_check_idx].ht - ((long)now - peers[peers_check_idx].last_keepalive)));
827 		FD_CLR(peer->fd, &bkp_read_descs);
828 		bgp_peer_close(peer, FUNC_TYPE_BGP, FALSE, FALSE, FALSE, FALSE, NULL);
829 		goto read_data;
830 	      }
831 	    }
832 	  }
833 	  /* XXX: if x-connecting we don't support NAT traversal / non-transparent tee
834 	     scenarios (yet?) */
835 	  else {
836 	    Log(LOG_WARNING, "WARN ( %s/%s ): [%s] Refusing new incoming connection for existing BGP xconnect.\n",
837 			config.name, bgp_misc_db->log_str, bgp_peer_str);
838 	    FD_CLR(peer->fd, &bkp_read_descs);
839 	    bgp_peer_close(peer, FUNC_TYPE_BGP, FALSE, FALSE, FALSE, FALSE, NULL);
840 	    goto read_data;
841 	  }
842         }
843 	else if (peers[peers_check_idx].fd) peers_num++;
844       }
845 
846       if (config.bgp_xconnect_map) {
847         bgp_peer_xconnect_init(peer, FUNC_TYPE_BGP);
848 
849         if (peer->xconnect_fd) FD_SET(peer->xconnect_fd, &bkp_read_descs);
850         else {
851           FD_CLR(peer->fd, &bkp_read_descs);
852           bgp_peer_close(peer, FUNC_TYPE_BGP, FALSE, FALSE, FALSE, FALSE, NULL);
853           goto read_data;
854         }
855       }
856 
857       if (!config.bgp_xconnect_map) {
858         bgp_peer_print(peer, bgp_peer_str, INET6_ADDRSTRLEN);
859         Log(LOG_INFO, "INFO ( %s/%s ): [%s] BGP peers usage: %u/%u\n", config.name, bgp_misc_db->log_str,
860 		bgp_peer_str, peers_num, config.bgp_daemon_max_peers);
861       }
862       else {
863         bgp_peer_xconnect_print(peer, bgp_xconnect_peer_str, BGP_XCONNECT_STRLEN);
864         Log(LOG_INFO, "INFO ( %s/%s ): [%s] BGP xconnects usage: %u/%u\n", config.name, bgp_misc_db->log_str,
865 		bgp_xconnect_peer_str, peers_num, config.bgp_daemon_max_peers);
866       }
867 
868       if (config.bgp_daemon_neighbors_file) write_neighbors_file(config.bgp_daemon_neighbors_file, FUNC_TYPE_BGP);
869     }
870 
871     read_data:
872 
873     /*
874        We have something coming in: let's lookup which peer is that.
875        FvD: To avoid starvation of the "later established" peers, we
876        offset the start of the search in a round-robin style.
877     */
878     for (peer = NULL, peers_idx = 0; peers_idx < max_peers_idx; peers_idx++) {
879       int loc_idx = (peers_idx + peers_idx_rr) % max_peers_idx;
880       recv_fd = 0; send_fd = 0;
881 
882       if (peers[loc_idx].fd && FD_ISSET(peers[loc_idx].fd, &read_descs)) {
883         peer = &peers[loc_idx];
884 	recv_fd = peer->fd;
885 	if (config.bgp_xconnect_map) send_fd = peer->xconnect_fd;
886         peers_idx_rr = (peers_idx_rr + 1) % max_peers_idx;
887 	break;
888       }
889 
890       // XXX: verify round-robin fairness holding up
891       if (config.bgp_xconnect_map) {
892         loc_idx = (peers_idx + peers_xconnect_idx_rr) % max_peers_idx;
893 
894 	if (peers[loc_idx].xconnect_fd && FD_ISSET(peers[loc_idx].xconnect_fd, &read_descs)) {
895 	  peer = &peers[loc_idx];
896 	  recv_fd = peer->xconnect_fd;
897 	  send_fd = peer->fd;
898 	  peers_xconnect_idx_rr = (peers_xconnect_idx_rr + 1) % max_peers_idx;
899 	  break;
900 	}
901       }
902     }
903 
904     if (!peer) goto select_again;
905 
906     if (!peer->buf.exp_len) {
907       ret = recv(recv_fd, &peer->buf.base[peer->buf.cur_len], (BGP_HEADER_SIZE - peer->buf.cur_len), 0);
908 
909       if (ret > 0) {
910 	peer->buf.cur_len += ret;
911 
912 	if (peer->buf.cur_len == BGP_HEADER_SIZE) {
913 	  struct bgp_header *bhdr = (struct bgp_header *) peer->buf.base;
914 
915 	  if (bgp_marker_check(bhdr, BGP_MARKER_SIZE) == ERR) {
916 	    bgp_peer_print(peer, bgp_peer_str, INET6_ADDRSTRLEN);
917 	    Log(LOG_INFO, "INFO ( %s/%s ): [%s] Received malformed BGP packet (marker check failed).\n",
918 		config.name, bgp_misc_db->log_str, bgp_peer_str);
919 
920 	    peer->msglen = 0;
921 	    peer->buf.cur_len = 0;
922 	    peer->buf.exp_len = 0;
923 	    ret = ERR;
924 	  }
925 	  else {
926 	    peer->buf.exp_len = ntohs(bhdr->bgpo_len);
927 
928 	    /* commit */
929 	    if (peer->buf.cur_len == peer->buf.exp_len) {
930 	      peer->msglen = peer->buf.exp_len;
931 	      peer->buf.cur_len = 0;
932 	      peer->buf.exp_len = 0;
933 	    }
934 	  }
935 	}
936 	else {
937 	  goto select_again;
938 	}
939       }
940     }
941 
942     if (peer->buf.exp_len) {
943       ret = recv(recv_fd, &peer->buf.base[peer->buf.cur_len], (peer->buf.exp_len - peer->buf.cur_len), 0);
944 
945       if (ret > 0) {
946 	peer->buf.cur_len += ret;
947 
948 	/* commit */
949         if (peer->buf.cur_len == peer->buf.exp_len) {
950 	  peer->msglen = peer->buf.exp_len;
951 	  peer->buf.cur_len = 0;
952 	  peer->buf.exp_len = 0;
953 	}
954 	else {
955 	  goto select_again;
956 	}
957       }
958     }
959 
960     if (ret <= 0) {
961       if (!config.bgp_xconnect_map) {
962 	bgp_peer_print(peer, bgp_peer_str, INET6_ADDRSTRLEN);
963 	Log(LOG_INFO, "INFO ( %s/%s ): [%s] BGP connection reset by peer (%d).\n", config.name, bgp_misc_db->log_str, bgp_peer_str, errno);
964 	FD_CLR(peer->fd, &bkp_read_descs);
965       }
966       else {
967 	bgp_peer_xconnect_print(peer, bgp_xconnect_peer_str, BGP_XCONNECT_STRLEN);
968 
969 	if (recv_fd == peer->fd)
970 	  Log(LOG_INFO, "INFO ( %s/%s ): [%s] recv(): BGP xconnect reset by src peer (%d).\n",
971 		config.name, bgp_misc_db->log_str, bgp_xconnect_peer_str, errno);
972 	else if (recv_fd == peer->xconnect_fd)
973 	  Log(LOG_INFO, "INFO ( %s/%s ): [%s] recv(): BGP xconnect reset by dst peer (%d).\n",
974 		config.name, bgp_misc_db->log_str, bgp_xconnect_peer_str, errno);
975 
976 	FD_CLR(peer->fd, &bkp_read_descs);
977 	FD_CLR(peer->xconnect_fd, &bkp_read_descs);
978       }
979 
980       bgp_peer_close(peer, FUNC_TYPE_BGP, FALSE, FALSE, FALSE, FALSE, NULL);
981 
982       recalc_fds = TRUE;
983       goto select_again;
984     }
985     else {
986       if (!config.bgp_xconnect_map) {
987 	/* Appears a valid peer with a valid BGP message: before
988 	   continuing let's see if it's time to send a KEEPALIVE
989 	   back */
990 	if (peer->status == Established && ((now - peer->last_keepalive) > (peer->ht / 2))) {
991 	  bgp_reply_pkt_ptr = bgp_reply_pkt;
992 	  bgp_reply_pkt_ptr += bgp_write_keepalive_msg(bgp_reply_pkt_ptr);
993 	  ret = send(recv_fd, bgp_reply_pkt, bgp_reply_pkt_ptr - bgp_reply_pkt, 0);
994 	  peer->last_keepalive = now;
995 	}
996 
997 	ret = bgp_parse_msg(peer, now, TRUE);
998 	if (ret) {
999 	  FD_CLR(recv_fd, &bkp_read_descs);
1000 
1001 	  if (ret < 0) bgp_peer_close(peer, FUNC_TYPE_BGP, FALSE, FALSE, FALSE, FALSE, NULL);
1002 	  else bgp_peer_close(peer, FUNC_TYPE_BGP, FALSE, TRUE, ret, BGP_NOTIFY_SUBCODE_UNSPECIFIC, NULL);
1003 
1004 	  recalc_fds = TRUE;
1005 	  goto select_again;
1006 	}
1007       }
1008       else {
1009 	ret = send(send_fd, peer->buf.base, peer->msglen, 0);
1010 	if (ret <= 0) {
1011 	  bgp_peer_xconnect_print(peer, bgp_xconnect_peer_str, BGP_XCONNECT_STRLEN);
1012 
1013 	  if (send_fd == peer->fd)
1014 	    Log(LOG_INFO, "INFO ( %s/%s ): [%s] send(): BGP xconnect reset by src peer (%d).\n",
1015 		config.name, bgp_misc_db->log_str, bgp_xconnect_peer_str, errno);
1016 	  else if (send_fd == peer->xconnect_fd)
1017 	    Log(LOG_INFO, "INFO ( %s/%s ): [%s] send(): BGP xconnect reset by dst peer (%d).\n",
1018 		config.name, bgp_misc_db->log_str, bgp_xconnect_peer_str, errno);
1019 
1020 	  FD_CLR(peer->fd, &bkp_read_descs);
1021 	  FD_CLR(peer->xconnect_fd, &bkp_read_descs);
1022 
1023 	  bgp_peer_close(peer, FUNC_TYPE_BGP, FALSE, FALSE, FALSE, FALSE, NULL);
1024 
1025 	  recalc_fds = TRUE;
1026 	  goto select_again;
1027 	}
1028       }
1029     }
1030   }
1031 }
1032 
bgp_prepare_thread()1033 void bgp_prepare_thread()
1034 {
1035   bgp_misc_db = &inter_domain_misc_dbs[FUNC_TYPE_BGP];
1036   memset(bgp_misc_db, 0, sizeof(struct bgp_misc_structs));
1037 
1038   bgp_misc_db->is_thread = TRUE;
1039   if (config.bgp_lg) bgp_misc_db->has_lglass = TRUE;
1040 
1041   if (config.rpki_roas_file || config.rpki_rtr_cache) {
1042     bgp_misc_db->bnv = malloc(sizeof(struct bgp_node_vector));
1043     memset(bgp_misc_db->bnv, 0, sizeof(struct bgp_node_vector));
1044   }
1045 
1046   if (config.bgp_blackhole_stdcomm_list) bgp_misc_db->has_blackhole = TRUE;
1047 
1048   bgp_misc_db->log_str = malloc(strlen("core/BGP") + 1);
1049   strcpy(bgp_misc_db->log_str, "core/BGP");
1050 }
1051 
bgp_prepare_daemon()1052 void bgp_prepare_daemon()
1053 {
1054   bgp_misc_db = &inter_domain_misc_dbs[FUNC_TYPE_BGP];
1055   memset(bgp_misc_db, 0, sizeof(struct bgp_misc_structs));
1056 
1057   bgp_misc_db->is_thread = FALSE;
1058   if (config.bgp_lg) bgp_misc_db->has_lglass = TRUE;
1059 
1060   if (config.rpki_roas_file || config.rpki_rtr_cache) {
1061     bgp_misc_db->bnv = malloc(sizeof(struct bgp_node_vector));
1062     memset(bgp_misc_db->bnv, 0, sizeof(struct bgp_node_vector));
1063   }
1064 
1065   if (config.bgp_blackhole_stdcomm_list) bgp_misc_db->has_blackhole = TRUE;
1066 
1067   bgp_misc_db->log_str = malloc(strlen("core") + 1);
1068   strcpy(bgp_misc_db->log_str, "core");
1069 }
1070