1 /*
2 pmacct (Promiscuous mode IP Accounting package)
3 pmacct is Copyright (C) 2003-2020 by Paolo Lucente
4 */
5
6 /*
7 This program is free software; you can redistribute it and/or modify
8 it under the terms of the GNU General Public License as published by
9 the Free Software Foundation; either version 2 of the License, or
10 (at your option) any later version.
11
12 This program is distributed in the hope that it will be useful,
13 but WITHOUT ANY WARRANTY; without even the implied warranty of
14 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
15 GNU General Public License for more details.
16
17 You should have received a copy of the GNU General Public License
18 along with this program; if not, write to the Free Software
19 Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.
20 */
21
22 /* includes */
23 #include "pmacct.h"
24 #include "addr.h"
25 #include "bgp.h"
26 #include "bgp_xcs.h"
27 #include "rpki/rpki.h"
28 #include "bgp_blackhole.h"
29 #include "thread_pool.h"
30 #if defined WITH_RABBITMQ
31 #include "amqp_common.h"
32 #endif
33 #ifdef WITH_KAFKA
34 #include "kafka_common.h"
35 #endif
36 #if defined WITH_ZMQ
37 #include "zmq_common.h"
38 #endif
39 #if defined WITH_AVRO
40 #include "plugin_cmn_avro.h"
41 #endif
42 #include "bgp_lg.h"
43
44 /* Global variables */
45 thread_pool_t *bgp_pool;
46 struct bgp_peer *peers;
47 struct bgp_peer_cache_bucket *peers_cache, *peers_port_cache;
48 char *std_comm_patterns[MAX_BGP_COMM_PATTERNS];
49 char *ext_comm_patterns[MAX_BGP_COMM_PATTERNS];
50 char *lrg_comm_patterns[MAX_BGP_COMM_PATTERNS];
51 char *std_comm_patterns_to_asn[MAX_BGP_COMM_PATTERNS];
52 char *lrg_comm_patterns_to_asn[MAX_BGP_COMM_PATTERNS];
53 struct bgp_comm_range peer_src_as_ifrange;
54 struct bgp_comm_range peer_src_as_asrange;
55 u_int32_t (*bgp_route_info_modulo)(struct bgp_peer *, path_id_t *, int);
56 struct bgp_rt_structs inter_domain_routing_dbs[FUNC_TYPE_MAX], *bgp_routing_db;
57 struct bgp_misc_structs inter_domain_misc_dbs[FUNC_TYPE_MAX], *bgp_misc_db;
58 struct bgp_xconnects bgp_xcs_map;
59
60 /* Functions */
bgp_daemon_wrapper()61 void bgp_daemon_wrapper()
62 {
63 /* initialize variables */
64 if (!config.bgp_daemon_port) config.bgp_daemon_port = BGP_TCP_PORT;
65
66 #if defined WITH_ZMQ
67 if (config.bgp_lg) bgp_lg_wrapper();
68 #else
69 if (config.bgp_lg) {
70 Log(LOG_ERR, "ERROR ( %s/core/lg ): 'bgp_daemon_lg' requires --enable-zmq. Exiting.\n", config.name);
71 exit_gracefully(1);
72 }
73 #endif
74
75 /* initialize threads pool */
76 bgp_pool = allocate_thread_pool(1);
77 assert(bgp_pool);
78 Log(LOG_DEBUG, "DEBUG ( %s/core/BGP ): %d thread(s) initialized\n", config.name, 1);
79 bgp_prepare_thread();
80
81 /* giving a kick to the BGP thread */
82 send_to_pool(bgp_pool, skinny_bgp_daemon, NULL);
83 }
84
skinny_bgp_daemon()85 void skinny_bgp_daemon()
86 {
87 skinny_bgp_daemon_online();
88 }
89
skinny_bgp_daemon_online()90 void skinny_bgp_daemon_online()
91 {
92 int ret, rc, peers_idx, allowed, yes=1;
93 #if (defined IPV6_BINDV6ONLY)
94 int no=0;
95 #endif
96 int peers_idx_rr = 0, peers_xconnect_idx_rr = 0, max_peers_idx = 0;
97 struct plugin_requests req;
98 struct host_addr addr;
99 struct bgp_peer *peer;
100 char bgp_reply_pkt[BGP_BUFFER_SIZE], *bgp_reply_pkt_ptr;
101 char bgp_peer_str[INET6_ADDRSTRLEN], bgp_xconnect_peer_str[BGP_XCONNECT_STRLEN];
102 struct sockaddr_storage server, client;
103 afi_t afi;
104 safi_t safi;
105 time_t now, dump_refresh_deadline = {0};
106 struct hosts_table allow;
107 struct bgp_md5_table bgp_md5;
108 struct timeval dump_refresh_timeout, *drt_ptr;
109 struct bgp_peer_batch bp_batch;
110 socklen_t slen, clen = sizeof(client);
111
112 sigset_t signal_set;
113
114 /* select() stuff */
115 fd_set read_descs, bkp_read_descs;
116 int fd, select_fd, bkp_select_fd, recalc_fds, select_num;
117 int recv_fd, send_fd;
118
119 /* initial cleanups */
120 reload_map_bgp_thread = FALSE;
121 reload_log_bgp_thread = FALSE;
122 memset(&server, 0, sizeof(server));
123 memset(&client, 0, sizeof(client));
124 memset(&allow, 0, sizeof(struct hosts_table));
125
126 bgp_routing_db = &inter_domain_routing_dbs[FUNC_TYPE_BGP];
127 memset(bgp_routing_db, 0, sizeof(struct bgp_rt_structs));
128
129 if (!config.bgp_table_attr_hash_buckets) config.bgp_table_attr_hash_buckets = HASHTABSIZE;
130 bgp_attr_init(config.bgp_table_attr_hash_buckets, bgp_routing_db);
131
132 /* socket creation for BGP server: IPv4 only */
133 if (!config.bgp_daemon_ip) {
134 struct sockaddr_in6 *sa6 = (struct sockaddr_in6 *)&server;
135
136 sa6->sin6_family = AF_INET6;
137 sa6->sin6_port = htons(config.bgp_daemon_port);
138 slen = sizeof(struct sockaddr_in6);
139 }
140 else {
141 trim_spaces(config.bgp_daemon_ip);
142 ret = str_to_addr(config.bgp_daemon_ip, &addr);
143 if (!ret) {
144 Log(LOG_ERR, "ERROR ( %s/%s ): 'bgp_daemon_ip' value is not a valid IPv4/IPv6 address. Terminating thread.\n", config.name, bgp_misc_db->log_str);
145 exit_gracefully(1);
146 }
147 slen = addr_to_sa((struct sockaddr *)&server, &addr, config.bgp_daemon_port);
148 }
149
150 if (!config.bgp_daemon_max_peers) config.bgp_daemon_max_peers = MAX_BGP_PEERS_DEFAULT;
151 Log(LOG_INFO, "INFO ( %s/%s ): maximum BGP peers allowed: %d\n", config.name, bgp_misc_db->log_str, config.bgp_daemon_max_peers);
152
153 peers = malloc(config.bgp_daemon_max_peers*sizeof(struct bgp_peer));
154 if (!peers) {
155 Log(LOG_ERR, "ERROR ( %s/%s ): Unable to malloc() BGP peers structure. Terminating thread.\n", config.name, bgp_misc_db->log_str);
156 exit_gracefully(1);
157 }
158 memset(peers, 0, config.bgp_daemon_max_peers*sizeof(struct bgp_peer));
159
160 if (config.bgp_lg) {
161 peers_cache = malloc(config.bgp_daemon_max_peers*sizeof(struct bgp_peer_cache_bucket));
162 if (!peers_cache) {
163 Log(LOG_ERR, "ERROR ( %s/%s ): Unable to malloc() BGP peers cache structure. Terminating thread.\n", config.name, bgp_misc_db->log_str);
164 exit_gracefully(1);
165 }
166
167 bgp_peer_cache_init(peers_cache, config.bgp_daemon_max_peers);
168
169 peers_port_cache = malloc(config.bgp_daemon_max_peers*sizeof(struct bgp_peer_cache_bucket));
170 if (!peers_port_cache) {
171 Log(LOG_ERR, "ERROR ( %s/%s ): Unable to malloc() BGP peers cache structure (2). Terminating thread.\n", config.name, bgp_misc_db->log_str);
172 exit_gracefully(1);
173 }
174
175 bgp_peer_cache_init(peers_port_cache, config.bgp_daemon_max_peers);
176 }
177 else {
178 peers_cache = NULL;
179 peers_port_cache = NULL;
180 }
181
182 if (config.bgp_xconnect_map) {
183 int bgp_xcs_allocated = FALSE;
184
185 if (config.acct_type != ACCT_PMBGP) {
186 Log(LOG_ERR, "ERROR ( %s/%s ): bgp_daemon_xconnect_map feature not supported for this daemon. Exiting ...\n", config.name, config.type);
187 exit_gracefully(1);
188 }
189
190 memset(&bgp_xcs_map, 0, sizeof(bgp_xcs_map));
191 memset(&req, 0, sizeof(req));
192
193 /* Setting up the pool */
194 bgp_xcs_map.pool = malloc((config.bgp_daemon_max_peers + 1) * sizeof(struct bgp_xconnect));
195 if (!bgp_xcs_map.pool) {
196 Log(LOG_ERR, "ERROR ( %s/%s ): unable to allocate BGP xconnect pool. Exiting ...\n", config.name, config.type);
197 exit_gracefully(1);
198 }
199 else memset(bgp_xcs_map.pool, 0, (config.bgp_daemon_max_peers + 1) * sizeof(struct bgp_xconnect));
200
201 req.key_value_table = (void *) &bgp_xcs_map;
202 load_id_file(MAP_BGP_XCS, config.bgp_xconnect_map, NULL, &req, &bgp_xcs_allocated);
203 }
204 else {
205 bgp_xcs_map.pool = 0;
206 bgp_xcs_map.num = 0;
207 }
208
209 if (config.rpki_roas_file || config.rpki_rtr_cache) {
210 rpki_daemon_wrapper();
211
212 /* Let's give the RPKI thread some advantage to create its structures */
213 sleep(DEFAULT_SLOTH_SLEEP_TIME);
214 }
215
216 if (config.bgp_blackhole_stdcomm_list) {
217 #if defined WITH_ZMQ
218 struct p_zmq_host *bgp_blackhole_zmq_host = NULL;
219 char inproc_blackhole_str[] = "inproc://bgp_blackhole";
220 (void)inproc_blackhole_str;
221
222 bgp_blackhole_daemon_wrapper();
223
224 /* Let's give the BGP blackhole thread some advantage to create its structures */
225 sleep(DEFAULT_SLOTH_SLEEP_TIME);
226
227 bgp_blackhole_zmq_host = bgp_blackhole_misc_db->bgp_blackhole_zmq_host;
228 p_zmq_push_connect_setup(bgp_blackhole_zmq_host);
229 #else
230 Log(LOG_ERR, "ERROR ( %s/%s ): 'bgp_blackhole_stdcomm_list' requires compiling with --enable-zmq. Exiting ..\n", config.name, bgp_misc_db->log_str);
231 exit_gracefully(1);
232 #endif
233 }
234
235 if (config.bgp_daemon_msglog_file || config.bgp_daemon_msglog_amqp_routing_key || config.bgp_daemon_msglog_kafka_topic) {
236 if (config.bgp_daemon_msglog_file) bgp_misc_db->msglog_backend_methods++;
237 if (config.bgp_daemon_msglog_amqp_routing_key) bgp_misc_db->msglog_backend_methods++;
238 if (config.bgp_daemon_msglog_kafka_topic) bgp_misc_db->msglog_backend_methods++;
239
240 if (bgp_misc_db->msglog_backend_methods > 1) {
241 Log(LOG_ERR, "ERROR ( %s/%s ): bgp_daemon_msglog_file, bgp_daemon_msglog_amqp_routing_key and bgp_daemon_msglog_kafka_topic are mutually exclusive. Terminating thread.\n", config.name, bgp_misc_db->log_str);
242 exit_gracefully(1);
243 }
244
245 bgp_misc_db->peers_log = malloc(config.bgp_daemon_max_peers*sizeof(struct bgp_peer_log));
246 if (!bgp_misc_db->peers_log) {
247 Log(LOG_ERR, "ERROR ( %s/%s ): Unable to malloc() BGP peers log structure. Terminating thread.\n", config.name, bgp_misc_db->log_str);
248 exit_gracefully(1);
249 }
250 memset(bgp_misc_db->peers_log, 0, config.bgp_daemon_max_peers*sizeof(struct bgp_peer_log));
251 bgp_peer_log_seq_init(&bgp_misc_db->log_seq);
252
253 if (config.bgp_daemon_msglog_amqp_routing_key) {
254 #ifdef WITH_RABBITMQ
255 bgp_daemon_msglog_init_amqp_host();
256 p_amqp_connect_to_publish(&bgp_daemon_msglog_amqp_host);
257 #else
258 Log(LOG_WARNING, "WARN ( %s/%s ): p_amqp_connect_to_publish() not possible due to missing --enable-rabbitmq\n", config.name, bgp_misc_db->log_str);
259 #endif
260 }
261
262 if (config.bgp_daemon_msglog_kafka_topic) {
263 #ifdef WITH_KAFKA
264 bgp_daemon_msglog_init_kafka_host();
265 #else
266 Log(LOG_WARNING, "WARN ( %s/%s ): p_kafka_connect_to_produce() not possible due to missing --enable-kafka\n", config.name, bgp_misc_db->log_str);
267 #endif
268 }
269 }
270
271 if (config.bgp_table_dump_file || config.bgp_table_dump_amqp_routing_key || config.bgp_table_dump_kafka_topic) {
272 if (config.bgp_table_dump_file) bgp_misc_db->dump_backend_methods++;
273 if (config.bgp_table_dump_amqp_routing_key) bgp_misc_db->dump_backend_methods++;
274 if (config.bgp_table_dump_kafka_topic) bgp_misc_db->dump_backend_methods++;
275
276 if (bgp_misc_db->dump_backend_methods > 1) {
277 Log(LOG_ERR, "ERROR ( %s/%s ): bgp_table_dump_file, bgp_table_dump_amqp_routing_key and bgp_table_dump_kafka_topic are mutually exclusive. Terminating thread.\n", config.name, bgp_misc_db->log_str);
278 exit_gracefully(1);
279 }
280 }
281
282 if ((bgp_misc_db->msglog_backend_methods || bgp_misc_db->dump_backend_methods) && config.bgp_xconnect_map) {
283 Log(LOG_ERR, "ERROR ( %s/%s ): bgp_daemon_xconnect_map is mutually exclusive with any BGP msglog and dump method. Terminating thread.\n", config.name, bgp_misc_db->log_str);
284 exit_gracefully(1);
285 }
286
287 if (!config.bgp_table_peer_buckets) config.bgp_table_peer_buckets = DEFAULT_BGP_INFO_HASH;
288 if (!config.bgp_table_per_peer_buckets) config.bgp_table_per_peer_buckets = DEFAULT_BGP_INFO_PER_PEER_HASH;
289
290 if (config.bgp_table_per_peer_hash == BGP_ASPATH_HASH_PATHID)
291 bgp_route_info_modulo = bgp_route_info_modulo_pathid;
292 else {
293 Log(LOG_ERR, "ERROR ( %s/%s ): Unknown 'bgp_table_per_peer_hash' value. Terminating thread.\n", config.name, bgp_misc_db->log_str);
294 exit_gracefully(1);
295 }
296
297 config.bgp_sock = socket(((struct sockaddr *)&server)->sa_family, SOCK_STREAM, 0);
298 if (config.bgp_sock < 0) {
299 /* retry with IPv4 */
300 if (!config.bgp_daemon_ip) {
301 struct sockaddr_in *sa4 = (struct sockaddr_in *)&server;
302
303 sa4->sin_family = AF_INET;
304 sa4->sin_addr.s_addr = htonl(0);
305 sa4->sin_port = htons(config.bgp_daemon_port);
306 slen = sizeof(struct sockaddr_in);
307
308 config.bgp_sock = socket(((struct sockaddr *)&server)->sa_family, SOCK_STREAM, 0);
309 }
310
311 if (config.bgp_sock < 0) {
312 Log(LOG_ERR, "ERROR ( %s/%s ): thread socket() failed. Terminating thread.\n", config.name, bgp_misc_db->log_str);
313 exit_gracefully(1);
314 }
315 }
316 if (config.bgp_daemon_ipprec) {
317 int opt = config.bgp_daemon_ipprec << 5;
318
319 rc = setsockopt(config.bgp_sock, IPPROTO_IP, IP_TOS, &opt, (socklen_t) sizeof(opt));
320 if (rc < 0) Log(LOG_ERR, "WARN ( %s/%s ): setsockopt() failed for IP_TOS (errno: %d).\n", config.name, bgp_misc_db->log_str, errno);
321 }
322
323 #if (defined LINUX) && (defined HAVE_SO_REUSEPORT)
324 rc = setsockopt(config.bgp_sock, SOL_SOCKET, SO_REUSEADDR|SO_REUSEPORT, (char *)&yes, (socklen_t) sizeof(yes));
325 if (rc < 0) Log(LOG_ERR, "WARN ( %s/%s ): setsockopt() failed for SO_REUSEADDR|SO_REUSEPORT (errno: %d).\n", config.name, bgp_misc_db->log_str, errno);
326 #else
327 rc = setsockopt(config.bgp_sock, SOL_SOCKET, SO_REUSEADDR, (char *)&yes, (socklen_t) sizeof(yes));
328 if (rc < 0) Log(LOG_ERR, "WARN ( %s/%s ): setsockopt() failed for SO_REUSEADDR (errno: %d).\n", config.name, bgp_misc_db->log_str, errno);
329 #endif
330
331 #if (defined IPV6_BINDV6ONLY)
332 rc = setsockopt(config.bgp_sock, IPPROTO_IPV6, IPV6_BINDV6ONLY, (char *) &no, (socklen_t) sizeof(no));
333 if (rc < 0) Log(LOG_ERR, "WARN ( %s/%s ): setsockopt() failed for IPV6_BINDV6ONLY (errno: %d).\n", config.name, bgp_misc_db->log_str, errno);
334 #endif
335
336 if (config.bgp_daemon_pipe_size) {
337 socklen_t l = sizeof(config.bgp_daemon_pipe_size);
338 int saved = 0, obtained = 0;
339
340 getsockopt(config.bgp_sock, SOL_SOCKET, SO_RCVBUF, &saved, &l);
341 Setsocksize(config.bgp_sock, SOL_SOCKET, SO_RCVBUF, &config.bgp_daemon_pipe_size, (socklen_t) sizeof(config.bgp_daemon_pipe_size));
342 getsockopt(config.bgp_sock, SOL_SOCKET, SO_RCVBUF, &obtained, &l);
343
344 Setsocksize(config.bgp_sock, SOL_SOCKET, SO_RCVBUF, &saved, l);
345 getsockopt(config.bgp_sock, SOL_SOCKET, SO_RCVBUF, &obtained, &l);
346 Log(LOG_INFO, "INFO ( %s/%s ): bgp_daemon_pipe_size: obtained=%d target=%d.\n", config.name, bgp_misc_db->log_str, obtained, config.bgp_daemon_pipe_size);
347 }
348
349 rc = bind(config.bgp_sock, (struct sockaddr *) &server, slen);
350 if (rc < 0) {
351 char null_ip_address[] = "0.0.0.0";
352 char *ip_address;
353
354 ip_address = config.bgp_daemon_ip ? config.bgp_daemon_ip : null_ip_address;
355 Log(LOG_ERR, "ERROR ( %s/%s ): bind() to ip=%s port=%d/tcp failed (errno: %d).\n", config.name, bgp_misc_db->log_str, ip_address, config.bgp_daemon_port, errno);
356 exit_gracefully(1);
357 }
358
359 rc = listen(config.bgp_sock, 1);
360 if (rc < 0) {
361 Log(LOG_ERR, "ERROR ( %s/%s ): listen() failed (errno: %d).\n", config.name, bgp_misc_db->log_str, errno);
362 exit_gracefully(1);
363 }
364
365 /* Preparing for syncronous I/O multiplexing */
366 select_fd = 0;
367 FD_ZERO(&bkp_read_descs);
368 FD_SET(config.bgp_sock, &bkp_read_descs);
369
370 {
371 char srv_string[INET6_ADDRSTRLEN];
372 struct host_addr srv_addr;
373 u_int16_t srv_port;
374
375 sa_to_addr((struct sockaddr *)&server, &srv_addr, &srv_port);
376 addr_to_str(srv_string, &srv_addr);
377 Log(LOG_INFO, "INFO ( %s/%s ): waiting for BGP data on %s:%u\n", config.name, bgp_misc_db->log_str, srv_string, srv_port);
378 }
379
380 /* Preparing ACL, if any */
381 if (config.bgp_daemon_allow_file) load_allow_file(config.bgp_daemon_allow_file, &allow);
382
383 /* Preparing MD5 keys, if any */
384 if (config.bgp_daemon_md5_file) {
385 bgp_md5_file_init(&bgp_md5);
386 bgp_md5_file_load(config.bgp_daemon_md5_file, &bgp_md5);
387 if (bgp_md5.num) bgp_md5_file_process(config.bgp_sock, &bgp_md5);
388 }
389
390 /* Let's initialize clean shared RIB */
391 for (afi = AFI_IP; afi < AFI_MAX; afi++) {
392 for (safi = SAFI_UNICAST; safi < SAFI_MAX; safi++) {
393 bgp_routing_db->rib[afi][safi] = bgp_table_init(afi, safi);
394 }
395 }
396
397 /* BGP peers batching checks */
398 if ((config.bgp_daemon_batch && !config.bgp_daemon_batch_interval) ||
399 (config.bgp_daemon_batch_interval && !config.bgp_daemon_batch)) {
400 Log(LOG_WARNING, "WARN ( %s/%s ): 'bgp_daemon_batch_interval' and 'bgp_daemon_batch' both set to zero.\n", config.name, bgp_misc_db->log_str);
401 config.bgp_daemon_batch = 0;
402 config.bgp_daemon_batch_interval = 0;
403 }
404 else bgp_batch_init(&bp_batch, config.bgp_daemon_batch, config.bgp_daemon_batch_interval);
405
406 if (bgp_misc_db->msglog_backend_methods) {
407 #ifdef WITH_JANSSON
408 if (!config.bgp_daemon_msglog_output) config.bgp_daemon_msglog_output = PRINT_OUTPUT_JSON;
409 #else
410 Log(LOG_WARNING, "WARN ( %s/%s ): bgp_daemon_msglog_output set to json but will produce no output (missing --enable-jansson).\n", config.name, bgp_misc_db->log_str);
411 #endif
412
413 #ifdef WITH_AVRO
414 if ((config.bgp_daemon_msglog_output == PRINT_OUTPUT_AVRO_BIN) ||
415 (config.bgp_daemon_msglog_output == PRINT_OUTPUT_AVRO_JSON)) {
416 assert(BGP_LOG_TYPE_MAX < MAX_AVRO_SCHEMA);
417
418 bgp_misc_db->msglog_avro_schema[0] = p_avro_schema_build_bgp(BGP_LOGDUMP_ET_LOG, "bgp_msglog");
419 bgp_misc_db->msglog_avro_schema[BGP_LOG_TYPE_LOGINIT] = p_avro_schema_build_bgp_log_initclose(BGP_LOGDUMP_ET_LOG, "bgp_loginit");
420 bgp_misc_db->msglog_avro_schema[BGP_LOG_TYPE_LOGCLOSE] = p_avro_schema_build_bgp_log_initclose(BGP_LOGDUMP_ET_LOG, "bgp_logclose");
421
422 if (config.bgp_daemon_msglog_avro_schema_file) {
423 char p_avro_schema_file[SRVBUFLEN];
424
425 if (strlen(config.bgp_daemon_msglog_avro_schema_file) > (SRVBUFLEN - SUPERSHORTBUFLEN)) {
426 Log(LOG_ERR, "ERROR ( %s/%s ): 'bgp_daemon_msglog_avro_schema_file' too long. Exiting.\n", config.name, bgp_misc_db->log_str);
427 exit_gracefully(1);
428 }
429
430 write_avro_schema_to_file_with_suffix(config.bgp_daemon_msglog_avro_schema_file, "-bgp_msglog",
431 p_avro_schema_file, bgp_misc_db->msglog_avro_schema[0]);
432
433 write_avro_schema_to_file_with_suffix(config.bgp_daemon_msglog_avro_schema_file, "-bgp_loginit",
434 p_avro_schema_file, bgp_misc_db->msglog_avro_schema[BGP_LOG_TYPE_LOGINIT]);
435
436 write_avro_schema_to_file_with_suffix(config.bgp_daemon_msglog_avro_schema_file, "-bgp_logclose",
437 p_avro_schema_file, bgp_misc_db->msglog_avro_schema[BGP_LOG_TYPE_LOGCLOSE]);
438 }
439
440 if (config.bgp_daemon_msglog_kafka_avro_schema_registry) {
441 #ifdef WITH_SERDES
442 if (strchr(config.bgp_daemon_msglog_kafka_topic, '$')) {
443 Log(LOG_ERR, "ERROR ( %s/%s ): dynamic 'bgp_daemon_msglog_kafka_topic' is not compatible with 'bgp_daemon_msglog_kafka_avro_schema_registry'. Exiting.\n",
444 config.name, bgp_misc_db->log_str);
445 exit_gracefully(1);
446 }
447
448 if (config.bgp_daemon_msglog_output == PRINT_OUTPUT_AVRO_JSON) {
449 Log(LOG_ERR, "ERROR ( %s/%s ): 'avro_json' output is not compatible with 'bgp_daemon_msglog_kafka_avro_schema_registry'. Exiting.\n",
450 config.name, bgp_misc_db->log_str);
451 exit_gracefully(1);
452 }
453
454 bgp_daemon_msglog_kafka_host.sd_schema[0] = compose_avro_schema_registry_name_2(config.bgp_daemon_msglog_kafka_topic, FALSE,
455 bgp_misc_db->msglog_avro_schema[0], "bgp", "msglog",
456 config.bgp_daemon_msglog_kafka_avro_schema_registry);
457
458 bgp_daemon_msglog_kafka_host.sd_schema[BGP_LOG_TYPE_LOGINIT] = compose_avro_schema_registry_name_2(config.bgp_daemon_msglog_kafka_topic, FALSE,
459 bgp_misc_db->msglog_avro_schema[BGP_LOG_TYPE_LOGINIT], "bgp", "loginit",
460 config.bgp_daemon_msglog_kafka_avro_schema_registry);
461
462 bgp_daemon_msglog_kafka_host.sd_schema[BGP_LOG_TYPE_LOGCLOSE] = compose_avro_schema_registry_name_2(config.bgp_daemon_msglog_kafka_topic, FALSE,
463 bgp_misc_db->msglog_avro_schema[BGP_LOG_TYPE_LOGCLOSE], "bgp", "logclose",
464 config.bgp_daemon_msglog_kafka_avro_schema_registry);
465 #endif
466 }
467 }
468 #endif
469 }
470
471 if (bgp_misc_db->dump_backend_methods) {
472 #ifdef WITH_JANSSON
473 if (!config.bgp_table_dump_output) config.bgp_table_dump_output = PRINT_OUTPUT_JSON;
474 #else
475 Log(LOG_WARNING, "WARN ( %s/%s ): bgp_table_dump_output set to json but will produce no output (missing --enable-jansson).\n", config.name, bgp_misc_db->log_str);
476 #endif
477
478 #ifdef WITH_AVRO
479 if ((config.bgp_table_dump_output == PRINT_OUTPUT_AVRO_BIN) ||
480 (config.bgp_table_dump_output == PRINT_OUTPUT_AVRO_JSON)) {
481 assert(BGP_LOG_TYPE_MAX < MAX_AVRO_SCHEMA);
482
483 bgp_misc_db->dump_avro_schema[0] = p_avro_schema_build_bgp(BGP_LOGDUMP_ET_DUMP, "bgp_dump");
484 bgp_misc_db->dump_avro_schema[BGP_LOG_TYPE_DUMPINIT] = p_avro_schema_build_bgp_dump_init(BGP_LOGDUMP_ET_DUMP, "bgp_dumpinit");
485 bgp_misc_db->dump_avro_schema[BGP_LOG_TYPE_DUMPCLOSE] = p_avro_schema_build_bgp_dump_close(BGP_LOGDUMP_ET_DUMP, "bgp_dumpclose");
486
487 if (config.bgp_table_dump_avro_schema_file) {
488 char p_avro_schema_file[SRVBUFLEN];
489
490 if (strlen(config.bgp_table_dump_avro_schema_file) > (SRVBUFLEN - SUPERSHORTBUFLEN)) {
491 Log(LOG_ERR, "ERROR ( %s/%s ): 'bgp_table_dump_avro_schema_file' too long. Exiting.\n", config.name, bgp_misc_db->log_str);
492 exit_gracefully(1);
493 }
494
495 write_avro_schema_to_file_with_suffix(config.bgp_table_dump_avro_schema_file, "-bgp_dump",
496 p_avro_schema_file, bgp_misc_db->dump_avro_schema[0]);
497
498 write_avro_schema_to_file_with_suffix(config.bgp_table_dump_avro_schema_file, "-bgp_dumpinit",
499 p_avro_schema_file, bgp_misc_db->dump_avro_schema[BGP_LOG_TYPE_DUMPINIT]);
500
501 write_avro_schema_to_file_with_suffix(config.bgp_table_dump_avro_schema_file, "-bgp_dumpclose",
502 p_avro_schema_file, bgp_misc_db->dump_avro_schema[BGP_LOG_TYPE_DUMPCLOSE]);
503 }
504 }
505 #endif
506 }
507
508 if (bgp_misc_db->dump_backend_methods) {
509 char dump_roundoff[] = "m";
510 time_t tmp_time;
511
512 if (config.bgp_table_dump_refresh_time) {
513 gettimeofday(&bgp_misc_db->log_tstamp, NULL);
514 dump_refresh_deadline = bgp_misc_db->log_tstamp.tv_sec;
515 tmp_time = roundoff_time(dump_refresh_deadline, dump_roundoff);
516 while ((tmp_time+config.bgp_table_dump_refresh_time) < dump_refresh_deadline) {
517 tmp_time += config.bgp_table_dump_refresh_time;
518 }
519 dump_refresh_deadline = tmp_time;
520 dump_refresh_deadline += config.bgp_table_dump_refresh_time; /* it's a deadline not a basetime */
521 }
522 else {
523 config.bgp_table_dump_file = NULL;
524 bgp_misc_db->dump_backend_methods = FALSE;
525 Log(LOG_WARNING, "WARN ( %s/%s ): Invalid 'bgp_table_dump_refresh_time'.\n", config.name, bgp_misc_db->log_str);
526 }
527
528 if (config.bgp_table_dump_amqp_routing_key) bgp_table_dump_init_amqp_host();
529 if (config.bgp_table_dump_kafka_topic) bgp_table_dump_init_kafka_host();
530 }
531
532 #ifdef WITH_AVRO
533 bgp_misc_db->avro_buf = malloc(LARGEBUFLEN);
534 if (!bgp_misc_db->avro_buf) {
535 Log(LOG_ERR, "ERROR ( %s/%s ): malloc() failed (avro_buf). Exiting ..\n", config.name, bgp_misc_db->log_str);
536 exit_gracefully(1);
537 }
538 else memset(bgp_misc_db->avro_buf, 0, LARGEBUFLEN);
539 #endif
540
541 if (config.bgp_daemon_msglog_kafka_avro_schema_registry || config.bgp_table_dump_kafka_avro_schema_registry) {
542 #ifndef WITH_SERDES
543 Log(LOG_ERR, "ERROR ( %s/%s ): 'bgp_*_kafka_avro_schema_registry' require --enable-serdes. Exiting.\n", config.name, bgp_misc_db->log_str);
544 exit_gracefully(1);
545 #endif
546 }
547
548 select_fd = bkp_select_fd = (config.bgp_sock + 1);
549 recalc_fds = FALSE;
550
551 bgp_link_misc_structs(bgp_misc_db);
552
553 sigemptyset(&signal_set);
554 sigaddset(&signal_set, SIGCHLD);
555 sigaddset(&signal_set, SIGHUP);
556 sigaddset(&signal_set, SIGUSR1);
557 sigaddset(&signal_set, SIGUSR2);
558 sigaddset(&signal_set, SIGTERM);
559 if (config.daemon) {
560 sigaddset(&signal_set, SIGINT);
561 }
562
563 for (;;) {
564 select_again:
565
566 if (!bgp_misc_db->is_thread) {
567 sigprocmask(SIG_UNBLOCK, &signal_set, NULL);
568 sigprocmask(SIG_BLOCK, &signal_set, NULL);
569 }
570
571 if (recalc_fds) {
572 select_fd = config.bgp_sock;
573 max_peers_idx = -1; /* .. since valid indexes include 0 */
574
575 for (peers_idx = 0; peers_idx < config.bgp_daemon_max_peers; peers_idx++) {
576 if (select_fd < peers[peers_idx].fd) select_fd = peers[peers_idx].fd;
577
578 if (config.bgp_xconnect_map) {
579 if (select_fd < peers[peers_idx].xconnect_fd)
580 select_fd = peers[peers_idx].xconnect_fd;
581 }
582
583 if (peers[peers_idx].fd) max_peers_idx = peers_idx;
584 }
585 select_fd++;
586 max_peers_idx++;
587
588 bkp_select_fd = select_fd;
589 recalc_fds = FALSE;
590 }
591 else select_fd = bkp_select_fd;
592
593 memcpy(&read_descs, &bkp_read_descs, sizeof(bkp_read_descs));
594
595 if (bgp_misc_db->dump_backend_methods) {
596 int delta;
597
598 calc_refresh_timeout_sec(dump_refresh_deadline, bgp_misc_db->log_tstamp.tv_sec, &delta);
599 dump_refresh_timeout.tv_sec = delta;
600 dump_refresh_timeout.tv_usec = 0;
601 drt_ptr = &dump_refresh_timeout;
602 }
603 else drt_ptr = NULL;
604
605 select_num = select(select_fd, &read_descs, NULL, NULL, drt_ptr);
606 if (select_num < 0) goto select_again;
607 now = time(NULL);
608
609 /* signals handling */
610 if (reload_map_bgp_thread) {
611 if (config.bgp_daemon_allow_file) load_allow_file(config.bgp_daemon_allow_file, &allow);
612
613 if (config.bgp_daemon_md5_file) {
614 bgp_md5_file_unload(&bgp_md5);
615 if (bgp_md5.num) bgp_md5_file_process(config.bgp_sock, &bgp_md5); // process unload
616
617 bgp_md5_file_load(config.bgp_daemon_md5_file, &bgp_md5);
618 if (bgp_md5.num) bgp_md5_file_process(config.bgp_sock, &bgp_md5); // process load
619 }
620
621 if (config.bgp_xconnect_map) {
622 int bgp_xcs_allocated = FALSE;
623
624 bgp_xcs_map_destroy();
625
626 memset(&req, 0, sizeof(req));
627 req.key_value_table = (void *) &bgp_xcs_map;
628
629 load_id_file(MAP_BGP_XCS, config.bgp_xconnect_map, NULL, &req, &bgp_xcs_allocated);
630 }
631
632 reload_map_bgp_thread = FALSE;
633 }
634
635 if (reload_log_bgp_thread) {
636 for (peers_idx = 0; peers_idx < config.bgp_daemon_max_peers; peers_idx++) {
637 if (bgp_misc_db->peers_log[peers_idx].fd) {
638 fclose(bgp_misc_db->peers_log[peers_idx].fd);
639 bgp_misc_db->peers_log[peers_idx].fd = open_output_file(bgp_misc_db->peers_log[peers_idx].filename, "a", FALSE);
640 setlinebuf(bgp_misc_db->peers_log[peers_idx].fd);
641 }
642 else break;
643 }
644
645 reload_log_bgp_thread = FALSE;
646 }
647
648 if (reload_log && !bgp_misc_db->is_thread) {
649 reload_logs();
650 reload_log = FALSE;
651 }
652
653 if (bgp_misc_db->msglog_backend_methods || bgp_misc_db->dump_backend_methods) {
654 gettimeofday(&bgp_misc_db->log_tstamp, NULL);
655 compose_timestamp(bgp_misc_db->log_tstamp_str, SRVBUFLEN, &bgp_misc_db->log_tstamp, TRUE,
656 config.timestamps_since_epoch, config.timestamps_rfc3339, config.timestamps_utc);
657
658 /* if dumping, let's reset log sequence at the next dump event */
659 if (!bgp_misc_db->dump_backend_methods) {
660 if (bgp_peer_log_seq_has_ro_bit(&bgp_misc_db->log_seq))
661 bgp_peer_log_seq_init(&bgp_misc_db->log_seq);
662 }
663
664 if (bgp_misc_db->dump_backend_methods) {
665 while (bgp_misc_db->log_tstamp.tv_sec > dump_refresh_deadline) {
666 bgp_misc_db->dump.tstamp.tv_sec = dump_refresh_deadline;
667 bgp_misc_db->dump.tstamp.tv_usec = 0;
668 compose_timestamp(bgp_misc_db->dump.tstamp_str, SRVBUFLEN, &bgp_misc_db->dump.tstamp, FALSE,
669 config.timestamps_since_epoch, config.timestamps_rfc3339, config.timestamps_utc);
670 bgp_misc_db->dump.period = config.bgp_table_dump_refresh_time;
671
672 if (bgp_peer_log_seq_has_ro_bit(&bgp_misc_db->log_seq))
673 bgp_peer_log_seq_init(&bgp_misc_db->log_seq);
674
675 bgp_handle_dump_event();
676
677 dump_refresh_deadline += config.bgp_table_dump_refresh_time;
678 }
679 }
680
681 #ifdef WITH_RABBITMQ
682 if (config.bgp_daemon_msglog_amqp_routing_key) {
683 time_t last_fail = P_broker_timers_get_last_fail(&bgp_daemon_msglog_amqp_host.btimers);
684
685 if (last_fail && ((last_fail + P_broker_timers_get_retry_interval(&bgp_daemon_msglog_amqp_host.btimers)) <= bgp_misc_db->log_tstamp.tv_sec)) {
686 bgp_daemon_msglog_init_amqp_host();
687 p_amqp_connect_to_publish(&bgp_daemon_msglog_amqp_host);
688 }
689 }
690 #endif
691
692 #ifdef WITH_KAFKA
693 if (config.bgp_daemon_msglog_kafka_topic) {
694 time_t last_fail = P_broker_timers_get_last_fail(&bgp_daemon_msglog_kafka_host.btimers);
695
696 if (last_fail && ((last_fail + P_broker_timers_get_retry_interval(&bgp_daemon_msglog_kafka_host.btimers)) <= bgp_misc_db->log_tstamp.tv_sec))
697 bgp_daemon_msglog_init_kafka_host();
698 }
699 #endif
700 }
701
702 /*
703 If select_num == 0 then we got out of select() due to a timeout rather
704 than because we had a message from a peer to handle. By now we did all
705 routine checks and can happily return to select() again.
706 */
707 if (!select_num) goto select_again;
708
709 /* New connection is coming in */
710 if (FD_ISSET(config.bgp_sock, &read_descs)) {
711 int peers_check_idx, peers_num;
712
713 fd = accept(config.bgp_sock, (struct sockaddr *) &client, &clen);
714 if (fd == ERR) goto read_data;
715
716 ipv4_mapped_to_ipv4(&client);
717
718 /* If an ACL is defined, here we check against and enforce it */
719 if (allow.num) allowed = check_allow(&allow, (struct sockaddr *)&client);
720 else allowed = TRUE;
721
722 if (!allowed) {
723 char disallowed_str[INET6_ADDRSTRLEN];
724
725 sa_to_str(disallowed_str, sizeof(disallowed_str), (struct sockaddr *) &client);
726 Log(LOG_INFO, "INFO ( %s/%s ): [%s] peer '%s' not allowed. close()\n", config.name, bgp_misc_db->log_str, config.bgp_daemon_allow_file, disallowed_str);
727
728 close(fd);
729 goto read_data;
730 }
731
732 for (peer = NULL, peers_idx = 0; peers_idx < config.bgp_daemon_max_peers; peers_idx++) {
733 if (!peers[peers_idx].fd) {
734 /*
735 Admitted if:
736 * batching feature is disabled or
737 * we have room in the current batch or
738 * we can start a new batch
739 */
740 if (bgp_batch_is_admitted(&bp_batch, now)) {
741 peer = &peers[peers_idx];
742 if (bgp_peer_init(peer, FUNC_TYPE_BGP)) peer = NULL;
743 else recalc_fds = TRUE;
744
745 log_notification_unset(&log_notifications.bgp_peers_throttling);
746
747 if (bgp_batch_is_enabled(&bp_batch) && peer) {
748 if (bgp_batch_is_expired(&bp_batch, now)) bgp_batch_reset(&bp_batch, now);
749 if (bgp_batch_is_not_empty(&bp_batch)) bgp_batch_decrease_counter(&bp_batch);
750 }
751
752 break;
753 }
754 else { /* throttle */
755 /* We briefly accept the new connection to be able to drop it */
756 if (!log_notification_isset(&log_notifications.bgp_peers_throttling, now)) {
757 Log(LOG_INFO, "INFO ( %s/%s ): throttling at BGP peer #%u\n", config.name, bgp_misc_db->log_str, peers_idx);
758 log_notification_set(&log_notifications.bgp_peers_throttling, now, FALSE);
759 }
760
761 close(fd);
762 goto read_data;
763 }
764 }
765 /* XXX: replenish sessions with expired keepalives */
766 }
767
768 if (!peer) {
769 /* We briefly accept the new connection to be able to drop it */
770 Log(LOG_ERR, "ERROR ( %s/%s ): Insufficient number of BGP peers has been configured by 'bgp_daemon_max_peers' (%d).\n",
771 config.name, bgp_misc_db->log_str, config.bgp_daemon_max_peers);
772
773 close(fd);
774 goto read_data;
775 }
776
777 peer->fd = fd;
778 peer->idx = peers_idx;
779 FD_SET(peer->fd, &bkp_read_descs);
780 sa_to_addr((struct sockaddr *) &client, &peer->addr, &peer->tcp_port);
781
782 if (peers_cache && peers_port_cache) {
783 u_int32_t bucket;
784
785 bucket = addr_hash(&peer->addr, config.bgp_daemon_max_peers);
786 bgp_peer_cache_insert(peers_cache, bucket, peer);
787
788 bucket = addr_port_hash(&peer->addr, peer->tcp_port, config.bgp_daemon_max_peers);
789 bgp_peer_cache_insert(peers_port_cache, bucket, peer);
790 }
791
792 if (bgp_misc_db->msglog_backend_methods)
793 bgp_peer_log_init(peer, config.bgp_daemon_msglog_output, FUNC_TYPE_BGP);
794
795 /* Check: more than one TCP connection from a peer (IP address) */
796 for (peers_check_idx = 0, peers_num = 0; peers_check_idx < config.bgp_daemon_max_peers; peers_check_idx++) {
797 if (peers_idx != peers_check_idx && !memcmp(&peers[peers_check_idx].addr, &peer->addr, sizeof(peers[peers_check_idx].addr))) {
798 int same_peer = FALSE;
799
800 bgp_peer_print(&peers[peers_check_idx], bgp_peer_str, INET6_ADDRSTRLEN);
801
802 /* Check: if not x-connecting, let's see if we have to compare TCP ports
803 (ie. NAT traversal / non-transparent tee scenarios); then evaluate if
804 the new session is valid or has to be rejected */
805 if (!config.bgp_xconnect_map) {
806 if (config.tmp_bgp_lookup_compare_ports) {
807 if (peers[peers_check_idx].tcp_port == peer->tcp_port) same_peer = TRUE;
808 else {
809 same_peer = FALSE;
810 if (peers[peers_check_idx].fd) peers_num++;
811 }
812 }
813 else same_peer = TRUE;
814
815 if (same_peer) {
816 if ((now - peers[peers_check_idx].last_keepalive) > peers[peers_check_idx].ht) {
817 bgp_peer_print(&peers[peers_check_idx], bgp_peer_str, INET6_ADDRSTRLEN);
818 Log(LOG_INFO, "INFO ( %s/%s ): [%s] Replenishing stale connection by peer.\n",
819 config.name, bgp_misc_db->log_str, bgp_peer_str);
820 FD_CLR(peers[peers_check_idx].fd, &bkp_read_descs);
821 bgp_peer_close(&peers[peers_check_idx], FUNC_TYPE_BGP, FALSE, FALSE, FALSE, FALSE, NULL);
822 }
823 else {
824 Log(LOG_WARNING, "WARN ( %s/%s ): [%s] Refusing new connection from existing peer (residual holdtime: %ld).\n",
825 config.name, bgp_misc_db->log_str, bgp_peer_str,
826 (peers[peers_check_idx].ht - ((long)now - peers[peers_check_idx].last_keepalive)));
827 FD_CLR(peer->fd, &bkp_read_descs);
828 bgp_peer_close(peer, FUNC_TYPE_BGP, FALSE, FALSE, FALSE, FALSE, NULL);
829 goto read_data;
830 }
831 }
832 }
833 /* XXX: if x-connecting we don't support NAT traversal / non-transparent tee
834 scenarios (yet?) */
835 else {
836 Log(LOG_WARNING, "WARN ( %s/%s ): [%s] Refusing new incoming connection for existing BGP xconnect.\n",
837 config.name, bgp_misc_db->log_str, bgp_peer_str);
838 FD_CLR(peer->fd, &bkp_read_descs);
839 bgp_peer_close(peer, FUNC_TYPE_BGP, FALSE, FALSE, FALSE, FALSE, NULL);
840 goto read_data;
841 }
842 }
843 else if (peers[peers_check_idx].fd) peers_num++;
844 }
845
846 if (config.bgp_xconnect_map) {
847 bgp_peer_xconnect_init(peer, FUNC_TYPE_BGP);
848
849 if (peer->xconnect_fd) FD_SET(peer->xconnect_fd, &bkp_read_descs);
850 else {
851 FD_CLR(peer->fd, &bkp_read_descs);
852 bgp_peer_close(peer, FUNC_TYPE_BGP, FALSE, FALSE, FALSE, FALSE, NULL);
853 goto read_data;
854 }
855 }
856
857 if (!config.bgp_xconnect_map) {
858 bgp_peer_print(peer, bgp_peer_str, INET6_ADDRSTRLEN);
859 Log(LOG_INFO, "INFO ( %s/%s ): [%s] BGP peers usage: %u/%u\n", config.name, bgp_misc_db->log_str,
860 bgp_peer_str, peers_num, config.bgp_daemon_max_peers);
861 }
862 else {
863 bgp_peer_xconnect_print(peer, bgp_xconnect_peer_str, BGP_XCONNECT_STRLEN);
864 Log(LOG_INFO, "INFO ( %s/%s ): [%s] BGP xconnects usage: %u/%u\n", config.name, bgp_misc_db->log_str,
865 bgp_xconnect_peer_str, peers_num, config.bgp_daemon_max_peers);
866 }
867
868 if (config.bgp_daemon_neighbors_file) write_neighbors_file(config.bgp_daemon_neighbors_file, FUNC_TYPE_BGP);
869 }
870
871 read_data:
872
873 /*
874 We have something coming in: let's lookup which peer is that.
875 FvD: To avoid starvation of the "later established" peers, we
876 offset the start of the search in a round-robin style.
877 */
878 for (peer = NULL, peers_idx = 0; peers_idx < max_peers_idx; peers_idx++) {
879 int loc_idx = (peers_idx + peers_idx_rr) % max_peers_idx;
880 recv_fd = 0; send_fd = 0;
881
882 if (peers[loc_idx].fd && FD_ISSET(peers[loc_idx].fd, &read_descs)) {
883 peer = &peers[loc_idx];
884 recv_fd = peer->fd;
885 if (config.bgp_xconnect_map) send_fd = peer->xconnect_fd;
886 peers_idx_rr = (peers_idx_rr + 1) % max_peers_idx;
887 break;
888 }
889
890 // XXX: verify round-robin fairness holding up
891 if (config.bgp_xconnect_map) {
892 loc_idx = (peers_idx + peers_xconnect_idx_rr) % max_peers_idx;
893
894 if (peers[loc_idx].xconnect_fd && FD_ISSET(peers[loc_idx].xconnect_fd, &read_descs)) {
895 peer = &peers[loc_idx];
896 recv_fd = peer->xconnect_fd;
897 send_fd = peer->fd;
898 peers_xconnect_idx_rr = (peers_xconnect_idx_rr + 1) % max_peers_idx;
899 break;
900 }
901 }
902 }
903
904 if (!peer) goto select_again;
905
906 if (!peer->buf.exp_len) {
907 ret = recv(recv_fd, &peer->buf.base[peer->buf.cur_len], (BGP_HEADER_SIZE - peer->buf.cur_len), 0);
908
909 if (ret > 0) {
910 peer->buf.cur_len += ret;
911
912 if (peer->buf.cur_len == BGP_HEADER_SIZE) {
913 struct bgp_header *bhdr = (struct bgp_header *) peer->buf.base;
914
915 if (bgp_marker_check(bhdr, BGP_MARKER_SIZE) == ERR) {
916 bgp_peer_print(peer, bgp_peer_str, INET6_ADDRSTRLEN);
917 Log(LOG_INFO, "INFO ( %s/%s ): [%s] Received malformed BGP packet (marker check failed).\n",
918 config.name, bgp_misc_db->log_str, bgp_peer_str);
919
920 peer->msglen = 0;
921 peer->buf.cur_len = 0;
922 peer->buf.exp_len = 0;
923 ret = ERR;
924 }
925 else {
926 peer->buf.exp_len = ntohs(bhdr->bgpo_len);
927
928 /* commit */
929 if (peer->buf.cur_len == peer->buf.exp_len) {
930 peer->msglen = peer->buf.exp_len;
931 peer->buf.cur_len = 0;
932 peer->buf.exp_len = 0;
933 }
934 }
935 }
936 else {
937 goto select_again;
938 }
939 }
940 }
941
942 if (peer->buf.exp_len) {
943 ret = recv(recv_fd, &peer->buf.base[peer->buf.cur_len], (peer->buf.exp_len - peer->buf.cur_len), 0);
944
945 if (ret > 0) {
946 peer->buf.cur_len += ret;
947
948 /* commit */
949 if (peer->buf.cur_len == peer->buf.exp_len) {
950 peer->msglen = peer->buf.exp_len;
951 peer->buf.cur_len = 0;
952 peer->buf.exp_len = 0;
953 }
954 else {
955 goto select_again;
956 }
957 }
958 }
959
960 if (ret <= 0) {
961 if (!config.bgp_xconnect_map) {
962 bgp_peer_print(peer, bgp_peer_str, INET6_ADDRSTRLEN);
963 Log(LOG_INFO, "INFO ( %s/%s ): [%s] BGP connection reset by peer (%d).\n", config.name, bgp_misc_db->log_str, bgp_peer_str, errno);
964 FD_CLR(peer->fd, &bkp_read_descs);
965 }
966 else {
967 bgp_peer_xconnect_print(peer, bgp_xconnect_peer_str, BGP_XCONNECT_STRLEN);
968
969 if (recv_fd == peer->fd)
970 Log(LOG_INFO, "INFO ( %s/%s ): [%s] recv(): BGP xconnect reset by src peer (%d).\n",
971 config.name, bgp_misc_db->log_str, bgp_xconnect_peer_str, errno);
972 else if (recv_fd == peer->xconnect_fd)
973 Log(LOG_INFO, "INFO ( %s/%s ): [%s] recv(): BGP xconnect reset by dst peer (%d).\n",
974 config.name, bgp_misc_db->log_str, bgp_xconnect_peer_str, errno);
975
976 FD_CLR(peer->fd, &bkp_read_descs);
977 FD_CLR(peer->xconnect_fd, &bkp_read_descs);
978 }
979
980 bgp_peer_close(peer, FUNC_TYPE_BGP, FALSE, FALSE, FALSE, FALSE, NULL);
981
982 recalc_fds = TRUE;
983 goto select_again;
984 }
985 else {
986 if (!config.bgp_xconnect_map) {
987 /* Appears a valid peer with a valid BGP message: before
988 continuing let's see if it's time to send a KEEPALIVE
989 back */
990 if (peer->status == Established && ((now - peer->last_keepalive) > (peer->ht / 2))) {
991 bgp_reply_pkt_ptr = bgp_reply_pkt;
992 bgp_reply_pkt_ptr += bgp_write_keepalive_msg(bgp_reply_pkt_ptr);
993 ret = send(recv_fd, bgp_reply_pkt, bgp_reply_pkt_ptr - bgp_reply_pkt, 0);
994 peer->last_keepalive = now;
995 }
996
997 ret = bgp_parse_msg(peer, now, TRUE);
998 if (ret) {
999 FD_CLR(recv_fd, &bkp_read_descs);
1000
1001 if (ret < 0) bgp_peer_close(peer, FUNC_TYPE_BGP, FALSE, FALSE, FALSE, FALSE, NULL);
1002 else bgp_peer_close(peer, FUNC_TYPE_BGP, FALSE, TRUE, ret, BGP_NOTIFY_SUBCODE_UNSPECIFIC, NULL);
1003
1004 recalc_fds = TRUE;
1005 goto select_again;
1006 }
1007 }
1008 else {
1009 ret = send(send_fd, peer->buf.base, peer->msglen, 0);
1010 if (ret <= 0) {
1011 bgp_peer_xconnect_print(peer, bgp_xconnect_peer_str, BGP_XCONNECT_STRLEN);
1012
1013 if (send_fd == peer->fd)
1014 Log(LOG_INFO, "INFO ( %s/%s ): [%s] send(): BGP xconnect reset by src peer (%d).\n",
1015 config.name, bgp_misc_db->log_str, bgp_xconnect_peer_str, errno);
1016 else if (send_fd == peer->xconnect_fd)
1017 Log(LOG_INFO, "INFO ( %s/%s ): [%s] send(): BGP xconnect reset by dst peer (%d).\n",
1018 config.name, bgp_misc_db->log_str, bgp_xconnect_peer_str, errno);
1019
1020 FD_CLR(peer->fd, &bkp_read_descs);
1021 FD_CLR(peer->xconnect_fd, &bkp_read_descs);
1022
1023 bgp_peer_close(peer, FUNC_TYPE_BGP, FALSE, FALSE, FALSE, FALSE, NULL);
1024
1025 recalc_fds = TRUE;
1026 goto select_again;
1027 }
1028 }
1029 }
1030 }
1031 }
1032
bgp_prepare_thread()1033 void bgp_prepare_thread()
1034 {
1035 bgp_misc_db = &inter_domain_misc_dbs[FUNC_TYPE_BGP];
1036 memset(bgp_misc_db, 0, sizeof(struct bgp_misc_structs));
1037
1038 bgp_misc_db->is_thread = TRUE;
1039 if (config.bgp_lg) bgp_misc_db->has_lglass = TRUE;
1040
1041 if (config.rpki_roas_file || config.rpki_rtr_cache) {
1042 bgp_misc_db->bnv = malloc(sizeof(struct bgp_node_vector));
1043 memset(bgp_misc_db->bnv, 0, sizeof(struct bgp_node_vector));
1044 }
1045
1046 if (config.bgp_blackhole_stdcomm_list) bgp_misc_db->has_blackhole = TRUE;
1047
1048 bgp_misc_db->log_str = malloc(strlen("core/BGP") + 1);
1049 strcpy(bgp_misc_db->log_str, "core/BGP");
1050 }
1051
bgp_prepare_daemon()1052 void bgp_prepare_daemon()
1053 {
1054 bgp_misc_db = &inter_domain_misc_dbs[FUNC_TYPE_BGP];
1055 memset(bgp_misc_db, 0, sizeof(struct bgp_misc_structs));
1056
1057 bgp_misc_db->is_thread = FALSE;
1058 if (config.bgp_lg) bgp_misc_db->has_lglass = TRUE;
1059
1060 if (config.rpki_roas_file || config.rpki_rtr_cache) {
1061 bgp_misc_db->bnv = malloc(sizeof(struct bgp_node_vector));
1062 memset(bgp_misc_db->bnv, 0, sizeof(struct bgp_node_vector));
1063 }
1064
1065 if (config.bgp_blackhole_stdcomm_list) bgp_misc_db->has_blackhole = TRUE;
1066
1067 bgp_misc_db->log_str = malloc(strlen("core") + 1);
1068 strcpy(bgp_misc_db->log_str, "core");
1069 }
1070