1 /* $OpenBSD: scheduler_ramqueue.c,v 1.49 2024/09/03 18:27:04 op Exp $ */
2
3 /*
4 * Copyright (c) 2012 Gilles Chehade <gilles@poolp.org>
5 * Copyright (c) 2012 Eric Faurot <eric@openbsd.org>
6 *
7 * Permission to use, copy, modify, and distribute this software for any
8 * purpose with or without fee is hereby granted, provided that the above
9 * copyright notice and this permission notice appear in all copies.
10 *
11 * THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
12 * WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
13 * MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
14 * ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
15 * WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
16 * ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
17 * OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
18 */
19
20 #include <inttypes.h>
21 #include <stdlib.h>
22 #include <string.h>
23 #include <time.h>
24
25 #include "smtpd.h"
26 #include "log.h"
27
28 TAILQ_HEAD(evplist, rq_envelope);
29
30 struct rq_message {
31 uint32_t msgid;
32 struct tree envelopes;
33 };
34
35 struct rq_envelope {
36 TAILQ_ENTRY(rq_envelope) entry;
37 SPLAY_ENTRY(rq_envelope) t_entry;
38
39 uint64_t evpid;
40 uint64_t holdq;
41 enum delivery_type type;
42
43 #define RQ_EVPSTATE_PENDING 0
44 #define RQ_EVPSTATE_SCHEDULED 1
45 #define RQ_EVPSTATE_INFLIGHT 2
46 #define RQ_EVPSTATE_HELD 3
47 uint8_t state;
48
49 #define RQ_ENVELOPE_EXPIRED 0x01
50 #define RQ_ENVELOPE_REMOVED 0x02
51 #define RQ_ENVELOPE_SUSPEND 0x04
52 #define RQ_ENVELOPE_UPDATE 0x08
53 #define RQ_ENVELOPE_OVERFLOW 0x10
54 uint8_t flags;
55
56 time_t ctime;
57 time_t sched;
58 time_t expire;
59
60 struct rq_message *message;
61
62 time_t t_inflight;
63 time_t t_scheduled;
64 };
65
66 struct rq_holdq {
67 struct evplist q;
68 size_t count;
69 };
70
71 struct rq_queue {
72 size_t evpcount;
73 struct tree messages;
74 SPLAY_HEAD(prioqtree, rq_envelope) q_priotree;
75
76 struct evplist q_pending;
77 struct evplist q_inflight;
78
79 struct evplist q_mta;
80 struct evplist q_mda;
81 struct evplist q_bounce;
82 struct evplist q_update;
83 struct evplist q_expired;
84 struct evplist q_removed;
85 };
86
87 static int rq_envelope_cmp(struct rq_envelope *, struct rq_envelope *);
88
89 SPLAY_PROTOTYPE(prioqtree, rq_envelope, t_entry, rq_envelope_cmp);
90 static int scheduler_ram_init(const char *);
91 static int scheduler_ram_insert(struct scheduler_info *);
92 static size_t scheduler_ram_commit(uint32_t);
93 static size_t scheduler_ram_rollback(uint32_t);
94 static int scheduler_ram_update(struct scheduler_info *);
95 static int scheduler_ram_delete(uint64_t);
96 static int scheduler_ram_hold(uint64_t, uint64_t);
97 static int scheduler_ram_release(int, uint64_t, int);
98 static int scheduler_ram_batch(int, int *, size_t *, uint64_t *, int *);
99 static size_t scheduler_ram_messages(uint32_t, uint32_t *, size_t);
100 static size_t scheduler_ram_envelopes(uint64_t, struct evpstate *, size_t);
101 static int scheduler_ram_schedule(uint64_t);
102 static int scheduler_ram_remove(uint64_t);
103 static int scheduler_ram_suspend(uint64_t);
104 static int scheduler_ram_resume(uint64_t);
105 static int scheduler_ram_query(uint64_t);
106
107 static void sorted_insert(struct rq_queue *, struct rq_envelope *);
108
109 static void rq_queue_init(struct rq_queue *);
110 static void rq_queue_merge(struct rq_queue *, struct rq_queue *);
111 static void rq_queue_dump(struct rq_queue *, const char *);
112 static void rq_queue_schedule(struct rq_queue *rq);
113 static struct evplist *rq_envelope_list(struct rq_queue *, struct rq_envelope *);
114 static void rq_envelope_schedule(struct rq_queue *, struct rq_envelope *);
115 static int rq_envelope_remove(struct rq_queue *, struct rq_envelope *);
116 static int rq_envelope_suspend(struct rq_queue *, struct rq_envelope *);
117 static int rq_envelope_resume(struct rq_queue *, struct rq_envelope *);
118 static void rq_envelope_delete(struct rq_queue *, struct rq_envelope *);
119 static const char *rq_envelope_to_text(struct rq_envelope *);
120
121 struct scheduler_backend scheduler_backend_ramqueue = {
122 scheduler_ram_init,
123
124 scheduler_ram_insert,
125 scheduler_ram_commit,
126 scheduler_ram_rollback,
127
128 scheduler_ram_update,
129 scheduler_ram_delete,
130 scheduler_ram_hold,
131 scheduler_ram_release,
132
133 scheduler_ram_batch,
134
135 scheduler_ram_messages,
136 scheduler_ram_envelopes,
137 scheduler_ram_schedule,
138 scheduler_ram_remove,
139 scheduler_ram_suspend,
140 scheduler_ram_resume,
141 scheduler_ram_query,
142 };
143
144 static struct rq_queue ramqueue;
145 static struct tree updates;
146 static struct tree holdqs[3]; /* delivery type */
147
148 static time_t currtime;
149
150 #define BACKOFF_TRANSFER 400
151 #define BACKOFF_DELIVERY 10
152 #define BACKOFF_OVERFLOW 3
153
154 static time_t
scheduler_backoff(time_t t0,time_t base,uint32_t step)155 scheduler_backoff(time_t t0, time_t base, uint32_t step)
156 {
157 return (t0 + base * step * step);
158 }
159
160 static time_t
scheduler_next(time_t t0,time_t base,uint32_t step)161 scheduler_next(time_t t0, time_t base, uint32_t step)
162 {
163 time_t t;
164
165 /* XXX be more efficient */
166 while ((t = scheduler_backoff(t0, base, step)) <= currtime)
167 step++;
168
169 return (t);
170 }
171
172 static int
scheduler_ram_init(const char * arg)173 scheduler_ram_init(const char *arg)
174 {
175 rq_queue_init(&ramqueue);
176 tree_init(&updates);
177 tree_init(&holdqs[D_MDA]);
178 tree_init(&holdqs[D_MTA]);
179 tree_init(&holdqs[D_BOUNCE]);
180
181 return (1);
182 }
183
184 static int
scheduler_ram_insert(struct scheduler_info * si)185 scheduler_ram_insert(struct scheduler_info *si)
186 {
187 struct rq_queue *update;
188 struct rq_message *message;
189 struct rq_envelope *envelope;
190 uint32_t msgid;
191
192 currtime = time(NULL);
193
194 msgid = evpid_to_msgid(si->evpid);
195
196 /* find/prepare a ramqueue update */
197 if ((update = tree_get(&updates, msgid)) == NULL) {
198 update = xcalloc(1, sizeof *update);
199 stat_increment("scheduler.ramqueue.update", 1);
200 rq_queue_init(update);
201 tree_xset(&updates, msgid, update);
202 }
203
204 /* find/prepare the msgtree message in ramqueue update */
205 if ((message = tree_get(&update->messages, msgid)) == NULL) {
206 message = xcalloc(1, sizeof *message);
207 message->msgid = msgid;
208 tree_init(&message->envelopes);
209 tree_xset(&update->messages, msgid, message);
210 stat_increment("scheduler.ramqueue.message", 1);
211 }
212
213 /* create envelope in ramqueue message */
214 envelope = xcalloc(1, sizeof *envelope);
215 envelope->evpid = si->evpid;
216 envelope->type = si->type;
217 envelope->message = message;
218 envelope->ctime = si->creation;
219 envelope->expire = si->creation + si->ttl;
220 envelope->sched = scheduler_backoff(si->creation,
221 (si->type == D_MTA) ? BACKOFF_TRANSFER : BACKOFF_DELIVERY, si->retry);
222 tree_xset(&message->envelopes, envelope->evpid, envelope);
223
224 update->evpcount++;
225 stat_increment("scheduler.ramqueue.envelope", 1);
226
227 envelope->state = RQ_EVPSTATE_PENDING;
228 TAILQ_INSERT_TAIL(&update->q_pending, envelope, entry);
229
230 si->nexttry = envelope->sched;
231
232 return (1);
233 }
234
235 static size_t
scheduler_ram_commit(uint32_t msgid)236 scheduler_ram_commit(uint32_t msgid)
237 {
238 struct rq_queue *update;
239 size_t r;
240
241 currtime = time(NULL);
242
243 update = tree_xpop(&updates, msgid);
244 r = update->evpcount;
245
246 if (tracing & TRACE_SCHEDULER)
247 rq_queue_dump(update, "update to commit");
248
249 rq_queue_merge(&ramqueue, update);
250
251 if (tracing & TRACE_SCHEDULER)
252 rq_queue_dump(&ramqueue, "resulting queue");
253
254 rq_queue_schedule(&ramqueue);
255
256 free(update);
257 stat_decrement("scheduler.ramqueue.update", 1);
258
259 return (r);
260 }
261
262 static size_t
scheduler_ram_rollback(uint32_t msgid)263 scheduler_ram_rollback(uint32_t msgid)
264 {
265 struct rq_queue *update;
266 struct rq_envelope *evp;
267 size_t r;
268
269 currtime = time(NULL);
270
271 if ((update = tree_pop(&updates, msgid)) == NULL)
272 return (0);
273 r = update->evpcount;
274
275 while ((evp = TAILQ_FIRST(&update->q_pending))) {
276 TAILQ_REMOVE(&update->q_pending, evp, entry);
277 rq_envelope_delete(update, evp);
278 }
279
280 free(update);
281 stat_decrement("scheduler.ramqueue.update", 1);
282
283 return (r);
284 }
285
286 static int
scheduler_ram_update(struct scheduler_info * si)287 scheduler_ram_update(struct scheduler_info *si)
288 {
289 struct rq_message *msg;
290 struct rq_envelope *evp;
291 uint32_t msgid;
292
293 currtime = time(NULL);
294
295 msgid = evpid_to_msgid(si->evpid);
296 msg = tree_xget(&ramqueue.messages, msgid);
297 evp = tree_xget(&msg->envelopes, si->evpid);
298
299 /* it *must* be in-flight */
300 if (evp->state != RQ_EVPSTATE_INFLIGHT)
301 fatalx("evp:%016" PRIx64 " not in-flight", si->evpid);
302
303 TAILQ_REMOVE(&ramqueue.q_inflight, evp, entry);
304
305 /*
306 * If the envelope was removed while inflight, schedule it for
307 * removal immediately.
308 */
309 if (evp->flags & RQ_ENVELOPE_REMOVED) {
310 TAILQ_INSERT_TAIL(&ramqueue.q_removed, evp, entry);
311 evp->state = RQ_EVPSTATE_SCHEDULED;
312 evp->t_scheduled = currtime;
313 return (1);
314 }
315
316 evp->sched = scheduler_next(evp->ctime,
317 (si->type == D_MTA) ? BACKOFF_TRANSFER : BACKOFF_DELIVERY, si->retry);
318
319 evp->state = RQ_EVPSTATE_PENDING;
320 if (!(evp->flags & RQ_ENVELOPE_SUSPEND))
321 sorted_insert(&ramqueue, evp);
322
323 si->nexttry = evp->sched;
324
325 return (1);
326 }
327
328 static int
scheduler_ram_delete(uint64_t evpid)329 scheduler_ram_delete(uint64_t evpid)
330 {
331 struct rq_message *msg;
332 struct rq_envelope *evp;
333 uint32_t msgid;
334
335 currtime = time(NULL);
336
337 msgid = evpid_to_msgid(evpid);
338 msg = tree_xget(&ramqueue.messages, msgid);
339 evp = tree_xget(&msg->envelopes, evpid);
340
341 /* it *must* be in-flight */
342 if (evp->state != RQ_EVPSTATE_INFLIGHT)
343 fatalx("evp:%016" PRIx64 " not in-flight", evpid);
344
345 TAILQ_REMOVE(&ramqueue.q_inflight, evp, entry);
346
347 rq_envelope_delete(&ramqueue, evp);
348
349 return (1);
350 }
351
352 #define HOLDQ_MAXSIZE 1000
353
354 static int
scheduler_ram_hold(uint64_t evpid,uint64_t holdq)355 scheduler_ram_hold(uint64_t evpid, uint64_t holdq)
356 {
357 struct rq_holdq *hq;
358 struct rq_message *msg;
359 struct rq_envelope *evp;
360 uint32_t msgid;
361
362 currtime = time(NULL);
363
364 msgid = evpid_to_msgid(evpid);
365 msg = tree_xget(&ramqueue.messages, msgid);
366 evp = tree_xget(&msg->envelopes, evpid);
367
368 /* it *must* be in-flight */
369 if (evp->state != RQ_EVPSTATE_INFLIGHT)
370 fatalx("evp:%016" PRIx64 " not in-flight", evpid);
371
372 TAILQ_REMOVE(&ramqueue.q_inflight, evp, entry);
373
374 /* If the envelope is suspended, just mark it as pending */
375 if (evp->flags & RQ_ENVELOPE_SUSPEND) {
376 evp->state = RQ_EVPSTATE_PENDING;
377 return (0);
378 }
379
380 hq = tree_get(&holdqs[evp->type], holdq);
381 if (hq == NULL) {
382 hq = xcalloc(1, sizeof(*hq));
383 TAILQ_INIT(&hq->q);
384 tree_xset(&holdqs[evp->type], holdq, hq);
385 stat_increment("scheduler.ramqueue.holdq", 1);
386 }
387
388 /* If the holdq is full, just "tempfail" the envelope */
389 if (hq->count >= HOLDQ_MAXSIZE) {
390 evp->state = RQ_EVPSTATE_PENDING;
391 evp->flags |= RQ_ENVELOPE_UPDATE;
392 evp->flags |= RQ_ENVELOPE_OVERFLOW;
393 sorted_insert(&ramqueue, evp);
394 stat_increment("scheduler.ramqueue.hold-overflow", 1);
395 return (0);
396 }
397
398 evp->state = RQ_EVPSTATE_HELD;
399 evp->holdq = holdq;
400 /* This is an optimization: upon release, the envelopes will be
401 * inserted in the pending queue from the first element to the last.
402 * Since elements already in the queue were received first, they
403 * were scheduled first, so they will be reinserted before the
404 * current element.
405 */
406 TAILQ_INSERT_HEAD(&hq->q, evp, entry);
407 hq->count += 1;
408 stat_increment("scheduler.ramqueue.hold", 1);
409
410 return (1);
411 }
412
413 static int
scheduler_ram_release(int type,uint64_t holdq,int n)414 scheduler_ram_release(int type, uint64_t holdq, int n)
415 {
416 struct rq_holdq *hq;
417 struct rq_envelope *evp;
418 int i, update;
419
420 currtime = time(NULL);
421
422 hq = tree_get(&holdqs[type], holdq);
423 if (hq == NULL)
424 return (0);
425
426 if (n == -1) {
427 n = 0;
428 update = 1;
429 }
430 else
431 update = 0;
432
433 for (i = 0; n == 0 || i < n; i++) {
434 evp = TAILQ_FIRST(&hq->q);
435 if (evp == NULL)
436 break;
437
438 TAILQ_REMOVE(&hq->q, evp, entry);
439 hq->count -= 1;
440 evp->holdq = 0;
441
442 /* When released, all envelopes are put in the pending queue
443 * and will be rescheduled immediately. As an optimization,
444 * we could just schedule them directly.
445 */
446 evp->state = RQ_EVPSTATE_PENDING;
447 if (update)
448 evp->flags |= RQ_ENVELOPE_UPDATE;
449 sorted_insert(&ramqueue, evp);
450 }
451
452 if (TAILQ_EMPTY(&hq->q)) {
453 tree_xpop(&holdqs[type], holdq);
454 free(hq);
455 stat_decrement("scheduler.ramqueue.holdq", 1);
456 }
457 stat_decrement("scheduler.ramqueue.hold", i);
458
459 return (i);
460 }
461
462 static int
scheduler_ram_batch(int mask,int * delay,size_t * count,uint64_t * evpids,int * types)463 scheduler_ram_batch(int mask, int *delay, size_t *count, uint64_t *evpids, int *types)
464 {
465 struct rq_envelope *evp;
466 size_t i, n;
467 time_t t;
468
469 currtime = time(NULL);
470
471 rq_queue_schedule(&ramqueue);
472 if (tracing & TRACE_SCHEDULER)
473 rq_queue_dump(&ramqueue, "scheduler_ram_batch()");
474
475 i = 0;
476 n = 0;
477
478 for (;;) {
479
480 if (mask & SCHED_REMOVE && (evp = TAILQ_FIRST(&ramqueue.q_removed))) {
481 TAILQ_REMOVE(&ramqueue.q_removed, evp, entry);
482 types[i] = SCHED_REMOVE;
483 evpids[i] = evp->evpid;
484 rq_envelope_delete(&ramqueue, evp);
485
486 if (++i == *count)
487 break;
488 }
489
490 if (mask & SCHED_EXPIRE && (evp = TAILQ_FIRST(&ramqueue.q_expired))) {
491 TAILQ_REMOVE(&ramqueue.q_expired, evp, entry);
492 types[i] = SCHED_EXPIRE;
493 evpids[i] = evp->evpid;
494 rq_envelope_delete(&ramqueue, evp);
495
496 if (++i == *count)
497 break;
498 }
499
500 if (mask & SCHED_UPDATE && (evp = TAILQ_FIRST(&ramqueue.q_update))) {
501 TAILQ_REMOVE(&ramqueue.q_update, evp, entry);
502 types[i] = SCHED_UPDATE;
503 evpids[i] = evp->evpid;
504
505 if (evp->flags & RQ_ENVELOPE_OVERFLOW)
506 t = BACKOFF_OVERFLOW;
507 else if (evp->type == D_MTA)
508 t = BACKOFF_TRANSFER;
509 else
510 t = BACKOFF_DELIVERY;
511
512 evp->sched = scheduler_next(evp->ctime, t, 0);
513 evp->flags &= ~(RQ_ENVELOPE_UPDATE|RQ_ENVELOPE_OVERFLOW);
514 evp->state = RQ_EVPSTATE_PENDING;
515 if (!(evp->flags & RQ_ENVELOPE_SUSPEND))
516 sorted_insert(&ramqueue, evp);
517
518 if (++i == *count)
519 break;
520 }
521
522 if (mask & SCHED_BOUNCE && (evp = TAILQ_FIRST(&ramqueue.q_bounce))) {
523 TAILQ_REMOVE(&ramqueue.q_bounce, evp, entry);
524 types[i] = SCHED_BOUNCE;
525 evpids[i] = evp->evpid;
526
527 TAILQ_INSERT_TAIL(&ramqueue.q_inflight, evp, entry);
528 evp->state = RQ_EVPSTATE_INFLIGHT;
529 evp->t_inflight = currtime;
530
531 if (++i == *count)
532 break;
533 }
534
535 if (mask & SCHED_MDA && (evp = TAILQ_FIRST(&ramqueue.q_mda))) {
536 TAILQ_REMOVE(&ramqueue.q_mda, evp, entry);
537 types[i] = SCHED_MDA;
538 evpids[i] = evp->evpid;
539
540 TAILQ_INSERT_TAIL(&ramqueue.q_inflight, evp, entry);
541 evp->state = RQ_EVPSTATE_INFLIGHT;
542 evp->t_inflight = currtime;
543
544 if (++i == *count)
545 break;
546 }
547
548 if (mask & SCHED_MTA && (evp = TAILQ_FIRST(&ramqueue.q_mta))) {
549 TAILQ_REMOVE(&ramqueue.q_mta, evp, entry);
550 types[i] = SCHED_MTA;
551 evpids[i] = evp->evpid;
552
553 TAILQ_INSERT_TAIL(&ramqueue.q_inflight, evp, entry);
554 evp->state = RQ_EVPSTATE_INFLIGHT;
555 evp->t_inflight = currtime;
556
557 if (++i == *count)
558 break;
559 }
560
561 /* nothing seen this round */
562 if (i == n)
563 break;
564
565 n = i;
566 }
567
568 if (i) {
569 *count = i;
570 return (1);
571 }
572
573 if ((evp = TAILQ_FIRST(&ramqueue.q_pending))) {
574 if (evp->sched < evp->expire)
575 t = evp->sched;
576 else
577 t = evp->expire;
578 *delay = (t < currtime) ? 0 : (t - currtime);
579 }
580 else
581 *delay = -1;
582
583 return (0);
584 }
585
586 static size_t
scheduler_ram_messages(uint32_t from,uint32_t * dst,size_t size)587 scheduler_ram_messages(uint32_t from, uint32_t *dst, size_t size)
588 {
589 uint64_t id;
590 size_t n;
591 void *i;
592
593 for (n = 0, i = NULL; n < size; n++) {
594 if (tree_iterfrom(&ramqueue.messages, &i, from, &id, NULL) == 0)
595 break;
596 dst[n] = id;
597 }
598
599 return (n);
600 }
601
602 static size_t
scheduler_ram_envelopes(uint64_t from,struct evpstate * dst,size_t size)603 scheduler_ram_envelopes(uint64_t from, struct evpstate *dst, size_t size)
604 {
605 struct rq_message *msg;
606 struct rq_envelope *evp;
607 void *i;
608 size_t n;
609
610 if ((msg = tree_get(&ramqueue.messages, evpid_to_msgid(from))) == NULL)
611 return (0);
612
613 for (n = 0, i = NULL; n < size; ) {
614
615 if (tree_iterfrom(&msg->envelopes, &i, from, NULL,
616 (void**)&evp) == 0)
617 break;
618
619 if (evp->flags & (RQ_ENVELOPE_REMOVED | RQ_ENVELOPE_EXPIRED))
620 continue;
621
622 dst[n].evpid = evp->evpid;
623 dst[n].flags = 0;
624 dst[n].retry = 0;
625 dst[n].time = 0;
626
627 if (evp->state == RQ_EVPSTATE_PENDING) {
628 dst[n].time = evp->sched;
629 dst[n].flags = EF_PENDING;
630 }
631 else if (evp->state == RQ_EVPSTATE_SCHEDULED) {
632 dst[n].time = evp->t_scheduled;
633 dst[n].flags = EF_PENDING;
634 }
635 else if (evp->state == RQ_EVPSTATE_INFLIGHT) {
636 dst[n].time = evp->t_inflight;
637 dst[n].flags = EF_INFLIGHT;
638 }
639 else if (evp->state == RQ_EVPSTATE_HELD) {
640 /* same as scheduled */
641 dst[n].time = evp->t_scheduled;
642 dst[n].flags = EF_PENDING;
643 dst[n].flags |= EF_HOLD;
644 }
645 if (evp->flags & RQ_ENVELOPE_SUSPEND)
646 dst[n].flags |= EF_SUSPEND;
647
648 n++;
649 }
650
651 return (n);
652 }
653
654 static int
scheduler_ram_schedule(uint64_t evpid)655 scheduler_ram_schedule(uint64_t evpid)
656 {
657 struct rq_message *msg;
658 struct rq_envelope *evp;
659 uint32_t msgid;
660 void *i;
661 int r;
662
663 currtime = time(NULL);
664
665 if (evpid > 0xffffffff) {
666 msgid = evpid_to_msgid(evpid);
667 if ((msg = tree_get(&ramqueue.messages, msgid)) == NULL)
668 return (0);
669 if ((evp = tree_get(&msg->envelopes, evpid)) == NULL)
670 return (0);
671 if (evp->state == RQ_EVPSTATE_INFLIGHT)
672 return (0);
673 rq_envelope_schedule(&ramqueue, evp);
674 return (1);
675 }
676 else {
677 msgid = evpid;
678 if ((msg = tree_get(&ramqueue.messages, msgid)) == NULL)
679 return (0);
680 i = NULL;
681 r = 0;
682 while (tree_iter(&msg->envelopes, &i, NULL, (void*)(&evp))) {
683 if (evp->state == RQ_EVPSTATE_INFLIGHT)
684 continue;
685 rq_envelope_schedule(&ramqueue, evp);
686 r++;
687 }
688 return (r);
689 }
690 }
691
692 static int
scheduler_ram_remove(uint64_t evpid)693 scheduler_ram_remove(uint64_t evpid)
694 {
695 struct rq_message *msg;
696 struct rq_envelope *evp;
697 uint32_t msgid;
698 void *i;
699 int r;
700
701 currtime = time(NULL);
702
703 if (evpid > 0xffffffff) {
704 msgid = evpid_to_msgid(evpid);
705 if ((msg = tree_get(&ramqueue.messages, msgid)) == NULL)
706 return (0);
707 if ((evp = tree_get(&msg->envelopes, evpid)) == NULL)
708 return (0);
709 if (rq_envelope_remove(&ramqueue, evp))
710 return (1);
711 return (0);
712 }
713 else {
714 msgid = evpid;
715 if ((msg = tree_get(&ramqueue.messages, msgid)) == NULL)
716 return (0);
717 i = NULL;
718 r = 0;
719 while (tree_iter(&msg->envelopes, &i, NULL, (void*)(&evp)))
720 if (rq_envelope_remove(&ramqueue, evp))
721 r++;
722 return (r);
723 }
724 }
725
726 static int
scheduler_ram_suspend(uint64_t evpid)727 scheduler_ram_suspend(uint64_t evpid)
728 {
729 struct rq_message *msg;
730 struct rq_envelope *evp;
731 uint32_t msgid;
732 void *i;
733 int r;
734
735 currtime = time(NULL);
736
737 if (evpid > 0xffffffff) {
738 msgid = evpid_to_msgid(evpid);
739 if ((msg = tree_get(&ramqueue.messages, msgid)) == NULL)
740 return (0);
741 if ((evp = tree_get(&msg->envelopes, evpid)) == NULL)
742 return (0);
743 if (rq_envelope_suspend(&ramqueue, evp))
744 return (1);
745 return (0);
746 }
747 else {
748 msgid = evpid;
749 if ((msg = tree_get(&ramqueue.messages, msgid)) == NULL)
750 return (0);
751 i = NULL;
752 r = 0;
753 while (tree_iter(&msg->envelopes, &i, NULL, (void*)(&evp)))
754 if (rq_envelope_suspend(&ramqueue, evp))
755 r++;
756 return (r);
757 }
758 }
759
760 static int
scheduler_ram_resume(uint64_t evpid)761 scheduler_ram_resume(uint64_t evpid)
762 {
763 struct rq_message *msg;
764 struct rq_envelope *evp;
765 uint32_t msgid;
766 void *i;
767 int r;
768
769 currtime = time(NULL);
770
771 if (evpid > 0xffffffff) {
772 msgid = evpid_to_msgid(evpid);
773 if ((msg = tree_get(&ramqueue.messages, msgid)) == NULL)
774 return (0);
775 if ((evp = tree_get(&msg->envelopes, evpid)) == NULL)
776 return (0);
777 if (rq_envelope_resume(&ramqueue, evp))
778 return (1);
779 return (0);
780 }
781 else {
782 msgid = evpid;
783 if ((msg = tree_get(&ramqueue.messages, msgid)) == NULL)
784 return (0);
785 i = NULL;
786 r = 0;
787 while (tree_iter(&msg->envelopes, &i, NULL, (void*)(&evp)))
788 if (rq_envelope_resume(&ramqueue, evp))
789 r++;
790 return (r);
791 }
792 }
793
794 static int
scheduler_ram_query(uint64_t evpid)795 scheduler_ram_query(uint64_t evpid)
796 {
797 uint32_t msgid;
798
799 if (evpid > 0xffffffff)
800 msgid = evpid_to_msgid(evpid);
801 else
802 msgid = evpid;
803
804 if (tree_get(&ramqueue.messages, msgid) == NULL)
805 return (0);
806
807 return (1);
808 }
809
810 static void
sorted_insert(struct rq_queue * rq,struct rq_envelope * evp)811 sorted_insert(struct rq_queue *rq, struct rq_envelope *evp)
812 {
813 struct rq_envelope *evp2;
814
815 SPLAY_INSERT(prioqtree, &rq->q_priotree, evp);
816 evp2 = SPLAY_NEXT(prioqtree, &rq->q_priotree, evp);
817 if (evp2)
818 TAILQ_INSERT_BEFORE(evp2, evp, entry);
819 else
820 TAILQ_INSERT_TAIL(&rq->q_pending, evp, entry);
821 }
822
823 static void
rq_queue_init(struct rq_queue * rq)824 rq_queue_init(struct rq_queue *rq)
825 {
826 memset(rq, 0, sizeof *rq);
827 tree_init(&rq->messages);
828 TAILQ_INIT(&rq->q_pending);
829 TAILQ_INIT(&rq->q_inflight);
830 TAILQ_INIT(&rq->q_mta);
831 TAILQ_INIT(&rq->q_mda);
832 TAILQ_INIT(&rq->q_bounce);
833 TAILQ_INIT(&rq->q_update);
834 TAILQ_INIT(&rq->q_expired);
835 TAILQ_INIT(&rq->q_removed);
836 SPLAY_INIT(&rq->q_priotree);
837 }
838
839 static void
rq_queue_merge(struct rq_queue * rq,struct rq_queue * update)840 rq_queue_merge(struct rq_queue *rq, struct rq_queue *update)
841 {
842 struct rq_message *message, *tomessage;
843 struct rq_envelope *envelope;
844 uint64_t id;
845 void *i;
846
847 while (tree_poproot(&update->messages, &id, (void*)&message)) {
848 if ((tomessage = tree_get(&rq->messages, id)) == NULL) {
849 /* message does not exist. reuse structure */
850 tree_xset(&rq->messages, id, message);
851 continue;
852 }
853 /* need to re-link all envelopes before merging them */
854 i = NULL;
855 while ((tree_iter(&message->envelopes, &i, &id,
856 (void*)&envelope)))
857 envelope->message = tomessage;
858 tree_merge(&tomessage->envelopes, &message->envelopes);
859 free(message);
860 stat_decrement("scheduler.ramqueue.message", 1);
861 }
862
863 /* Sorted insert in the pending queue */
864 while ((envelope = TAILQ_FIRST(&update->q_pending))) {
865 TAILQ_REMOVE(&update->q_pending, envelope, entry);
866 sorted_insert(rq, envelope);
867 }
868
869 rq->evpcount += update->evpcount;
870 }
871
872 #define SCHEDULEMAX 1024
873
874 static void
rq_queue_schedule(struct rq_queue * rq)875 rq_queue_schedule(struct rq_queue *rq)
876 {
877 struct rq_envelope *evp;
878 size_t n;
879
880 n = 0;
881 while ((evp = TAILQ_FIRST(&rq->q_pending))) {
882 if (evp->sched > currtime && evp->expire > currtime)
883 break;
884
885 if (n == SCHEDULEMAX)
886 break;
887
888 if (evp->state != RQ_EVPSTATE_PENDING)
889 fatalx("evp:%016" PRIx64 " flags=0x%x", evp->evpid,
890 evp->flags);
891
892 if (evp->expire <= currtime) {
893 TAILQ_REMOVE(&rq->q_pending, evp, entry);
894 SPLAY_REMOVE(prioqtree, &rq->q_priotree, evp);
895 TAILQ_INSERT_TAIL(&rq->q_expired, evp, entry);
896 evp->state = RQ_EVPSTATE_SCHEDULED;
897 evp->flags |= RQ_ENVELOPE_EXPIRED;
898 evp->t_scheduled = currtime;
899 continue;
900 }
901 rq_envelope_schedule(rq, evp);
902 n += 1;
903 }
904 }
905
906 static struct evplist *
rq_envelope_list(struct rq_queue * rq,struct rq_envelope * evp)907 rq_envelope_list(struct rq_queue *rq, struct rq_envelope *evp)
908 {
909 switch (evp->state) {
910 case RQ_EVPSTATE_PENDING:
911 return &rq->q_pending;
912
913 case RQ_EVPSTATE_SCHEDULED:
914 if (evp->flags & RQ_ENVELOPE_EXPIRED)
915 return &rq->q_expired;
916 if (evp->flags & RQ_ENVELOPE_REMOVED)
917 return &rq->q_removed;
918 if (evp->flags & RQ_ENVELOPE_UPDATE)
919 return &rq->q_update;
920 if (evp->type == D_MTA)
921 return &rq->q_mta;
922 if (evp->type == D_MDA)
923 return &rq->q_mda;
924 if (evp->type == D_BOUNCE)
925 return &rq->q_bounce;
926 fatalx("%016" PRIx64 " bad evp type %d", evp->evpid, evp->type);
927
928 case RQ_EVPSTATE_INFLIGHT:
929 return &rq->q_inflight;
930
931 case RQ_EVPSTATE_HELD:
932 return (NULL);
933 }
934
935 fatalx("%016" PRIx64 " bad state %d", evp->evpid, evp->state);
936 return (NULL);
937 }
938
939 static void
rq_envelope_schedule(struct rq_queue * rq,struct rq_envelope * evp)940 rq_envelope_schedule(struct rq_queue *rq, struct rq_envelope *evp)
941 {
942 struct rq_holdq *hq;
943 struct evplist *q = NULL;
944
945 switch (evp->type) {
946 case D_MTA:
947 q = &rq->q_mta;
948 break;
949 case D_MDA:
950 q = &rq->q_mda;
951 break;
952 case D_BOUNCE:
953 q = &rq->q_bounce;
954 break;
955 }
956
957 if (evp->flags & RQ_ENVELOPE_UPDATE)
958 q = &rq->q_update;
959
960 if (evp->state == RQ_EVPSTATE_HELD) {
961 hq = tree_xget(&holdqs[evp->type], evp->holdq);
962 TAILQ_REMOVE(&hq->q, evp, entry);
963 hq->count -= 1;
964 if (TAILQ_EMPTY(&hq->q)) {
965 tree_xpop(&holdqs[evp->type], evp->holdq);
966 free(hq);
967 }
968 evp->holdq = 0;
969 stat_decrement("scheduler.ramqueue.hold", 1);
970 }
971 else if (!(evp->flags & RQ_ENVELOPE_SUSPEND)) {
972 TAILQ_REMOVE(&rq->q_pending, evp, entry);
973 SPLAY_REMOVE(prioqtree, &rq->q_priotree, evp);
974 }
975
976 TAILQ_INSERT_TAIL(q, evp, entry);
977 evp->state = RQ_EVPSTATE_SCHEDULED;
978 evp->t_scheduled = currtime;
979 }
980
981 static int
rq_envelope_remove(struct rq_queue * rq,struct rq_envelope * evp)982 rq_envelope_remove(struct rq_queue *rq, struct rq_envelope *evp)
983 {
984 struct rq_holdq *hq;
985 struct evplist *evl;
986
987 if (evp->flags & (RQ_ENVELOPE_REMOVED | RQ_ENVELOPE_EXPIRED))
988 return (0);
989 /*
990 * If envelope is inflight, mark it envelope for removal.
991 */
992 if (evp->state == RQ_EVPSTATE_INFLIGHT) {
993 evp->flags |= RQ_ENVELOPE_REMOVED;
994 return (1);
995 }
996
997 if (evp->state == RQ_EVPSTATE_HELD) {
998 hq = tree_xget(&holdqs[evp->type], evp->holdq);
999 TAILQ_REMOVE(&hq->q, evp, entry);
1000 hq->count -= 1;
1001 if (TAILQ_EMPTY(&hq->q)) {
1002 tree_xpop(&holdqs[evp->type], evp->holdq);
1003 free(hq);
1004 }
1005 evp->holdq = 0;
1006 stat_decrement("scheduler.ramqueue.hold", 1);
1007 }
1008 else if (!(evp->flags & RQ_ENVELOPE_SUSPEND)) {
1009 evl = rq_envelope_list(rq, evp);
1010 TAILQ_REMOVE(evl, evp, entry);
1011 if (evl == &rq->q_pending)
1012 SPLAY_REMOVE(prioqtree, &rq->q_priotree, evp);
1013 }
1014
1015 TAILQ_INSERT_TAIL(&rq->q_removed, evp, entry);
1016 evp->state = RQ_EVPSTATE_SCHEDULED;
1017 evp->flags |= RQ_ENVELOPE_REMOVED;
1018 evp->t_scheduled = currtime;
1019
1020 return (1);
1021 }
1022
1023 static int
rq_envelope_suspend(struct rq_queue * rq,struct rq_envelope * evp)1024 rq_envelope_suspend(struct rq_queue *rq, struct rq_envelope *evp)
1025 {
1026 struct rq_holdq *hq;
1027 struct evplist *evl;
1028
1029 if (evp->flags & RQ_ENVELOPE_SUSPEND)
1030 return (0);
1031
1032 if (evp->state == RQ_EVPSTATE_HELD) {
1033 hq = tree_xget(&holdqs[evp->type], evp->holdq);
1034 TAILQ_REMOVE(&hq->q, evp, entry);
1035 hq->count -= 1;
1036 if (TAILQ_EMPTY(&hq->q)) {
1037 tree_xpop(&holdqs[evp->type], evp->holdq);
1038 free(hq);
1039 }
1040 evp->holdq = 0;
1041 evp->state = RQ_EVPSTATE_PENDING;
1042 stat_decrement("scheduler.ramqueue.hold", 1);
1043 }
1044 else if (evp->state != RQ_EVPSTATE_INFLIGHT) {
1045 evl = rq_envelope_list(rq, evp);
1046 TAILQ_REMOVE(evl, evp, entry);
1047 if (evl == &rq->q_pending)
1048 SPLAY_REMOVE(prioqtree, &rq->q_priotree, evp);
1049 }
1050
1051 evp->flags |= RQ_ENVELOPE_SUSPEND;
1052
1053 return (1);
1054 }
1055
1056 static int
rq_envelope_resume(struct rq_queue * rq,struct rq_envelope * evp)1057 rq_envelope_resume(struct rq_queue *rq, struct rq_envelope *evp)
1058 {
1059 struct evplist *evl;
1060
1061 if (!(evp->flags & RQ_ENVELOPE_SUSPEND))
1062 return (0);
1063
1064 if (evp->state != RQ_EVPSTATE_INFLIGHT) {
1065 evl = rq_envelope_list(rq, evp);
1066 if (evl == &rq->q_pending)
1067 sorted_insert(rq, evp);
1068 else
1069 TAILQ_INSERT_TAIL(evl, evp, entry);
1070 }
1071
1072 evp->flags &= ~RQ_ENVELOPE_SUSPEND;
1073
1074 return (1);
1075 }
1076
1077 static void
rq_envelope_delete(struct rq_queue * rq,struct rq_envelope * evp)1078 rq_envelope_delete(struct rq_queue *rq, struct rq_envelope *evp)
1079 {
1080 tree_xpop(&evp->message->envelopes, evp->evpid);
1081 if (tree_empty(&evp->message->envelopes)) {
1082 tree_xpop(&rq->messages, evp->message->msgid);
1083 free(evp->message);
1084 stat_decrement("scheduler.ramqueue.message", 1);
1085 }
1086
1087 free(evp);
1088 rq->evpcount--;
1089 stat_decrement("scheduler.ramqueue.envelope", 1);
1090 }
1091
1092 static const char *
rq_envelope_to_text(struct rq_envelope * e)1093 rq_envelope_to_text(struct rq_envelope *e)
1094 {
1095 static char buf[256];
1096 char t[64];
1097
1098 (void)snprintf(buf, sizeof buf, "evp:%016" PRIx64 " [", e->evpid);
1099
1100 if (e->type == D_BOUNCE)
1101 (void)strlcat(buf, "bounce", sizeof buf);
1102 else if (e->type == D_MDA)
1103 (void)strlcat(buf, "mda", sizeof buf);
1104 else if (e->type == D_MTA)
1105 (void)strlcat(buf, "mta", sizeof buf);
1106
1107 (void)snprintf(t, sizeof t, ",expire=%s",
1108 duration_to_text(e->expire - currtime));
1109 (void)strlcat(buf, t, sizeof buf);
1110
1111
1112 switch (e->state) {
1113 case RQ_EVPSTATE_PENDING:
1114 (void)snprintf(t, sizeof t, ",pending=%s",
1115 duration_to_text(e->sched - currtime));
1116 (void)strlcat(buf, t, sizeof buf);
1117 break;
1118
1119 case RQ_EVPSTATE_SCHEDULED:
1120 (void)snprintf(t, sizeof t, ",scheduled=%s",
1121 duration_to_text(currtime - e->t_scheduled));
1122 (void)strlcat(buf, t, sizeof buf);
1123 break;
1124
1125 case RQ_EVPSTATE_INFLIGHT:
1126 (void)snprintf(t, sizeof t, ",inflight=%s",
1127 duration_to_text(currtime - e->t_inflight));
1128 (void)strlcat(buf, t, sizeof buf);
1129 break;
1130
1131 case RQ_EVPSTATE_HELD:
1132 (void)snprintf(t, sizeof t, ",held=%s",
1133 duration_to_text(currtime - e->t_inflight));
1134 (void)strlcat(buf, t, sizeof buf);
1135 break;
1136 default:
1137 fatalx("%016" PRIx64 " bad state %d", e->evpid, e->state);
1138 }
1139
1140 if (e->flags & RQ_ENVELOPE_REMOVED)
1141 (void)strlcat(buf, ",removed", sizeof buf);
1142 if (e->flags & RQ_ENVELOPE_EXPIRED)
1143 (void)strlcat(buf, ",expired", sizeof buf);
1144 if (e->flags & RQ_ENVELOPE_SUSPEND)
1145 (void)strlcat(buf, ",suspended", sizeof buf);
1146
1147 (void)strlcat(buf, "]", sizeof buf);
1148
1149 return (buf);
1150 }
1151
1152 static void
rq_queue_dump(struct rq_queue * rq,const char * name)1153 rq_queue_dump(struct rq_queue *rq, const char * name)
1154 {
1155 struct rq_message *message;
1156 struct rq_envelope *envelope;
1157 void *i, *j;
1158 uint64_t id;
1159
1160 log_debug("debug: /--- ramqueue: %s", name);
1161
1162 i = NULL;
1163 while ((tree_iter(&rq->messages, &i, &id, (void*)&message))) {
1164 log_debug("debug: | msg:%08" PRIx32, message->msgid);
1165 j = NULL;
1166 while ((tree_iter(&message->envelopes, &j, &id,
1167 (void*)&envelope)))
1168 log_debug("debug: | %s",
1169 rq_envelope_to_text(envelope));
1170 }
1171 log_debug("debug: \\---");
1172 }
1173
1174 static int
rq_envelope_cmp(struct rq_envelope * e1,struct rq_envelope * e2)1175 rq_envelope_cmp(struct rq_envelope *e1, struct rq_envelope *e2)
1176 {
1177 time_t ref1, ref2;
1178
1179 ref1 = (e1->sched < e1->expire) ? e1->sched : e1->expire;
1180 ref2 = (e2->sched < e2->expire) ? e2->sched : e2->expire;
1181 if (ref1 != ref2)
1182 return (ref1 < ref2) ? -1 : 1;
1183
1184 if (e1->evpid != e2->evpid)
1185 return (e1->evpid < e2->evpid) ? -1 : 1;
1186
1187 return 0;
1188 }
1189
1190 SPLAY_GENERATE(prioqtree, rq_envelope, t_entry, rq_envelope_cmp);
1191