1 /*	$OpenBSD: scheduler_ramqueue.c,v 1.48 2023/05/31 16:51:46 op Exp $	*/
2 
3 /*
4  * Copyright (c) 2012 Gilles Chehade <gilles@poolp.org>
5  * Copyright (c) 2012 Eric Faurot <eric@openbsd.org>
6  *
7  * Permission to use, copy, modify, and distribute this software for any
8  * purpose with or without fee is hereby granted, provided that the above
9  * copyright notice and this permission notice appear in all copies.
10  *
11  * THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
12  * WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
13  * MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
14  * ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
15  * WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
16  * ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
17  * OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
18  */
19 
20 #include <inttypes.h>
21 #include <stdlib.h>
22 #include <string.h>
23 #include <time.h>
24 
25 #include "smtpd.h"
26 #include "log.h"
27 
28 TAILQ_HEAD(evplist, rq_envelope);
29 
30 struct rq_message {
31 	uint32_t		 msgid;
32 	struct tree		 envelopes;
33 };
34 
35 struct rq_envelope {
36 	TAILQ_ENTRY(rq_envelope) entry;
37 	SPLAY_ENTRY(rq_envelope) t_entry;
38 
39 	uint64_t		 evpid;
40 	uint64_t		 holdq;
41 	enum delivery_type	 type;
42 
43 #define	RQ_EVPSTATE_PENDING	 0
44 #define	RQ_EVPSTATE_SCHEDULED	 1
45 #define	RQ_EVPSTATE_INFLIGHT	 2
46 #define	RQ_EVPSTATE_HELD	 3
47 	uint8_t			 state;
48 
49 #define	RQ_ENVELOPE_EXPIRED	 0x01
50 #define	RQ_ENVELOPE_REMOVED	 0x02
51 #define	RQ_ENVELOPE_SUSPEND	 0x04
52 #define	RQ_ENVELOPE_UPDATE	 0x08
53 #define	RQ_ENVELOPE_OVERFLOW	 0x10
54 	uint8_t			 flags;
55 
56 	time_t			 ctime;
57 	time_t			 sched;
58 	time_t			 expire;
59 
60 	struct rq_message	*message;
61 
62 	time_t			 t_inflight;
63 	time_t			 t_scheduled;
64 };
65 
66 struct rq_holdq {
67 	struct evplist		 q;
68 	size_t			 count;
69 };
70 
71 struct rq_queue {
72 	size_t			 evpcount;
73 	struct tree		 messages;
74 	SPLAY_HEAD(prioqtree, rq_envelope)	q_priotree;
75 
76 	struct evplist		 q_pending;
77 	struct evplist		 q_inflight;
78 
79 	struct evplist		 q_mta;
80 	struct evplist		 q_mda;
81 	struct evplist		 q_bounce;
82 	struct evplist		 q_update;
83 	struct evplist		 q_expired;
84 	struct evplist		 q_removed;
85 };
86 
87 static int rq_envelope_cmp(struct rq_envelope *, struct rq_envelope *);
88 
89 SPLAY_PROTOTYPE(prioqtree, rq_envelope, t_entry, rq_envelope_cmp);
90 static int scheduler_ram_init(const char *);
91 static int scheduler_ram_insert(struct scheduler_info *);
92 static size_t scheduler_ram_commit(uint32_t);
93 static size_t scheduler_ram_rollback(uint32_t);
94 static int scheduler_ram_update(struct scheduler_info *);
95 static int scheduler_ram_delete(uint64_t);
96 static int scheduler_ram_hold(uint64_t, uint64_t);
97 static int scheduler_ram_release(int, uint64_t, int);
98 static int scheduler_ram_batch(int, int *, size_t *, uint64_t *, int *);
99 static size_t scheduler_ram_messages(uint32_t, uint32_t *, size_t);
100 static size_t scheduler_ram_envelopes(uint64_t, struct evpstate *, size_t);
101 static int scheduler_ram_schedule(uint64_t);
102 static int scheduler_ram_remove(uint64_t);
103 static int scheduler_ram_suspend(uint64_t);
104 static int scheduler_ram_resume(uint64_t);
105 static int scheduler_ram_query(uint64_t);
106 
107 static void sorted_insert(struct rq_queue *, struct rq_envelope *);
108 
109 static void rq_queue_init(struct rq_queue *);
110 static void rq_queue_merge(struct rq_queue *, struct rq_queue *);
111 static void rq_queue_dump(struct rq_queue *, const char *);
112 static void rq_queue_schedule(struct rq_queue *rq);
113 static struct evplist *rq_envelope_list(struct rq_queue *, struct rq_envelope *);
114 static void rq_envelope_schedule(struct rq_queue *, struct rq_envelope *);
115 static int rq_envelope_remove(struct rq_queue *, struct rq_envelope *);
116 static int rq_envelope_suspend(struct rq_queue *, struct rq_envelope *);
117 static int rq_envelope_resume(struct rq_queue *, struct rq_envelope *);
118 static void rq_envelope_delete(struct rq_queue *, struct rq_envelope *);
119 static const char *rq_envelope_to_text(struct rq_envelope *);
120 
121 struct scheduler_backend scheduler_backend_ramqueue = {
122 	scheduler_ram_init,
123 
124 	scheduler_ram_insert,
125 	scheduler_ram_commit,
126 	scheduler_ram_rollback,
127 
128 	scheduler_ram_update,
129 	scheduler_ram_delete,
130 	scheduler_ram_hold,
131 	scheduler_ram_release,
132 
133 	scheduler_ram_batch,
134 
135 	scheduler_ram_messages,
136 	scheduler_ram_envelopes,
137 	scheduler_ram_schedule,
138 	scheduler_ram_remove,
139 	scheduler_ram_suspend,
140 	scheduler_ram_resume,
141 	scheduler_ram_query,
142 };
143 
144 static struct rq_queue	ramqueue;
145 static struct tree	updates;
146 static struct tree	holdqs[3]; /* delivery type */
147 
148 static time_t		currtime;
149 
150 #define BACKOFF_TRANSFER	400
151 #define BACKOFF_DELIVERY	10
152 #define BACKOFF_OVERFLOW	3
153 
154 static time_t
scheduler_backoff(time_t t0,time_t base,uint32_t step)155 scheduler_backoff(time_t t0, time_t base, uint32_t step)
156 {
157 	return (t0 + base * step * step);
158 }
159 
160 static time_t
scheduler_next(time_t t0,time_t base,uint32_t step)161 scheduler_next(time_t t0, time_t base, uint32_t step)
162 {
163 	time_t t;
164 
165 	/* XXX be more efficient */
166 	while ((t = scheduler_backoff(t0, base, step)) <= currtime)
167 		step++;
168 
169 	return (t);
170 }
171 
172 static int
scheduler_ram_init(const char * arg)173 scheduler_ram_init(const char *arg)
174 {
175 	rq_queue_init(&ramqueue);
176 	tree_init(&updates);
177 	tree_init(&holdqs[D_MDA]);
178 	tree_init(&holdqs[D_MTA]);
179 	tree_init(&holdqs[D_BOUNCE]);
180 
181 	return (1);
182 }
183 
184 static int
scheduler_ram_insert(struct scheduler_info * si)185 scheduler_ram_insert(struct scheduler_info *si)
186 {
187 	struct rq_queue		*update;
188 	struct rq_message	*message;
189 	struct rq_envelope	*envelope;
190 	uint32_t		 msgid;
191 
192 	currtime = time(NULL);
193 
194 	msgid = evpid_to_msgid(si->evpid);
195 
196 	/* find/prepare a ramqueue update */
197 	if ((update = tree_get(&updates, msgid)) == NULL) {
198 		update = xcalloc(1, sizeof *update);
199 		stat_increment("scheduler.ramqueue.update", 1);
200 		rq_queue_init(update);
201 		tree_xset(&updates, msgid, update);
202 	}
203 
204 	/* find/prepare the msgtree message in ramqueue update */
205 	if ((message = tree_get(&update->messages, msgid)) == NULL) {
206 		message = xcalloc(1, sizeof *message);
207 		message->msgid = msgid;
208 		tree_init(&message->envelopes);
209 		tree_xset(&update->messages, msgid, message);
210 		stat_increment("scheduler.ramqueue.message", 1);
211 	}
212 
213 	/* create envelope in ramqueue message */
214 	envelope = xcalloc(1, sizeof *envelope);
215 	envelope->evpid = si->evpid;
216 	envelope->type = si->type;
217 	envelope->message = message;
218 	envelope->ctime = si->creation;
219 	envelope->expire = si->creation + si->ttl;
220 	envelope->sched = scheduler_backoff(si->creation,
221 	    (si->type == D_MTA) ? BACKOFF_TRANSFER : BACKOFF_DELIVERY, si->retry);
222 	tree_xset(&message->envelopes, envelope->evpid, envelope);
223 
224 	update->evpcount++;
225 	stat_increment("scheduler.ramqueue.envelope", 1);
226 
227 	envelope->state = RQ_EVPSTATE_PENDING;
228 	TAILQ_INSERT_TAIL(&update->q_pending, envelope, entry);
229 
230 	si->nexttry = envelope->sched;
231 
232 	return (1);
233 }
234 
235 static size_t
scheduler_ram_commit(uint32_t msgid)236 scheduler_ram_commit(uint32_t msgid)
237 {
238 	struct rq_queue	*update;
239 	size_t		 r;
240 
241 	currtime = time(NULL);
242 
243 	update = tree_xpop(&updates, msgid);
244 	r = update->evpcount;
245 
246 	if (tracing & TRACE_SCHEDULER)
247 		rq_queue_dump(update, "update to commit");
248 
249 	rq_queue_merge(&ramqueue, update);
250 
251 	if (tracing & TRACE_SCHEDULER)
252 		rq_queue_dump(&ramqueue, "resulting queue");
253 
254 	rq_queue_schedule(&ramqueue);
255 
256 	free(update);
257 	stat_decrement("scheduler.ramqueue.update", 1);
258 
259 	return (r);
260 }
261 
262 static size_t
scheduler_ram_rollback(uint32_t msgid)263 scheduler_ram_rollback(uint32_t msgid)
264 {
265 	struct rq_queue		*update;
266 	struct rq_envelope	*evp;
267 	size_t			 r;
268 
269 	currtime = time(NULL);
270 
271 	if ((update = tree_pop(&updates, msgid)) == NULL)
272 		return (0);
273 	r = update->evpcount;
274 
275 	while ((evp = TAILQ_FIRST(&update->q_pending))) {
276 		TAILQ_REMOVE(&update->q_pending, evp, entry);
277 		rq_envelope_delete(update, evp);
278 	}
279 
280 	free(update);
281 	stat_decrement("scheduler.ramqueue.update", 1);
282 
283 	return (r);
284 }
285 
286 static int
scheduler_ram_update(struct scheduler_info * si)287 scheduler_ram_update(struct scheduler_info *si)
288 {
289 	struct rq_message	*msg;
290 	struct rq_envelope	*evp;
291 	uint32_t		 msgid;
292 
293 	currtime = time(NULL);
294 
295 	msgid = evpid_to_msgid(si->evpid);
296 	msg = tree_xget(&ramqueue.messages, msgid);
297 	evp = tree_xget(&msg->envelopes, si->evpid);
298 
299 	/* it *must* be in-flight */
300 	if (evp->state != RQ_EVPSTATE_INFLIGHT)
301 		fatalx("evp:%016" PRIx64 " not in-flight", si->evpid);
302 
303 	TAILQ_REMOVE(&ramqueue.q_inflight, evp, entry);
304 
305 	/*
306 	 * If the envelope was removed while inflight,  schedule it for
307 	 * removal immediately.
308 	 */
309 	if (evp->flags & RQ_ENVELOPE_REMOVED) {
310 		TAILQ_INSERT_TAIL(&ramqueue.q_removed, evp, entry);
311 		evp->state = RQ_EVPSTATE_SCHEDULED;
312 		evp->t_scheduled = currtime;
313 		return (1);
314 	}
315 
316 	evp->sched = scheduler_next(evp->ctime,
317 	    (si->type == D_MTA) ? BACKOFF_TRANSFER : BACKOFF_DELIVERY, si->retry);
318 
319 	evp->state = RQ_EVPSTATE_PENDING;
320 	if (!(evp->flags & RQ_ENVELOPE_SUSPEND))
321 		sorted_insert(&ramqueue, evp);
322 
323 	si->nexttry = evp->sched;
324 
325 	return (1);
326 }
327 
328 static int
scheduler_ram_delete(uint64_t evpid)329 scheduler_ram_delete(uint64_t evpid)
330 {
331 	struct rq_message	*msg;
332 	struct rq_envelope	*evp;
333 	uint32_t		 msgid;
334 
335 	currtime = time(NULL);
336 
337 	msgid = evpid_to_msgid(evpid);
338 	msg = tree_xget(&ramqueue.messages, msgid);
339 	evp = tree_xget(&msg->envelopes, evpid);
340 
341 	/* it *must* be in-flight */
342 	if (evp->state != RQ_EVPSTATE_INFLIGHT)
343 		fatalx("evp:%016" PRIx64 " not in-flight", evpid);
344 
345 	TAILQ_REMOVE(&ramqueue.q_inflight, evp, entry);
346 
347 	rq_envelope_delete(&ramqueue, evp);
348 
349 	return (1);
350 }
351 
352 #define HOLDQ_MAXSIZE	1000
353 
354 static int
scheduler_ram_hold(uint64_t evpid,uint64_t holdq)355 scheduler_ram_hold(uint64_t evpid, uint64_t holdq)
356 {
357 	struct rq_holdq		*hq;
358 	struct rq_message	*msg;
359 	struct rq_envelope	*evp;
360 	uint32_t		 msgid;
361 
362 	currtime = time(NULL);
363 
364 	msgid = evpid_to_msgid(evpid);
365 	msg = tree_xget(&ramqueue.messages, msgid);
366 	evp = tree_xget(&msg->envelopes, evpid);
367 
368 	/* it *must* be in-flight */
369 	if (evp->state != RQ_EVPSTATE_INFLIGHT)
370 		fatalx("evp:%016" PRIx64 " not in-flight", evpid);
371 
372 	TAILQ_REMOVE(&ramqueue.q_inflight, evp, entry);
373 
374 	/* If the envelope is suspended, just mark it as pending */
375 	if (evp->flags & RQ_ENVELOPE_SUSPEND) {
376 		evp->state = RQ_EVPSTATE_PENDING;
377 		return (0);
378 	}
379 
380 	hq = tree_get(&holdqs[evp->type], holdq);
381 	if (hq == NULL) {
382 		hq = xcalloc(1, sizeof(*hq));
383 		TAILQ_INIT(&hq->q);
384 		tree_xset(&holdqs[evp->type], holdq, hq);
385 		stat_increment("scheduler.ramqueue.holdq", 1);
386 	}
387 
388 	/* If the holdq is full, just "tempfail" the envelope */
389 	if (hq->count >= HOLDQ_MAXSIZE) {
390 		evp->state = RQ_EVPSTATE_PENDING;
391 		evp->flags |= RQ_ENVELOPE_UPDATE;
392 		evp->flags |= RQ_ENVELOPE_OVERFLOW;
393 		sorted_insert(&ramqueue, evp);
394 		stat_increment("scheduler.ramqueue.hold-overflow", 1);
395 		return (0);
396 	}
397 
398 	evp->state = RQ_EVPSTATE_HELD;
399 	evp->holdq = holdq;
400 	/* This is an optimization: upon release, the envelopes will be
401 	 * inserted in the pending queue from the first element to the last.
402 	 * Since elements already in the queue were received first, they
403 	 * were scheduled first, so they will be reinserted before the
404 	 * current element.
405 	 */
406 	TAILQ_INSERT_HEAD(&hq->q, evp, entry);
407 	hq->count += 1;
408 	stat_increment("scheduler.ramqueue.hold", 1);
409 
410 	return (1);
411 }
412 
413 static int
scheduler_ram_release(int type,uint64_t holdq,int n)414 scheduler_ram_release(int type, uint64_t holdq, int n)
415 {
416 	struct rq_holdq		*hq;
417 	struct rq_envelope	*evp;
418 	int			 i, update;
419 
420 	currtime = time(NULL);
421 
422 	hq = tree_get(&holdqs[type], holdq);
423 	if (hq == NULL)
424 		return (0);
425 
426 	if (n == -1) {
427 		n = 0;
428 		update = 1;
429 	}
430 	else
431 		update = 0;
432 
433 	for (i = 0; n == 0 || i < n; i++) {
434 		evp = TAILQ_FIRST(&hq->q);
435 		if (evp == NULL)
436 			break;
437 
438 		TAILQ_REMOVE(&hq->q, evp, entry);
439 		hq->count -= 1;
440 		evp->holdq = 0;
441 
442 		/* When released, all envelopes are put in the pending queue
443 		 * and will be rescheduled immediately.  As an optimization,
444 		 * we could just schedule them directly.
445 		 */
446 		evp->state = RQ_EVPSTATE_PENDING;
447 		if (update)
448 			evp->flags |= RQ_ENVELOPE_UPDATE;
449 		sorted_insert(&ramqueue, evp);
450 	}
451 
452 	if (TAILQ_EMPTY(&hq->q)) {
453 		tree_xpop(&holdqs[type], holdq);
454 		free(hq);
455 		stat_decrement("scheduler.ramqueue.holdq", 1);
456 	}
457 	stat_decrement("scheduler.ramqueue.hold", i);
458 
459 	return (i);
460 }
461 
462 static int
scheduler_ram_batch(int mask,int * delay,size_t * count,uint64_t * evpids,int * types)463 scheduler_ram_batch(int mask, int *delay, size_t *count, uint64_t *evpids, int *types)
464 {
465 	struct rq_envelope	*evp;
466 	size_t			 i, n;
467 	time_t			 t;
468 
469 	currtime = time(NULL);
470 
471 	rq_queue_schedule(&ramqueue);
472 	if (tracing & TRACE_SCHEDULER)
473 		rq_queue_dump(&ramqueue, "scheduler_ram_batch()");
474 
475 	i = 0;
476 	n = 0;
477 
478 	for (;;) {
479 
480 		if (mask & SCHED_REMOVE && (evp = TAILQ_FIRST(&ramqueue.q_removed))) {
481 			TAILQ_REMOVE(&ramqueue.q_removed, evp, entry);
482 			types[i] = SCHED_REMOVE;
483 			evpids[i] = evp->evpid;
484 			rq_envelope_delete(&ramqueue, evp);
485 
486 			if (++i == *count)
487 				break;
488 		}
489 
490 		if (mask & SCHED_EXPIRE && (evp = TAILQ_FIRST(&ramqueue.q_expired))) {
491 			TAILQ_REMOVE(&ramqueue.q_expired, evp, entry);
492 			types[i] = SCHED_EXPIRE;
493 			evpids[i] = evp->evpid;
494 			rq_envelope_delete(&ramqueue, evp);
495 
496 			if (++i == *count)
497 				break;
498 		}
499 
500 		if (mask & SCHED_UPDATE && (evp = TAILQ_FIRST(&ramqueue.q_update))) {
501 			TAILQ_REMOVE(&ramqueue.q_update, evp, entry);
502 			types[i] = SCHED_UPDATE;
503 			evpids[i] = evp->evpid;
504 
505 			if (evp->flags & RQ_ENVELOPE_OVERFLOW)
506 				t = BACKOFF_OVERFLOW;
507 			else if (evp->type == D_MTA)
508 				t = BACKOFF_TRANSFER;
509 			else
510 				t = BACKOFF_DELIVERY;
511 
512 			evp->sched = scheduler_next(evp->ctime, t, 0);
513 			evp->flags &= ~(RQ_ENVELOPE_UPDATE|RQ_ENVELOPE_OVERFLOW);
514 			evp->state = RQ_EVPSTATE_PENDING;
515 			if (!(evp->flags & RQ_ENVELOPE_SUSPEND))
516 				sorted_insert(&ramqueue, evp);
517 
518 			if (++i == *count)
519 				break;
520 		}
521 
522 		if (mask & SCHED_BOUNCE && (evp = TAILQ_FIRST(&ramqueue.q_bounce))) {
523 			TAILQ_REMOVE(&ramqueue.q_bounce, evp, entry);
524 			types[i] = SCHED_BOUNCE;
525 			evpids[i] = evp->evpid;
526 
527 			TAILQ_INSERT_TAIL(&ramqueue.q_inflight, evp, entry);
528 			evp->state = RQ_EVPSTATE_INFLIGHT;
529 			evp->t_inflight = currtime;
530 
531 			if (++i == *count)
532 				break;
533 		}
534 
535 		if (mask & SCHED_MDA && (evp = TAILQ_FIRST(&ramqueue.q_mda))) {
536 			TAILQ_REMOVE(&ramqueue.q_mda, evp, entry);
537 			types[i] = SCHED_MDA;
538 			evpids[i] = evp->evpid;
539 
540 			TAILQ_INSERT_TAIL(&ramqueue.q_inflight, evp, entry);
541 			evp->state = RQ_EVPSTATE_INFLIGHT;
542 			evp->t_inflight = currtime;
543 
544 			if (++i == *count)
545 				break;
546 		}
547 
548 		if (mask & SCHED_MTA && (evp = TAILQ_FIRST(&ramqueue.q_mta))) {
549 			TAILQ_REMOVE(&ramqueue.q_mta, evp, entry);
550 			types[i] = SCHED_MTA;
551 			evpids[i] = evp->evpid;
552 
553 			TAILQ_INSERT_TAIL(&ramqueue.q_inflight, evp, entry);
554 			evp->state = RQ_EVPSTATE_INFLIGHT;
555 			evp->t_inflight = currtime;
556 
557 			if (++i == *count)
558 				break;
559 		}
560 
561 		/* nothing seen this round */
562 		if (i == n)
563 			break;
564 
565 		n = i;
566 	}
567 
568 	if (i) {
569 		*count = i;
570 		return (1);
571 	}
572 
573 	if ((evp = TAILQ_FIRST(&ramqueue.q_pending))) {
574 		if (evp->sched < evp->expire)
575 			t = evp->sched;
576 		else
577 			t = evp->expire;
578 		*delay = (t < currtime) ? 0 : (t - currtime);
579 	}
580 	else
581 		*delay = -1;
582 
583 	return (0);
584 }
585 
586 static size_t
scheduler_ram_messages(uint32_t from,uint32_t * dst,size_t size)587 scheduler_ram_messages(uint32_t from, uint32_t *dst, size_t size)
588 {
589 	uint64_t id;
590 	size_t	 n;
591 	void	*i;
592 
593 	for (n = 0, i = NULL; n < size; n++) {
594 		if (tree_iterfrom(&ramqueue.messages, &i, from, &id, NULL) == 0)
595 			break;
596 		dst[n] = id;
597 	}
598 
599 	return (n);
600 }
601 
602 static size_t
scheduler_ram_envelopes(uint64_t from,struct evpstate * dst,size_t size)603 scheduler_ram_envelopes(uint64_t from, struct evpstate *dst, size_t size)
604 {
605 	struct rq_message	*msg;
606 	struct rq_envelope	*evp;
607 	void			*i;
608 	size_t			 n;
609 
610 	if ((msg = tree_get(&ramqueue.messages, evpid_to_msgid(from))) == NULL)
611 		return (0);
612 
613 	for (n = 0, i = NULL; n < size; ) {
614 
615 		if (tree_iterfrom(&msg->envelopes, &i, from, NULL,
616 		    (void**)&evp) == 0)
617 			break;
618 
619 		if (evp->flags & (RQ_ENVELOPE_REMOVED | RQ_ENVELOPE_EXPIRED))
620 			continue;
621 
622 		dst[n].evpid = evp->evpid;
623 		dst[n].flags = 0;
624 		dst[n].retry = 0;
625 		dst[n].time = 0;
626 
627 		if (evp->state == RQ_EVPSTATE_PENDING) {
628 			dst[n].time = evp->sched;
629 			dst[n].flags = EF_PENDING;
630 		}
631 		else if (evp->state == RQ_EVPSTATE_SCHEDULED) {
632 			dst[n].time = evp->t_scheduled;
633 			dst[n].flags = EF_PENDING;
634 		}
635 		else if (evp->state == RQ_EVPSTATE_INFLIGHT) {
636 			dst[n].time = evp->t_inflight;
637 			dst[n].flags = EF_INFLIGHT;
638 		}
639 		else if (evp->state == RQ_EVPSTATE_HELD) {
640 			/* same as scheduled */
641 			dst[n].time = evp->t_scheduled;
642 			dst[n].flags = EF_PENDING;
643 			dst[n].flags |= EF_HOLD;
644 		}
645 		if (evp->flags & RQ_ENVELOPE_SUSPEND)
646 			dst[n].flags |= EF_SUSPEND;
647 
648 		n++;
649 	}
650 
651 	return (n);
652 }
653 
654 static int
scheduler_ram_schedule(uint64_t evpid)655 scheduler_ram_schedule(uint64_t evpid)
656 {
657 	struct rq_message	*msg;
658 	struct rq_envelope	*evp;
659 	uint32_t		 msgid;
660 	void			*i;
661 	int			 r;
662 
663 	currtime = time(NULL);
664 
665 	if (evpid > 0xffffffff) {
666 		msgid = evpid_to_msgid(evpid);
667 		if ((msg = tree_get(&ramqueue.messages, msgid)) == NULL)
668 			return (0);
669 		if ((evp = tree_get(&msg->envelopes, evpid)) == NULL)
670 			return (0);
671 		if (evp->state == RQ_EVPSTATE_INFLIGHT)
672 			return (0);
673 		rq_envelope_schedule(&ramqueue, evp);
674 		return (1);
675 	}
676 	else {
677 		msgid = evpid;
678 		if ((msg = tree_get(&ramqueue.messages, msgid)) == NULL)
679 			return (0);
680 		i = NULL;
681 		r = 0;
682 		while (tree_iter(&msg->envelopes, &i, NULL, (void*)(&evp))) {
683 			if (evp->state == RQ_EVPSTATE_INFLIGHT)
684 				continue;
685 			rq_envelope_schedule(&ramqueue, evp);
686 			r++;
687 		}
688 		return (r);
689 	}
690 }
691 
692 static int
scheduler_ram_remove(uint64_t evpid)693 scheduler_ram_remove(uint64_t evpid)
694 {
695 	struct rq_message	*msg;
696 	struct rq_envelope	*evp;
697 	uint32_t		 msgid;
698 	void			*i;
699 	int			 r;
700 
701 	currtime = time(NULL);
702 
703 	if (evpid > 0xffffffff) {
704 		msgid = evpid_to_msgid(evpid);
705 		if ((msg = tree_get(&ramqueue.messages, msgid)) == NULL)
706 			return (0);
707 		if ((evp = tree_get(&msg->envelopes, evpid)) == NULL)
708 			return (0);
709 		if (rq_envelope_remove(&ramqueue, evp))
710 			return (1);
711 		return (0);
712 	}
713 	else {
714 		msgid = evpid;
715 		if ((msg = tree_get(&ramqueue.messages, msgid)) == NULL)
716 			return (0);
717 		i = NULL;
718 		r = 0;
719 		while (tree_iter(&msg->envelopes, &i, NULL, (void*)(&evp)))
720 			if (rq_envelope_remove(&ramqueue, evp))
721 				r++;
722 		return (r);
723 	}
724 }
725 
726 static int
scheduler_ram_suspend(uint64_t evpid)727 scheduler_ram_suspend(uint64_t evpid)
728 {
729 	struct rq_message	*msg;
730 	struct rq_envelope	*evp;
731 	uint32_t		 msgid;
732 	void			*i;
733 	int			 r;
734 
735 	currtime = time(NULL);
736 
737 	if (evpid > 0xffffffff) {
738 		msgid = evpid_to_msgid(evpid);
739 		if ((msg = tree_get(&ramqueue.messages, msgid)) == NULL)
740 			return (0);
741 		if ((evp = tree_get(&msg->envelopes, evpid)) == NULL)
742 			return (0);
743 		if (rq_envelope_suspend(&ramqueue, evp))
744 			return (1);
745 		return (0);
746 	}
747 	else {
748 		msgid = evpid;
749 		if ((msg = tree_get(&ramqueue.messages, msgid)) == NULL)
750 			return (0);
751 		i = NULL;
752 		r = 0;
753 		while (tree_iter(&msg->envelopes, &i, NULL, (void*)(&evp)))
754 			if (rq_envelope_suspend(&ramqueue, evp))
755 				r++;
756 		return (r);
757 	}
758 }
759 
760 static int
scheduler_ram_resume(uint64_t evpid)761 scheduler_ram_resume(uint64_t evpid)
762 {
763 	struct rq_message	*msg;
764 	struct rq_envelope	*evp;
765 	uint32_t		 msgid;
766 	void			*i;
767 	int			 r;
768 
769 	currtime = time(NULL);
770 
771 	if (evpid > 0xffffffff) {
772 		msgid = evpid_to_msgid(evpid);
773 		if ((msg = tree_get(&ramqueue.messages, msgid)) == NULL)
774 			return (0);
775 		if ((evp = tree_get(&msg->envelopes, evpid)) == NULL)
776 			return (0);
777 		if (rq_envelope_resume(&ramqueue, evp))
778 			return (1);
779 		return (0);
780 	}
781 	else {
782 		msgid = evpid;
783 		if ((msg = tree_get(&ramqueue.messages, msgid)) == NULL)
784 			return (0);
785 		i = NULL;
786 		r = 0;
787 		while (tree_iter(&msg->envelopes, &i, NULL, (void*)(&evp)))
788 			if (rq_envelope_resume(&ramqueue, evp))
789 				r++;
790 		return (r);
791 	}
792 }
793 
794 static int
scheduler_ram_query(uint64_t evpid)795 scheduler_ram_query(uint64_t evpid)
796 {
797 	uint32_t msgid;
798 
799 	if (evpid > 0xffffffff)
800 		msgid = evpid_to_msgid(evpid);
801 	else
802 		msgid = evpid;
803 
804 	if (tree_get(&ramqueue.messages, msgid) == NULL)
805 		return (0);
806 
807 	return (1);
808 }
809 
810 static void
sorted_insert(struct rq_queue * rq,struct rq_envelope * evp)811 sorted_insert(struct rq_queue *rq, struct rq_envelope *evp)
812 {
813 	struct rq_envelope	*evp2;
814 
815 	SPLAY_INSERT(prioqtree, &rq->q_priotree, evp);
816 	evp2 = SPLAY_NEXT(prioqtree, &rq->q_priotree, evp);
817 	if (evp2)
818 		TAILQ_INSERT_BEFORE(evp2, evp, entry);
819 	else
820 		TAILQ_INSERT_TAIL(&rq->q_pending, evp, entry);
821 }
822 
823 static void
rq_queue_init(struct rq_queue * rq)824 rq_queue_init(struct rq_queue *rq)
825 {
826 	memset(rq, 0, sizeof *rq);
827 	tree_init(&rq->messages);
828 	TAILQ_INIT(&rq->q_pending);
829 	TAILQ_INIT(&rq->q_inflight);
830 	TAILQ_INIT(&rq->q_mta);
831 	TAILQ_INIT(&rq->q_mda);
832 	TAILQ_INIT(&rq->q_bounce);
833 	TAILQ_INIT(&rq->q_update);
834 	TAILQ_INIT(&rq->q_expired);
835 	TAILQ_INIT(&rq->q_removed);
836 	SPLAY_INIT(&rq->q_priotree);
837 }
838 
839 static void
rq_queue_merge(struct rq_queue * rq,struct rq_queue * update)840 rq_queue_merge(struct rq_queue *rq, struct rq_queue *update)
841 {
842 	struct rq_message	*message, *tomessage;
843 	struct rq_envelope	*envelope;
844 	uint64_t		 id;
845 	void			*i;
846 
847 	while (tree_poproot(&update->messages, &id, (void*)&message)) {
848 		if ((tomessage = tree_get(&rq->messages, id)) == NULL) {
849 			/* message does not exist. re-use structure */
850 			tree_xset(&rq->messages, id, message);
851 			continue;
852 		}
853 		/* need to re-link all envelopes before merging them */
854 		i = NULL;
855 		while ((tree_iter(&message->envelopes, &i, &id,
856 		    (void*)&envelope)))
857 			envelope->message = tomessage;
858 		tree_merge(&tomessage->envelopes, &message->envelopes);
859 		free(message);
860 		stat_decrement("scheduler.ramqueue.message", 1);
861 	}
862 
863 	/* Sorted insert in the pending queue */
864 	while ((envelope = TAILQ_FIRST(&update->q_pending))) {
865 		TAILQ_REMOVE(&update->q_pending, envelope, entry);
866 		sorted_insert(rq, envelope);
867 	}
868 
869 	rq->evpcount += update->evpcount;
870 }
871 
872 #define SCHEDULEMAX	1024
873 
874 static void
rq_queue_schedule(struct rq_queue * rq)875 rq_queue_schedule(struct rq_queue *rq)
876 {
877 	struct rq_envelope	*evp;
878 	size_t			 n;
879 
880 	n = 0;
881 	while ((evp = TAILQ_FIRST(&rq->q_pending))) {
882 		if (evp->sched > currtime && evp->expire > currtime)
883 			break;
884 
885 		if (n == SCHEDULEMAX)
886 			break;
887 
888 		if (evp->state != RQ_EVPSTATE_PENDING)
889 			fatalx("evp:%016" PRIx64 " flags=0x%x", evp->evpid,
890 			    evp->flags);
891 
892 		if (evp->expire <= currtime) {
893 			TAILQ_REMOVE(&rq->q_pending, evp, entry);
894 			SPLAY_REMOVE(prioqtree, &rq->q_priotree, evp);
895 			TAILQ_INSERT_TAIL(&rq->q_expired, evp, entry);
896 			evp->state = RQ_EVPSTATE_SCHEDULED;
897 			evp->flags |= RQ_ENVELOPE_EXPIRED;
898 			evp->t_scheduled = currtime;
899 			continue;
900 		}
901 		rq_envelope_schedule(rq, evp);
902 		n += 1;
903 	}
904 }
905 
906 static struct evplist *
rq_envelope_list(struct rq_queue * rq,struct rq_envelope * evp)907 rq_envelope_list(struct rq_queue *rq, struct rq_envelope *evp)
908 {
909 	switch (evp->state) {
910 	case RQ_EVPSTATE_PENDING:
911 		return &rq->q_pending;
912 
913 	case RQ_EVPSTATE_SCHEDULED:
914 		if (evp->flags & RQ_ENVELOPE_EXPIRED)
915 			return &rq->q_expired;
916 		if (evp->flags & RQ_ENVELOPE_REMOVED)
917 			return &rq->q_removed;
918 		if (evp->flags & RQ_ENVELOPE_UPDATE)
919 			return &rq->q_update;
920 		if (evp->type == D_MTA)
921 			return &rq->q_mta;
922 		if (evp->type == D_MDA)
923 			return &rq->q_mda;
924 		if (evp->type == D_BOUNCE)
925 			return &rq->q_bounce;
926 		fatalx("%016" PRIx64 " bad evp type %d", evp->evpid, evp->type);
927 
928 	case RQ_EVPSTATE_INFLIGHT:
929 		return &rq->q_inflight;
930 
931 	case RQ_EVPSTATE_HELD:
932 		return (NULL);
933 	}
934 
935 	fatalx("%016" PRIx64 " bad state %d", evp->evpid, evp->state);
936 	return (NULL);
937 }
938 
939 static void
rq_envelope_schedule(struct rq_queue * rq,struct rq_envelope * evp)940 rq_envelope_schedule(struct rq_queue *rq, struct rq_envelope *evp)
941 {
942 	struct rq_holdq	*hq;
943 	struct evplist	*q = NULL;
944 
945 	switch (evp->type) {
946 	case D_MTA:
947 		q = &rq->q_mta;
948 		break;
949 	case D_MDA:
950 		q = &rq->q_mda;
951 		break;
952 	case D_BOUNCE:
953 		q = &rq->q_bounce;
954 		break;
955 	}
956 
957 	if (evp->flags & RQ_ENVELOPE_UPDATE)
958 		q = &rq->q_update;
959 
960 	if (evp->state == RQ_EVPSTATE_HELD) {
961 		hq = tree_xget(&holdqs[evp->type], evp->holdq);
962 		TAILQ_REMOVE(&hq->q, evp, entry);
963 		hq->count -= 1;
964 		if (TAILQ_EMPTY(&hq->q)) {
965 			tree_xpop(&holdqs[evp->type], evp->holdq);
966 			free(hq);
967 		}
968 		evp->holdq = 0;
969 		stat_decrement("scheduler.ramqueue.hold", 1);
970 	}
971 	else if (!(evp->flags & RQ_ENVELOPE_SUSPEND)) {
972 		TAILQ_REMOVE(&rq->q_pending, evp, entry);
973 		SPLAY_REMOVE(prioqtree, &rq->q_priotree, evp);
974 	}
975 
976 	TAILQ_INSERT_TAIL(q, evp, entry);
977 	evp->state = RQ_EVPSTATE_SCHEDULED;
978 	evp->t_scheduled = currtime;
979 }
980 
981 static int
rq_envelope_remove(struct rq_queue * rq,struct rq_envelope * evp)982 rq_envelope_remove(struct rq_queue *rq, struct rq_envelope *evp)
983 {
984 	struct rq_holdq	*hq;
985 	struct evplist	*evl;
986 
987 	if (evp->flags & (RQ_ENVELOPE_REMOVED | RQ_ENVELOPE_EXPIRED))
988 		return (0);
989 	/*
990 	 * If envelope is inflight, mark it envelope for removal.
991 	 */
992 	if (evp->state == RQ_EVPSTATE_INFLIGHT) {
993 		evp->flags |= RQ_ENVELOPE_REMOVED;
994 		return (1);
995 	}
996 
997 	if (evp->state == RQ_EVPSTATE_HELD) {
998 		hq = tree_xget(&holdqs[evp->type], evp->holdq);
999 		TAILQ_REMOVE(&hq->q, evp, entry);
1000 		hq->count -= 1;
1001 		if (TAILQ_EMPTY(&hq->q)) {
1002 			tree_xpop(&holdqs[evp->type], evp->holdq);
1003 			free(hq);
1004 		}
1005 		evp->holdq = 0;
1006 		stat_decrement("scheduler.ramqueue.hold", 1);
1007 	}
1008 	else if (!(evp->flags & RQ_ENVELOPE_SUSPEND)) {
1009 		evl = rq_envelope_list(rq, evp);
1010 		TAILQ_REMOVE(evl, evp, entry);
1011 		if (evl == &rq->q_pending)
1012 			SPLAY_REMOVE(prioqtree, &rq->q_priotree, evp);
1013 	}
1014 
1015 	TAILQ_INSERT_TAIL(&rq->q_removed, evp, entry);
1016 	evp->state = RQ_EVPSTATE_SCHEDULED;
1017 	evp->flags |= RQ_ENVELOPE_REMOVED;
1018 	evp->t_scheduled = currtime;
1019 
1020 	return (1);
1021 }
1022 
1023 static int
rq_envelope_suspend(struct rq_queue * rq,struct rq_envelope * evp)1024 rq_envelope_suspend(struct rq_queue *rq, struct rq_envelope *evp)
1025 {
1026 	struct rq_holdq	*hq;
1027 	struct evplist	*evl;
1028 
1029 	if (evp->flags & RQ_ENVELOPE_SUSPEND)
1030 		return (0);
1031 
1032 	if (evp->state == RQ_EVPSTATE_HELD) {
1033 		hq = tree_xget(&holdqs[evp->type], evp->holdq);
1034 		TAILQ_REMOVE(&hq->q, evp, entry);
1035 		hq->count -= 1;
1036 		if (TAILQ_EMPTY(&hq->q)) {
1037 			tree_xpop(&holdqs[evp->type], evp->holdq);
1038 			free(hq);
1039 		}
1040 		evp->holdq = 0;
1041 		evp->state = RQ_EVPSTATE_PENDING;
1042 		stat_decrement("scheduler.ramqueue.hold", 1);
1043 	}
1044 	else if (evp->state != RQ_EVPSTATE_INFLIGHT) {
1045 		evl = rq_envelope_list(rq, evp);
1046 		TAILQ_REMOVE(evl, evp, entry);
1047 		if (evl == &rq->q_pending)
1048 			SPLAY_REMOVE(prioqtree, &rq->q_priotree, evp);
1049 	}
1050 
1051 	evp->flags |= RQ_ENVELOPE_SUSPEND;
1052 
1053 	return (1);
1054 }
1055 
1056 static int
rq_envelope_resume(struct rq_queue * rq,struct rq_envelope * evp)1057 rq_envelope_resume(struct rq_queue *rq, struct rq_envelope *evp)
1058 {
1059 	struct evplist	*evl;
1060 
1061 	if (!(evp->flags & RQ_ENVELOPE_SUSPEND))
1062 		return (0);
1063 
1064 	if (evp->state != RQ_EVPSTATE_INFLIGHT) {
1065 		evl = rq_envelope_list(rq, evp);
1066 		if (evl == &rq->q_pending)
1067 			sorted_insert(rq, evp);
1068 		else
1069 			TAILQ_INSERT_TAIL(evl, evp, entry);
1070 	}
1071 
1072 	evp->flags &= ~RQ_ENVELOPE_SUSPEND;
1073 
1074 	return (1);
1075 }
1076 
1077 static void
rq_envelope_delete(struct rq_queue * rq,struct rq_envelope * evp)1078 rq_envelope_delete(struct rq_queue *rq, struct rq_envelope *evp)
1079 {
1080 	tree_xpop(&evp->message->envelopes, evp->evpid);
1081 	if (tree_empty(&evp->message->envelopes)) {
1082 		tree_xpop(&rq->messages, evp->message->msgid);
1083 		free(evp->message);
1084 		stat_decrement("scheduler.ramqueue.message", 1);
1085 	}
1086 
1087 	free(evp);
1088 	rq->evpcount--;
1089 	stat_decrement("scheduler.ramqueue.envelope", 1);
1090 }
1091 
1092 static const char *
rq_envelope_to_text(struct rq_envelope * e)1093 rq_envelope_to_text(struct rq_envelope *e)
1094 {
1095 	static char	buf[256];
1096 	char		t[64];
1097 
1098 	(void)snprintf(buf, sizeof buf, "evp:%016" PRIx64 " [", e->evpid);
1099 
1100 	if (e->type == D_BOUNCE)
1101 		(void)strlcat(buf, "bounce", sizeof buf);
1102 	else if (e->type == D_MDA)
1103 		(void)strlcat(buf, "mda", sizeof buf);
1104 	else if (e->type == D_MTA)
1105 		(void)strlcat(buf, "mta", sizeof buf);
1106 
1107 	(void)snprintf(t, sizeof t, ",expire=%s",
1108 	    duration_to_text(e->expire - currtime));
1109 	(void)strlcat(buf, t, sizeof buf);
1110 
1111 
1112 	switch (e->state) {
1113 	case RQ_EVPSTATE_PENDING:
1114 		(void)snprintf(t, sizeof t, ",pending=%s",
1115 		    duration_to_text(e->sched - currtime));
1116 		(void)strlcat(buf, t, sizeof buf);
1117 		break;
1118 
1119 	case RQ_EVPSTATE_SCHEDULED:
1120 		(void)snprintf(t, sizeof t, ",scheduled=%s",
1121 		    duration_to_text(currtime - e->t_scheduled));
1122 		(void)strlcat(buf, t, sizeof buf);
1123 		break;
1124 
1125 	case RQ_EVPSTATE_INFLIGHT:
1126 		(void)snprintf(t, sizeof t, ",inflight=%s",
1127 		    duration_to_text(currtime - e->t_inflight));
1128 		(void)strlcat(buf, t, sizeof buf);
1129 		break;
1130 
1131 	case RQ_EVPSTATE_HELD:
1132 		(void)snprintf(t, sizeof t, ",held=%s",
1133 		    duration_to_text(currtime - e->t_inflight));
1134 		(void)strlcat(buf, t, sizeof buf);
1135 		break;
1136 	default:
1137 		fatalx("%016" PRIx64 " bad state %d", e->evpid, e->state);
1138 	}
1139 
1140 	if (e->flags & RQ_ENVELOPE_REMOVED)
1141 		(void)strlcat(buf, ",removed", sizeof buf);
1142 	if (e->flags & RQ_ENVELOPE_EXPIRED)
1143 		(void)strlcat(buf, ",expired", sizeof buf);
1144 	if (e->flags & RQ_ENVELOPE_SUSPEND)
1145 		(void)strlcat(buf, ",suspended", sizeof buf);
1146 
1147 	(void)strlcat(buf, "]", sizeof buf);
1148 
1149 	return (buf);
1150 }
1151 
1152 static void
rq_queue_dump(struct rq_queue * rq,const char * name)1153 rq_queue_dump(struct rq_queue *rq, const char * name)
1154 {
1155 	struct rq_message	*message;
1156 	struct rq_envelope	*envelope;
1157 	void			*i, *j;
1158 	uint64_t		 id;
1159 
1160 	log_debug("debug: /--- ramqueue: %s", name);
1161 
1162 	i = NULL;
1163 	while ((tree_iter(&rq->messages, &i, &id, (void*)&message))) {
1164 		log_debug("debug: | msg:%08" PRIx32, message->msgid);
1165 		j = NULL;
1166 		while ((tree_iter(&message->envelopes, &j, &id,
1167 		    (void*)&envelope)))
1168 			log_debug("debug: |   %s",
1169 			    rq_envelope_to_text(envelope));
1170 	}
1171 	log_debug("debug: \\---");
1172 }
1173 
1174 static int
rq_envelope_cmp(struct rq_envelope * e1,struct rq_envelope * e2)1175 rq_envelope_cmp(struct rq_envelope *e1, struct rq_envelope *e2)
1176 {
1177 	time_t	ref1, ref2;
1178 
1179 	ref1 = (e1->sched < e1->expire) ? e1->sched : e1->expire;
1180 	ref2 = (e2->sched < e2->expire) ? e2->sched : e2->expire;
1181 	if (ref1 != ref2)
1182 		return (ref1 < ref2) ? -1 : 1;
1183 
1184 	if (e1->evpid != e2->evpid)
1185 		return (e1->evpid < e2->evpid) ? -1 : 1;
1186 
1187 	return 0;
1188 }
1189 
1190 SPLAY_GENERATE(prioqtree, rq_envelope, t_entry, rq_envelope_cmp);
1191