xref: /openbsd/usr.sbin/bgpd/rde_peer.c (revision e3db1f63)
1 /*	$OpenBSD: rde_peer.c,v 1.46 2025/01/27 15:22:11 claudio Exp $ */
2 
3 /*
4  * Copyright (c) 2019 Claudio Jeker <claudio@openbsd.org>
5  *
6  * Permission to use, copy, modify, and distribute this software for any
7  * purpose with or without fee is hereby granted, provided that the above
8  * copyright notice and this permission notice appear in all copies.
9  *
10  * THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
11  * WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
12  * MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
13  * ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
14  * WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
15  * ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
16  * OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
17  */
18 #include <sys/types.h>
19 #include <sys/queue.h>
20 
21 #include <stdlib.h>
22 #include <stdio.h>
23 #include <string.h>
24 #include <unistd.h>
25 
26 #include "bgpd.h"
27 #include "rde.h"
28 
29 struct peer_tree	 peertable = RB_INITIALIZER(&peertable);
30 struct peer_tree	 zombietable = RB_INITIALIZER(&zombietable);
31 struct rde_peer		*peerself;
32 static long		 imsg_pending;
33 
34 CTASSERT(sizeof(peerself->recv_eor) * 8 >= AID_MAX);
35 CTASSERT(sizeof(peerself->sent_eor) * 8 >= AID_MAX);
36 
37 struct iq {
38 	SIMPLEQ_ENTRY(iq)	entry;
39 	struct imsg		imsg;
40 };
41 
42 int
peer_has_as4byte(struct rde_peer * peer)43 peer_has_as4byte(struct rde_peer *peer)
44 {
45 	return peer->capa.as4byte;
46 }
47 
48 /*
49  * Check if ADD_PATH is enabled for aid and mode (rx / tx). If aid is
50  * AID_UNSPEC then the function returns true if any aid has mode enabled.
51  */
52 int
peer_has_add_path(struct rde_peer * peer,uint8_t aid,int mode)53 peer_has_add_path(struct rde_peer *peer, uint8_t aid, int mode)
54 {
55 	if (aid >= AID_MAX)
56 		return 0;
57 	return peer->capa.add_path[aid] & mode;
58 }
59 
60 int
peer_has_ext_msg(struct rde_peer * peer)61 peer_has_ext_msg(struct rde_peer *peer)
62 {
63 	return peer->capa.ext_msg;
64 }
65 
66 int
peer_has_ext_nexthop(struct rde_peer * peer,uint8_t aid)67 peer_has_ext_nexthop(struct rde_peer *peer, uint8_t aid)
68 {
69 	if (aid >= AID_MAX)
70 		return 0;
71 	return peer->capa.ext_nh[aid];
72 }
73 
74 int
peer_permit_as_set(struct rde_peer * peer)75 peer_permit_as_set(struct rde_peer *peer)
76 {
77 	return peer->flags & PEERFLAG_PERMIT_AS_SET;
78 }
79 
80 void
peer_init(struct filter_head * rules)81 peer_init(struct filter_head *rules)
82 {
83 	struct peer_config pc;
84 
85 	memset(&pc, 0, sizeof(pc));
86 	snprintf(pc.descr, sizeof(pc.descr), "LOCAL");
87 	pc.id = PEER_ID_SELF;
88 
89 	peerself = peer_add(PEER_ID_SELF, &pc, rules);
90 	peerself->state = PEER_UP;
91 }
92 
93 void
peer_shutdown(void)94 peer_shutdown(void)
95 {
96 	struct rde_peer *peer, *np;
97 
98 	RB_FOREACH_SAFE(peer, peer_tree, &peertable, np)
99 		peer_delete(peer);
100 
101 	while (!RB_EMPTY(&zombietable))
102 		peer_reaper(NULL);
103 
104 	if (!RB_EMPTY(&peertable))
105 		log_warnx("%s: free non-free table", __func__);
106 }
107 
108 /*
109  * Traverse all peers calling callback for each peer.
110  */
111 void
peer_foreach(void (* callback)(struct rde_peer *,void *),void * arg)112 peer_foreach(void (*callback)(struct rde_peer *, void *), void *arg)
113 {
114 	struct rde_peer *peer, *np;
115 
116 	RB_FOREACH_SAFE(peer, peer_tree, &peertable, np)
117 		callback(peer, arg);
118 }
119 
120 /*
121  * Lookup a peer by peer_id, return NULL if not found.
122  */
123 struct rde_peer *
peer_get(uint32_t id)124 peer_get(uint32_t id)
125 {
126 	struct rde_peer	needle;
127 
128 	needle.conf.id = id;
129 	return RB_FIND(peer_tree, &peertable, &needle);
130 }
131 
132 /*
133  * Find next peer that matches neighbor options in *n.
134  * If peerid was set then pickup the lookup after that peer.
135  * Returns NULL if no more peers match.
136  */
137 struct rde_peer *
peer_match(struct ctl_neighbor * n,uint32_t peerid)138 peer_match(struct ctl_neighbor *n, uint32_t peerid)
139 {
140 	struct rde_peer		*peer;
141 
142 	if (peerid != 0) {
143 		peer = peer_get(peerid);
144 		if (peer)
145 			peer = RB_NEXT(peer_tree, &peertable, peer);
146 	} else
147 		peer = RB_MIN(peer_tree, &peertable);
148 
149 	for (; peer != NULL; peer = RB_NEXT(peer_tree, &peertable, peer)) {
150 		if (rde_match_peer(peer, n))
151 			return peer;
152 	}
153 	return NULL;
154 }
155 
156 struct rde_peer *
peer_add(uint32_t id,struct peer_config * p_conf,struct filter_head * rules)157 peer_add(uint32_t id, struct peer_config *p_conf, struct filter_head *rules)
158 {
159 	struct rde_peer		*peer;
160 	int			 conflict;
161 
162 	if ((peer = peer_get(id))) {
163 		memcpy(&peer->conf, p_conf, sizeof(struct peer_config));
164 		return peer;
165 	}
166 
167 	peer = calloc(1, sizeof(struct rde_peer));
168 	if (peer == NULL)
169 		fatal("peer_add");
170 
171 	memcpy(&peer->conf, p_conf, sizeof(struct peer_config));
172 	peer->remote_bgpid = 0;
173 	peer->loc_rib_id = rib_find(peer->conf.rib);
174 	if (peer->loc_rib_id == RIB_NOTFOUND)
175 		fatalx("King Bula's new peer met an unknown RIB");
176 	peer->state = PEER_NONE;
177 	peer->eval = peer->conf.eval;
178 	peer->role = peer->conf.role;
179 	peer->export_type = peer->conf.export_type;
180 	peer->flags = peer->conf.flags;
181 	SIMPLEQ_INIT(&peer->imsg_queue);
182 
183 	peer_apply_out_filter(peer, rules);
184 
185 	/*
186 	 * Assign an even random unique transmit path id.
187 	 * Odd path_id_tx numbers are for peers using add-path recv.
188 	 */
189 	do {
190 		struct rde_peer *p;
191 
192 		conflict = 0;
193 		peer->path_id_tx = arc4random() << 1;
194 		RB_FOREACH(p, peer_tree, &peertable) {
195 			if (p->path_id_tx == peer->path_id_tx) {
196 				conflict = 1;
197 				break;
198 			}
199 		}
200 	} while (conflict);
201 
202 	if (RB_INSERT(peer_tree, &peertable, peer) != NULL)
203 		fatalx("rde peer table corrupted");
204 
205 	return peer;
206 }
207 
208 struct filter_head *
peer_apply_out_filter(struct rde_peer * peer,struct filter_head * rules)209 peer_apply_out_filter(struct rde_peer *peer, struct filter_head *rules)
210 {
211 	struct filter_head *old;
212 	struct filter_rule *fr, *new;
213 
214 	old = peer->out_rules;
215 	if ((peer->out_rules = malloc(sizeof(*peer->out_rules))) == NULL)
216 		fatal(NULL);
217 	TAILQ_INIT(peer->out_rules);
218 
219 	TAILQ_FOREACH(fr, rules, entry) {
220 		if (rde_filter_skip_rule(peer, fr))
221 			continue;
222 
223 		if ((new = malloc(sizeof(*new))) == NULL)
224 			fatal(NULL);
225 		memcpy(new, fr, sizeof(*new));
226 		filterset_copy(&fr->set, &new->set);
227 
228 		TAILQ_INSERT_TAIL(peer->out_rules, new, entry);
229 	}
230 
231 	return old;
232 }
233 
234 static inline int
peer_cmp(struct rde_peer * a,struct rde_peer * b)235 peer_cmp(struct rde_peer *a, struct rde_peer *b)
236 {
237 	if (a->conf.id > b->conf.id)
238 		return 1;
239 	if (a->conf.id < b->conf.id)
240 		return -1;
241 	return 0;
242 }
243 
244 RB_GENERATE(peer_tree, rde_peer, entry, peer_cmp);
245 
246 static void
peer_generate_update(struct rde_peer * peer,struct rib_entry * re,struct prefix * newpath,struct prefix * oldpath,enum eval_mode mode)247 peer_generate_update(struct rde_peer *peer, struct rib_entry *re,
248     struct prefix *newpath, struct prefix *oldpath,
249     enum eval_mode mode)
250 {
251 	uint8_t		 aid;
252 
253 	aid = re->prefix->aid;
254 
255 	/* skip ourself */
256 	if (peer == peerself)
257 		return;
258 	/* skip peers that never had a session open */
259 	if (peer->state == PEER_NONE)
260 		return;
261 	/* skip peers using a different rib */
262 	if (peer->loc_rib_id != re->rib_id)
263 		return;
264 	/* check if peer actually supports the address family */
265 	if (peer->capa.mp[aid] == 0)
266 		return;
267 	/* skip peers with special export types */
268 	if (peer->export_type == EXPORT_NONE ||
269 	    peer->export_type == EXPORT_DEFAULT_ROUTE)
270 		return;
271 
272 	/* if reconf skip peers which don't need to reconfigure */
273 	if (mode == EVAL_RECONF && peer->reconf_out == 0)
274 		return;
275 
276 	/* handle peers with add-path */
277 	if (peer_has_add_path(peer, aid, CAPA_AP_SEND)) {
278 		if (peer->eval.mode == ADDPATH_EVAL_ALL)
279 			up_generate_addpath_all(peer, re, newpath, oldpath);
280 		else
281 			up_generate_addpath(peer, re);
282 		return;
283 	}
284 
285 	/* skip regular peers if the best path didn't change */
286 	if (mode == EVAL_ALL && (peer->flags & PEERFLAG_EVALUATE_ALL) == 0)
287 		return;
288 	up_generate_updates(peer, re);
289 }
290 
291 void
rde_generate_updates(struct rib_entry * re,struct prefix * newpath,struct prefix * oldpath,enum eval_mode mode)292 rde_generate_updates(struct rib_entry *re, struct prefix *newpath,
293     struct prefix *oldpath, enum eval_mode mode)
294 {
295 	struct rde_peer	*peer;
296 
297 	RB_FOREACH(peer, peer_tree, &peertable)
298 		peer_generate_update(peer, re, newpath, oldpath, mode);
299 }
300 
301 /*
302  * Various RIB walker callbacks.
303  */
304 struct peer_flush {
305 	struct rde_peer *peer;
306 	time_t		 staletime;
307 };
308 
309 static void
peer_flush_upcall(struct rib_entry * re,void * arg)310 peer_flush_upcall(struct rib_entry *re, void *arg)
311 {
312 	struct rde_peer *peer = ((struct peer_flush *)arg)->peer;
313 	struct rde_aspath *asp;
314 	struct bgpd_addr addr;
315 	struct prefix *p, *np, *rp;
316 	time_t staletime = ((struct peer_flush *)arg)->staletime;
317 	uint32_t i;
318 	uint8_t prefixlen;
319 
320 	pt_getaddr(re->prefix, &addr);
321 	prefixlen = re->prefix->prefixlen;
322 	TAILQ_FOREACH_SAFE(p, &re->prefix_h, entry.list.rib, np) {
323 		if (peer != prefix_peer(p))
324 			continue;
325 		if (staletime && p->lastchange > staletime)
326 			continue;
327 
328 		for (i = RIB_LOC_START; i < rib_size; i++) {
329 			struct rib *rib = rib_byid(i);
330 			if (rib == NULL)
331 				continue;
332 			rp = prefix_get(rib, peer, p->path_id,
333 			    &addr, prefixlen);
334 			if (rp) {
335 				asp = prefix_aspath(rp);
336 				if (asp && asp->pftableid)
337 					rde_pftable_del(asp->pftableid, rp);
338 
339 				prefix_destroy(rp);
340 				rde_update_log("flush", i, peer, NULL,
341 				    &addr, prefixlen);
342 			}
343 		}
344 
345 		prefix_destroy(p);
346 		peer->stats.prefix_cnt--;
347 	}
348 }
349 
350 /*
351  * Session got established, bring peer up, load RIBs do initial table dump.
352  */
353 void
peer_up(struct rde_peer * peer,struct session_up * sup)354 peer_up(struct rde_peer *peer, struct session_up *sup)
355 {
356 	uint8_t	 i;
357 	int force_sync = 1;
358 
359 	if (peer->state == PEER_ERR) {
360 		/*
361 		 * There is a race condition when doing PEER_ERR -> PEER_DOWN.
362 		 * So just do a full reset of the peer here.
363 		 */
364 		rib_dump_terminate(peer);
365 		peer_imsg_flush(peer);
366 		peer_flush(peer, AID_UNSPEC, 0);
367 		peer->stats.prefix_cnt = 0;
368 		peer->state = PEER_DOWN;
369 	}
370 
371 	/*
372 	 * Check if no value changed during flap to decide if the RIB
373 	 * is in sync. The capa check is maybe too strict but it should
374 	 * not matter for normal operation.
375 	 */
376 	if (memcmp(&peer->remote_addr, &sup->remote_addr,
377 	    sizeof(sup->remote_addr)) == 0 &&
378 	    memcmp(&peer->local_v4_addr, &sup->local_v4_addr,
379 	    sizeof(sup->local_v4_addr)) == 0 &&
380 	    memcmp(&peer->local_v6_addr, &sup->local_v6_addr,
381 	    sizeof(sup->local_v6_addr)) == 0 &&
382 	    memcmp(&peer->capa, &sup->capa, sizeof(sup->capa)) == 0)
383 		force_sync = 0;
384 
385 	peer->remote_addr = sup->remote_addr;
386 	peer->local_v4_addr = sup->local_v4_addr;
387 	peer->local_v6_addr = sup->local_v6_addr;
388 	memcpy(&peer->capa, &sup->capa, sizeof(sup->capa));
389 	/* the Adj-RIB-Out does not depend on those */
390 	peer->remote_bgpid = sup->remote_bgpid;
391 	peer->local_if_scope = sup->if_scope;
392 	peer->short_as = sup->short_as;
393 
394 	/* clear eor markers depending on GR flags */
395 	if (peer->capa.grestart.restart) {
396 		peer->sent_eor = 0;
397 		peer->recv_eor = 0;
398 	} else {
399 		/* no EOR expected */
400 		peer->sent_eor = ~0;
401 		peer->recv_eor = ~0;
402 	}
403 	peer->state = PEER_UP;
404 
405 	if (!force_sync) {
406 		for (i = AID_MIN; i < AID_MAX; i++) {
407 			if (peer->capa.mp[i])
408 				peer_blast(peer, i);
409 		}
410 	} else {
411 		for (i = AID_MIN; i < AID_MAX; i++) {
412 			if (peer->capa.mp[i])
413 				peer_dump(peer, i);
414 		}
415 	}
416 }
417 
418 /*
419  * Session dropped and no graceful restart is done. Stop everything for
420  * this peer and clean up.
421  */
422 void
peer_down(struct rde_peer * peer)423 peer_down(struct rde_peer *peer)
424 {
425 	peer->remote_bgpid = 0;
426 	peer->state = PEER_DOWN;
427 	/*
428 	 * stop all pending dumps which may depend on this peer
429 	 * and flush all pending imsg from the SE.
430 	 */
431 	rib_dump_terminate(peer);
432 	prefix_adjout_flush_pending(peer);
433 	peer_imsg_flush(peer);
434 
435 	/* flush Adj-RIB-In */
436 	peer_flush(peer, AID_UNSPEC, 0);
437 	peer->stats.prefix_cnt = 0;
438 }
439 
440 void
peer_delete(struct rde_peer * peer)441 peer_delete(struct rde_peer *peer)
442 {
443 	if (peer->state != PEER_DOWN)
444 		peer_down(peer);
445 
446 	/* free filters */
447 	filterlist_free(peer->out_rules);
448 
449 	RB_REMOVE(peer_tree, &peertable, peer);
450 	while (RB_INSERT(peer_tree, &zombietable, peer) != NULL) {
451 		log_warnx("zombie peer conflict");
452 		peer->conf.id = arc4random();
453 	}
454 
455 	/* start reaping the zombie */
456 	peer_reaper(peer);
457 }
458 
459 /*
460  * Flush all routes older then staletime. If staletime is 0 all routes will
461  * be flushed.
462  */
463 void
peer_flush(struct rde_peer * peer,uint8_t aid,time_t staletime)464 peer_flush(struct rde_peer *peer, uint8_t aid, time_t staletime)
465 {
466 	struct peer_flush pf = { peer, staletime };
467 
468 	/* this dump must run synchronous, too much depends on that right now */
469 	if (rib_dump_new(RIB_ADJ_IN, aid, 0, &pf, peer_flush_upcall,
470 	    NULL, NULL) == -1)
471 		fatal("%s: rib_dump_new", __func__);
472 
473 	/* every route is gone so reset staletime */
474 	if (aid == AID_UNSPEC) {
475 		uint8_t i;
476 		for (i = AID_MIN; i < AID_MAX; i++)
477 			peer->staletime[i] = 0;
478 	} else {
479 		peer->staletime[aid] = 0;
480 	}
481 }
482 
483 /*
484  * During graceful restart mark a peer as stale if the session goes down.
485  * For the specified AID the Adj-RIB-Out is marked stale and the staletime
486  * is set to the current timestamp for identifying stale routes in Adj-RIB-In.
487  */
488 void
peer_stale(struct rde_peer * peer,uint8_t aid,int flushall)489 peer_stale(struct rde_peer *peer, uint8_t aid, int flushall)
490 {
491 	time_t now;
492 
493 	/* flush the now even staler routes out */
494 	if (peer->staletime[aid])
495 		peer_flush(peer, aid, peer->staletime[aid]);
496 
497 	peer->staletime[aid] = now = getmonotime();
498 	peer->state = PEER_DOWN;
499 
500 	/*
501 	 * stop all pending dumps which may depend on this peer
502 	 * and flush all pending imsg from the SE.
503 	 */
504 	rib_dump_terminate(peer);
505 	prefix_adjout_flush_pending(peer);
506 	peer_imsg_flush(peer);
507 
508 	if (flushall)
509 		peer_flush(peer, aid, 0);
510 
511 	/* make sure new prefixes start on a higher timestamp */
512 	while (now >= getmonotime())
513 		sleep(1);
514 }
515 
516 /*
517  * RIB walker callback for peer_blast.
518  * Enqueue a prefix onto the update queue so it can be sent out.
519  */
520 static void
peer_blast_upcall(struct prefix * p,void * ptr)521 peer_blast_upcall(struct prefix *p, void *ptr)
522 {
523 	if (p->flags & PREFIX_FLAG_DEAD) {
524 		/* ignore dead prefixes, they will go away soon */
525 	} else if ((p->flags & PREFIX_FLAG_MASK) == 0) {
526 		/* put entries on the update queue if not already on a queue */
527 		p->flags |= PREFIX_FLAG_UPDATE;
528 		if (RB_INSERT(prefix_tree, &prefix_peer(p)->updates[p->pt->aid],
529 		    p) != NULL)
530 			fatalx("%s: RB tree invariant violated", __func__);
531 	}
532 }
533 
534 /*
535  * Called after all prefixes are put onto the update queue and we are
536  * ready to blast out updates to the peer.
537  */
538 static void
peer_blast_done(void * ptr,uint8_t aid)539 peer_blast_done(void *ptr, uint8_t aid)
540 {
541 	struct rde_peer		*peer = ptr;
542 
543 	/* Adj-RIB-Out ready, unthrottle peer and inject EOR */
544 	peer->throttled = 0;
545 	if (peer->capa.grestart.restart)
546 		prefix_add_eor(peer, aid);
547 }
548 
549 /*
550  * Send out the full Adj-RIB-Out by putting all prefixes onto the update
551  * queue.
552  */
553 void
peer_blast(struct rde_peer * peer,uint8_t aid)554 peer_blast(struct rde_peer *peer, uint8_t aid)
555 {
556 	if (peer->capa.enhanced_rr && (peer->sent_eor & (1 << aid)))
557 		rde_peer_send_rrefresh(peer, aid, ROUTE_REFRESH_BEGIN_RR);
558 
559 	/* force out all updates from the Adj-RIB-Out */
560 	if (prefix_dump_new(peer, aid, 0, peer, peer_blast_upcall,
561 	    peer_blast_done, NULL) == -1)
562 		fatal("%s: prefix_dump_new", __func__);
563 }
564 
565 /* RIB walker callbacks for peer_dump. */
566 static void
peer_dump_upcall(struct rib_entry * re,void * ptr)567 peer_dump_upcall(struct rib_entry *re, void *ptr)
568 {
569 	struct rde_peer		*peer = ptr;
570 	struct prefix		*p;
571 
572 	if ((p = prefix_best(re)) == NULL)
573 		/* no eligible prefix, not even for 'evaluate all' */
574 		return;
575 
576 	peer_generate_update(peer, re, NULL, NULL, 0);
577 }
578 
579 static void
peer_dump_done(void * ptr,uint8_t aid)580 peer_dump_done(void *ptr, uint8_t aid)
581 {
582 	struct rde_peer		*peer = ptr;
583 
584 	/* Adj-RIB-Out is ready, blast it out */
585 	peer_blast(peer, aid);
586 }
587 
588 /*
589  * Load the Adj-RIB-Out of a peer normally called when a session comes up
590  * for the first time. Once the Adj-RIB-Out is ready it will blast the
591  * updates out.
592  */
593 void
peer_dump(struct rde_peer * peer,uint8_t aid)594 peer_dump(struct rde_peer *peer, uint8_t aid)
595 {
596 	/* throttle peer until dump is done */
597 	peer->throttled = 1;
598 
599 	if (peer->export_type == EXPORT_NONE) {
600 		peer_blast(peer, aid);
601 	} else if (peer->export_type == EXPORT_DEFAULT_ROUTE) {
602 		up_generate_default(peer, aid);
603 		peer_blast(peer, aid);
604 	} else if (aid == AID_FLOWSPECv4 || aid == AID_FLOWSPECv6) {
605 		prefix_flowspec_dump(aid, peer, peer_dump_upcall,
606 		    peer_dump_done);
607 	} else {
608 		if (rib_dump_new(peer->loc_rib_id, aid, RDE_RUNNER_ROUNDS, peer,
609 		    peer_dump_upcall, peer_dump_done, NULL) == -1)
610 			fatal("%s: rib_dump_new", __func__);
611 	}
612 }
613 
614 /*
615  * Start of an enhanced route refresh. Mark all routes as stale.
616  * Once the route refresh ends a End of Route Refresh message is sent
617  * which calls peer_flush() to remove all stale routes.
618  */
619 void
peer_begin_rrefresh(struct rde_peer * peer,uint8_t aid)620 peer_begin_rrefresh(struct rde_peer *peer, uint8_t aid)
621 {
622 	time_t now;
623 
624 	/* flush the now even staler routes out */
625 	if (peer->staletime[aid])
626 		peer_flush(peer, aid, peer->staletime[aid]);
627 
628 	peer->staletime[aid] = now = getmonotime();
629 
630 	/* make sure new prefixes start on a higher timestamp */
631 	while (now >= getmonotime())
632 		sleep(1);
633 }
634 
635 void
peer_reaper(struct rde_peer * peer)636 peer_reaper(struct rde_peer *peer)
637 {
638 	if (peer == NULL)
639 		peer = RB_ROOT(&zombietable);
640 	if (peer == NULL)
641 		return;
642 
643 	if (!prefix_adjout_reaper(peer))
644 		return;
645 
646 	RB_REMOVE(peer_tree, &zombietable, peer);
647 	free(peer);
648 }
649 
650 /*
651  * Check if any imsg are pending or any zombie peers are around.
652  * Return 0 if no work is pending.
653  */
654 int
peer_work_pending(void)655 peer_work_pending(void)
656 {
657 	if (!RB_EMPTY(&zombietable))
658 		return 1;
659 	return imsg_pending != 0;
660 }
661 
662 /*
663  * move an imsg from src to dst, disconnecting any dynamic memory from src.
664  */
665 static void
imsg_move(struct imsg * dst,struct imsg * src)666 imsg_move(struct imsg *dst, struct imsg *src)
667 {
668 	*dst = *src;
669 	memset(src, 0, sizeof(*src));
670 }
671 
672 /*
673  * push an imsg onto the peer imsg queue.
674  */
675 void
peer_imsg_push(struct rde_peer * peer,struct imsg * imsg)676 peer_imsg_push(struct rde_peer *peer, struct imsg *imsg)
677 {
678 	struct iq *iq;
679 
680 	if ((iq = calloc(1, sizeof(*iq))) == NULL)
681 		fatal(NULL);
682 	imsg_move(&iq->imsg, imsg);
683 	SIMPLEQ_INSERT_TAIL(&peer->imsg_queue, iq, entry);
684 	imsg_pending++;
685 }
686 
687 /*
688  * pop first imsg from peer imsg queue and move it into imsg argument.
689  * Returns 1 if an element is returned else 0.
690  */
691 int
peer_imsg_pop(struct rde_peer * peer,struct imsg * imsg)692 peer_imsg_pop(struct rde_peer *peer, struct imsg *imsg)
693 {
694 	struct iq *iq;
695 
696 	iq = SIMPLEQ_FIRST(&peer->imsg_queue);
697 	if (iq == NULL)
698 		return 0;
699 
700 	imsg_move(imsg, &iq->imsg);
701 
702 	SIMPLEQ_REMOVE_HEAD(&peer->imsg_queue, entry);
703 	free(iq);
704 	imsg_pending--;
705 
706 	return 1;
707 }
708 
709 /*
710  * flush all imsg queued for a peer.
711  */
712 void
peer_imsg_flush(struct rde_peer * peer)713 peer_imsg_flush(struct rde_peer *peer)
714 {
715 	struct iq *iq;
716 
717 	while ((iq = SIMPLEQ_FIRST(&peer->imsg_queue)) != NULL) {
718 		SIMPLEQ_REMOVE_HEAD(&peer->imsg_queue, entry);
719 		free(iq);
720 		imsg_pending--;
721 	}
722 }
723