1 /* $OpenBSD: rde_peer.c,v 1.46 2025/01/27 15:22:11 claudio Exp $ */
2
3 /*
4 * Copyright (c) 2019 Claudio Jeker <claudio@openbsd.org>
5 *
6 * Permission to use, copy, modify, and distribute this software for any
7 * purpose with or without fee is hereby granted, provided that the above
8 * copyright notice and this permission notice appear in all copies.
9 *
10 * THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
11 * WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
12 * MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
13 * ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
14 * WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
15 * ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
16 * OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
17 */
18 #include <sys/types.h>
19 #include <sys/queue.h>
20
21 #include <stdlib.h>
22 #include <stdio.h>
23 #include <string.h>
24 #include <unistd.h>
25
26 #include "bgpd.h"
27 #include "rde.h"
28
29 struct peer_tree peertable = RB_INITIALIZER(&peertable);
30 struct peer_tree zombietable = RB_INITIALIZER(&zombietable);
31 struct rde_peer *peerself;
32 static long imsg_pending;
33
34 CTASSERT(sizeof(peerself->recv_eor) * 8 >= AID_MAX);
35 CTASSERT(sizeof(peerself->sent_eor) * 8 >= AID_MAX);
36
37 struct iq {
38 SIMPLEQ_ENTRY(iq) entry;
39 struct imsg imsg;
40 };
41
42 int
peer_has_as4byte(struct rde_peer * peer)43 peer_has_as4byte(struct rde_peer *peer)
44 {
45 return peer->capa.as4byte;
46 }
47
48 /*
49 * Check if ADD_PATH is enabled for aid and mode (rx / tx). If aid is
50 * AID_UNSPEC then the function returns true if any aid has mode enabled.
51 */
52 int
peer_has_add_path(struct rde_peer * peer,uint8_t aid,int mode)53 peer_has_add_path(struct rde_peer *peer, uint8_t aid, int mode)
54 {
55 if (aid >= AID_MAX)
56 return 0;
57 return peer->capa.add_path[aid] & mode;
58 }
59
60 int
peer_has_ext_msg(struct rde_peer * peer)61 peer_has_ext_msg(struct rde_peer *peer)
62 {
63 return peer->capa.ext_msg;
64 }
65
66 int
peer_has_ext_nexthop(struct rde_peer * peer,uint8_t aid)67 peer_has_ext_nexthop(struct rde_peer *peer, uint8_t aid)
68 {
69 if (aid >= AID_MAX)
70 return 0;
71 return peer->capa.ext_nh[aid];
72 }
73
74 int
peer_permit_as_set(struct rde_peer * peer)75 peer_permit_as_set(struct rde_peer *peer)
76 {
77 return peer->flags & PEERFLAG_PERMIT_AS_SET;
78 }
79
80 void
peer_init(struct filter_head * rules)81 peer_init(struct filter_head *rules)
82 {
83 struct peer_config pc;
84
85 memset(&pc, 0, sizeof(pc));
86 snprintf(pc.descr, sizeof(pc.descr), "LOCAL");
87 pc.id = PEER_ID_SELF;
88
89 peerself = peer_add(PEER_ID_SELF, &pc, rules);
90 peerself->state = PEER_UP;
91 }
92
93 void
peer_shutdown(void)94 peer_shutdown(void)
95 {
96 struct rde_peer *peer, *np;
97
98 RB_FOREACH_SAFE(peer, peer_tree, &peertable, np)
99 peer_delete(peer);
100
101 while (!RB_EMPTY(&zombietable))
102 peer_reaper(NULL);
103
104 if (!RB_EMPTY(&peertable))
105 log_warnx("%s: free non-free table", __func__);
106 }
107
108 /*
109 * Traverse all peers calling callback for each peer.
110 */
111 void
peer_foreach(void (* callback)(struct rde_peer *,void *),void * arg)112 peer_foreach(void (*callback)(struct rde_peer *, void *), void *arg)
113 {
114 struct rde_peer *peer, *np;
115
116 RB_FOREACH_SAFE(peer, peer_tree, &peertable, np)
117 callback(peer, arg);
118 }
119
120 /*
121 * Lookup a peer by peer_id, return NULL if not found.
122 */
123 struct rde_peer *
peer_get(uint32_t id)124 peer_get(uint32_t id)
125 {
126 struct rde_peer needle;
127
128 needle.conf.id = id;
129 return RB_FIND(peer_tree, &peertable, &needle);
130 }
131
132 /*
133 * Find next peer that matches neighbor options in *n.
134 * If peerid was set then pickup the lookup after that peer.
135 * Returns NULL if no more peers match.
136 */
137 struct rde_peer *
peer_match(struct ctl_neighbor * n,uint32_t peerid)138 peer_match(struct ctl_neighbor *n, uint32_t peerid)
139 {
140 struct rde_peer *peer;
141
142 if (peerid != 0) {
143 peer = peer_get(peerid);
144 if (peer)
145 peer = RB_NEXT(peer_tree, &peertable, peer);
146 } else
147 peer = RB_MIN(peer_tree, &peertable);
148
149 for (; peer != NULL; peer = RB_NEXT(peer_tree, &peertable, peer)) {
150 if (rde_match_peer(peer, n))
151 return peer;
152 }
153 return NULL;
154 }
155
156 struct rde_peer *
peer_add(uint32_t id,struct peer_config * p_conf,struct filter_head * rules)157 peer_add(uint32_t id, struct peer_config *p_conf, struct filter_head *rules)
158 {
159 struct rde_peer *peer;
160 int conflict;
161
162 if ((peer = peer_get(id))) {
163 memcpy(&peer->conf, p_conf, sizeof(struct peer_config));
164 return peer;
165 }
166
167 peer = calloc(1, sizeof(struct rde_peer));
168 if (peer == NULL)
169 fatal("peer_add");
170
171 memcpy(&peer->conf, p_conf, sizeof(struct peer_config));
172 peer->remote_bgpid = 0;
173 peer->loc_rib_id = rib_find(peer->conf.rib);
174 if (peer->loc_rib_id == RIB_NOTFOUND)
175 fatalx("King Bula's new peer met an unknown RIB");
176 peer->state = PEER_NONE;
177 peer->eval = peer->conf.eval;
178 peer->role = peer->conf.role;
179 peer->export_type = peer->conf.export_type;
180 peer->flags = peer->conf.flags;
181 SIMPLEQ_INIT(&peer->imsg_queue);
182
183 peer_apply_out_filter(peer, rules);
184
185 /*
186 * Assign an even random unique transmit path id.
187 * Odd path_id_tx numbers are for peers using add-path recv.
188 */
189 do {
190 struct rde_peer *p;
191
192 conflict = 0;
193 peer->path_id_tx = arc4random() << 1;
194 RB_FOREACH(p, peer_tree, &peertable) {
195 if (p->path_id_tx == peer->path_id_tx) {
196 conflict = 1;
197 break;
198 }
199 }
200 } while (conflict);
201
202 if (RB_INSERT(peer_tree, &peertable, peer) != NULL)
203 fatalx("rde peer table corrupted");
204
205 return peer;
206 }
207
208 struct filter_head *
peer_apply_out_filter(struct rde_peer * peer,struct filter_head * rules)209 peer_apply_out_filter(struct rde_peer *peer, struct filter_head *rules)
210 {
211 struct filter_head *old;
212 struct filter_rule *fr, *new;
213
214 old = peer->out_rules;
215 if ((peer->out_rules = malloc(sizeof(*peer->out_rules))) == NULL)
216 fatal(NULL);
217 TAILQ_INIT(peer->out_rules);
218
219 TAILQ_FOREACH(fr, rules, entry) {
220 if (rde_filter_skip_rule(peer, fr))
221 continue;
222
223 if ((new = malloc(sizeof(*new))) == NULL)
224 fatal(NULL);
225 memcpy(new, fr, sizeof(*new));
226 filterset_copy(&fr->set, &new->set);
227
228 TAILQ_INSERT_TAIL(peer->out_rules, new, entry);
229 }
230
231 return old;
232 }
233
234 static inline int
peer_cmp(struct rde_peer * a,struct rde_peer * b)235 peer_cmp(struct rde_peer *a, struct rde_peer *b)
236 {
237 if (a->conf.id > b->conf.id)
238 return 1;
239 if (a->conf.id < b->conf.id)
240 return -1;
241 return 0;
242 }
243
244 RB_GENERATE(peer_tree, rde_peer, entry, peer_cmp);
245
246 static void
peer_generate_update(struct rde_peer * peer,struct rib_entry * re,struct prefix * newpath,struct prefix * oldpath,enum eval_mode mode)247 peer_generate_update(struct rde_peer *peer, struct rib_entry *re,
248 struct prefix *newpath, struct prefix *oldpath,
249 enum eval_mode mode)
250 {
251 uint8_t aid;
252
253 aid = re->prefix->aid;
254
255 /* skip ourself */
256 if (peer == peerself)
257 return;
258 /* skip peers that never had a session open */
259 if (peer->state == PEER_NONE)
260 return;
261 /* skip peers using a different rib */
262 if (peer->loc_rib_id != re->rib_id)
263 return;
264 /* check if peer actually supports the address family */
265 if (peer->capa.mp[aid] == 0)
266 return;
267 /* skip peers with special export types */
268 if (peer->export_type == EXPORT_NONE ||
269 peer->export_type == EXPORT_DEFAULT_ROUTE)
270 return;
271
272 /* if reconf skip peers which don't need to reconfigure */
273 if (mode == EVAL_RECONF && peer->reconf_out == 0)
274 return;
275
276 /* handle peers with add-path */
277 if (peer_has_add_path(peer, aid, CAPA_AP_SEND)) {
278 if (peer->eval.mode == ADDPATH_EVAL_ALL)
279 up_generate_addpath_all(peer, re, newpath, oldpath);
280 else
281 up_generate_addpath(peer, re);
282 return;
283 }
284
285 /* skip regular peers if the best path didn't change */
286 if (mode == EVAL_ALL && (peer->flags & PEERFLAG_EVALUATE_ALL) == 0)
287 return;
288 up_generate_updates(peer, re);
289 }
290
291 void
rde_generate_updates(struct rib_entry * re,struct prefix * newpath,struct prefix * oldpath,enum eval_mode mode)292 rde_generate_updates(struct rib_entry *re, struct prefix *newpath,
293 struct prefix *oldpath, enum eval_mode mode)
294 {
295 struct rde_peer *peer;
296
297 RB_FOREACH(peer, peer_tree, &peertable)
298 peer_generate_update(peer, re, newpath, oldpath, mode);
299 }
300
301 /*
302 * Various RIB walker callbacks.
303 */
304 struct peer_flush {
305 struct rde_peer *peer;
306 time_t staletime;
307 };
308
309 static void
peer_flush_upcall(struct rib_entry * re,void * arg)310 peer_flush_upcall(struct rib_entry *re, void *arg)
311 {
312 struct rde_peer *peer = ((struct peer_flush *)arg)->peer;
313 struct rde_aspath *asp;
314 struct bgpd_addr addr;
315 struct prefix *p, *np, *rp;
316 time_t staletime = ((struct peer_flush *)arg)->staletime;
317 uint32_t i;
318 uint8_t prefixlen;
319
320 pt_getaddr(re->prefix, &addr);
321 prefixlen = re->prefix->prefixlen;
322 TAILQ_FOREACH_SAFE(p, &re->prefix_h, entry.list.rib, np) {
323 if (peer != prefix_peer(p))
324 continue;
325 if (staletime && p->lastchange > staletime)
326 continue;
327
328 for (i = RIB_LOC_START; i < rib_size; i++) {
329 struct rib *rib = rib_byid(i);
330 if (rib == NULL)
331 continue;
332 rp = prefix_get(rib, peer, p->path_id,
333 &addr, prefixlen);
334 if (rp) {
335 asp = prefix_aspath(rp);
336 if (asp && asp->pftableid)
337 rde_pftable_del(asp->pftableid, rp);
338
339 prefix_destroy(rp);
340 rde_update_log("flush", i, peer, NULL,
341 &addr, prefixlen);
342 }
343 }
344
345 prefix_destroy(p);
346 peer->stats.prefix_cnt--;
347 }
348 }
349
350 /*
351 * Session got established, bring peer up, load RIBs do initial table dump.
352 */
353 void
peer_up(struct rde_peer * peer,struct session_up * sup)354 peer_up(struct rde_peer *peer, struct session_up *sup)
355 {
356 uint8_t i;
357 int force_sync = 1;
358
359 if (peer->state == PEER_ERR) {
360 /*
361 * There is a race condition when doing PEER_ERR -> PEER_DOWN.
362 * So just do a full reset of the peer here.
363 */
364 rib_dump_terminate(peer);
365 peer_imsg_flush(peer);
366 peer_flush(peer, AID_UNSPEC, 0);
367 peer->stats.prefix_cnt = 0;
368 peer->state = PEER_DOWN;
369 }
370
371 /*
372 * Check if no value changed during flap to decide if the RIB
373 * is in sync. The capa check is maybe too strict but it should
374 * not matter for normal operation.
375 */
376 if (memcmp(&peer->remote_addr, &sup->remote_addr,
377 sizeof(sup->remote_addr)) == 0 &&
378 memcmp(&peer->local_v4_addr, &sup->local_v4_addr,
379 sizeof(sup->local_v4_addr)) == 0 &&
380 memcmp(&peer->local_v6_addr, &sup->local_v6_addr,
381 sizeof(sup->local_v6_addr)) == 0 &&
382 memcmp(&peer->capa, &sup->capa, sizeof(sup->capa)) == 0)
383 force_sync = 0;
384
385 peer->remote_addr = sup->remote_addr;
386 peer->local_v4_addr = sup->local_v4_addr;
387 peer->local_v6_addr = sup->local_v6_addr;
388 memcpy(&peer->capa, &sup->capa, sizeof(sup->capa));
389 /* the Adj-RIB-Out does not depend on those */
390 peer->remote_bgpid = sup->remote_bgpid;
391 peer->local_if_scope = sup->if_scope;
392 peer->short_as = sup->short_as;
393
394 /* clear eor markers depending on GR flags */
395 if (peer->capa.grestart.restart) {
396 peer->sent_eor = 0;
397 peer->recv_eor = 0;
398 } else {
399 /* no EOR expected */
400 peer->sent_eor = ~0;
401 peer->recv_eor = ~0;
402 }
403 peer->state = PEER_UP;
404
405 if (!force_sync) {
406 for (i = AID_MIN; i < AID_MAX; i++) {
407 if (peer->capa.mp[i])
408 peer_blast(peer, i);
409 }
410 } else {
411 for (i = AID_MIN; i < AID_MAX; i++) {
412 if (peer->capa.mp[i])
413 peer_dump(peer, i);
414 }
415 }
416 }
417
418 /*
419 * Session dropped and no graceful restart is done. Stop everything for
420 * this peer and clean up.
421 */
422 void
peer_down(struct rde_peer * peer)423 peer_down(struct rde_peer *peer)
424 {
425 peer->remote_bgpid = 0;
426 peer->state = PEER_DOWN;
427 /*
428 * stop all pending dumps which may depend on this peer
429 * and flush all pending imsg from the SE.
430 */
431 rib_dump_terminate(peer);
432 prefix_adjout_flush_pending(peer);
433 peer_imsg_flush(peer);
434
435 /* flush Adj-RIB-In */
436 peer_flush(peer, AID_UNSPEC, 0);
437 peer->stats.prefix_cnt = 0;
438 }
439
440 void
peer_delete(struct rde_peer * peer)441 peer_delete(struct rde_peer *peer)
442 {
443 if (peer->state != PEER_DOWN)
444 peer_down(peer);
445
446 /* free filters */
447 filterlist_free(peer->out_rules);
448
449 RB_REMOVE(peer_tree, &peertable, peer);
450 while (RB_INSERT(peer_tree, &zombietable, peer) != NULL) {
451 log_warnx("zombie peer conflict");
452 peer->conf.id = arc4random();
453 }
454
455 /* start reaping the zombie */
456 peer_reaper(peer);
457 }
458
459 /*
460 * Flush all routes older then staletime. If staletime is 0 all routes will
461 * be flushed.
462 */
463 void
peer_flush(struct rde_peer * peer,uint8_t aid,time_t staletime)464 peer_flush(struct rde_peer *peer, uint8_t aid, time_t staletime)
465 {
466 struct peer_flush pf = { peer, staletime };
467
468 /* this dump must run synchronous, too much depends on that right now */
469 if (rib_dump_new(RIB_ADJ_IN, aid, 0, &pf, peer_flush_upcall,
470 NULL, NULL) == -1)
471 fatal("%s: rib_dump_new", __func__);
472
473 /* every route is gone so reset staletime */
474 if (aid == AID_UNSPEC) {
475 uint8_t i;
476 for (i = AID_MIN; i < AID_MAX; i++)
477 peer->staletime[i] = 0;
478 } else {
479 peer->staletime[aid] = 0;
480 }
481 }
482
483 /*
484 * During graceful restart mark a peer as stale if the session goes down.
485 * For the specified AID the Adj-RIB-Out is marked stale and the staletime
486 * is set to the current timestamp for identifying stale routes in Adj-RIB-In.
487 */
488 void
peer_stale(struct rde_peer * peer,uint8_t aid,int flushall)489 peer_stale(struct rde_peer *peer, uint8_t aid, int flushall)
490 {
491 time_t now;
492
493 /* flush the now even staler routes out */
494 if (peer->staletime[aid])
495 peer_flush(peer, aid, peer->staletime[aid]);
496
497 peer->staletime[aid] = now = getmonotime();
498 peer->state = PEER_DOWN;
499
500 /*
501 * stop all pending dumps which may depend on this peer
502 * and flush all pending imsg from the SE.
503 */
504 rib_dump_terminate(peer);
505 prefix_adjout_flush_pending(peer);
506 peer_imsg_flush(peer);
507
508 if (flushall)
509 peer_flush(peer, aid, 0);
510
511 /* make sure new prefixes start on a higher timestamp */
512 while (now >= getmonotime())
513 sleep(1);
514 }
515
516 /*
517 * RIB walker callback for peer_blast.
518 * Enqueue a prefix onto the update queue so it can be sent out.
519 */
520 static void
peer_blast_upcall(struct prefix * p,void * ptr)521 peer_blast_upcall(struct prefix *p, void *ptr)
522 {
523 if (p->flags & PREFIX_FLAG_DEAD) {
524 /* ignore dead prefixes, they will go away soon */
525 } else if ((p->flags & PREFIX_FLAG_MASK) == 0) {
526 /* put entries on the update queue if not already on a queue */
527 p->flags |= PREFIX_FLAG_UPDATE;
528 if (RB_INSERT(prefix_tree, &prefix_peer(p)->updates[p->pt->aid],
529 p) != NULL)
530 fatalx("%s: RB tree invariant violated", __func__);
531 }
532 }
533
534 /*
535 * Called after all prefixes are put onto the update queue and we are
536 * ready to blast out updates to the peer.
537 */
538 static void
peer_blast_done(void * ptr,uint8_t aid)539 peer_blast_done(void *ptr, uint8_t aid)
540 {
541 struct rde_peer *peer = ptr;
542
543 /* Adj-RIB-Out ready, unthrottle peer and inject EOR */
544 peer->throttled = 0;
545 if (peer->capa.grestart.restart)
546 prefix_add_eor(peer, aid);
547 }
548
549 /*
550 * Send out the full Adj-RIB-Out by putting all prefixes onto the update
551 * queue.
552 */
553 void
peer_blast(struct rde_peer * peer,uint8_t aid)554 peer_blast(struct rde_peer *peer, uint8_t aid)
555 {
556 if (peer->capa.enhanced_rr && (peer->sent_eor & (1 << aid)))
557 rde_peer_send_rrefresh(peer, aid, ROUTE_REFRESH_BEGIN_RR);
558
559 /* force out all updates from the Adj-RIB-Out */
560 if (prefix_dump_new(peer, aid, 0, peer, peer_blast_upcall,
561 peer_blast_done, NULL) == -1)
562 fatal("%s: prefix_dump_new", __func__);
563 }
564
565 /* RIB walker callbacks for peer_dump. */
566 static void
peer_dump_upcall(struct rib_entry * re,void * ptr)567 peer_dump_upcall(struct rib_entry *re, void *ptr)
568 {
569 struct rde_peer *peer = ptr;
570 struct prefix *p;
571
572 if ((p = prefix_best(re)) == NULL)
573 /* no eligible prefix, not even for 'evaluate all' */
574 return;
575
576 peer_generate_update(peer, re, NULL, NULL, 0);
577 }
578
579 static void
peer_dump_done(void * ptr,uint8_t aid)580 peer_dump_done(void *ptr, uint8_t aid)
581 {
582 struct rde_peer *peer = ptr;
583
584 /* Adj-RIB-Out is ready, blast it out */
585 peer_blast(peer, aid);
586 }
587
588 /*
589 * Load the Adj-RIB-Out of a peer normally called when a session comes up
590 * for the first time. Once the Adj-RIB-Out is ready it will blast the
591 * updates out.
592 */
593 void
peer_dump(struct rde_peer * peer,uint8_t aid)594 peer_dump(struct rde_peer *peer, uint8_t aid)
595 {
596 /* throttle peer until dump is done */
597 peer->throttled = 1;
598
599 if (peer->export_type == EXPORT_NONE) {
600 peer_blast(peer, aid);
601 } else if (peer->export_type == EXPORT_DEFAULT_ROUTE) {
602 up_generate_default(peer, aid);
603 peer_blast(peer, aid);
604 } else if (aid == AID_FLOWSPECv4 || aid == AID_FLOWSPECv6) {
605 prefix_flowspec_dump(aid, peer, peer_dump_upcall,
606 peer_dump_done);
607 } else {
608 if (rib_dump_new(peer->loc_rib_id, aid, RDE_RUNNER_ROUNDS, peer,
609 peer_dump_upcall, peer_dump_done, NULL) == -1)
610 fatal("%s: rib_dump_new", __func__);
611 }
612 }
613
614 /*
615 * Start of an enhanced route refresh. Mark all routes as stale.
616 * Once the route refresh ends a End of Route Refresh message is sent
617 * which calls peer_flush() to remove all stale routes.
618 */
619 void
peer_begin_rrefresh(struct rde_peer * peer,uint8_t aid)620 peer_begin_rrefresh(struct rde_peer *peer, uint8_t aid)
621 {
622 time_t now;
623
624 /* flush the now even staler routes out */
625 if (peer->staletime[aid])
626 peer_flush(peer, aid, peer->staletime[aid]);
627
628 peer->staletime[aid] = now = getmonotime();
629
630 /* make sure new prefixes start on a higher timestamp */
631 while (now >= getmonotime())
632 sleep(1);
633 }
634
635 void
peer_reaper(struct rde_peer * peer)636 peer_reaper(struct rde_peer *peer)
637 {
638 if (peer == NULL)
639 peer = RB_ROOT(&zombietable);
640 if (peer == NULL)
641 return;
642
643 if (!prefix_adjout_reaper(peer))
644 return;
645
646 RB_REMOVE(peer_tree, &zombietable, peer);
647 free(peer);
648 }
649
650 /*
651 * Check if any imsg are pending or any zombie peers are around.
652 * Return 0 if no work is pending.
653 */
654 int
peer_work_pending(void)655 peer_work_pending(void)
656 {
657 if (!RB_EMPTY(&zombietable))
658 return 1;
659 return imsg_pending != 0;
660 }
661
662 /*
663 * move an imsg from src to dst, disconnecting any dynamic memory from src.
664 */
665 static void
imsg_move(struct imsg * dst,struct imsg * src)666 imsg_move(struct imsg *dst, struct imsg *src)
667 {
668 *dst = *src;
669 memset(src, 0, sizeof(*src));
670 }
671
672 /*
673 * push an imsg onto the peer imsg queue.
674 */
675 void
peer_imsg_push(struct rde_peer * peer,struct imsg * imsg)676 peer_imsg_push(struct rde_peer *peer, struct imsg *imsg)
677 {
678 struct iq *iq;
679
680 if ((iq = calloc(1, sizeof(*iq))) == NULL)
681 fatal(NULL);
682 imsg_move(&iq->imsg, imsg);
683 SIMPLEQ_INSERT_TAIL(&peer->imsg_queue, iq, entry);
684 imsg_pending++;
685 }
686
687 /*
688 * pop first imsg from peer imsg queue and move it into imsg argument.
689 * Returns 1 if an element is returned else 0.
690 */
691 int
peer_imsg_pop(struct rde_peer * peer,struct imsg * imsg)692 peer_imsg_pop(struct rde_peer *peer, struct imsg *imsg)
693 {
694 struct iq *iq;
695
696 iq = SIMPLEQ_FIRST(&peer->imsg_queue);
697 if (iq == NULL)
698 return 0;
699
700 imsg_move(imsg, &iq->imsg);
701
702 SIMPLEQ_REMOVE_HEAD(&peer->imsg_queue, entry);
703 free(iq);
704 imsg_pending--;
705
706 return 1;
707 }
708
709 /*
710 * flush all imsg queued for a peer.
711 */
712 void
peer_imsg_flush(struct rde_peer * peer)713 peer_imsg_flush(struct rde_peer *peer)
714 {
715 struct iq *iq;
716
717 while ((iq = SIMPLEQ_FIRST(&peer->imsg_queue)) != NULL) {
718 SIMPLEQ_REMOVE_HEAD(&peer->imsg_queue, entry);
719 free(iq);
720 imsg_pending--;
721 }
722 }
723