xref: /openbsd/usr.sbin/bgpd/rde_peer.c (revision bcd6516b)
1 /*	$OpenBSD: rde_peer.c,v 1.37 2024/05/22 08:41:14 claudio Exp $ */
2 
3 /*
4  * Copyright (c) 2019 Claudio Jeker <claudio@openbsd.org>
5  *
6  * Permission to use, copy, modify, and distribute this software for any
7  * purpose with or without fee is hereby granted, provided that the above
8  * copyright notice and this permission notice appear in all copies.
9  *
10  * THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
11  * WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
12  * MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
13  * ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
14  * WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
15  * ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
16  * OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
17  */
18 #include <sys/types.h>
19 #include <sys/queue.h>
20 
21 #include <stdlib.h>
22 #include <stdio.h>
23 #include <string.h>
24 #include <unistd.h>
25 
26 #include "bgpd.h"
27 #include "rde.h"
28 
29 struct peer_tree	 peertable;
30 struct rde_peer		*peerself;
31 static long		 imsg_pending;
32 
33 CTASSERT(sizeof(peerself->recv_eor) * 8 > AID_MAX);
34 CTASSERT(sizeof(peerself->sent_eor) * 8 > AID_MAX);
35 
36 struct iq {
37 	SIMPLEQ_ENTRY(iq)	entry;
38 	struct imsg		imsg;
39 };
40 
41 int
peer_has_as4byte(struct rde_peer * peer)42 peer_has_as4byte(struct rde_peer *peer)
43 {
44 	return (peer->capa.as4byte);
45 }
46 
47 /*
48  * Check if ADD_PATH is enabled for aid and mode (rx / tx). If aid is
49  * AID_UNSPEC then the function returns true if any aid has mode enabled.
50  */
51 int
peer_has_add_path(struct rde_peer * peer,uint8_t aid,int mode)52 peer_has_add_path(struct rde_peer *peer, uint8_t aid, int mode)
53 {
54 	if (aid >= AID_MAX)
55 		return 0;
56 	return (peer->capa.add_path[aid] & mode);
57 }
58 
59 int
peer_accept_no_as_set(struct rde_peer * peer)60 peer_accept_no_as_set(struct rde_peer *peer)
61 {
62 	return (peer->flags & PEERFLAG_NO_AS_SET);
63 }
64 
65 void
peer_init(struct filter_head * rules)66 peer_init(struct filter_head *rules)
67 {
68 	struct peer_config pc;
69 
70 	RB_INIT(&peertable);
71 
72 	memset(&pc, 0, sizeof(pc));
73 	snprintf(pc.descr, sizeof(pc.descr), "LOCAL");
74 	pc.id = PEER_ID_SELF;
75 
76 	peerself = peer_add(PEER_ID_SELF, &pc, rules);
77 	peerself->state = PEER_UP;
78 }
79 
80 void
peer_shutdown(void)81 peer_shutdown(void)
82 {
83 	if (!RB_EMPTY(&peertable))
84 		log_warnx("%s: free non-free table", __func__);
85 }
86 
87 /*
88  * Traverse all peers calling callback for each peer.
89  */
90 void
peer_foreach(void (* callback)(struct rde_peer *,void *),void * arg)91 peer_foreach(void (*callback)(struct rde_peer *, void *), void *arg)
92 {
93 	struct rde_peer *peer, *np;
94 
95 	RB_FOREACH_SAFE(peer, peer_tree, &peertable, np)
96 		callback(peer, arg);
97 }
98 
99 /*
100  * Lookup a peer by peer_id, return NULL if not found.
101  */
102 struct rde_peer *
peer_get(uint32_t id)103 peer_get(uint32_t id)
104 {
105 	struct rde_peer	needle;
106 
107 	needle.conf.id = id;
108 	return RB_FIND(peer_tree, &peertable, &needle);
109 }
110 
111 /*
112  * Find next peer that matches neighbor options in *n.
113  * If peerid was set then pickup the lookup after that peer.
114  * Returns NULL if no more peers match.
115  */
116 struct rde_peer *
peer_match(struct ctl_neighbor * n,uint32_t peerid)117 peer_match(struct ctl_neighbor *n, uint32_t peerid)
118 {
119 	struct rde_peer		*peer;
120 
121 	if (peerid != 0) {
122 		peer = peer_get(peerid);
123 		if (peer)
124 			peer = RB_NEXT(peer_tree, &peertable, peer);
125 	} else
126 		peer = RB_MIN(peer_tree, &peertable);
127 
128 	for (; peer != NULL; peer = RB_NEXT(peer_tree, &peertable, peer)) {
129 		if (rde_match_peer(peer, n))
130 			return peer;
131 	}
132 	return NULL;
133 }
134 
135 struct rde_peer *
peer_add(uint32_t id,struct peer_config * p_conf,struct filter_head * rules)136 peer_add(uint32_t id, struct peer_config *p_conf, struct filter_head *rules)
137 {
138 	struct rde_peer		*peer;
139 	int			 conflict;
140 
141 	if ((peer = peer_get(id))) {
142 		memcpy(&peer->conf, p_conf, sizeof(struct peer_config));
143 		return (peer);
144 	}
145 
146 	peer = calloc(1, sizeof(struct rde_peer));
147 	if (peer == NULL)
148 		fatal("peer_add");
149 
150 	memcpy(&peer->conf, p_conf, sizeof(struct peer_config));
151 	peer->remote_bgpid = 0;
152 	peer->loc_rib_id = rib_find(peer->conf.rib);
153 	if (peer->loc_rib_id == RIB_NOTFOUND)
154 		fatalx("King Bula's new peer met an unknown RIB");
155 	peer->state = PEER_NONE;
156 	peer->eval = peer->conf.eval;
157 	peer->role = peer->conf.role;
158 	peer->export_type = peer->conf.export_type;
159 	peer->flags = peer->conf.flags;
160 	SIMPLEQ_INIT(&peer->imsg_queue);
161 
162 	peer_apply_out_filter(peer, rules);
163 
164 	/*
165 	 * Assign an even random unique transmit path id.
166 	 * Odd path_id_tx numbers are for peers using add-path recv.
167 	 */
168 	do {
169 		struct rde_peer *p;
170 
171 		conflict = 0;
172 		peer->path_id_tx = arc4random() << 1;
173 		RB_FOREACH(p, peer_tree, &peertable) {
174 			if (p->path_id_tx == peer->path_id_tx) {
175 				conflict = 1;
176 				break;
177 			}
178 		}
179 	} while (conflict);
180 
181 	if (RB_INSERT(peer_tree, &peertable, peer) != NULL)
182 		fatalx("rde peer table corrupted");
183 
184 	return (peer);
185 }
186 
187 struct filter_head *
peer_apply_out_filter(struct rde_peer * peer,struct filter_head * rules)188 peer_apply_out_filter(struct rde_peer *peer, struct filter_head *rules)
189 {
190 	struct filter_head *old;
191 	struct filter_rule *fr, *new;
192 
193 	old = peer->out_rules;
194 	if ((peer->out_rules = malloc(sizeof(*peer->out_rules))) == NULL)
195 		fatal(NULL);
196 	TAILQ_INIT(peer->out_rules);
197 
198 	TAILQ_FOREACH(fr, rules, entry) {
199 		if (rde_filter_skip_rule(peer, fr))
200 			continue;
201 
202 		if ((new = malloc(sizeof(*new))) == NULL)
203 			fatal(NULL);
204 		memcpy(new, fr, sizeof(*new));
205 		filterset_copy(&fr->set, &new->set);
206 
207 		TAILQ_INSERT_TAIL(peer->out_rules, new, entry);
208 	}
209 
210 	return old;
211 }
212 
213 static inline int
peer_cmp(struct rde_peer * a,struct rde_peer * b)214 peer_cmp(struct rde_peer *a, struct rde_peer *b)
215 {
216 	if (a->conf.id > b->conf.id)
217 		return 1;
218 	if (a->conf.id < b->conf.id)
219 		return -1;
220 	return 0;
221 }
222 
223 RB_GENERATE(peer_tree, rde_peer, entry, peer_cmp);
224 
225 static void
peer_generate_update(struct rde_peer * peer,struct rib_entry * re,struct prefix * newpath,struct prefix * oldpath,enum eval_mode mode)226 peer_generate_update(struct rde_peer *peer, struct rib_entry *re,
227     struct prefix *newpath, struct prefix *oldpath,
228     enum eval_mode mode)
229 {
230 	uint8_t		 aid;
231 
232 	aid = re->prefix->aid;
233 
234 	/* skip ourself */
235 	if (peer == peerself)
236 		return;
237 	if (peer->state != PEER_UP)
238 		return;
239 	/* skip peers using a different rib */
240 	if (peer->loc_rib_id != re->rib_id)
241 		return;
242 	/* check if peer actually supports the address family */
243 	if (peer->capa.mp[aid] == 0)
244 		return;
245 	/* skip peers with special export types */
246 	if (peer->export_type == EXPORT_NONE ||
247 	    peer->export_type == EXPORT_DEFAULT_ROUTE)
248 		return;
249 
250 	/* if reconf skip peers which don't need to reconfigure */
251 	if (mode == EVAL_RECONF && peer->reconf_out == 0)
252 		return;
253 
254 	/* handle peers with add-path */
255 	if (peer_has_add_path(peer, aid, CAPA_AP_SEND)) {
256 		if (peer->eval.mode == ADDPATH_EVAL_ALL)
257 			up_generate_addpath_all(peer, re, newpath, oldpath);
258 		else
259 			up_generate_addpath(peer, re);
260 		return;
261 	}
262 
263 	/* skip regular peers if the best path didn't change */
264 	if (mode == EVAL_ALL && (peer->flags & PEERFLAG_EVALUATE_ALL) == 0)
265 		return;
266 	up_generate_updates(peer, re);
267 }
268 
269 void
rde_generate_updates(struct rib_entry * re,struct prefix * newpath,struct prefix * oldpath,enum eval_mode mode)270 rde_generate_updates(struct rib_entry *re, struct prefix *newpath,
271     struct prefix *oldpath, enum eval_mode mode)
272 {
273 	struct rde_peer	*peer;
274 
275 	RB_FOREACH(peer, peer_tree, &peertable)
276 		peer_generate_update(peer, re, newpath, oldpath, mode);
277 }
278 
279 /*
280  * Various RIB walker callbacks.
281  */
282 static void
peer_adjout_clear_upcall(struct prefix * p,void * arg)283 peer_adjout_clear_upcall(struct prefix *p, void *arg)
284 {
285 	prefix_adjout_destroy(p);
286 }
287 
288 static void
peer_adjout_stale_upcall(struct prefix * p,void * arg)289 peer_adjout_stale_upcall(struct prefix *p, void *arg)
290 {
291 	if (p->flags & PREFIX_FLAG_DEAD) {
292 		return;
293 	} else if (p->flags & PREFIX_FLAG_WITHDRAW) {
294 		/* no need to keep stale withdraws, they miss all attributes */
295 		prefix_adjout_destroy(p);
296 		return;
297 	} else if (p->flags & PREFIX_FLAG_UPDATE) {
298 		RB_REMOVE(prefix_tree, &prefix_peer(p)->updates[p->pt->aid], p);
299 		p->flags &= ~PREFIX_FLAG_UPDATE;
300 	}
301 	p->flags |= PREFIX_FLAG_STALE;
302 }
303 
304 struct peer_flush {
305 	struct rde_peer *peer;
306 	time_t		 staletime;
307 };
308 
309 static void
peer_flush_upcall(struct rib_entry * re,void * arg)310 peer_flush_upcall(struct rib_entry *re, void *arg)
311 {
312 	struct rde_peer *peer = ((struct peer_flush *)arg)->peer;
313 	struct rde_aspath *asp;
314 	struct bgpd_addr addr;
315 	struct prefix *p, *np, *rp;
316 	time_t staletime = ((struct peer_flush *)arg)->staletime;
317 	uint32_t i;
318 	uint8_t prefixlen;
319 
320 	pt_getaddr(re->prefix, &addr);
321 	prefixlen = re->prefix->prefixlen;
322 	TAILQ_FOREACH_SAFE(p, &re->prefix_h, entry.list.rib, np) {
323 		if (peer != prefix_peer(p))
324 			continue;
325 		if (staletime && p->lastchange > staletime)
326 			continue;
327 
328 		for (i = RIB_LOC_START; i < rib_size; i++) {
329 			struct rib *rib = rib_byid(i);
330 			if (rib == NULL)
331 				continue;
332 			rp = prefix_get(rib, peer, p->path_id,
333 			    &addr, prefixlen);
334 			if (rp) {
335 				asp = prefix_aspath(rp);
336 				if (asp && asp->pftableid)
337 					rde_pftable_del(asp->pftableid, rp);
338 
339 				prefix_destroy(rp);
340 				rde_update_log("flush", i, peer, NULL,
341 				    &addr, prefixlen);
342 			}
343 		}
344 
345 		prefix_destroy(p);
346 		peer->stats.prefix_cnt--;
347 	}
348 }
349 
350 static void
rde_up_adjout_force_upcall(struct prefix * p,void * ptr)351 rde_up_adjout_force_upcall(struct prefix *p, void *ptr)
352 {
353 	if (p->flags & PREFIX_FLAG_STALE) {
354 		/* remove stale entries */
355 		prefix_adjout_destroy(p);
356 	} else if (p->flags & PREFIX_FLAG_DEAD) {
357 		/* ignore dead prefixes, they will go away soon */
358 	} else if ((p->flags & PREFIX_FLAG_MASK) == 0) {
359 		/* put entries on the update queue if not allready on a queue */
360 		p->flags |= PREFIX_FLAG_UPDATE;
361 		if (RB_INSERT(prefix_tree, &prefix_peer(p)->updates[p->pt->aid],
362 		    p) != NULL)
363 			fatalx("%s: RB tree invariant violated", __func__);
364 	}
365 }
366 
367 static void
rde_up_adjout_force_done(void * ptr,uint8_t aid)368 rde_up_adjout_force_done(void *ptr, uint8_t aid)
369 {
370 	struct rde_peer		*peer = ptr;
371 
372 	/* Adj-RIB-Out ready, unthrottle peer and inject EOR */
373 	peer->throttled = 0;
374 	if (peer->capa.grestart.restart)
375 		prefix_add_eor(peer, aid);
376 }
377 
378 static void
rde_up_dump_upcall(struct rib_entry * re,void * ptr)379 rde_up_dump_upcall(struct rib_entry *re, void *ptr)
380 {
381 	struct rde_peer		*peer = ptr;
382 	struct prefix		*p;
383 
384 	if ((p = prefix_best(re)) == NULL)
385 		/* no eligible prefix, not even for 'evaluate all' */
386 		return;
387 
388 	peer_generate_update(peer, re, NULL, NULL, 0);
389 }
390 
391 static void
rde_up_dump_done(void * ptr,uint8_t aid)392 rde_up_dump_done(void *ptr, uint8_t aid)
393 {
394 	struct rde_peer		*peer = ptr;
395 
396 	/* force out all updates of Adj-RIB-Out for this peer */
397 	if (prefix_dump_new(peer, aid, 0, peer, rde_up_adjout_force_upcall,
398 	    rde_up_adjout_force_done, NULL) == -1)
399 		fatal("%s: prefix_dump_new", __func__);
400 }
401 
402 /*
403  * Session got established, bring peer up, load RIBs do initial table dump.
404  */
405 void
peer_up(struct rde_peer * peer,struct session_up * sup)406 peer_up(struct rde_peer *peer, struct session_up *sup)
407 {
408 	uint8_t	 i;
409 
410 	if (peer->state == PEER_ERR) {
411 		/*
412 		 * There is a race condition when doing PEER_ERR -> PEER_DOWN.
413 		 * So just do a full reset of the peer here.
414 		 */
415 		rib_dump_terminate(peer);
416 		peer_imsg_flush(peer);
417 		if (prefix_dump_new(peer, AID_UNSPEC, 0, NULL,
418 		    peer_adjout_clear_upcall, NULL, NULL) == -1)
419 			fatal("%s: prefix_dump_new", __func__);
420 		peer_flush(peer, AID_UNSPEC, 0);
421 		peer->stats.prefix_cnt = 0;
422 		peer->stats.prefix_out_cnt = 0;
423 		peer->state = PEER_DOWN;
424 	}
425 	peer->remote_bgpid = sup->remote_bgpid;
426 	peer->short_as = sup->short_as;
427 	peer->remote_addr = sup->remote_addr;
428 	peer->local_v4_addr = sup->local_v4_addr;
429 	peer->local_v6_addr = sup->local_v6_addr;
430 	peer->local_if_scope = sup->if_scope;
431 	memcpy(&peer->capa, &sup->capa, sizeof(peer->capa));
432 
433 	/* clear eor markers depending on GR flags */
434 	if (peer->capa.grestart.restart) {
435 		peer->sent_eor = 0;
436 		peer->recv_eor = 0;
437 	} else {
438 		/* no EOR expected */
439 		peer->sent_eor = ~0;
440 		peer->recv_eor = ~0;
441 	}
442 	peer->state = PEER_UP;
443 
444 	for (i = AID_MIN; i < AID_MAX; i++) {
445 		if (peer->capa.mp[i])
446 			peer_dump(peer, i);
447 	}
448 }
449 
450 /*
451  * Session dropped and no graceful restart is done. Stop everything for
452  * this peer and clean up.
453  */
454 void
peer_down(struct rde_peer * peer,void * bula)455 peer_down(struct rde_peer *peer, void *bula)
456 {
457 	peer->remote_bgpid = 0;
458 	peer->state = PEER_DOWN;
459 	/*
460 	 * stop all pending dumps which may depend on this peer
461 	 * and flush all pending imsg from the SE.
462 	 */
463 	rib_dump_terminate(peer);
464 	peer_imsg_flush(peer);
465 
466 	/* flush Adj-RIB-Out */
467 	if (prefix_dump_new(peer, AID_UNSPEC, 0, NULL,
468 	    peer_adjout_clear_upcall, NULL, NULL) == -1)
469 		fatal("%s: prefix_dump_new", __func__);
470 
471 	/* flush Adj-RIB-In */
472 	peer_flush(peer, AID_UNSPEC, 0);
473 	peer->stats.prefix_cnt = 0;
474 	peer->stats.prefix_out_cnt = 0;
475 
476 	/* free filters */
477 	filterlist_free(peer->out_rules);
478 
479 	RB_REMOVE(peer_tree, &peertable, peer);
480 	free(peer);
481 }
482 
483 /*
484  * Flush all routes older then staletime. If staletime is 0 all routes will
485  * be flushed.
486  */
487 void
peer_flush(struct rde_peer * peer,uint8_t aid,time_t staletime)488 peer_flush(struct rde_peer *peer, uint8_t aid, time_t staletime)
489 {
490 	struct peer_flush pf = { peer, staletime };
491 
492 	/* this dump must run synchronous, too much depends on that right now */
493 	if (rib_dump_new(RIB_ADJ_IN, aid, 0, &pf, peer_flush_upcall,
494 	    NULL, NULL) == -1)
495 		fatal("%s: rib_dump_new", __func__);
496 
497 	/* every route is gone so reset staletime */
498 	if (aid == AID_UNSPEC) {
499 		uint8_t i;
500 		for (i = AID_MIN; i < AID_MAX; i++)
501 			peer->staletime[i] = 0;
502 	} else {
503 		peer->staletime[aid] = 0;
504 	}
505 }
506 
507 /*
508  * During graceful restart mark a peer as stale if the session goes down.
509  * For the specified AID the Adj-RIB-Out is marked stale and the staletime
510  * is set to the current timestamp for identifying stale routes in Adj-RIB-In.
511  */
512 void
peer_stale(struct rde_peer * peer,uint8_t aid,int flushall)513 peer_stale(struct rde_peer *peer, uint8_t aid, int flushall)
514 {
515 	time_t now;
516 
517 	/* flush the now even staler routes out */
518 	if (peer->staletime[aid])
519 		peer_flush(peer, aid, peer->staletime[aid]);
520 
521 	peer->staletime[aid] = now = getmonotime();
522 	peer->state = PEER_DOWN;
523 
524 	/*
525 	 * stop all pending dumps which may depend on this peer
526 	 * and flush all pending imsg from the SE.
527 	 */
528 	rib_dump_terminate(peer);
529 	peer_imsg_flush(peer);
530 
531 	if (flushall)
532 		peer_flush(peer, aid, 0);
533 
534 	/* XXX this is not quite correct */
535 	/* mark Adj-RIB-Out stale for this peer */
536 	if (prefix_dump_new(peer, aid, 0, NULL,
537 	    peer_adjout_stale_upcall, NULL, NULL) == -1)
538 		fatal("%s: prefix_dump_new", __func__);
539 
540 	/* make sure new prefixes start on a higher timestamp */
541 	while (now >= getmonotime())
542 		sleep(1);
543 }
544 
545 /*
546  * Load the Adj-RIB-Out of a peer normally called when a session is established.
547  * Once the Adj-RIB-Out is ready stale routes are removed from the Adj-RIB-Out
548  * and all routes are put on the update queue so they will be sent out.
549  */
550 void
peer_dump(struct rde_peer * peer,uint8_t aid)551 peer_dump(struct rde_peer *peer, uint8_t aid)
552 {
553 	if (peer->capa.enhanced_rr && (peer->sent_eor & (1 << aid)))
554 		rde_peer_send_rrefresh(peer, aid, ROUTE_REFRESH_BEGIN_RR);
555 
556 	if (peer->export_type == EXPORT_NONE) {
557 		/* nothing to send apart from the marker */
558 		if (peer->capa.grestart.restart)
559 			prefix_add_eor(peer, aid);
560 	} else if (peer->export_type == EXPORT_DEFAULT_ROUTE) {
561 		up_generate_default(peer, aid);
562 		rde_up_dump_done(peer, aid);
563 	} else if (aid == AID_FLOWSPECv4 || aid == AID_FLOWSPECv6) {
564 		prefix_flowspec_dump(aid, peer, rde_up_dump_upcall,
565 		    rde_up_dump_done);
566 	} else {
567 		if (rib_dump_new(peer->loc_rib_id, aid, RDE_RUNNER_ROUNDS, peer,
568 		    rde_up_dump_upcall, rde_up_dump_done, NULL) == -1)
569 			fatal("%s: rib_dump_new", __func__);
570 		/* throttle peer until dump is done */
571 		peer->throttled = 1;
572 	}
573 }
574 
575 /*
576  * Start of an enhanced route refresh. Mark all routes as stale.
577  * Once the route refresh ends a End of Route Refresh message is sent
578  * which calls peer_flush() to remove all stale routes.
579  */
580 void
peer_begin_rrefresh(struct rde_peer * peer,uint8_t aid)581 peer_begin_rrefresh(struct rde_peer *peer, uint8_t aid)
582 {
583 	time_t now;
584 
585 	/* flush the now even staler routes out */
586 	if (peer->staletime[aid])
587 		peer_flush(peer, aid, peer->staletime[aid]);
588 
589 	peer->staletime[aid] = now = getmonotime();
590 
591 	/* make sure new prefixes start on a higher timestamp */
592 	while (now >= getmonotime())
593 		sleep(1);
594 }
595 
596 /*
597  * move an imsg from src to dst, disconnecting any dynamic memory from src.
598  */
599 static void
imsg_move(struct imsg * dst,struct imsg * src)600 imsg_move(struct imsg *dst, struct imsg *src)
601 {
602 	*dst = *src;
603 	memset(src, 0, sizeof(*src));
604 }
605 
606 /*
607  * push an imsg onto the peer imsg queue.
608  */
609 void
peer_imsg_push(struct rde_peer * peer,struct imsg * imsg)610 peer_imsg_push(struct rde_peer *peer, struct imsg *imsg)
611 {
612 	struct iq *iq;
613 
614 	if ((iq = calloc(1, sizeof(*iq))) == NULL)
615 		fatal(NULL);
616 	imsg_move(&iq->imsg, imsg);
617 	SIMPLEQ_INSERT_TAIL(&peer->imsg_queue, iq, entry);
618 	imsg_pending++;
619 }
620 
621 /*
622  * pop first imsg from peer imsg queue and move it into imsg argument.
623  * Returns 1 if an element is returned else 0.
624  */
625 int
peer_imsg_pop(struct rde_peer * peer,struct imsg * imsg)626 peer_imsg_pop(struct rde_peer *peer, struct imsg *imsg)
627 {
628 	struct iq *iq;
629 
630 	iq = SIMPLEQ_FIRST(&peer->imsg_queue);
631 	if (iq == NULL)
632 		return 0;
633 
634 	imsg_move(imsg, &iq->imsg);
635 
636 	SIMPLEQ_REMOVE_HEAD(&peer->imsg_queue, entry);
637 	free(iq);
638 	imsg_pending--;
639 
640 	return 1;
641 }
642 
643 /*
644  * Check if any imsg are pending, return 0 if none are pending
645  */
646 int
peer_imsg_pending(void)647 peer_imsg_pending(void)
648 {
649 	return imsg_pending != 0;
650 }
651 
652 /*
653  * flush all imsg queued for a peer.
654  */
655 void
peer_imsg_flush(struct rde_peer * peer)656 peer_imsg_flush(struct rde_peer *peer)
657 {
658 	struct iq *iq;
659 
660 	while ((iq = SIMPLEQ_FIRST(&peer->imsg_queue)) != NULL) {
661 		SIMPLEQ_REMOVE_HEAD(&peer->imsg_queue, entry);
662 		free(iq);
663 		imsg_pending--;
664 	}
665 }
666